Celery is a very simple, flexible and reliable distributed system for processing large numbers of messages and provides a full set of tools to operate the system. Celery is also a message queuing tool for processing real-time data as well as task scheduling.
Celery = celery, celery = vine, py-amqp, celery = celery, py-amqp
- Celery source parsing – Vine implementation Promise function
- Celery source parsing -py-AMQp implementation of amQP protocol
In this article we continue with celery’s base library: Kombu, a Python implemented message library which carries out the core message processing flow in celery. This paper includes the following parts:
- Closer agreement
- Kombu overview
- Kombu Usage Guide
- Producer & Consumer
- Exchange && Queue parsing
- The Message parsing
- The Connection parse
- Matcher && serialization
- summary
- tip
It concept
Following the previous chapter, we continue to study the concepts related to AMQP. Understanding these basic concepts is helpful to understand why KOMbu is implemented this way. This time we use a short story to simulate the message processing flow of KOMbu.
Xiao Ming in grade three likes her deskmate xiao Hong, like her ponytail and smile, often write a small note to her. A Message is delivered directly to a Producer and a Consumer. Sometimes, xiao Hong is not at her desk, xiao Ming will put the note in her drawer. The drawer acts as a Queue, temporarily storing posted messages. Xiao Ming and Xiao Hong are no longer deskmates after the teacher discovers that they often misbehave in class. Xiao Ming students can not forget xiao Hong’s smile, the distance produced more beauty, please in front of the pony to help him pass a small note, the cover of the paper says “please give Xiao Hong”. Pony is Exchange, pony’s front seat is Exchange, “please give Red” is the message route-key. He that often walks by the river shall not have wet feet. Once the note was seized by the teacher, the teacher let xiao Ming students on the platform to tell the content of the note to everyone. Reading a note in public is called a fanout.
Childish stories are also a kind of real life, who has not written a little note, please pause and remember for a minute :). Business is an abstraction of life scenarios, and code is a higher abstraction. Understand the business, and don’t lose sight of the concepts in the code.
These concepts Exchange and Queue are what brokers implement. However, the Producer/Consumer client is also included. Why? Can message transmission be simplified as one client only uses Producer to send messages and the other client only uses Consumer to consume messages? AMQP exchanges and queues are created and bound to the broker using an administrative tool, which limits the flexibility of AMPQ. Kombu includes the Exchange and Queue models, which are used to manage brokers.
Kombu overview
Kombu is an important member of the plant family, celery, vine and kombu are a happy family. We parse KOMbu using version 5.0.0 with the following main modules:
The module | function |
---|---|
abstract.py | Abstract binding implementation, whether an object can be bound to a channel |
compression.py | A summary of compression algorithms |
connection.py | The broker connection |
entity.py | Entity classes, including implementations of Exchange, Binding, and Queue objects |
matcher.py | Matching strategy |
message.py | Message object with ack, Reject, and so on for the message |
messaging.py | Message processing, including Producer and Consumer |
mixins.py,pools.py,simple.py | Enhance functionality or package for ease of use |
serialization.py | A summary of serialization algorithms |
transport | Interconnect various storage engine data transfer implementation, mainly memory, Redis, PyAMQP (RabbitMQ), etc |
asynchronous | Asynchronous implementation |
The bottom layer of KOMBU uses AMQP protocol support provided by PYAMQP, and implements Producer, Consumer, Exchange, Queue and other models.
Kombu Usage Guide
As always, let’s start with kombu. Here is an example of a producer sending a message:
# kombu-5.0.0/examples/complete_send.py from kombu import Connection, Producer, Exchange, Queue exchange = Exchange('kombu_demo', type='direct') with Connection('amqp://guest:guest@localhost:5672//') as connection: Publish ({'hello': 'world'}, exchange=exchange, routing_key='kombu_demo', serializer='json', compression='zlib')Copy the code
The producer example includes the following steps:
- Create an exchange named kombu_demo
- Create a connection to the broker and use it as a context
- A connection is used to create a producer that sends messages
- Using the created producer, send a plain JSON message to the created exchange, specifying the routing_key as kombu_demo. Convention messages are serialized using JSON and compressed using the Zlib algorithm.
The consumer example is a little more complicated:
Kombu-5.0.0 /examples/complete_receive.py from pprint import pformat from kombu import Connection, Exchange, Queue, Consumer, eventloop exchange = Exchange('kombu_demo', type='direct') queue = Queue('kombu_demo', exchange, Routing_key ='kombu_demo') # def pretty(obj): return pformat(obj, indent=4) # This is the callback applied when a message is received. def handle_message(body, message): print(f'Received message: {body! r}') print(' properties:\n{}'.format(pretty(message.properties))) print(' delivery_info:\n{}'.format(pretty(message.delivery_info))) message.ack() with Connection('amqp://guest:guest@localhost:5672//') as connection: with Consumer(connection, queue, callbacks=[handle_message]): for _ in eventloop(connection): passCopy the code
The consumer example consists of the following steps:
- Also create an exchange named kombu_demo
- Create a queue named kombu_demo, bind it to exchange, and set the routing_key for consumption
- Create the callback function to receive the body and message. The body is pure business information, while Message contains some Posting information and can be used to perform ack responses directly to the broker.
- As with producers, create a connection to the broker and use it as a context
- To create a consumer using connection, bind the consumer to the queue and set the callback function
- Continuously listen for event loops on connection
Let’s go back to the picture below and compare the examples to understand:
The producer in the example is in the left half of the diagram and the consumer is in the right half of the diagram. Broker in the middle. In the first article, we used the Redis service as the broker. Another important point about the example is that channels are not created at all; they are created automatically. In a typical distributed system, there are three processes: the Producer process and the Consumer process process messages through the Broker process.
Producer & Consumer
Proudcer parsing
Proudcer constructor:
class Producer: def __init__(self, channel, exchange=None, routing_key=None, serializer=None, auto_declare=None, compression=None, on_return=None): self._channel = channel self.exchange = exchange self.routing_key = routing_key or self.routing_key self.serializer = serializer or self.serializer self.compression = compression or self.compression self.on_return = on_return or Self.on_return self._channel_promise = None if self.exchange is None: # default exchange self.exchange = exchange ('')... if self._channel: self.revive(self._channel) def revive(self, channel): """Revive the producer after connection loss.""" if is_connection(channel): connection = channel self.__connection__ = connection channel = ChannelPromise(lambda: connection.default_channel) if isinstance(channel, ChannelPromise): self._channel = channel self.exchange = self.exchange(channel) else: # Channel already concrete self._channel = channel if self.on_return: self._channel.events['basic_return'].add(self.on_return) self.exchange = self.exchange(channel)Copy the code
In addition to setting its own properties, Producer also processes channels. As mentioned earlier, connection is also a channel. You need to handle connection first, and then get the default channel from Connection. Meanwhile, for successful channels, producer is bound to channel. Self. exchange(channel) is the same as self.exchange.__call__(channel). After being created, producer can publish messages:
def publish(self, body, routing_key=None, delivery_mode=None, mandatory=False, immediate=False, priority=0, content_type=None, content_encoding=None, serializer=None, headers=None, compression=None, exchange=None, retry=False, retry_policy=None, declare=None, expiration=None, timeout=None, **properties): Routing-key, exchange routing_key = self.routing_key if routing_key is None else Routing_key exchange_name, Properties ['delivery_mode'] = self._delivery_details(exchange or self.exchange, delivery_mode,) # Body, content_type, content_encoding = self._prepare(body, serializer, content_type, content_encoding, compression, Prepare_message (body, priority, content_type, content_encoding) headers, properties, ) ... Basic_publish (message, exchange=exchange, routing_key=routing_key, mandatory=mandatory, immediate=immediate, timeout=timeout )Copy the code
Producer encapsulates services for channels. When a channel is created, a channel is used. If no channel is created, a connection default_channel is used. Producer sends messages. After wrapping Exchange and Message, Producer sends messages using a channel.
Consumer parsing
Constructor and context for Consumer:
class Consumer: def __init__(self, channel, queues=None, no_ack=None, auto_declare=None, callbacks=None, on_decode_error=None, on_message=None, accept=None, prefetch_count=None, tag_prefix=None): Queues = maybe_list(queues or []) self.no_ack = self.no_ack if no_ack is None Self.callbacks = (self.callbacks or [] if callbacks are None else callbacks) # self.on_message = on_message self.tag_prefix = tag_prefix self._active_tags = {} ... if self.channel: self.revive(self.channel) def revive(self, channel): """Revive consumer after connection loss.""" self._active_tags.clear() channel = self.channel = maybe_channel(channel) # modify dict size while iterating over it is not allowed for qname, queue in list(self._queues.items()): # name may have changed after declare self._queues.pop(qname, [queue.name] = queue(self. Channel) # bind queue. def __enter__(self): self.consume() return selfCopy the code
A Consumer is similar to Producer in that it also processes a channel after setting its properties, except that a Queue (in Producer’s case, an Exchange) is bound to a channel and provides a context. Consuming messages in context:
Def consume (self, no_ack = None) : tag = self. _add_tag (queue, consumer_tag) # each message queue news for the queue in the self. The _queues: queue.consume(tag, self._receive_callback, no_ack=no_ack, nowait=nowait) def _receive_callback(self, message): accept = self.accept on_m, channel, decoded = self.on_message, self.channel, None try: ... Decoded = None if on_m else message.decode() except Exception as exc: if not self.on_decode_error: raise self.on_decode_error(message, exc) else: return on_m(message) if on_m else self.receive(decoded, message) def receive(self, body, message): """Method called when a message is received. This dispatches to the registered :attr:`callbacks`. Arguments: body (Any): The decoded message body. message (~kombu.Message): The message instance. Raises: NotImplementedError: If no consumer callbacks have been registered. """ # callbacks = self.callbacks... Callback (body, message) for callback in callbacksCopy the code
A consumer can use multiple queues, and each queue can consume messages using either an override handler or a system handler. Typically, the callback gets the decoded body and message text. How messages are continuously consumed is covered in the Connection section.
Exchange && Queue parsing
Producers use exchanges, consumers use queues, and messages are delivered via exchanges and queues. Exchanges and queues have a common parent, MaybeChannelBound:
+-------------------+ | MaybeChannelBound | +-------^-----------+ | +----------------+----------------+ | | +----+-----+ +---+---+ | Exchange | | Queue | +----------+ +-------+Copy the code
MaybeChannelBound specifies the binding behavior of a class to a channel:
class MaybeChannelBound(Object):
_channel = None
_is_bound = False
def __call__(self, channel):
"""`self(channel) -> self.bind(channel)`."""
return self.bind(channel)
Copy the code
- Both _channel and _is_bound are class attributes, so you can see that channel is reused on the class
- The __call__ magic function causes class methods such as Exchange (channel) and Queue (channel) to automatically perform actions bound to a channel when executed.
This is also verified by the following actions for binding channels and whether to bind them or not.
def maybe_bind(self, channel): """Bind instance to channel if not already bound.""" if not self.is_bound and channel: self._channel = maybe_channel(channel) self.when_bound() self._is_bound = True return self @property def is_bound(self): """Flag set if the channel is bound.""" return self._is_bound and self._channel is not NoneCopy the code
Exchange object creation and binding to channel:
class Exchange(MaybeChannelBound):
def __init__(self, name='', type='', channel=None, **kwargs):
super().__init__(**kwargs)
self.name = name or self.name
self.type = type or self.type
self.maybe_bind(channel)
...
Copy the code
The created exchange object needs to be declared. The declaration process is the same as that used by the broker to create an exchange:
def declare(self, nowait=False, passive=None, channel=None): """Declare the exchange. Creates the exchange on the broker, unless passive is set in which case it will only assert that the exchange exists. Argument: nowait (bool): If set the server will not respond, and a response will not be waited for. Default is :const:`False`. """ if self._can_declare(): # rely on channel return (channel or self.channel).exchange_declare( exchange=self.name, type=self.type, durable=self.durable, auto_delete=self.auto_delete, arguments=self.arguments, nowait=nowait, passive=passive, )Copy the code
The queue object also needs to be bound to a channel:
class Queue(MaybeChannelBound):
def __init__(self, name='', exchange=None, routing_key='',
channel=None, bindings=None, on_declared=None,
**kwargs):
super().__init__(**kwargs)
self.name = name or self.name
self.maybe_bind(channel)
...
Copy the code
Then declare the queue, which consists of the following three steps:
def declare(self, nowait=False, channel=None): """Declare queue and exchange then binds queue to exchange.""" if not self.no_declare: # - declare main binding. self._create_exchange(nowait=nowait, channel=channel) self._create_queue(nowait=nowait, channel=channel) self._create_bindings(nowait=nowait, channel=channel) return self.name def _create_exchange(self, nowait=False, channel=None): if self.exchange: Exchange self.exchange. Declare (nowait=nowait, channel=channel) def _create_queue(self, nowait=False, channel=None): Queue self. Queue_declare (nowait=nowait, passive=False, channel=channel) if self. Exchange and self. Self. queue_bind(nowait=nowait, channel=channel) def _bindings(self, nowait=False, channel=None): for B in self.bindings: channel = channel or self.channel B.declare(channel) B.bind(self, nowait=nowait, channel=channel)Copy the code
A queue declaration also tells the broker to create a queue:
def queue_declare(self, nowait=False, passive=False, channel=None):
...
ret = channel.queue_declare(
queue=self.name,
passive=passive,
durable=self.durable,
exclusive=self.exclusive,
auto_delete=self.auto_delete,
arguments=queue_arguments,
nowait=nowait,
)
...
Copy the code
One more step in queue than exchange is to bind to exchange. Queue_bind’s job is to let the broker create associations between queues and exchanges.
def queue_bind(self, nowait=False, channel=None):
"""Create the queue binding on the server."""
return (channel or self.channel).queue_bind(
queue=self.name,
exchange=exchange,
routing_key=routing_key,
arguments=arguments,
nowait=nowait,
)
Copy the code
From the Exchange and Queue implementations, we know that producers don’t care about consumer implementations, just create and declare exchanges. The consumer needs to know the producer and bind the queue to the exchange after creating and declaring the queue. Because consumers and producers are in different processes, consumers need to implicitly create Exchange objects locally, even if the producer creates them.
The Message parsing
Message objects, in addition to pure data structures, also contain references to channels. After all, messages can perform ACK actions directly:
class Message: def __init__(self, body=None, delivery_tag=None, content_type=None, content_encoding=None, delivery_info=None, properties=None, headers=None, postencode=None, accept=None, channel=None, **kwargs): # channel, the main API source self.channel = channel # delivery_tag, can be used in response to self.delivery_tag = delivery_tag... self.headers = headers or {} self.body = body ... self._state = 'RECEIVED'Copy the code
The message itself also has four states:
RECEIVED
The default stateACK
Complete ACK responseREJECTED
Reject messagesREQUEUED
Repost message
{‘ACK’, ‘REJECTED’, ‘REQUEUED’}
def ack(self, multiple=False): # ACK self.channel.basic_ack(self.delivery_tag, multiple=multiple) self._state = 'ACK' def reject(self, requeue=False): Self.channel.basic_reject (self. Delivery_tag, requeue=requeue) self._state = 'REJECTED' def requeue(self): # reject(reject) (reject =True) self.channel.basic_reject(self.delivery_tag, requeue=True) self._state = 'REQUEUED'Copy the code
The information attached to the message is serialized through different load methods:
from .serialization import loads
@property
def payload(self):
return loads(self.body, self.content_type,
self.content_encoding, accept=self.accept)
Copy the code
The Connection parse
Connection manages the network Connection between producers/consumers and brokers:
class Connection:
def __init__(self, hostname='localhost', userid=None,
password=None, virtual_host=None, port=None, insist=False,
ssl=False, transport=None, connect_timeout=5,
transport_options=None, login_method=None, uri_prefix=None,
heartbeat=0, failover_strategy='round-robin',
alternates=None, **kwargs):
...
params = self._initial_params = {
'hostname': hostname, 'userid': userid,
'password': password, 'virtual_host': virtual_host,
'port': port, 'insist': insist, 'ssl': ssl,
'transport': transport, 'connect_timeout': connect_timeout,
'login_method': login_method, 'heartbeat': heartbeat
}
...
self._init_params(**params)
...
Copy the code
_init_params manages various AQMP-enabled brokers, such as Redis and RobbitMQ:
def _init_params(self, hostname, userid, password, virtual_host, port, insist, ssl, transport, connect_timeout, login_method, heartbeat): transport = transport or 'amqp' if transport == 'amqp' and supports_librabbitmq(): transport = 'librabbitmq' if transport == 'rediss' and ssl_available and not ssl: logger.warning( 'Secure redis scheme specified (rediss) with no ssl ' 'options, defaulting to insecure SSL behaviour.' ) ssl = {'ssl_cert_reqs': CERT_NONE} self.hostname = hostname self.userid = userid self.password = password self.login_method = login_method # Virtual_host = virtual_host or self.virtual_host self.port = port or self.port self Self. connect_timeout = connect_timeout self. SSL = SSL # transport self.transport_cls = transport self.heartbeat = heartbeat and float(heartbeat)Copy the code
After configuring the Connection information, you need to create a network connection. This procedure is created automatically when the Connection property or default_channel property is called:
@property def connection(self): """The underlying connection object. Warning: This instance is transport specific, so do not depend on the interface of this object. """ if not self._closed: if not self.connected: Return self._ensure_connection(max_retries=1, reraise_as_library_errors=False ) return self._connection @property def default_channel(self): """Default channel. Created upon access and closed when the connection is closed. Note: Can be used for automatic channel handling when you only need one channel, and also it is the channel implicitly used if a connection is passed instead of a channel, to functions that require a channel. """ # make sure we're still connected, And if not refresh. Conn_opts = self._extract_failover_opts() # create connection self._enSURE_connection (**conn_opts) if self._default_channel is None: self._default_channel = self.channel() return self._default_channelCopy the code
After the connection is created, continue to create a channel:
def channel(self): """Create and return a new channel.""" self._debug('create channel') chan = self.transport.create_channel(self.connection) return chan def create_transport(self): Return self.get_transport_cls()(client=self) def get_transport_cls(self): """Get the currently used transport class.""" transport_cls = self.transport_cls if not transport_cls or isinstance(transport_cls, str): transport_cls = get_transport_cls(transport_cls) return transport_clsCopy the code
The process of creating a connection to the broker, which is created through transport, covers the details of the adaptation of different types of broker services, which will be covered in the next chapter.
Matcher && serialization
Matcher handles the matching mechanism of messages and serialization of complex messages. Both are implemented similarly, using the registry pattern + policy pattern.
Matcher’s registry:
class MatcherRegistry: """Pattern matching function registry.""" """ MatcherNotInstalled = MatcherNotInstalled matcher_pattern_first = ["pcre", ] def __init__(self): self._matchers = {} self._default_matcher = None #: Global registry of matchers. registry = MatcherRegistry()Copy the code
Register glob(fuzzy) mode and PCRE (regular) mode two strategies:
def register_glob(): Register ('glob', fnmatch) def register_pcre(): Register ('glob', fnmatch) def register_pcre(): """Register pcre into default registry.""" """ Uses re to match """ registry. Register ('pcre', rematch) # Register the base matching methods. register_glob() register_pcre()Copy the code
The way to match messages is to use pattern recognition:
def match(self, data, pattern, matcher=None, matcher_kwargs=None): """Call the matcher.""" if matcher and not self._matchers.get(matcher): Raise self.matchernotInstalled (f'No matcher installed for {matcher}') # default to match_func = self._matchers[matcher Or 'glob'] # if matcher in self. Matcher_pattern_first: first_arg = bytes_to_str(pattern) second_arg = bytes_to_str(data) else: first_arg = bytes_to_str(data) second_arg = bytes_to_str(pattern) return match_func(first_arg, second_arg, **matcher_kwargs or {})Copy the code
Serializer registration Center:
Class SerializerRegistry: """The registry keeps track of serialization methods.""" """ """ def __init__(self): self._encoders = {} self._decoders = {} self._default_encode = None self._default_content_type = None Self._default_content_encoding = None # Record the disabled codec type self._disabled_content_types = set() # Self.type_to_name = {} self.name_to_type = {} # Dumps = SerializerRegistry() dumps = registry.dumps dumps = registry.dumps register = registry.register unregister = registry.unregisterCopy the code
Json, YAML, pickle, and Msgpack registration for four serialization strategies:
def register_json(): """Register a encoder/decoder for JSON serialization.""" from kombu.utils import json as _json registry.register('json', _json.dumps, _json.loads, content_type='application/json', content_encoding='utf-8') def register_yaml(): """Register a encoder/decoder for YAML serialization. It is slower than JSON, but allows for more data types to be serialized. Useful if you need to send data such as dates """ import yaml registry.register('yaml', yaml.safe_dump, yaml.safe_load, content_type='application/x-yaml', content_encoding='utf-8') def register_pickle(): """Register pickle serializer. The fastest serialization method, but restricts you to python clients. """ def pickle_dumps(obj, dumper=pickle.dumps): return dumper(obj, protocol=pickle_protocol) registry.register('pickle', pickle_dumps, unpickle, content_type='application/x-python-serialize', content_encoding='binary') def register_msgpack(): """Register msgpack serializer. See Also: https://msgpack.org/. """ pack = unpack = None import msgpack from msgpack import packb, unpackb def pack(s): return packb(s, use_bin_type=True) def unpack(s): return unpackb(s, raw=False) registry.register( 'msgpack', pack, unpack, content_type='application/x-msgpack', content_encoding='binary', ) register_json() register_pickle() register_yaml() register_msgpack()Copy the code
Use of deserialization:
# kombu - 5.0.0 kombu/serialization. Py: 285 # export strategy loads = registry. Loads # kombu - 5.0.0 kombu/message. Py: 10 the from .serialization import loads class Message: def _decode(self): Content_type, self.content_encoding, accept=self.acceptCopy the code
summary
The Producer of KOMbu can send messages to the broker, and the Producer of Comsumer can consume messages. Exchange is used when sending messages to distribute consumption to different target queues. When consuming messages, you need to use a Queue, which also needs to be bound to an Exchange. Both exchanges and queues use underlying channels for data transmission, so binding is required. They also need to be created in a remote broker, so created exchanges and queues need to declare. Messages are serialized with post information and then forwarded from producer to broker to consumers, who then use serialization conventions on post information to de-sequence messages into business messages.
tip
Pickle package function
Pickle supports serialization of functions as well as data interfaces:
Python3 Python 3.8.5 (v3.8.5:580fbb018f, Jul 20 2020, 12:11:27) [Clang 6.0 (clang-600.0.57)] on Darwin Type "help", python3 Python 3.8.5 (v3.8.5:580fbb018f, Jul 20 2020, 12:11:27) [Clang 6.0 (clang-600.0.57)] "copyright", "credits" or "license" for more information. >>> import pickle >>> >>> def hello(msg): ... print("hello", msg) ... >>> p = pickle.dumps(hello) >>> p b'\x80\x04\x95\x16\x00\x00\x00\x00\x00\x00\x00\x8c\x08__main__\x94\x8c\x05hello\x94\x93\x94.' >>> >>> q = pickle.loads(p) >>> >>> q("python") hello python >>>Copy the code
The hello function above can be executed by pickling and unpacking. Using komBU, the producer process’s functions can be sent to the Consumer process for remote execution. Pickle supports a wide range of data types, including the following:
The following types can be pickled: * None, True, and False * integers, floating point numbers, complex numbers * strings, bytes, bytearrays * tuples, lists, sets, and dictionaries containing only picklable objects * functions defined at the top level of a module (using def, not lambda) * built-in functions defined at the top level of a module * classes that are defined at the top level of a module * instances of such classes whose __dict__ or the result of calling __getstate__() is picklable (see section Pickling Class Instances for details). Copy the code
Simplification of configuration classes
Object provides a way to quickly build objects:
class Object: """Common base class. Supports automatic kwargs->attributes handling, and cloning. """ attrs = () def __init__(self, *args, **kwargs): # attrs defines in subclasses for name, type_ in self. Attrs: Value = kwargs.get(name) # If value is not None: setattr(self, name, (type_ or _any)(value)) else: try: getattr(self, name) except AttributeError: setattr(self, name, None)Copy the code
Queue shows examples of this, such as the max_length attribute:
class Queue(MaybeChannelBound):
attrs = (
..
('max_length', int),
...
)
def __init__(self, name='', exchange=None, routing_key='',
channel=None, bindings=None, on_declared=None,
**kwargs):
self.name = name or self.name
...
def queue_declare(self, nowait=False, passive=False, channel=None):
...
queue_arguments = channel.prepare_queue_arguments(
self.queue_arguments or {},
expires=self.expires,
message_ttl=self.message_ttl,
max_length=self.max_length,
max_length_bytes=self.max_length_bytes,
max_priority=self.max_priority,
)
...
Copy the code
The max_length attribute is not defined in the Queue constructor, but can be used directly in queue_DECLARE to feel the difference with the name attribute. This helps us simplify the definition of objects with many attributes, such as some configuration classes.
Use count to provide the increment ID
Itertools. count provides a way to generate incrementing ids from iterators:
>>> from itertools import count >>> >>> for i in count(): ... if i % 10 == 0: ... print(i) ... if i>50: ... break ... 0, 10, 20, 30, 40, 50Copy the code
Refer to the connection
- Github.com/celery/komb…
- Talking to RabbitMQ with Python and Kombu medium.com/python-pand…
- An article about thoroughly jishuin.proginn.com/p/763bfbd2a closer agreement…