Task description

Recently, we need to write a micro-service in Python to transmit pictures to hardware using MQTT. Python uses the flask framework, and the general process is as follows:

Agreement as follows:
  • Image data needs to be encapsulated into multiple messages for transmission, and each message transmits 1400bytes of data.
  • Web server ——–>BASE:


  • Feedback: BASE———> Web server:


  • If the Web server does not receive an MQTT “feedback message” within 5 seconds after sending a “Data Transfer Message”, or if “Packet is incomplete” is displayed in the received feedback, the Web server resends the “data transfer Message”.

Program flow chart

According to the above agreement, the following flow chart can be obtained:

The code is as follows:

# encoding:utf-8
from flask import Flask, jsonify
from flask_restful import Api, Resource, reqparse
from PIL import Image
from io import BytesIO
import requests
import os, logging, time
import paho.mqtt.client as mqtt
import struct
from flask_cors import *

Log configuration information
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(LevelName)s - %(message)s (runing by %(funcName)s',
)


class Mqtt(object):
    def __init__(self, img_data, size):
        self.MQTTHOST = '* * * * * * *'
        self.MQTTPORT = "* * * * * *"

        # Subscribe and send topics
        self.topic_from_base = 'mqttTestSub'
        self.topic_to_base = 'mqttTestPub'

        self.client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
        self.client = mqtt.Client(self.client_id)

        # echo function after link completion
        self.client.on_connect = self.on_connect
        # image size
        self.size = size

        # Used to break out of the loop and end the task
        self.finished = None

        # Package number
        self.index = 0

        # Divide the received image data into lists by size
        self.image_data_list = [img_data[x:x + 1400] for x in range(0, self.size, 1400)]

        Record the data after publication for monitoring the delay
        self.pub_time = 0


        self.header_to_base = 0xffffeeee
        self.header_from_base = 0xeeeeffff

        # Function identification
        self.function_begin = 0x01
        self.function_doing = 0x02
        self.function_finished = 0x03

        The complete and incomplete state of the package
        self.whole_package = 0x01
        self.bad_package = 0x00

        The format of the header information, little endian mode
        self.format_to_base = "<Lbhh"
        self.format_from_base = "<Lbhb"

        Check whether the first packet is resent
        self.first = True

        Check whether the last packet is resent
        self.last = False

        self.begin_data = 'image.jpg; ' + str(self.size)

    # link MQTT server function
    def on_mqtt_connect(self):
        self.client.connect(self.MQTTHOST, self.MQTTPORT, 60)
        self.client.loop_start()

    The callback function after the link is complete
    def on_connect(self, client, userdata, flags, rc):
        logging.info("+++ Connected with result code {} +++".format(str(rc)))
        self.client.subscribe(self.topic_from_base)

    # subscribe function
    def subscribe(self):
        self.client.subscribe(self.topic_from_base, 1)
        # message arrival handler
        self.client.on_message = self.on_message

    The callback function after the message is received

    def on_message(self, client, userdata, msg):
        If you accept the first packet, you do not need to resend the first packet
        self.first = False

        Unpack the received package to get a tuple
        base_tuple = struct.unpack(self.format_from_base, msg.payload)
        logging.info("+++ imageData's letgth is {}, base_tupe is {} +++".format(self.size, base_tuple))
        logging.info("+++ package_number is {}, package_status_from_base is {} +++"
                     .format(base_tuple[2], base_tuple[3]))

        Check whether the header receiving the message is correct
        if base_tuple[0] == self.header_from_base:
            logging.info("+++ function_from_base is {} +++".format(base_tuple[1]))

            If yes, exit
            if base_tuple[1] == self.function_finished:
                logging.info("+++ finish work +++")
                self.finished = 1
                self.client.disconnect()
            else:
                Is it the last package
                if self.index == len(self.image_data_list) - 1:
                    self.publish('finished', self.function_finished)
                    self.last = True
                    logging.info("+++ finished_data_to_base is finished+++")
                else:

                    If the received packet is not 0x03, send data
                    if base_tuple[1] == self.function_begin or base_tuple[1] == self.function_doing:
                        logging.info("+++ package_number is {}, package_status_from_base is {} +++"
                                     .format(base_tuple[2],base_tuple[3]))

                        If the packet status is 1 in the data feedback, continue to send the next packet
                        if base_tuple[3] == self.whole_package:
                            self.publish(self.index, self.function_doing)
                            logging.info("+++ data_to_base is finished+++")
                            self.index += 1

                        If the status of the packet is 0, the packet will be resend
                        elif base_tuple[3] == self.bad_package:
                            re_package_number = base_tuple[2]
                            self.publish(re_package_number-1, self.function_doing)
                            logging.info("+++ re_data_to_base is finished+++")
                        else:
                            logging.info("+++ package_status_from_base is not 0 or 1 +++")
                            self.client.disconnect()
                    else:
                        logging.info("+++ function_identifier is illegal +++")
                        self.client.disconnect()
        else:
            logging.info("+++ header_from_base is illegal +++")
            self.client.disconnect()

    # Data sending function
    def publish(self, index, fuc):
        Check if it is the last package
        if index == 'finished':
            length = 0
            package_number = 0
            data = b' '
        else:
            length = len(self.image_data_list[index])
            package_number = index
            data = self.image_data_list[index]

        Package data header information
        buffer = struct.pack(
            self.format_to_base,
            self.header_to_base,
            fuc,
            package_number,
            length
        )
        to_base_data = buffer + data

        # MQTT send
        self.client.publish(
            self.topic_to_base,
            to_base_data
        )
        self.pub_time = time.time()

    Send the first packet function
    def publish_begin(self):
        buffer = struct.pack(
            self.format_to_base,
            self.header_to_base,
            self.function_begin,
            0,
            len(self.begin_data.encode('utf-8')),
        )
        begin_data = buffer + self.begin_data.encode('utf-8')
        self.client.publish(self.topic_to_base, begin_data)

    # control function
    def control(self):
        self.on_mqtt_connect()
        self.publish_begin()
        begin_time = time.time()
        self.pub_time = time.time()
        self.subscribe()
        while True:
            time.sleep(1)
            Retransmission exceeds 5 seconds
            date = time.time() - self.pub_time
            if date > 5:
                Whether to retransmit the first packet
                if self.first == True:
                    self.publish_begin()
                    logging.info('+++ this is timeout first_data +++')

                Whether to retransmit the last packet
                elif self.last == True:
                    self.publish('finished', self.function_finished)
                    logging.info('+++ this is timeout last_data +++')
                else:
                    self.publish(self.index-1, self.function_doing)
                    logging.info('+++ this is timeout middle_data +++')
            if self.finished == 1:
                logging.info('+++ all works is finished+++')
                break

        print(str(time.time()-begin_time) + 'begin_time - end_time')

app = Flask(__name__)
api = Api(app)
CORS(app, supports_credentials=True)

# accept parameters
parser = reqparse.RequestParser()
parser.add_argument('url'.help='mqttImage url', location='args'.type=str)


class GetImage(Resource):
    Get the parameters and download them locally from the map bed
    def get(self):
        args = parser.parse_args()
        url = args.get('url')
        response = requests.get(url)
        # get images
        image = Image.open(BytesIO(response.content))
        # Access images
        add = os.path.join(os.path.abspath(' '), 'image.jpg')
        image.save(add)
        # get the image size
        size = os.path.getsize(add)
        f = open(add, 'rb')
        imageData = f.read()
        f.close()

        # Perform MQTT transfer
        mqtt = Mqtt(imageData, size)
        mqtt.control()

        # delete file
        os.remove(add)
        logging.info('*** the result of control is {} ***'.format(1))
        return jsonify({
            "imageData": 1
        })


api.add_resource(GetImage, '/image')

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0')

Copy the code