I. Project description

There is not a lot of information on the Internet about using Python to call rainbow soft SDK, as for calling ArcSoft4.0 SDK there is almost no, I am also patchwork, and according to their own learning, understanding, finally I want to do hard work out.

First, let’s talk about what we’re doing here. My side has a RTSP streaming video, my side will want to use the rainbow soft after everyone’s face under the camera | + background images to grab and store locally. At the same time, the whole process of face recognition can be seen in the local window in real time. And can perform high quality de-weighting according to FaceID, leaving only the best quality group. This can greatly reduce the number of repetitive face pictures captured.

Ready to dry ~!

Two, environmental preparation

  • Development Environment: PyCharm 2018.3.5 (Professional Edition)
  • Python version: Python 3.7.3
  • Opencv is a Python library for video frame processing, image processing, display, etc.
  • ArcSoft SDK: Windows x64 c++ V4.0

From the above list, preparing things is extremely simple, and Python is relatively easy, as long as you are familiar with it.

Interface and type mapping

This section mainly maps the types and interfaces in Hongsoft SDK4.0 into Python version. According to the needs of our task, only the following interfaces can be met:

  1. ASFOnlineActivation [online activation]
  2. ASFInitEngine [engine initialization]
  1. ASFDetectFaces
  2. ASFProcess [Optional]
  1. ASFGetAge [Optional]
  2. ASFGetGender [Optional]
  1. Asfimage Equality Detect [Quality Detection]

Then, according to the above interface, map the dependent types, constants, and so on.

3.1 Type Mapping

Type mapping is done first, because interface mapping depends on these types.

File: asf_struct. Py

From ctypes import * # Structure: _fields_ = [ (u'left', c_int32), (u'top', c_int32), (u'right', c_int32), (u'bottom', C_int32)] # Structure: _fields_ = [('data', c_void_p), ('dataSize', c_int32)] # _fields_ = [ ('faceRect', MRECT), ('faceOrient', c_int32), ('faceDataInfo',ASFFaceDataInfo)] # Class ASFMultiFaceInfo(Structure): _fields_ = [ (u'faceRect', POINTER(MRECT)), (u'faceOrient', POINTER(c_int32)), (u'faceNum', c_int32), (u'faceID', POINTER(c_int32)), (u'wearGlasses',POINTER(c_float)), (u'leftEyeClosed', POINTER(c_int32)), (u'rightEyeClosed', POINTER(c_int32)), (u'faceShelter', POINTER(c_int32)), (u'faceDataInfoList',POINTER(ASFFaceDataInfo))] # _fields_ = [(U 'ageArray', C_VOid_P), (U 'num', C_INT32)] # GendergenderInfo (Structure): _fields_ = [ (u'genderArray', c_void_p), (u'num', c_int32) ]Copy the code

Ha ha is very simple. If you want to use other abilities later, just add types yourself.

3.2 Constant Definition

Rainbow soft SDK in some late use of constant definition.

File: asf_common. Py

from ctypes import * from enum import Enum face_dll = CDLL("libarcsoft_face.dll") face_engine_dll = CDLL (" libarcsoft_face_engine. DLL ") # = = = = = = = = = = = = = = = = = = = = const type definition = = = = = = = = = = = = = = = = = = = = ASF_DETECT_MODE_VIDEO = 0 x00000000 # Video stream detection mode ASF_DETECT_MODE_IMAGE = 0xFFFFFFFF # Image detection mode ASF_NONE = 0x00000000 # No attribute ASF_FACE_DETECT = 0x00000001 # Detect can be either tracking or Detection. The specific selection is determined by the Detect Mode ASF_FACERECOGNITION = 0x00000004 # Face feature ASF_AGE = 0x00000008 # age ASF_GENDER = 0x00000010 # Gender ASF_FACE3DANGLE = 0x00000020 # 3D Angle ASF_FACELANDMARK = 0x00000040 # Forehead area detection ASF_LIVENESS = 0x00000080 # RGB Alive ASF_IMAGEQUALITY = 0x00000200 # Image quality detection ASF_IR_LIVENESS = 0x00000400 # IR Live ASF_FACESHELTER = 0x00000800 # Face occlusion ASF_MASKDETECT = 0x00001000 # Mask detects ASF_UPDATE_FACEDATA = 0x00002000 # Face information ASVL_PAF_RGB24_B8G8R8 = 0x201 # Picture format # detection face Angle priority - enumeration class ArcSoftFaceOrientPriority (Enum) : ASF_OP_0_ONLY = 0x1, # positive direction ASF_OP_90_ONLY = 0x2, # counterclockwise 90° direction based on 0° ASF_OP_270_ONLY = 0x3, ASF_OP_0_HIGHER_EXT = 0x5, # all Angle # = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =Copy the code

For those of you who have called DLLS in Python, I don’t need to go into too much detail.

3.3 Interface Mapping

After defining all the types and constants that will be used later, python mapping will be carried out for the interfaces that need to be used.

File: asf_func. Py

The from asf_struct import * from ctypes import * import asf_common # = = = = = = = = = = = = = = = = = = = = Api interface mapping definition = = = = = = = = = = = = = = = = = = = = # Online_activate = ASF_common.face_engine_DLL.ASFOnlineActivation Online_activate. Restype = c_int32 online_activate.argtypes = (c_char_p, c_char_p, Init_engine = ASf_common.face_engine_dlL. ASFInitEngine init_engine.restype = c_int32 init_engine.argtypes = (c_long, c_int32, c_int32, c_int32, ASFDetectFaces Detect_face.face_engine_DLL. ASFDetectFaces Detect_face. restype = c_int32 detect_face.argtypes = (c_void_p, c_int32, c_int32, c_int32, POINTER(c_ubyte), Process = ASF_common.face_engine_DLL.asfProcess process.restype = c_int32 process.argtypes = (c_void_p, c_int32, c_int32, c_int32, POINTER(c_ubyte), POINTER(ASFMultiFaceInfo), ASFGetAge get_age. Restype = c_int32 get_age. Argtypes = (c_void_p, Face_engine_dll.ASFGetGender get_gender. Restype = c_int32 get_gender.argtypes = (c_void_p, POINTER (ASFGenderInfo)) # face quality detection Api image_quality_detect = asf_common. Face_engine_dll. ASFImageQualityDetect image_quality_detect.restype = c_int32 image_quality_detect.argtypes = (c_void_p, c_int32, c_int32, c_int32, POINTER(c_ubyte), POINTER(ASFSingleFaceInfo), c_int32,POINTER(c_float), c_int32) #=======================================================Copy the code

Iv. Process design

According to our task requirements, three processing threads are designed as follows:

Responsibilities of the video frame drawing thread: loop the RTSP stream frame by frame into the queue (to ensure local disk IO performance, only frame image data into the memory queue)

Face detection thread responsibilities: cycle to obtain a frame of the picture, the picture for face detection, quality detection, frame marking display, and the high quality (>0.5) score of the picture frame related information written into the face list container

To reorder thread responsibilities: cycle through the face list container information, will be more than 2 seconds no update time FaceID corresponding list for quality score sort, take the highest, for the big picture, small picture (face picture) landing, small picture is extracted according to the face frame coordinate information.


With the flow chart corresponding to each thread to help you understand:

5. Main process code implementation

According to the above design, basically we have determined our code ideas, directly dry. (Not afraid you don’t understand, because the comments are really detailed)

File: asf_main. Py

from queue import Queue
import threading
import asf_func
import asf_struct
from ctypes import *
import asf_common
import cv2
import time
import uuid
import os

lock = threading.Lock()
frames_q_size = 50
frames_q = Queue(maxsize=frames_q_size)
rtsp_url="rtsp://admin:[email protected]:554/h264/ch1/sub/av_stream"
zp_list = {}
zp_time_list = {}
storage_face_threshold = 0.5
app_id = b"xxxxxxxxxxxxxx"
sdk_key = b"xxxxxxxxxxxxxx"
active_key = b"xxxx-xxxx-xxxx-xxxx"


if not os.path.exists('IMAGE'):

ret = asf_func.online_activate(app_id, sdk_key, active_key)
if ret == 0 or ret == 90114:
    print("激活失败:", ret)

video_mask = asf_common.ASF_FACE_DETECT | asf_common.ASF_IMAGEQUALITY

video_engine = c_void_p()

video_ret = asf_func.init_engine(asf_common.ASF_DETECT_MODE_VIDEO, asf_common.ArcSoftFaceOrientPriority.ASF_OP_0_ONLY.value[0], 3, video_mask, byref(video_engine))

if video_ret == 0:
    print("视频模式引擎 初始化成功")
    print("视频模式引擎 初始化失败:", video_ret)

def get_uuid():
    return str(uuid.uuid1()).replace("-","")

def storage_best_zq(face_id):
    global zp_list
    list = zp_list[face_id]
    if len(list) == 0:
    new_list = sorted(list, key=lambda e: e[0])

    #file_uid = get_uuid()
    file_uid = str(int(time.time()))

    #生成大小图的文件名,格式:faceid编码--------uuid---------big/small.jpg  ,很容易理解
    big_file_path = "IMAGE/%s_"%(new_list[-1][6])+str(file_uid)+"_big.jpg"
    small_file_path = "IMAGE/%s_"%(new_list[-1][6]) + str(file_uid) + "_small.jpg"

    cv2.imwrite(big_file_path, new_list[-1][1])

    left = new_list[-1][2]
    top = new_list[-1][3]
    right = new_list[-1][4]
    bottom = new_list[-1][5]

    small_img = new_list[-1][1][top:bottom,left:right]

    cv2.imwrite(small_file_path, small_img)

def gen_zq():
    global zp_list
    global zp_time_list
        ct = time.time()
        face_id_list = list(zp_time_list.keys())
        for k in face_id_list:
            if ct - zp_time_list[k]>2:
                del zp_list[k]
                del zp_time_list[k]

def grab_frame(rtsp_url, q):
    cap = cv2.VideoCapture(rtsp_url)
    num = 0

    while (True):
        cap_res = cap.read()
        num += 1
        #todo num不断累加需要考虑溢出的异常
        if num % 2 == 0:
            if q.full():
                print("frames queue is full, will remove one.")
            #以上清理完一帧再存入,这样才能保证有限的队列不存在插入不进去导致OpenCV本地队列慢导致的 各种异常,以及延迟

def detect_face(q):
    global zp_list,zp_time_list,storage_face_threshold
        q_value = q.get()
        old_img = q_value[1]

        #sp = old_img.shape
        #img = cv2.resize(old_img, (sp[1] // 4 * 4, sp[0]))  # 四字节对齐

        img = old_img
        image_bytes = bytes(img)
        image_ubytes = cast(image_bytes, POINTER(c_ubyte))

        detect_faces = asf_struct.ASFMultiFaceInfo()

        ret = asf_func.detect_face(

        t3 = time.time()
        if ret !=0:
            print("检测人脸失败:%s" % (ret))
        if detect_faces.faceNum > 0:

            for i in range(detect_faces.faceNum):

                single_face_info = asf_struct.ASFSingleFaceInfo()

                single_face_info.faceRect.left = detect_faces.faceRect[i].left
                single_face_info.faceRect.top = detect_faces.faceRect[i].top
                single_face_info.faceRect.right = detect_faces.faceRect[i].right
                single_face_info.faceRect.bottom = detect_faces.faceRect[i].bottom
                single_face_info.faceOrient = detect_faces.faceOrient[i]
                single_face_info.faceDataInfo = detect_faces.faceDataInfoList[i]

                confidenceLevel = c_float()

                ret = asf_func.image_quality_detect(

                if detect_faces.faceID[i] not in zp_list:
                    zp_list[detect_faces.faceID[i]] = []

                if confidenceLevel.value > storage_face_threshold:
                    print("发现高质量得分人脸:%s" % (confidenceLevel.value))
                    zp_list[detect_faces.faceID[i]].append((confidenceLevel.value, old_img,
                zp_time_list[detect_faces.faceID[i]] = time.time()

                cv2.rectangle(img, (detect_faces.faceRect[i].left, detect_faces.faceRect[i].top), (detect_faces.faceRect[i].right, detect_faces.faceRect[i].bottom), (0, 0, 255), 1)
                cv2.putText(img, str("qa:%.2f"%(confidenceLevel.value)), (detect_faces.faceRect[i].left, detect_faces.faceRect[i].top-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
                cv2.putText(img, "id:"+str(detect_faces.faceID[i]),(detect_faces.faceRect[i].left, detect_faces.faceRect[i].top - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5,(0, 0, 255), 1)
                #以上是根据人脸检测的结果以及置信度得分,进行画框,以及文字标注,qa为质量得分   id为faceid
        cv2.imshow("RealTimeDisplay", img)

thread_grab_frame = threading.Thread(target=grab_frame, args=(rtsp_url, frames_q))
thread_detect_face = threading.Thread(target=detect_face, args=(frames_q,))
thread_gen_zq = threading.Thread(target=gen_zq, args=())
Copy the code

The code in the loop is a little bit longer, time dependent, not individually encapsulated, and the logic is not very complicated.

There are some slight flaws, and then adjust slowly when you have time. There is one small flaw.

thread_grab_frame = threading.Thread(target=grab_frame, args=(rtsp_url, frames_q))

thread_detect_face = threading.Thread(target=detect_face, args=(frames_q,))

thread_gen_zq = threading.Thread(target=gen_zq, args=())

You can according to the above three thread entry functions, respectively.


In order to ensure that the image queue of video frames will not increase indefinitely, a queue of limited capacity is designed here. When the image frame is inserted into the queue, if the queue is full, the oldest frame is discarded and the latest frame is inserted.

Attach the overall structure of the code file:

6. Run tests

Some minor problems and potholes have been solved and smoothed out.

Real-time display effect:

Run log screenshot:

The above are basically the same FaceId different frame picture quality score, if not heavy, there will be a lot of repetition after landing.

Storage effect after running:

We got a ton of facial information, so to keep it secret, we had to blur it.

Operating performance:

This design is basically in memory processing, only the final landing, using the disk, look at the running CPU, memory occupation.

The performance footprint is still satisfactory.

My machine is configured as i5 8th-generation, and the video stream uses fixed bit rate of 16384Kbps, 25 frames per second, 1920*1080 resolution and h.264 video encoding. If the resolution is reduced, the CPU usage will be lower.

Finally want to say a rainbow soft ox X~!

Project code

Reference: gitee.com/codetracer/…

DLL is not uploaded, please put the SDK of DLL in the root directory of the project.

To learn more about face recognition products, please visitRainbow soft visual open platformoh