Celery is a very simple, flexible and reliable distributed system for processing large numbers of messages and provides a full set of tools to operate the system. Celery is also a message queuing tool for processing real-time data as well as task scheduling.
A few weeks ago we read the celery source together and learned about one of celery’s tools, the ‘vine library’ that implements the Promise feature. This week we’ll take a look at another tool, the Python-AMQp library responsible for data transfer over the AMQP protocol. It is implemented in pure Python (with support for cython extensions) so you can understand the details of the AMQP protocol and build celery foundations. This article includes the following sections:
- Py-amqp Project Overview
- Frame mechanism in detail
- AMQP protocol frame processing
- It use
- It models
- summary
- tip
Py-amqp Project Overview
The current version of Py-AMQP is 5.0.6, and the main codes are as follows:
file | describe |
---|---|
abstract_channel.py | The abstract channel |
basic_message.py | Message implementation |
channel.py | Channel implementation |
connection.py | Connection implementation |
exceptions.py | abnormal |
method_framing.py | Frame resolution method |
platform.py | Operating platform adaptation |
protocol.py | Protocol object |
sasl.py | Implementation of SSL authentication |
serialization.py | Serialization-dependent implementation |
spec.py | Definition of Protocol Rules |
transport.py | Communication to achieve |
util.py | Utility class |
*.pxd | Cython implementation can accelerate AMQP |
The project mainly includes two functions:
- AMQP protocol transport processing, including byte stream, frame and Message serialization/deserialization
- AMQP protocol Connection, Channel, Message three basic model implementation
Before we begin, we need to take a quick look at the AMQP protocol:
Advanced Message Queuing Protocol (AMQP) is an open application-layer Protocol provided by message-oriented middleware. It is designed to sort, route, maintain reliability, and ensure security of messages (including point-to-point and subscription-publish) [1]. AMQP regulates the behavior of messagers and receivers to make messages interoperable between different providers, just as SMTP, HTTP, FTP and other protocols can create interactive systems.
The Advanced Message Queue protocol is a binary application layer protocol designed to support a wide range of message-oriented applications. The protocol provides message flow control, guaranteed delivery of a message object (at most, multiple, only once, etc.), and SASL and TLS based authentication and message encryption.
The text is difficult to understand, but AMQP should be understandable when combined with the following figure, which shows how messages are passed from producer to consumer:
The above is implemented using RabbitMQ, an open source messaging middleware which originally implements the AMQP protocol and is also celery’s default messaging middleware. “AMQP 0-9-1 Model Explained” is strongly recommended for those unfamiliar with the AMQP protocol. I have excerpted channel and Message as follows:
Some applications require multiple connections to agents. However, it is not desirable to keep many TCP connections open at the same time because doing so can consume system resources and make it more difficult to configure firewalls. AMQP 0-9-1 connections are multiplexed with channels that can be thought of as “lightweight connections sharing a single TCP connection.”
Every protocol operation performed by the client takes place on the channel. Communication on a particular channel is completely separate from communication on another channel, so each protocol method also carries a channel ID (also known as the channel number), which is an integer that both agents and clients use to determine which channel the method applies to. A channel exists only in the context of a connection, not on its own. When a connection is closed, all channels on it are also closed.
For applications that use multiple threads/processes for processing, it is common to open a new channel for each thread/process instead of sharing channels between them.
Messages in the AMQP 0-9-1 model have attributes. Some properties are so common that they are defined by the AMQP 0-9-1 specification that application developers need not worry about the exact property names. Some examples are:
- Content Type Content Type
- Content encoding
- Routing key Routing key
- Delivery Mode (Persistent or not)
- Message Priority
- Message Publishing timestamp
- Expiration period
- Publisher Application ID Publisher Application ID
The AMQP proxy uses some properties, but most of them can be interpreted by the application that receives them. Some attributes are optional, called HEADERS. They are similar to x-headers in HTTP. Message properties are set when a message is published.
Frame mechanism in detail
Redis Serialization Protocol RESP (Redis Serialization Protocol) is used to communicate between the Redis client and server. I didn’t go into enough detail at that time, but HERE I will try to introduce the common methods of building various application-layer protocols on top of TCP, a binary stream.
We know that TCP is a transport-layer communication protocol based on byte streams. You can think of it like this:
+ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | |... 00010001110001101110101111001111010110... | | | +--------------------------------------------+Copy the code
The data here is made up of zeros and ones, and the ellipsis at the beginning and end indicates that there is a lot more data, so much data flows from left (server) to right (client). We can’t get any useful information out of it without additional instructions. It’s like a long piece of text without punctuation. It’s unreadable, just a bunch of gibberish. To solve this problem, there are generally three ways:
- Fixed-length information
- Use specific characters to separate information
- Use data headers to specify the length of information
Fixed-length information
Fixed-length information, similar to the following figure:
+ + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- + | | | | | | | 00100110 | 10000111 | 00111011 | 11010110 | 00001111 | | | | | | | +--------+--------+--------+--------+--------+Copy the code
We agree that each message is 8 characters long, so the above data can get 5 valid messages, respectively: 00100110,10000111… . The defect of fixed-length information is obvious. If the information is larger than 8 bits, it needs to be truncated, and if it is smaller than 8 bits, it needs to be completed.
You can imagine that
00100110
How is it made up? And it’s very simple. We’re completing the digits first, so we’re completing the 2 digits with 0. If you’re completing at the tail, you have no way of knowing whether the 0 at the end is valid or complementary.
We can use a canopy pole to help us understand that everything that comes out of this factory is the same length:
Use specific characters to separate information
You can also use specific intervals to distinguish information in the data flow, as shown in the following figure.
+ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | 01100110101010 101010010101 1000010110101101 | | | +--------------------------------------------+Copy the code
Space is used to distinguish the above data, and three pieces of information are obtained, respectively: 01100110101010…
Note that there are no Spaces in the binary stream, just 0010 0000, just for illustrative purposes
We can think of the interval as the segment of the bamboo pole, and the segment between the two segments is a segment. Naturally growing bamboo joints must be of different lengths.
The drawback of the delimiter approach is that it is inefficient and requires a case-by-case determination of delimiters.
Use data headers to specify the length of information
Data headers add a header to each message that describes the length of the message, such as the following:
+ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | 10110110100111010110111110101100011100011100 | | | +--------------------------------------------+Copy the code
1 indicates that there is 1 bit of data behind it, 0 indicates that there is no data behind it, so the preceding part of the data translated is 0110 1001, corresponding to the ASCII lowercase letter I:
1011011010011101011 # Stream 01101001 #Copy the code
The above simulation only uses 0 and 1, which is a bit redundant. If characters are used, they can be defined in terms of character bits. Such as:
+ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | 30112101051111120010112113000210201211311111 | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + 30112101051111120010112113000210201211311111 3 2 1 2 1 1 2 3 2 2 2, 3, 1 011 10 0 11111 000 111 000 10 01 11111 1Copy the code
The data header can also be understood by the light string in life, the size of each message length, similar to the light bulb of different sizes, which indicates the data length.
The so-called frame refers to a minimum unit in the network, so we can use the above three methods to distinguish each information from the stream, namely frame. In practice, it’s almost always method 3 or a mixture of 2 and 3. For example, HTTP and RESP are the combination of separation and data headers, and AMQP can also be considered as such.
AMQP protocol frame processing
The processing of flow
Transport is responsible for creating sockets and for reading and writing binary streams over sockets. Read as follows:
If (self, n, l= 0, l= 0, l= 0, l= 0): ""Read exactly n bytes from the socket.""" Proto) # self._quick_recv = self.sock.recv = self._quick_recv S = recv(n-len (rbuf)) except OSError as exc: if exc. Errno in _errnos: if initial and self.raise_on_initial_eintr: raise socket.timeout() continue raise if not s: raise OSError('Server unexpectedly closed connection') rbuf += s except: # noqa self._read_buffer = rbuf raise # Return result, self._read_buffer = rbuf[:n], rbuf[n:] return resultCopy the code
Write as follows:
If (self, s): celery: not self. If (self, s): not self. # self._write = self.sock.sendall self._write(s) except socket.timeout: raise except OSError as exc: if exc.errno not in _UNAVAIL: self.connected = False raiseCopy the code
The processing of the frame
Binary stream reading and writing are generally nothing special, with the emphasis on how to parse the frame information from the stream being read. AMQP frame read (transport)
Self, celery = self, self = self: """Parse AMQP frame. Frame has following format:: 0 1 3 7 size+7 size+8 +------+---------+---------+ +-------------+ +-----------+ | type | channel | size | | payload | | frame-end | +------+---------+---------+ +-------------+ +-----------+ octet short long 'size' octets octet """ # Read = self._read # Buffer read_frame_buffer = bytes()... Frame_header = read(7, True) read_frame_buffer += frame_header # Parse frame_type (big end) (unsigned) size = unpack('>BHI', Frame_end = ord(read(1)) If frame_end == 206: # return frame_type, channel, payload...Copy the code
- The AMQP frame format is header +body+ tail.
- The frame header consists of 1 byte of the frame type +2 bytes of its own channelID+4 bytes of body length.
- The end of the frame is 1 byte, normally 0xCE, which is 206 in decimal.
- The information is retrieved from the binary using the unpack method
The Message processing
The read_frame method gets a data frame, which is grouped into business-usable messages in method_framing:
Celery/PQP /method_framing. Py def frame_handler(connection, callback, unpack_from=unpack_from, content_methods=_CONTENT_METHODS): """Create closure that reads frames.""" # Use closure to read frame # dictionary default value is 1 Expected_types = defaultdict(lambda: 1) messages = {} def on_frame(frame): # frame_type, channel, buf = frame... If frame_type is not in (Expected_types [channel], 8): raise UnexpectedFrame( 'Received frame {} while expecting type: {}'.format( frame_type, expected_types[channel]), ) elif frame_type == 1: Method_sig = unpack_from('>HH', buf, Content_methods =[spec.basic. Return spec.basic. Deliver spec.basic. GetOk] if method_sig in content_methods: # Save what we've got so far and wait for the content-header # Save what we've got so far and wait for the content-header # Save what we've got so far and wait for the content-header Message( frame_method=method_sig, frame_args=buf, ) expected_types[channel] = 2 return False ... elif frame_type == 2: MSG = partial_messages[channel] # attach header MSG. Inbound_header (buf) if not MSG. Ready: Body Expected_types [channel] = 3 return False elif Frame_type == 3: Message MSG = partial_messages[channel] # append body MSG. Inbound_body (buf)... # reset channel wait for next package EXPECted_types [channel] = 1 Callback (channel, MSG. Frame_method, MSG. Frame_args, MSG)Copy the code
- Three frames form a Message(business Message). The types of frames are start frame 1, header frame 2, and content frame 3
- Frame_method and frame_args are on the start frame
- Header frames have message properties on them, such as content_type, reply_to, etc., similar to HTTP headers
- On the content frame is the context of the message
Writing frames is the reverse of reading, as follows:
# ch23 - celery/py - it - 5.0.6 / it/method_framing py def frame_writer (connection, transport, pack = pack, pack_into=pack_into, range=range, len=len, bytes=bytes, str_to_bytes=str_to_bytes, text_t=str): Create closure that writes frames.""" # closure that writes frames. Self.sock. sendall write = transport.write buffer_store = Buffer(bytearray(connection.frame_max-8)) def write_frame(type_, channel, method_sig, args, content): ... buf = buffer_store.buf view = buffer_store.view ... # ## FAST: pack into buffer and single write frame = (b''.join([pack('>HH', *method_sig), Args]) if type_ == 1 else b ") framelen = len(frame) # pack_into('>BHI%dsB' % framelen, buf, offset, type_, channel, framelen, frame, 0xce) offset += 8 + framelen if body is not None: frame = b''.join([ pack('>HHQ', method_sig[0], 0, len(body)), properties, Pack_into ('>BHI%dsB' % framelen, buf, offset, 2, channel, framelen, 0xce) offset += 8 + framelen bodylen = len(body) if bodylen > 0: Pack_into ('>BHI%dsB' % framelen, buf, offset, 3, channel, framelen, body, 0xce) offset += 8 + framelen write(view[:offset]) ...Copy the code
- When writing, prepare three frames of binary data buF and write it to the socket at once
Serialization and deserialization of Message will be covered in the next section, the data model section.
It use
After understanding the details of AMQP protocol transport, we will first enter py-AMQP from the usage method. The producer sends the message like this:
import amqp
with amqp.Connection('broker.example.com') as c:
ch = c.channel()
ch.basic_publish(amqp.Message('Hello World'), routing_key='test')
Copy the code
- Create a connection and use a context wrap to automatically close the connection
- Create channels from the connection
- A channel is used to send messages, including at least the message text and route
Here’s the consumer news:
import amqp
with amqp.Connection('broker.example.com') as c:
ch = c.channel()
def on_message(message):
print('Received message (delivery tag: {}): {}'.format(message.delivery_tag, message.body))
ch.basic_consume(queue='test', callback=on_message, no_ack=True)
while True:
c.drain_events()
Copy the code
- Context wraps are also used to create connections
- Create a channel from a connection as well
- Bind the handling of the message to a channel
- Consuming messages must at least specify a queue, which must match the route on which they were sent. You can also set whether to ack.
- Continuously listen for events on the connection
You can see from the example that Connection and Channel are used for both sending and receiving, and Message objects are used for Message bodies. The difference is that publish is used for sending, while receive is a bit more complicated and requires continuous listening for events and consume.
It models
Connection
Connection mainly consists of AbstractChannel base classes and Connection classes. Strangely, both Connection and Channel inherit from AbstractChannel. Personally, I don’t think this design is good, although some operations of Channel and Connection can be used.
+-----------------+
| AbstractChannel |
+-^-------------^-+
| |
+---+ |
| |
+-----+------+ +--+------+
| Connection | | Channel |
+------------+ +---------+
Copy the code
Connection constructor:
class Connection(AbstractChannel): def __init__(self, host='localhost:5672', userid='guest', password='guest', login_method=None, login_response=None, authentication=(), virtual_host='/', locale='en_US', client_properties=None, ssl=False, connect_timeout=None, channel_max=None, frame_max=None, heartbeat=0, on_open=None, on_blocked=None, on_unblocked=None, confirm_publish=False, on_tune_ok=None, read_timeout=None, write_timeout=None, socket_settings=None, frame_handler=frame_handler, frame_writer=frame_writer, **kwargs): self._connection_id = uuid.uuid4().hex ... Frame # handler, Frame_writer_cls = frame_writer # Self.channels = {} # The connection object itself is treated as channel 0 Super ().__init__(self, 0)...Copy the code
Connection primarily manages data transfer, which is implemented by the connect function:
def connect(self, callback=None): # Let the transport.py module setup the actual # socket connection to the broker. # if self.connected: return callback() if callback else None try: Transport = self.Transport(self.host, self.connect_timeout, self. SSL, self.read_timeout, self.write_timeout, socket_settings=self.socket_settings, Self.on_inbound_frame = self.frame_handler_cls(self, self.on_inbound_method) self.frame_writer = self.frame_writer_cls(self, self.transport) ... except (OSError, SSLError): ...Copy the code
Connection is also responsible for some connection-related system functions, such as connection state maintenance:
def _setup_listeners(self): self._callbacks.update({ spec.Connection.Start: self._on_start, spec.Connection.OpenOk: self._on_open_ok, spec.Connection.Secure: self._on_secure, spec.Connection.Tune: self._on_tune, spec.Connection.Close: self._on_close, spec.Connection.Blocked: self._on_blocked, spec.Connection.Unblocked: self._on_unblocked, spec.Connection.CloseOk: self._on_close_ok, }) ef _on_start(self, version_major, version_minor, server_properties, mechanisms, locales, argsig='FsSs'): ... Return spec.connection. StartOk to server self.send_method(spec.connection. StartOk, argsig, (client_properties, authentication.mechanism, login_response, self.locale), ) ... def send_method(self, sig, format=None, args=None, content=None, wait=None, callback=None, returns_tuple=False): p = promise() conn = self.connection ... args = dumps(format, args) if format else '' try: Conn.frame_writer (1, self.channel_id, SIG, args, Content) except StopIteration:... # TODO temp: callback should be after write_method ... ;) Return self. Wait (wait, returns_tuple=returns_tuple) return pCopy the code
- After receiving the spec.connection. Start message from the server, the client responds with a spec.connection. StartOk message
Create Channel with connection:
Channel = Channel def channel(self, channel_id=None, callback=None): """Create new channel. Fetch a Channel object identified by the numeric channel_id, or create that object if it doesn't already exist. """ ... Return self. Channels [channel_id] except KeyError: Channel = self. channel (self, channel_id, on_open=callback) channel.open() return channelCopy the code
Channel
A Channel is constructed as follows:
class Channel(AbstractChannel):
def __init__(self, connection,
channel_id=None, auto_decode=True, on_open=None):
...
# 新建channelID
channel_id = connection._get_free_channel_id()
# 指定自己的channelID
super().__init__(connection, channel_id)
...
# 消息回调
self.callbacks = {}
Copy the code
A channel also needs to initialize system calls to the channel, such as spec.basic.delive:
def _setup_listeners(self): self._callbacks.update({ spec.Channel.Close: self._on_close, spec.Channel.CloseOk: self._on_close_ok, spec.Channel.Flow: self._on_flow, spec.Channel.OpenOk: self._on_open_ok, spec.Basic.Cancel: self._on_basic_cancel, spec.Basic.CancelOk: self._on_basic_cancel_ok, spec.Basic.Deliver: self._on_basic_deliver, spec.Basic.Return: self._on_basic_return, spec.Basic.Ack: self._on_basic_ack, spec.Basic.Nack: self._on_basic_nack, }) def _on_basic_deliver(self, consumer_tag, delivery_tag, redelivered, exchange, routing_key, MSG): MSG. Channel = self # delivery_info = {'consumer_tag': consumer_tag, 'delivery_tag': delivery_tag, 'redelivered': redelivered, 'exchange': exchange, 'routing_key': routing_key, } try: fun = self.callbacks[consumer_tag] except KeyError: ... else: fun(msg)Copy the code
First look at how the message is delivered:
def _basic_publish(self, msg, exchange='', routing_key='',
mandatory=False, immediate=False, timeout=None,
confirm_timeout=None,
argsig='Bssbb'):
...
try:
with self.connection.transport.having_timeout(timeout):
return self.send_method(
spec.Basic.Publish, argsig,
(0, exchange, routing_key, mandatory, immediate), msg
)
except socket.timeout:
...
basic_publish = _basic_publish
Copy the code
Send_method was introduced earlier when spec.connection.startok was introduced.
Message consumption requires listening on connection first:
def drain_events(self, timeout=None): # read until message is ready # Read until message is ready while not self.blocking_read(timeout): pass def blocking_read(self, timeout=None): with self.transport.having_timeout(timeout): Return self.on_inbound_frame(frame) def on_inbound_method(self, Channel_id, method_sig, payload, Content): # on_inbound_frame callback function... Return self.channels[channel_id]. Dispatch_method ()Copy the code
Channel processing of message is simple until the corresponding listener is executed
def dispatch_method(self, method_sig, payload, content):
...
content.body = content.body.decode(content.content_encoding)
...
amqp_method = self._METHODS[method_sig]
listeners = [self._callbacks[method_sig]]
one_shot = self._pending.pop(method_sig)
args = []
if amqp_method.args:
args, _ = loads(amqp_method.args, payload, 4)
if amqp_method.content:
args.append(content)
for listener in listeners:
listener(*args)
...
Copy the code
Message
Message inherits from GenericContent:
+----------------+
| GenericContent |
+-------+--------+
^
|
|
+----+----+
| Message |
+---------+
Copy the code
Both classes are relatively simple data structures:
class Message(GenericContent): # header PROPERTIES = [('content_type', 's'), ('content_encoding', 's'), ('application_headers', 'F'), ('delivery_mode', 'o'), ('priority', 'o'), ('correlation_id', 's'), ('reply_to', 's'), ('expiration', 's'), ('message_id', 's'), ('timestamp', 'L'), ('type', 's'), ('user_id', 's'), ('app_id', 's'), ('cluster_id', 's') ] def __init__(self, body='', children=None, channel=None, **properties): super().__init__(**properties) #: set by basic_consume/basic_get self.delivery_info = None self.body = body self.channel = channel class GenericContent: """Abstract base class for AMQP content. Subclasses should override the PROPERTIES attribute. """ CLASS_ID = None PROPERTIES = [('dummy', 's')] def __init__(self, frame_method=None, frame_args=None, **props): Self. properties = props self._pending_chunks = [] self.body_received = 0 self.body_size = 0 self.ready = False def __getattr__(self, name): # Look for additional properties in the 'properties' # dictionary, and if present - the 'delivery_info' dictionary. ... If name in self.properties: # return self.properties[name]...Copy the code
The header data described above is deserialized to Message like this:
def decode_properties_basic(buf, offset):
"""Decode basic properties."""
properties = {}
flags, = unpack_from('>H', buf, offset)
offset += 2
if flags & 0x8000:
slen, = unpack_from('>B', buf, offset)
offset += 1
properties['content_type'] = pstr_t(buf[offset:offset + slen])
offset += slen
...
def _load_properties(self, class_id, buf, offset):
...
props, offset = PROPERTY_CLASSES[class_id](buf, offset)
self.properties = props
return offset
def inbound_header(self, buf, offset=0):
...
self._load_properties(class_id, buf, offset)
...
Copy the code
The corresponding serialization method for deserialization is mainly implemented by _serialize_properties, which will not be described here.
summary
In this article, we will focus on the AMQP protocol and understand three ways to build application protocols on top of TCP streams: fixed length, interval and data header; Understand the method of using frame to transmit Message in AMQP protocol: start frame, header frame and content frame are used to carry a Message; Learn about the implementation of the three core concepts in AMQP: Connection, Channel, and Message, and how to use these concepts for Message sending and consumption.
tip
A channel generates an incrementing non-repeating ID using the following method:
>>> from array import array
>>> a=array('H', range(65535, 0, -1))
>>> a.pop()
1
>>> a.pop()
2
>>>
Copy the code
A point of digression: before the article, are called source code reading, mainly feel that they write is not enough. But from a search point of view, the source code is more intuitive, and I feel that the recent article is also a bit of progress, so brazen from this issue are renamed source code analysis.
Refer to the link
- Retaining documents cython.org/#about
- Amqp0 agreement – 9-1 www.rabbitmq.com/resources/s…
- Struct binary data docs.python.org/zh-cn/3/lib…
- Learning zhuanlan.zhihu.com/p/147675691 closer agreement
- It 0-9-1 Model Explained www.rabbitmq.com/tutorials/a…