[MCU] 嘉楠K230AI开发板测评7--AI Demo开发框架

dfjs   2024-11-6 20:55 楼主

嘉楠科K230AI开发板测评7--AI视觉篇

1、AI视觉开发框架

    更高级的机器视觉(AI视觉)需要使用KPU。可以简单类别比计算机的GPU(显卡),本质是实现高速的图像数据运算。

    KPU是K230内部一个神经网络处理器,它可以在低功耗的情况下实现卷积神经网络计算,实时获取被检测目标的大小、坐标和种类,对人脸或者物体进行检测和分类。K230 KPU支持INT8和INT16, 典型网络下实测推理能力可达K210的13.7倍,MAC利用率超70%。

    CanMV官方基于K230专门搭建了配套的AI视觉开发框架,框架结构如下图所示:

图片.png

    这个框架简单来说就是Sensor(摄像头)默认输出两路图像,一路格式为YUV420,直接给到Display显示;另一路格式为RGB888,给到AI部分进行处理。AI主要实现任务的前处理、推理和后处理流程,得到后处理结果后将其绘制在osd image实例上,并送给Display叠加,最后在HDMI、LCD或IDE缓冲区显示识别结果。

这套框架的优势是用户可以直接基于处理结果编程实现自己的功能,同时AI主要实现任务的前处理、推理和后处理流程也是通过Python代码实现,方便用户深入二次开发。充分满足不同用户和开发者的需求。

    AI视觉开发框架主要API接口有:

    PineLine : 将sensor、display封装成固定接口,用于采集图像、画图以及结果图片显示。

    Ai2d : 预处理(Preprocess)相关接口。

    AIBase : 模型推理主要接口。

2、相关接口

    可在该网址查看:https://developer.canaan-creative.com/k230_canmv/main/zh/example/ai/AI_Demo%E8%AF%B4%E6%98%8E%E6%96%87%E6%A1%A3.html

2.1 PipeLine

    将Media部分的代码封装在PipeLine类型中,通过固定的接口实现整个流程操作。

    PipeLine类提供的接口包括:

  1. 初始化参数

    rgb888p_size:list类型,预设给到AI部分的图像分辨率;如rgb888p_size=[1920,1080]。

    display_size:list类型,显示部分Display的分辨率;如display_size=[1920,1080]。

    display_mode:str类型,显示模式,包括”hdmi“和”lcd“;如display_mode=”hdmi“。

    debug_mode:int类型,耗时调试模式,如果大于0,打印操作耗时;如debug_mode=0。

  1. creat(sensor=None,hmirror=None,vfilp=None)

    sensor:参数为可选参数,类型为Sensor对象,可自主配置现有CanMV、01Studio和k230d zero开发板实现了自动探测,可以默认使用create()实现。

    hmirror:默认为None,当主动设置时为bool类型(True/False),表示是否实现水平方向镜像显示。

    vflip: 默认为None,当主动设置时为bool类型(True/False),表示是否实现垂直方向翻转。

  1. get_frame

    返回一帧ulab.numpy.ndarray类型图像数据,分辨率为rgb888p_size,排布为CHW。

  1. show_image

    PipeLine实例中预设一帧OSD图像,该接口将成员变量osd_img显示在屏幕上。

  1. destroy

    销毁PipeLine实例。

    示例代码:

from libs.PipeLine import PipeLine, ScopedTiming
from media.media import *
import gc
import sys,os

if __name__ == "__main__":
    # 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
    display_mode="hdmi"
    if display_mode=="hdmi":
        display_size=[1920,1080]
    else:
        display_size=[800,480]

    # 初始化PipeLine,用于图像处理流程
    pl = PipeLine(rgb888p_size=[1920,1080], display_size=display_size, display_mode=display_mode)
    pl.create()  # 创建PipeLine实例
    try:
        while True:
            os.exitpoint()                      # 检查是否有退出信号
            with ScopedTiming("total",1):
                img = pl.get_frame()            # 获取当前帧数据
                print(img.shape)
                gc.collect()                    # 垃圾回收
    except Exception as e:
        sys.print_exception(e)                  # 打印异常信息
    finally:
        pl.destroy()                            # 销毁PipeLine实例

    通过pl.get_frame()接口获取一帧分辨率为rgb888p_size的图像,类型为ulab.numpy.ndarray,排布为CHW。基于上面的代码得到了一帧图像给AI处理,只关注AI推理部分的操作即可。

    图像AI开发过程包括:图像预处理、模型推理、输出后处理的过程,整个过程封装在Ai2d类和AIBase类中。

2.2 Ai2d

    对于Ai2d类,我们给出了常见的几种预处理方法,包括crop/shift/pad/resize/affine。该类别提供的接口包括:

  1. 初始化参数

    debug_mode:int类型,耗时调试模式,如果大于0,打印操作耗时;如debug_mode=0。

  1. Set_ai2d_dtype(input_format,output_format,input_type,output_type)

    设置ai2d计算过程中的输入输出数据类型,输入输出数据格式。

  1. Crop(start_x,start_y,width,height)预处理crop函数:

    start_x:宽度方向的起始像素,int类型

    start_y: 高度方向的起始像素,int类型

    width: 宽度方向的crop长度,int类型

    height: 高度方向的crop长度,int类型

  1. Shift(shift_va预处理shift函数:

    shift_val:右移的比特数,int类型

  1. Pad(paddings,pad_mode,pad_val)预处理padding函数:

    paddings:各个维度的padding, size=8,分别表示dim0到dim4的前后padding的个数,其中dim0/dim1固定配置{0, 0},list类型

    pad_mode:只支持pad constant,配置0即可,int类型

    pad_val:每个channel的padding value,list类型

  1. Resize(interp_method,interp_mode)预处理resize函数:

    interp_method:resize插值方法,ai2d_interp_method类型

    interp_mode:resize模式,ai2d_interp_mode类型

  1. Affine(interp_method,crop_round,bound_ind,bound_val,bound_smooth,M)预处理affine函数:

    interp_method:Affine采用的插值方法,ai2d_interp_method类型

    cord_round:整数边界0或者1,uint32_t类型

    bound_ind:边界像素模式0或者1,uint32_t类型

    bound_val:边界填充值,uint32_t类型

    bound_smooth:边界平滑0或者1,uint32_t类型

    M:仿射变换矩阵对应的vector,仿射变换为Y=[a_0, a_1; a_2, a_3] \cdot  X + [b_0, b_1] $, 则  M=[a_0,a_1,b_0,a_2,a_3,b_1 ],list类型

  1. Build(ai2d_input_shape,ai2d_output_shape):ai2d构造函数,前面配置的预处理方法起作用。
  2. Run使用ai2d完成预处理

    注意:

        (1) Affine和Resize功能是互斥的,不能同时开启; (2) Shift功能的输入格式只能是Raw16; (3) Pad value是按通道配置的,对应的list元素个数要与channel数相等; (4) 当配置了多个功能时,执行顺序是Crop->Shift->Resize/Affine->Pad, 配置参数时注意要匹配;如果不符合该顺序,需要初始化多个Ai2d实例实现预处理过程;  

    示例代码如下:

from libs.PipeLine import PipeLine, ScopedTiming
from libs.AI2D import Ai2d
from media.media import *
import nncase_runtime as nn
import gc
import sys,os

if __name__ == "__main__":
    # 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
    display_mode="hdmi"
    if display_mode=="hdmi":
        display_size=[1920,1080]
    else:
        display_size=[800,480]

    # 初始化PipeLine,用于图像处理流程
    pl = PipeLine(rgb888p_size=[512,512], display_size=display_size, display_mode=display_mode)
    pl.create()  # 创建PipeLine实例
    my_ai2d=Ai2d(debug_mode=0) #初始化Ai2d实例 
    # 配置resize预处理方法
    my_ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
    # 构建预处理过程
    my_ai2d.build([1,3,512,512],[1,3,640,640])
    try:
        while True:
            os.exitpoint()                      # 检查是否有退出信号
            with ScopedTiming("total",1):
                img = pl.get_frame()            # 获取当前帧数据
                print(img.shape)                # 原图shape为[1,3,512,512]
                ai2d_output_tensor=my_ai2d.run(img) # 执行resize预处理
                ai2d_output_np=ai2d_output_tensor.to_numpy() # 类型转换
                print(ai2d_output_np.shape)        # 预处理后的shape为[1,3,640,640]
                gc.collect()                    # 垃圾回收
    except Exception as e:
        sys.print_exception(e)                  # 打印异常信息
    finally:
        pl.destroy()                            # 销毁PipeLine实例

2.3 AIBase                     

    AIBase部分封装了实现模型推理的主要接口,也是进行AI开发主要关注的部分。用户需要按照自己demo的要求实现前处理和后处理部分。

    AIBase提供的接口包括:

  1. 初始化参数

    kmodel_path:str类型,kmodel路径,用于初始化kpu对象并加载kmodel;

    model_input_size:list类型,可选,模型输入分辨率,在单输入时起作用,格式为[width,height],如:model_input_size=[512,512];

    ​ rgb888p_size:list类型,可选,AI得到的图像的分辨率,在单输入时起作用,格式为[width,height],如:rgb888p_size=[640,640];

    debug_mode:int类型,耗时调试模式,如果大于0,打印操作耗时;如debug_mode=0。

  1. get_kmodel_inputs_num():返回当前模型的输入个数
  2. get_kmodel_outputs_num():返回当前模型的输出个数
  3. preprocess(input_np):使用ai2d对input_np做预处理,,如果不使用单个ai2d实例做预处理,需要在子类重写该函数
  4. inference(tensors):对预处理后得到的kmodel的输入(类型为tensor)进行推理,得到多个输出(类型为ulab.numpy.ndarray)
  5. postprocess(results):模型输出后处理函数,该函数需要用户在任务子类重写,因为不同AI任务的后处理是不同的
  6. run(input_np):模型的前处理、推理、后处理流程,适用于单ai2d实例能解决的前处理的AI任务,其他任务需要用户在子类重写。
  7. deinit():AIBase销毁函数。

2.4 ScopedTiming

    ScopedTiming 类在PipeLine.py模块内,是一个用来测量代码块执行时间的上下文管理器。上下文管理器通过定义包含 __enter__ 和 __exit__ 方法的类来创建。当在 with 语句中使用该类的实例时,__enter__ 在进入 with 块时被调用,__exit__ 在离开时被调用。

    示例代码:

from libs.PipeLine import ScopedTiming

def test_time():
    with ScopedTiming("test",1):
        #####代码#####
        # ...
        ##############

 3、应用方法和示例

    用户可根据具体的AI场景自写任务类继承AIBase,可以将任务分为如下四类:单模型任务、多模型任务,自定义预处理任务、无预处理任务。不同任务需要编写不同的代码实现,具体如下图所示:

图片.png  

3.1 单模型任务

    该任务只有一个模型,只需要关注该模型的前处理、推理、后处理过程,此类任务的前处理使用Ai2d实现,可能使用一个Ai2d实例,也可能使用多个Ai2d实例,后处理基于场景自定义。

编写自定义任务类,主要关注任务类的config_preprocess、postprocess、以及该任务需要的其他方法如:draw_result等。
    如果该任务包含多个Ai2d实例,则需要重写preprocess,按照预处理的顺序设置预处理阶段的计算过程。

    单模型任务的伪代码结构如下:

from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import gc
import sys

# 自定义AI任务类,继承自AIBase基类
class MyAIApp(AIBase):
    def __init__(self, kmodel_path, model_input_size, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
        # 调用基类的构造函数
        super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)  
        # 模型文件路径
        self.kmodel_path = kmodel_path  
        # 模型输入分辨率
        self.model_input_size = model_input_size  
        # sensor给到AI的图像分辨率,并对宽度进行16的对齐
        self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]   
        # 显示分辨率,并对宽度进行16的对齐
        self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]] 
        # 是否开启调试模式
        self.debug_mode = debug_mode  
        # 实例化Ai2d,用于实现模型预处理
        self.ai2d = Ai2d(debug_mode)  
        # 设置Ai2d的输入输出格式和类型
        self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)  

    # 配置预处理操作,这里使用了resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
    def config_preprocess(self, input_image_size=None):
        with ScopedTiming("set preprocess config", self.debug_mode > 0): 
            # 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,可以通过设置input_image_size自行修改输入尺寸
            ai2d_input_size = input_image_size if input_image_size else self.rgb888p_size  
            # 配置resize预处理方法
            self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel) 
            # 构建预处理流程
            self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])  

    # 自定义当前任务的后处理,results是模型输出array列表,需要根据实际任务重写
    def postprocess(self, results):
        with ScopedTiming("postprocess", self.debug_mode > 0):
           pass

    # 绘制结果到画面上,需要根据任务自己写
    def draw_result(self, pl, dets):
        with ScopedTiming("display_draw", self.debug_mode > 0):
            pass

if __name__ == "__main__":
    # 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
    display_mode="hdmi"
    if display_mode=="hdmi":
        display_size=[1920,1080]
    else:
        display_size=[800,480]
    # 设置模型路径,这里要替换成当前任务模型
    kmodel_path = "example_test.kmodel"
    rgb888p_size = [1920, 1080]
    ###### 其它参数########
    ...
    ######################

    # 初始化PipeLine,用于图像处理流程
    pl = PipeLine(rgb888p_size=rgb888p_size, display_size=display_size, display_mode=display_mode)
    pl.create()  # 创建PipeLine实例
    # 初始化自定义AI任务实例
    my_ai = MyAIApp(kmodel_path, model_input_size=[320, 320],rgb888p_size=rgb888p_size, display_size=display_size, debug_mode=0)
    my_ai.config_preprocess()  # 配置预处理

    try:
        while True:
            os.exitpoint()                      # 检查是否有退出信号
            with ScopedTiming("total",1):
                img = pl.get_frame()            # 获取当前帧数据
                res = my_ai.run(img)            # 推理当前帧
                my_ai.draw_result(pl, res)      # 绘制结果
                pl.show_image()                 # 显示结果
                gc.collect()                    # 垃圾回收
    except Exception as e:
        sys.print_exception(e)                  # 打印异常信息
    finally:
        my_ai.deinit()                          # 反初始化
        pl.destroy()                            # 销毁PipeLine实例
                 

    多个Ai2d实例时的伪代码如下:

from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import gc
import sys

# 自定义AI任务类,继承自AIBase基类
class MyAIApp(AIBase):
    def __init__(self, kmodel_path, model_input_size, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
        # 调用基类的构造函数
        super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)  
        # 模型文件路径
        self.kmodel_path = kmodel_path  
        # 模型输入分辨率
        self.model_input_size = model_input_size  
        # sensor给到AI的图像分辨率,并对宽度进行16的对齐
        self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]   
        # 显示分辨率,并对宽度进行16的对齐
        self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]] 
        # 是否开启调试模式
        self.debug_mode = debug_mode  
        # 实例化Ai2d,用于实现模型预处理
        self.ai2d_resize = Ai2d(debug_mode)  
        # 设置Ai2d的输入输出格式和类型
        self.ai2d_resize.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)  
        # 实例化Ai2d,用于实现模型预处理
        self.ai2d_resize = Ai2d(debug_mode)  
        # 设置Ai2d的输入输出格式和类型
        self.ai2d_resize.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)  
        # 实例化Ai2d,用于实现模型预处理
        self.ai2d_crop = Ai2d(debug_mode)  
        # 设置Ai2d的输入输出格式和类型
        self.ai2d_crop.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)

    # 配置预处理操作,这里使用了resize和crop,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
    def config_preprocess(self, input_image_size=None):
        with ScopedTiming("set preprocess config", self.debug_mode > 0): 
            # 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,可以通过设置input_image_size自行修改输入尺寸
            ai2d_input_size = input_image_size if input_image_size else self.rgb888p_size  
            # 配置resize预处理方法
            self.ai2d_resize.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel) 
            # 构建预处理流程
            self.ai2d_resize.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,640,640])  
            # 配置crop预处理方法
            self.ai2d_crop.crop(0,0,320,320)
            # 构建预处理流程
            self.ai2d_crop.build([1,3,640,640],[1,3,320,320])  
    
    # 假设该任务需要crop和resize预处理,顺序是先resize再crop,该顺序不符合ai2d的处理顺序,因此需要设置两个Ai2d实例分别处理       
    def preprocess(self,input_np):
        resize_tensor=self.ai2d_resize.run(input_np)
        resize_np=resize_tensor.to_numpy()
        crop_tensor=self.ai2d_crop.run(resize_np)
        return [crop_tensor]
        

    # 自定义当前任务的后处理,results是模型输出array列表,需要根据实际任务重写
    def postprocess(self, results):
        with ScopedTiming("postprocess", self.debug_mode > 0):
           pass

    # 绘制结果到画面上,需要根据任务自己写
    def draw_result(self, pl, dets):
        with ScopedTiming("display_draw", self.debug_mode > 0):
            pass
        
    # 重写deinit,释放多个ai2d资源
    def deinit(self):
        with ScopedTiming("deinit",self.debug_mode > 0):
            del self.ai2d_resize
            del self.ai2d_crop
            super().deinit()

if __name__ == "__main__":
    # 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
    display_mode="hdmi"
    if display_mode=="hdmi":
        display_size=[1920,1080]
    else:
        display_size=[800,480]
    # 设置模型路径,这里要替换成当前任务模型
    kmodel_path = "example_test.kmodel"
    rgb888p_size = [1920, 1080]
    ###### 其它参数########
    ...
    ######################

    # 初始化PipeLine,用于图像处理流程
    pl = PipeLine(rgb888p_size=rgb888p_size, display_size=display_size, display_mode=display_mode)
    pl.create()  # 创建PipeLine实例
    # 初始化自定义AI任务实例
    my_ai = MyAIApp(kmodel_path, model_input_size=[320, 320],rgb888p_size=rgb888p_size, display_size=display_size, debug_mode=0)
    my_ai.config_preprocess()  # 配置预处理

    try:
        while True:
            os.exitpoint()                      # 检查是否有退出信号
            with ScopedTiming("total",1):
                img = pl.get_frame()            # 获取当前帧数据
                res = my_ai.run(img)            # 推理当前帧
                my_ai.draw_result(pl, res)      # 绘制结果
                pl.show_image()                 # 显示结果
                gc.collect()                    # 垃圾回收
    except Exception as e:
        sys.print_exception(e)                  # 打印异常信息
    finally:
        my_ai.deinit()                          # 反初始化
        pl.destroy()                            # 销毁PipeLine实例

3.2 自定义预处理任务

    该任务只有一个模型,只需要关注该模型的前处理、推理、后处理过程,此类任务的前处理不使用Ai2d实现,可以使用ulab.numpy自定义,后处理基于场景自定义。

    编写自定义任务类,主要关注任务类的preprocess、postprocess、以及该任务需要的其他方法如:draw_result等

    对于需要重写前处理(不使用提供的ai2d类,自己手动写预处理)的AI任务伪代码如下:

from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import gc
import sys

# 自定义AI任务类,继承自AIBase基类
class MyAIApp(AIBase):
    def __init__(self, kmodel_path, model_input_size, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
        # 调用基类的构造函数
        super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)  
        # 模型文件路径
        self.kmodel_path = kmodel_path  
        # 模型输入分辨率
        self.model_input_size = model_input_size  
        # sensor给到AI的图像分辨率,并对宽度进行16的对齐
        self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]   
        # 显示分辨率,并对宽度进行16的对齐
        self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]] 
        # 是否开启调试模式
        self.debug_mode = debug_mode  
        # 实例化Ai2d,用于实现模型预处理
        self.ai2d = Ai2d(debug_mode)  
        # 设置Ai2d的输入输出格式和类型
        self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)  
    
    # 对于不使用ai2d完成预处理的AI任务,使用封装的接口或者ulab.numpy实现预处理,需要在子类中重写该函数
    def preprocess(self,input_np):
        #############
        #注意自定义预处理过程
        #############
        return [tensor]

    # 自定义当前任务的后处理,results是模型输出array列表,需要根据实际任务重写
    def postprocess(self, results):
        with ScopedTiming("postprocess", self.debug_mode > 0):
           pass
           
    # 绘制结果到画面上,需要根据任务自己写
    def draw_result(self, pl, dets):
        with ScopedTiming("display_draw", self.debug_mode > 0):
            pass

if __name__ == "__main__":
    # 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
    display_mode="hdmi"
    if display_mode=="hdmi":
        display_size=[1920,1080]
    else:
        display_size=[800,480]
    # 设置模型路径,这里要替换成当前任务模型
    kmodel_path = "example_test.kmodel"
    rgb888p_size = [1920, 1080]
    ###### 其它参数########
    ...
    ######################

    # 初始化PipeLine,用于图像处理流程
    pl = PipeLine(rgb888p_size=rgb888p_size, display_size=display_size, display_mode=display_mode)
    pl.create()  # 创建PipeLine实例
    # 初始化自定义AI任务实例
    my_ai = MyAIApp(kmodel_path, model_input_size=[320, 320],rgb888p_size=rgb888p_size, display_size=display_size, debug_mode=0)
    my_ai.config_preprocess()  # 配置预处理

    try:
        while True:
            os.exitpoint()                      # 检查是否有退出信号
            with ScopedTiming("total",1):
                img = pl.get_frame()            # 获取当前帧数据
                res = my_ai.run(img)            # 推理当前帧
                my_ai.draw_result(pl, res)      # 绘制结果
                pl.show_image()                 # 显示结果
                gc.collect()                    # 垃圾回收
    except Exception as e:
        sys.print_exception(e)                  # 打印异常信息
    finally:
        my_ai.deinit()                          # 反初始化
        pl.destroy()                            # 销毁PipeLine实例

3.3 无预处理任务

    该任务只有一个模型且不需要预处理,只需要关注该模型的推理和后处理过程,此类任务一般作为多模型任务的一部分,直接对前一个模型的输出做为输入推理,后处理基于需求自定义。

    编写自定义任务类,主要关注任务类的run(模型推理的整个过程,包括preprocess、inference、postprocess中的全部或某一些步骤)、postprocess、以及该任务需要的其他方法如:draw_results等

    对于不需要预处理(直接输入推理)的AI任务伪代码如下:

 from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import gc
import sys

# 自定义AI任务类,继承自AIBase基类
class MyAIApp(AIBase):
    def __init__(self, kmodel_path, model_input_size, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
        # 调用基类的构造函数
        super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)  
        # 模型文件路径
        self.kmodel_path = kmodel_path  
        # 模型输入分辨率
        self.model_input_size = model_input_size  
        # sensor给到AI的图像分辨率,并对宽度进行16的对齐
        self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]   
        # 显示分辨率,并对宽度进行16的对齐
        self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]] 
        # 是否开启调试模式
        self.debug_mode = debug_mode  

    # 自定义当前任务的后处理,results是模型输出array列表,需要根据实际任务重写
    def postprocess(self, results):
        with ScopedTiming("postprocess", self.debug_mode > 0):
           pass
           
    # 对于用预处理的AI任务,需要在子类中重写该函数
    def run(self,inputs_np):
        # 先将ulab.numpy.ndarray列表转换成tensor列表
        tensors=[]
        for input_np in inputs_np:
            tensors.append(nn.from_numpy(input_np))
        # 调用AIBase内的inference函数进行模型推理
        results=self.inference(tensors)
        # 调用当前子类的postprocess方法进行自定义后处理
        outputs=self.postprocess(results)
        return outputs

    # 绘制结果到画面上,需要根据任务自己写
    def draw_result(self, pl, dets):
        with ScopedTiming("display_draw", self.debug_mode > 0):
            pass

if __name__ == "__main__":
    # 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
    display_mode="hdmi"
    if display_mode=="hdmi":
        display_size=[1920,1080]
    else:
        display_size=[800,480]
    # 设置模型路径,这里要替换成当前任务模型
    kmodel_path = "example_test.kmodel"
    rgb888p_size = [1920, 1080]
    ###### 其它参数########
    ...
    ######################

    # 初始化PipeLine,用于图像处理流程
    pl = PipeLine(rgb888p_size=rgb888p_size, display_size=display_size, display_mode=display_mode)
    pl.create()  # 创建PipeLine实例
    # 初始化自定义AI任务实例
    my_ai = MyAIApp(kmodel_path, model_input_size=[320, 320],rgb888p_size=rgb888p_size, display_size=display_size, debug_mode=0)
    my_ai.config_preprocess()  # 配置预处理

    try:
        while True:
            os.exitpoint()                      # 检查是否有退出信号
            with ScopedTiming("total",1):
                img = pl.get_frame()            # 获取当前帧数据
                res = my_ai.run(img)            # 推理当前帧
                my_ai.draw_result(pl, res)      # 绘制结果
                pl.show_image()                 # 显示结果
                gc.collect()                    # 垃圾回收
    except Exception as e:
        sys.print_exception(e)                  # 打印异常信息
    finally:
        my_ai.deinit()                          # 反初始化
        pl.destroy()                            # 销毁PipeLine实例

3.4 多模型任务

    该任务包含多个模型,可能是串联,也可能是其他组合方式。对于每个模型基本上属于前三种模型中的一种,最后通过一个完整的任务类将上述模型子任务统一起来。

    编写多个子模型任务类,不同子模型任务参照前三种任务定义。不同任务关注不同的方法。
    编写多模型任务类,将子模型任务类统一起来实现整个场景。

    以双模型串联推理为例,给出的伪代码如下:

from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import gc
import sys

# 自定义AI任务类,继承自AIBase基类
class MyAIApp_1(AIBase):
    def __init__(self, kmodel_path, model_input_size, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
        # 调用基类的构造函数
        super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)  
        # 模型文件路径
        self.kmodel_path = kmodel_path  
        # 模型输入分辨率
        self.model_input_size = model_input_size  
        # sensor给到AI的图像分辨率,并对宽度进行16的对齐
        self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]   
        # 显示分辨率,并对宽度进行16的对齐
        self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]] 
        # 是否开启调试模式
        self.debug_mode = debug_mode  
        # 实例化Ai2d,用于实现模型预处理
        self.ai2d = Ai2d(debug_mode)  
        # 设置Ai2d的输入输出格式和类型
        self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)  

    # 配置预处理操作,这里使用了resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
    def config_preprocess(self, input_image_size=None):
        with ScopedTiming("set preprocess config", self.debug_mode > 0): 
            # 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,可以通过设置input_image_size自行修改输入尺寸
            ai2d_input_size = input_image_size if input_image_size else self.rgb888p_size  
            # 配置resize预处理方法
            self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel) 
            # 构建预处理流程
            self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])  

    # 自定义当前任务的后处理,results是模型输出array列表,需要根据实际任务重写
    def postprocess(self, results):
        with ScopedTiming("postprocess", self.debug_mode > 0):
           pass

            
# 自定义AI任务类,继承自AIBase基类
class MyAIApp_2(AIBase):
    def __init__(self, kmodel_path, model_input_size, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
        # 调用基类的构造函数
        super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)  
        # 模型文件路径
        self.kmodel_path = kmodel_path  
        # 模型输入分辨率
        self.model_input_size = model_input_size  
        # sensor给到AI的图像分辨率,并对宽度进行16的对齐
        self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]   
        # 显示分辨率,并对宽度进行16的对齐
        self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]] 
        # 是否开启调试模式
        self.debug_mode = debug_mode  
        # 实例化Ai2d,用于实现模型预处理
        self.ai2d = Ai2d(debug_mode)  
        # 设置Ai2d的输入输出格式和类型
        self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)  

    # 配置预处理操作,这里使用了resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
    def config_preprocess(self, input_image_size=None):
        with ScopedTiming("set preprocess config", self.debug_mode > 0): 
            # 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,可以通过设置input_image_size自行修改输入尺寸
            ai2d_input_size = input_image_size if input_image_size else self.rgb888p_size  
            # 配置resize预处理方法
            self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel) 
            # 构建预处理流程
            self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])  

    # 自定义当前任务的后处理,results是模型输出array列表,需要根据实际任务重写
    def postprocess(self, results):
        with ScopedTiming("postprocess", self.debug_mode > 0):
           pass
           
           
class MyApp:
    def __init__(kmodel1_path,kmodel2_path,kmodel1_input_size,kmodel2_input_size,rgb888p_size,display_size,debug_mode):
        # 创建两个模型推理的实例
        self.app_1=MyApp_1(kmodel1_path,kmodel1_input_size,rgb888p_size,display_size,debug_mode)
        self.app_2=MyApp_2(kmodel2_path,kmodel2_input_size,rgb888p_size,display_size,debug_mode)
        self.app_1.config_preprocess()
    
    # 编写run函数,具体代码根据AI任务的需求编写,此处只是给出一个示例
    def run(self,input_np):
        outputs_1=self.app_1.run(input_np)
        outputs_2=[]
        for out in outputs_1:
            self.app_2.config_preprocess(out)
            out_2=self.app_2.run(input_np)
            outputs_2.append(out_2)
        return outputs_1,outputs_2
            
    # 绘制
    def draw_result(self,pl,outputs_1,outputs_2):
        pass
        
    ######其他函数########
    # 省略
    ####################
        

if __name__ == "__main__":
    # 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
    display_mode="hdmi"
    if display_mode=="hdmi":
        display_size=[1920,1080]
    else:
        display_size=[800,480]
    rgb888p_size = [1920, 1080]
    # 设置模型路径,这里要替换成当前任务模型
    kmodel1_path = "test_kmodel1.kmodel"
    kmdoel1_input_size=[320,320]
    kmodel2_path = "test_kmodel2.kmodel"
    kmodel2_input_size=[48,48]
    
    ###### 其它参数########
    # 省略
    ######################

    # 初始化PipeLine,用于图像处理流程
    pl = PipeLine(rgb888p_size=rgb888p_size, display_size=display_size, display_mode=display_mode)
    pl.create()  # 创建PipeLine实例
    # 初始化自定义AI任务实例
    my_ai = MyApp(kmodel1_path,kmodel2_path, kmodel1_input_size,kmodel2_input_size,rgb888p_size=rgb888p_size, display_size=display_size, debug_mode=0)
    my_ai.config_preprocess()  # 配置预处理

    try:
        while True:
            os.exitpoint()                      # 检查是否有退出信号
            with ScopedTiming("total",1):
                img = pl.get_frame()            # 获取当前帧数据
                outputs_1,outputs_2 = my_ai.run(img)            # 推理当前帧
                my_ai.draw_result(pl, outputs_1,outputs_2)      # 绘制结果
                pl.show_image()                 # 显示结果
                gc.collect()                    # 垃圾回收
    except Exception as e:
        sys.print_exception(e)                  # 打印异常信息
    finally:
        my_ai.app_1.deinit()                    # 反初始化
        my_ai.app_2.deinit()
        pl.destroy()                            # 销毁PipeLine实例

 

本帖最后由 dfjs 于 2024-11-6 21:01 编辑

回复评论 (1)

任务只有一个模型且不需要预处理,只需要关注该模型的推理和后处理过程,技巧

点赞  2024-11-9 09:49
电子工程世界版权所有 京B2-20211791 京ICP备10001474号-1 京公网安备 11010802033920号
    写回复