0 x00 the

In this series we introduce the message queue Kombu. Kombu is positioned as an AMQP compliant message queue abstraction. In this article, we can understand the concept of Producer in Kombu.

0x01 Sample code

The following code is used to illustrate.

This example from liqiang. IO /post/kombu-…

def main(arguments) :
    hub = Hub()
    exchange = Exchange('asynt_exchange')
    queue = Queue('asynt_queue', exchange, 'asynt_routing_key')

    def send_message(conn) :
        producer = Producer(conn)
        producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
        print('message sent')

    def on_message(message) :
        print('received: {0! r}'.format(message.body))
        message.ack()
        # hub.stop() # <-- exit after one message

    conn = Connection('redis://localhost:6379')
    conn.register_with_event_loop(hub)

    def p_message() :
        print(' kombu ')

    with Consumer(conn, [queue], on_message=on_message):
        send_message(conn)
        hub.timer.call_repeatedly(3, p_message)
        hub.run_forever()

if __name__ == '__main__':
    sys.exit(main(sys.argv[1:))Copy the code

0 x02 source

Having completed the construction part, the Consumer part, now comes the Producer part, which looks like this:

def send_message(conn) :
		producer = Producer(conn)
    producer.publish('hello world', exchange=exchange, routing_key='asynt')
   	print('message sent')
Copy the code

As we know, Transport needs to associate Channel with file information, but the Transport information is as follows, file information is still missing, which we need to pay attention to in the future:

transport = {Transport} <kombu.transport.redis.Transport object at 0x7f9056a26f98>
 Channel = {type} <class 'kombu.transport.redis.Channel'>
 Cycle = {type} <class 'kombu.utils.scheduling.FairCycle'>
 Management = {type} <class 'kombu.transport.virtual.base.Management'>
 channel_max = {int} 65535
 channels = {list: 2} [<kombu.transport.redis.Channel object at 0x7f9056a57278>, <kombu.transport.redis.Channel object at 0x7f9056b79cc0>]
 client = {Connection} <Connection: redis://localhost:6379// at 0x7f9056a26cc0>
 cycle = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7f9056a436a0>
  after_read = {set: 0} set()
  eventflags = {int} 25
  fds = {dict: 0} {}
  poller = {_poll} <kombu.utils.eventio._poll object at 0x7f9056583048>
 default_connection_params = {dict: 2} {'port': 6379.'hostname': 'localhost'}
 default_port = {int} 6379
 driver_name = {str} 'redis'
 driver_type = {str} 'redis'
 implements = {Implements: 3} {'asynchronous': True.'exchange_type': frozenset({'direct'.'topic'.'fanout'}), 'heartbeats': False}
 manager = {Management} <kombu.transport.virtual.base.Management object at 0x7f9056b79be0>
 polling_interval = {NoneType} None
 state = {BrokerState} <kombu.transport.virtual.base.BrokerState object at 0x7f9056a9ec50>
Copy the code

0 x03 build

3.1 define

In Producer, the main variables are:

  • _channel: channel.
  • Exchange: exchange;

But the example in this article does not pass In Exchange, which is a bit odd and we need to move on.

class Producer:
    """Message Producer. Arguments: channel (kombu.Connection, ChannelT): Connection or channel. exchange (kombu.entity.Exchange, str): Optional default exchange. routing_key (str): Optional default routing key. """

    #: Default exchange
    exchange = None

    #: Default routing key.
    routing_key = ' '

    #: Default serializer to use. Default is JSON.
    serializer = None

    #: Default compression method. Disabled by default.
    compression = None

    #: By default, if a defualt exchange is set,
    #: that exchange will be declare when publishing a message.
    auto_declare = True

    #: Basic return callback.
    on_return = None

    #: Set if channel argument was a Connection instance (using
    #: default_channel).
    __connection__ = None
Copy the code

3.2 the init

The init code is as follows.

def __init__(self, channel, exchange=None, routing_key=None,
             serializer=None, auto_declare=None, compression=None,
             on_return=None) :
    self._channel = channel
    self.exchange = exchange
    self.routing_key = routing_key or self.routing_key
    self.serializer = serializer or self.serializer
    self.compression = compression or self.compression
    self.on_return = on_return or self.on_return
    self._channel_promise = None
    if self.exchange is None:
        self.exchange = Exchange(' ')
    if auto_declare is not None:
        self.auto_declare = auto_declare

    if self._channel:
        self.revive(self._channel)
Copy the code

3.2.1 conversion channel

There’s an important shift here.

  • We start by assigning the input parameter Connection to self._channel.
  • The revive method then does the conversion to channel, that is, self._channel and ultimately the channel type.

But exchange still doesn’t make sense. It’s direct.

The code is as follows:

def revive(self, channel) :
    """Revive the producer after connection loss."""
    if is_connection(channel):
        connection = channel
        self.__connection__ = connection
        channel = ChannelPromise(lambda: connection.default_channel)
    if isinstance(channel, ChannelPromise):
        self._channel = channel
        self.exchange = self.exchange(channel)
    else:
        # Channel already concrete
        self._channel = channel
        if self.on_return:
            self._channel.events['basic_return'].add(self.on_return)
        self.exchange = self.exchange(channel)
Copy the code

At this point, the variable is:

producer = {Producer} 
 auto_declare = {bool} True
 channel = {Channel} <kombu.transport.redis.Channel object at 0x7f9056a57278>
 compression = {NoneType} None
 connection = {Connection} <Connection: redis://localhost:6379// at 0x7f9056a26cc0>
 exchange = {Exchange} Exchange ' '(direct)
 on_return = {NoneType} None
 routing_key = {str} ' '
 serializer = {NoneType} None
Copy the code

The logic is shown as follows:

+----------------------+               +-------------------+
| Producer             |               | Channel           |
|                      |               |                   |        +-----------------------------------------------------------+
|                      |               |    client  +-------------> | Redis<ConnectionPool<Connection<host=localhost,port=6379> | | channel +------------------> | | +-----------------------------------------------------------+ | | | pool | | exchange | +---------> | | <------------------------------------------------------------+ | | | | | | | connection | | +----> | connection +---------------+ | | + | | | | | | | +--+-------------------+ | | +-------------------+ | | | | | |  v | | | | | +-------------------+ +---+-----------------+ +--------------------+ | | | | | | Connection | | redis.Transport | | MultiChannelPoller | | | +----------------------> | | | | | | | | | | | | | | | _channels +--------+  | | | | | | cycle +------------> | _fd_to_chan | | | | | transport +---------> | | | _chan_to_sock | | +-------->+ | | | | | +------+ poller | | | | +-------------------+ +---------------------+ | | after_read | | | | | | | | | | | +--------------------+ | | | +------------------+ +---------------+ | | | | Hub | | | | | | | v | | | | | +------+------+ | | | | poller +---------------> | _poll | | publish | | | | | | +-------+ +--------------------------------+ | | | _poller+---------> | poll | | | | +------------------+ | | +-------+ | | | +-------------+ +-------------------+ | +-----> +----------------+ | Queue | | | | Exchange | | _channel | +---------+ |  | | | | | | exchange +--------------------> | channel | | | | | | | | | +-------------------+ +----------------+Copy the code

Mobile phone as shown below:

0 x04 send

The message is sent via producer.publish.

def send_message(conn) :
    producer = Producer(conn)
    producer.publish('hello world', exchange=exchange, routing_key='asynt')
    print('message sent')
Copy the code

Exchange is passed in as the argument. If you don’t have Exchange, you can fix it here.

Publish: publish: publish: publish: publish: publish

  • Call the channel’s assembly message functionprepare_message;
  • The send message that invokes a channelbasic_publish;

Therefore, the final message is sent through a channel.

def _publish(self, body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, exchange, declare) :
    channel = self.channel
    message = channel.prepare_message(
        body, priority, content_type,
        content_encoding, headers, properties,
    )
    if declare:
        maybe_declare = self.maybe_declare
        [maybe_declare(entity) for entity in declare]

    # handle autogenerated queue names for reply_to
    reply_to = properties.get('reply_to')
    if isinstance(reply_to, Queue):
        properties['reply_to'] = reply_to.name
    return channel.basic_publish(
        message,
        exchange=exchange, routing_key=routing_key,
        mandatory=mandatory, immediate=immediate,
    )
Copy the code

4.1 Assembling a Message in channel

The channel assembly message function prepare_message does the assembly, basically adding various properties to the message.

def prepare_message(self, body, priority=None, content_type=None,
                    content_encoding=None, headers=None, properties=None) :
    """Prepare message data."""
    properties = properties or {}
    properties.setdefault('delivery_info', {})
    properties.setdefault('priority', priority or self.default_priority)

    return {'body': body,
            'content-encoding': content_encoding,
            'content-type': content_type,
            'headers': headers or {},
            'properties': properties or {}}
Copy the code

The message is as follows:

message = {dict: 5} 
 'body' = {str} 'aGVsbG8gd29ybGQ='
 'content-encoding' = {str} 'utf-8'
 'content-type' = {str} 'text/plain'
 'headers' = {dict: 0} {}
  __len__ = {int} 0
 'properties' = {dict: 5} 
  'delivery_mode' = {int} 2
  'delivery_info' = {dict: 2} {'exchange': 'asynt_exchange'.'routing_key': 'asynt_routing_key'}
  'priority' = {int} 0
  'body_encoding' = {str} 'base64'
  'delivery_tag' = {str} '1b03590e-501c-471f-a5f9-f4fdcbe3379a'
  __len__ = {int} 5
Copy the code

4.2 Sending Messages in Channel

Channel basic_publish sends messages. The exchange argument passed in is used.

Basic_publish calls the _PUT method:

def basic_publish(self, message, exchange, routing_key, **kwargs) :
    """Publish message."""
    self._inplace_augment_message(message, exchange, routing_key)
    if exchange:
        return self.typeof(exchange).deliver(
            message, exchange, routing_key, **kwargs
        )
    # anon exchange: routing_key is the destination queue
    return self._put(routing_key, message, **kwargs)
Copy the code

4.3 deliver in exchange

The self.typeof(exchange).Deliver code then comes to Exchange. This article is DirectExchange.

Notice that self.channel._put is used here. Is the Exchange member variable channel.

class DirectExchange(ExchangeType) :
    """Direct exchange. The `direct` exchange routes based on exact routing keys. """

    type = 'direct'

    def lookup(self, table, exchange, routing_key, default) :
        return {
            queue for rkey, _, queue in table
            if rkey == routing_key
        }

    def deliver(self, message, exchange, routing_key, **kwargs) :
        _lookup = self.channel._lookup
        _put = self.channel._put
        for queue in _lookup(exchange, routing_key):
            _put(queue, message, **kwargs)
Copy the code

4.4 binding conversion

As we know, Exchange simply converts the routing_key sent to the name of the queue. So we know which queue we’re sending to.

Therefore, use the _lookup method to obtain the corresponding queue.

def _lookup(self, exchange, routing_key, default=None) :
    """Find all queues matching `routing_key` for the given `exchange`. Returns: str: queue name -- must return the string `default` if no queues matched. """
    if default is None:
        default = self.deadletter_queue
    if not exchange:  # anon exchange
        return [routing_key or default]

    try:
        R = self.typeof(exchange).lookup(
            self.get_table(exchange),
            exchange, routing_key, default,
        )
    except KeyError:
        R = []

    if not R and default is not None:
        warnings.warn(UndeliverableWarning(UNDELIVERABLE_FMT.format(
            exchange=exchange, routing_key=routing_key)),
        )
        self._new_queue(default)
        R = [default]
    return R
Copy the code

The specific logic here is:

First, call the method to channel. The exchange name here is asynt_exchange.

def get_table(self, exchange) :
    key = self.keyprefix_queue % exchange
    with self.conn_or_acquire() as client:
        values = client.smembers(key)
        if not values:
            raise InconsistencyError(NO_ROUTE_ERROR.format(exchange, key))
        return [tuple(bytes_to_str(val).split(self.sep)) for val in values]
Copy the code

If we look at the Redis content, we find that the collection content is as follows:

127.0. 01.:6379> smembers _kombu.binding.asynt_exchange
1) "asynt_routing_key\x06\x16\x06\x16asynt_queue"
Copy the code

Secondly, corresponding binding can be obtained as follows:

{b'asynt_routing_key\x06\x16\x06\x16asynt_queue'}

Get the routing_key –> queue from exchange, and then get the queue from routing_key. We know which queue the Consumer and Producer need to exchange messages on.

The logic is as follows:

                                  +---------------------------------+
                                  |         exchange                |
                                  |                                 |
                 1routing_key x | | +----------+ | | +------------+ | Producer | +-----------------> | routing_key x ---> queue x | | Consumer | +--------+-+ | | +------------+ | | routing_key y ---> queue y | | | | ^ | | routing_key z ---> queue z | | |  | | | | +---------------------------------+ | | | | | | | | | | | | | | | | | | +-----------+ | |2 message                 |           |        3 message      |
         +------------------------------->  |  queue X  |  +--------------------+
                                            |           |
                                            +-----------+

Copy the code

4.5 _put in the channel

Channel’s _put method is used to continue processing, and you can see that it ends up calling client.lpush.

The client is:

Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
Copy the code

The code is:

def _put(self, queue, message, **kwargs) :
    """Deliver message."""
    pri = self._get_message_priority(message, reverse=False)

    with self.conn_or_acquire() as client:
        client.lpush(self._q_for_pri(queue, pri), dumps(message))
Copy the code

How does Redis distinguish between queues?

Each queue is given a string name, which is the key of the list corresponding to redis. Knowing which list to put a message to, the next step is to send lpush messages to that list.

The following method completes the conversion function.

def _q_for_pri(self, queue, pri) :
    pri = self.priority(pri)
    if pri:
        return f"{queue}{self.sep}{pri}"
    return queue
Copy the code

Now, after sending the message, the redis is like this, and we can see that the message is put in as an item of the list.

127.0. 01.:6379> lrange asynt_queue 0 -1
1) "{\"body\": \"aGVsbG8gd29ybGQ=\", \"content-encoding\": \"utf-8\", \"content-type\": \"text/plain\", \"headers\": {}, \"properties\": {\"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"asynt_exchange\", \"routing_key\": \"asynt_routing_key\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"df7af424-e1ab-4c08-84b5-1cd5c97ed25d\"}}"
127.0. 01.:6379> 
Copy the code

0 x05 summary

Now we conclude as follows:

  • Producers: Abstract classes that send messages;
  • Consumers: Abstract classes that receive messages. The consumer needs to declare a queue, bind it to a specified Exchange, and then receive messages from it.
  • Exchange: MQ routing, where the message sender sends the message to the Exchange, which distributes the message to the queue.
  • Queue: The corresponding Queue abstraction stores messages that will be consumed by applications. Exchange distributes messages to the Queue and consumers receive messages from the Queue.
  • Channel: similar to the concept in AMQP, it can be understood as multiple lightweight connections sharing a Connection, which is the real Redis Connection.

So the logical chain is formed, and it goes something like this:

  • The Publish method accepts the Exchange parameter and sends a message to that Exchange.
  • Producer calls the channel’s assembly message functionprepare_messageAdd various attributes to the message;
  • Producer invokes a channel’s sending message basic_publish sending message, using the incoming exchange parameter.
  • The basic_publish method calls exchange.deliver(exchange, routing_key) to send messages;
  • Queues correspond to routing_key. Queues correspond to Queues. Queues correspond to Queues.
  • Deliver uses the _lookup method to get the corresponding queue based on key.
  • Deliver uses the _PUT method of the Exchange member variable Channel to put messages to the queue;
  • Channel gets its own Redis connection pool, that is, client isRedis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>; So you can do redis based on this;
  • Each queue is given a string name, which is the key of the list redis corresponds to.
  • Now that we have a list named Queue, we lpush messages to the list.
  • The Consumer fetches the message from the Queue.

Dynamic logic is as follows:

       +------------+                        +------------+               +------------+      +-----------------------+
       |  producer  |                        |  channel   |               |  exchange  |      | Redis<ConnectionPool> |
       +---+--------+                        +----+-------+               +-------+----+      +----------+------------+
           |                                      |                               |                      |
           |                                      |                               |                      |
publish(' ', exchange, routing_key)                |                               |                      |
           |                                      |                               |                      |
           |   prepare_message                    |                               |                      |
           |                                      |                               |                      |
           | +----------------------------------> |                               |                      |
           |                                      |                               |                      |
           | basic_publish (exchange, routing_key)|                               |                      |
           |                                      |                               |                      |
           | +----------------------------------> |                               |                      |
           |                                      |                               |                      |
           |                                      | deliver(exchange, routing_key)|                      |
           |                                      |                               |                      |
           |                                      +-----------------------------> |                      |
           |                                      |                               |                      |
           |                                      |                               |                      |
           |                                      |                _lookup(exchange, routing_key)        |
           |                                      |                               |                      |
           |                                      |                               |                      |
           |                                      |    _put(queue, message)       |                      |
           |                                      v                               |                      |
           |                                      | <---------------------------+ |                      |
           |                                      |                               |                      |
           |                                _q_for_pri(queue, pri)                |                      |
           |                                      +                               |                      |
           v                                      |                               |                      |
           |                                      |     client.lpush              |                      |
           |                                      | +--------------------------------------------------> |
           |                                      |                               |                      |
           v                                      v                               v                      v

Copy the code

The mobile phone is as follows:

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

Registry and EntryPoint for kombu excellent open source projects

Drop pika in favor of Kombu

Kombu Message Framework < 2 >

Concepts in AMQP

Basic concepts of AMQP

In-depth understanding of AMQP protocol

Kombu and message queue summary

Understanding epoll Version Server (Python implementation)