0 x00 the

In this series we introduce the message queue Kombu. Kombu is positioned as an AMQP compliant message queue abstraction. Through this article, you can understand the concept of mailbox in Kombu, and by the way, you can sort out the contents of previous articles again.

0x01 Sample code

Liqiang.io /post/celery…

The sample code is divided into two parts ˛

Node can be understood as broadcast Consumer. A Client can be regarded as the initiator of a broadcast.

1.1 the Node

import sys
import kombu
from kombu import pidbox

hostname = "localhost"
connection = kombu.Connection('redis://localhost:6379')
mailbox = pidbox.Mailbox("testMailbox".type="direct")
node = mailbox.Node(hostname, state={"a": "b"})
node.channel = connection.channel()

def callback(body, message) :
    print(body)
    print(message)

def main(arguments) :
    consumer = node.listen(callback=callback)
    try:
        while True:
            print('Consumer Waiting')
            connection.drain_events()
    finally:
        consumer.cancel()

if __name__ == '__main__':
    sys.exit(main(sys.argv[1:))Copy the code

1.2 the client

import sys
import kombu
from kombu import pidbox

def callback() :
    print("callback")

def main(arguments) :
    connection = kombu.Connection('redis://localhost:6379')
    mailbox = pidbox.Mailbox("testMailbox".type="direct")
    bound = mailbox(connection)
    bound._broadcast("print_msg", {'msg': 'Message for you'})

if __name__ == '__main__':
    sys.exit(main(sys.argv[1:))Copy the code

0x02 Core idea

The broadcast function is completed by the pubSub mechanism of Redis.

2.1 Redis PubSub

To support message multicast, Redis uses a separate module, PubSub.

Redis acts as a bridge between publishing and subscribing messages. In Redis, there is a concept of channel, that is, channel. The publisher publishes to a certain channel by specifying it. As long as a subscriber subscribes to the channel, the message will be sent to the subscriber.

2.2 an overview of the

In Kombu’s Mailbox implementation, there are two parts: Consumer and Producer.

When registering a listener in Kombu’s Channel class, the listener is actually registered to a key using the Redis driver’s PubSub function. The Consumer queue and callback functions are then linked to Redis via a Channel. Then the message can be read from Redis later.

psubscribe, client.py:3542
_subscribe, redis.py:664
_register_LISTEN, redis.py:322
get, redis.py:375
drain_events, base.py:960
drain_events, connection.py:318
main, node.py:24
Copy the code

0x03 Consumer

Let’s take a step by step look at the example code to complete the broadcast function.

3.1 to establish a Connection

After completing the following code, the system establishes a Connection.

connection = kombu.Connection('redis://localhost:6379')
Copy the code

As follows, I divide the problem domain into user domain and Kombu domain for understanding:

user scope                 +            kombu scope
                           |
                           |
+------------+             |            +--------------------------------------+
| connection | +----------------------> | Connection: redis://localhost:6379// |
+------------+             |            +--------------------------------------+
                           |
                           |
                           |
                           |
                           |
                           +
Copy the code

3.2 establish a mailbox

After completing the following code, the system establishes Connection and Mailbox.

connection = kombu.Connection('redis://localhost:6379')
mailbox = pidbox.Mailbox("testMailbox".type="fanout")
Copy the code

However, there is no connection between the two at this time, and some member variables of mailbox have no actual meaning. Examples are as follows:

mailbox = {Mailbox} <kombu.pidbox.Mailbox object at 0x7fea4b81df28>
 accept = {list: 1} ['json']
 clock = {LamportClock} 0
 connection = {NoneType} None
 exchange = {Exchange} Exchange testMailbox.pidbox(fanout)
 exchange_fmt = {str} '%s.pidbox'
 namespace = {str} 'testMailbox'
 node_cls = {type} <class 'kombu.pidbox.Node'>
 oid = {str} '9386a23b-ae96- 3c6c-b036-ae7646455ebb'
 producer_pool = {NoneType} None
 queue_expires = {NoneType} None
 queue_ttl = {NoneType} None
 reply_exchange = {Exchange} Exchange reply.testMailbox.pidbox(direct)
 reply_exchange_fmt = {str} 'reply%.s.pidbox'
 reply_queue = {Queue} <unbound Queue 9386a23b-ae96- 3c6c-b036-ae7646455ebb.reply.testMailbox.pidbox -> <unbound Exchange reply.testMailbox.pidbox(direct)> -> 9386a23b-ae96-3c6c-b036-ae7646455ebb>
 reply_queue_expires = {float} 10.0
 reply_queue_ttl = {NoneType} None
 serializer = {NoneType} None
 type = {str} 'fanout'
 unclaimed = {defaultdict: 0} defaultdict(<class 'collections.deque'>, {})
Copy the code

The logic is as follows:

3.3 establish a Node

After completing the following code, the system establishes Connection, Mailbox, and Node.

Node is a concept in mailbox, which can be understood as a specific mailbox.

connection = kombu.Connection('redis://localhost:6379')
mailbox = pidbox.Mailbox("testMailbox".type="fanout")
node = mailbox.Node(hostname, state={"a": "b"})

Copy the code

Node provides an example:

node = {Node} <kombu.pidbox.Node object at 0x7fea4b8bffd0>
 channel = {NoneType} None
 handlers = {dict: 0} {}
 hostname = {str} 'localhost'
 mailbox = {Mailbox} <kombu.pidbox.Mailbox object at 0x7fea4b81df28>
 state = {dict: 1} {'a': 'b'}

Copy the code

The logic is as follows:

3.4 set up the channel

After the following code, the channel is created.

connection = kombu.Connection('redis://localhost:6379')
mailbox = pidbox.Mailbox("testMailbox".type="fanout")
node = mailbox.Node(hostname, state={"a": "b"})
node.channel = connection.channel()
Copy the code

3.4.1 track link

The Connection between channel, Connection and Transport is explained as follows:

  • Connection: abstraction of MQ connections. A Connection corresponds to a Connection to MQ. Connection is the encapsulation of a Connection by AMQP;
  • Channel: Similar to the concept in AMQP, can be understood as multiple lightweight connections sharing a Connection; A Channel is an encapsulation of an AMQP operation on MQ;
  • Transport: Kombu supports flexible configuration of different message-oriented middleware as plug-ins. The use of the term Transport to refer to a concrete message-oriented middleware can be thought of as an abstraction of the broker:
    • Operations on MQ are bound to be connected, but instead of having channels directly send/receive requests using Connection, Kombu introduces a new abstract Transport, which is responsible for specific MQ operations, That is, all Channel operations fall on Transport. Introducing the abstract concept of transport makes it very easy to add transport to non-AMQP later;
    • Transport is a real MQ connection and an instance of a real connection to MQ(Redis/RabbitMQ), differentiating the implementation of the underlying message queue;
    • Build-in support for Kombu includes Redis, Beanstalk, Amazon SQS, CouchDB, MongoDB, ZeroMQ, ZooKeeper, SoftLayer MQ, and Pyro.

3.4.2 Channel

There are two channels listed in Transport:

self._avail_channels
self.channels
Copy the code

If _avail_Channels has content, fetch it directly, otherwise a new Channel is generated.

To actually connect, establish_connection is called in self._avail_channels.

def establish_connection(self) :
    # creates channel to verify connection.
    # this channel is then used as the next requested channel.
    # (returned by ``create_channel``).
    self._avail_channels.append(self.create_channel(self))
    return self     # for drain events
Copy the code

The key initialization code for Channel is as follows:

class Channel(virtual.Channel) :
    """Redis Channel."""

    def __init__(self, *args, **kwargs) :
        super().__init__(*args, **kwargs)

        self._queue_cycle = cycle_by_name(self.queue_order_strategy)()
        self.Client = self._get_client()

        self.active_fanout_queues = set()
        self.auto_delete_queues = set()
        self._fanout_to_queue = {}
        self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}

        self.connection.cycle.add(self)  # add to channel poller

            if register_after_fork is not None:
                register_after_fork(self, _after_fork_cleanup_channel)
Copy the code

Rule 3.4.3 MultiChannelPoller

The MultiChannelPoller is an execution engine.

  • Collect channel;
  • Establish fd to channel mapping;
  • Create channel to SOCKS mapping;
  • Using poll;
class MultiChannelPoller:

    def __init__(self) :
        # active channels
        self._channels = set(a)# file descriptor -> channel map.
        self._fd_to_chan = {}
        # channel -> socket map
        self._chan_to_sock = {}
        # poll implementation (epoll/kqueue/select)
        self.poller = poll()
        # one-shot callbacks called after reading from socket.
        self.after_read = set(a)def add(self, channel) :
        self._channels.add(channel)
Copy the code

The last example of a Channel variable is as follows:

self = {Channel} <kombu.transport.redis.Channel object at 0x7ffc6d9c5fd0>
 Client = {type} <class 'redis.client.Redis'>
 Message = {type} <class 'kombu.transport.virtual.base.Message'>
 QoS = {type} <class 'kombu.transport.redis.QoS'>
 active_fanout_queues = {set: 0} set()
 active_queues = {set: 0} set()
 async_pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
 auto_delete_queues = {set: 0} set()
 body_encoding = {str} 'base64'
 channel_id = {int} 1
 client = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
 connection = {Transport} <kombu.transport.redis.Transport object at 0x7ffc6d9c5f60>
 cycle = {FairCycle} <FairCycle: 0/0 []>
 exchange_types = {dict: 3} {'direct': <kombu.transport.virtual.exchange.DirectExchange object at 0x7ffc6d9c5f98>, 'topic': <kombu.transport.virtual.exchange.TopicExchange object at 0x7ffc6d9c5d68>, 'fanout': <kombu.transport.virtual.exchange.FanoutExchange object at 0x7ffc6d9d2b70>}
 handlers = {dict: 2} {'BRPOP': <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7ffc6d9c5fd0> >,'LISTEN': <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7ffc6d9c5fd0>>}
 health_check_interval = {int} 25
 keyprefix_fanout = {str} '/ 0.
 keyprefix_queue = {str} '_kombu.binding.%s'
 pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
 priority_steps = {list: 4} [0.3.6.9]
 qos = {QoS} <kombu.transport.redis.QoS object at 0x7ffc6d9fbc88>
 queue_order_strategy = {str} 'round_robin'
 state = {BrokerState} <kombu.transport.virtual.base.BrokerState object at 0x7ffc6d969e10>
 subclient = {PubSub} <redis.client.PubSub object at 0x7ffc6d9fbd68>

Copy the code

Finally, an example of the Transport variable is as follows:

connection = {Transport} <kombu.transport.redis.Transport object at 0x7ffc6d9c5f60>
 Channel = {type} <class 'kombu.transport.redis.Channel'>
 Cycle = {type} <class 'kombu.utils.scheduling.FairCycle'>
 Management = {type} <class 'kombu.transport.virtual.base.Management'>
 channels = {list: 1} [<kombu.transport.redis.Channel object at 0x7ffc6da8c748>]
 client = {Connection} <Connection: redis://localhost:6379// at 0x7ffc6d5f0e80>
 connection_errors = {tuple: 8} (<class 'amqp.exceptions.ConnectionError'>, <class 'kombu.exceptions.InconsistencyError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>, <class 'redis.exceptions.ConnectionError'>, <class 'redis.exceptions.AuthenticationError'>, <class 'redis.exceptions.TimeoutError'>)
 cycle = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7ffc6d9d2198>
  after_read = {set: 0} set()
  eventflags = {int} 25
  fds = {dict: 0} {}
  poller = {_poll} <kombu.utils.eventio._poll object at 0x7ffc6d9d21d0>
 driver_name = {str} 'redis'
 driver_type = {str} 'redis'
 implements = {Implements: 3} {'asynchronous': True.'exchange_type': frozenset({'direct'.'fanout'.'topic'}), 'heartbeats': False}
 manager = {Management} <kombu.transport.virtual.base.Management object at 0x7ffc6da8c6d8>
 state = {BrokerState} <kombu.transport.virtual.base.BrokerState object at 0x7ffc6d969e10>

Copy the code

The logic is as follows:

3.5 establish a Consumer

The following code creates a Consumer and Queue. That said, the broadcast still needs to be done by Consumer, or by Consumer functionality.

def main(arguments) :
    consumer = node.listen(callback=callback)

Copy the code

The listen code is as follows:

def listen(self, channel=None, callback=None) :
    consumer = self.Consumer(channel=channel,
                             callbacks=[callback or self.handle_message],
                             on_decode_error=self.on_decode_error)
    consumer.consume()
    return consumer

Copy the code

The corresponding Queue variable is as follows:

queue = {Queue} <unbound Queue localhost.testMailbox.pidbox -> <unbound Exchange testMailbox.pidbox(fanout)> -> >
 ContentDisallowed = {type} <class 'kombu.exceptions.ContentDisallowed'>
 alias = {NoneType} None
 auto_delete = {bool} True
 binding_arguments = {NoneType} None
 bindings = {set: 0} set()
 can_cache_declaration = {bool} False
 channel = {str} 'line 178, in _getPyDictionary\n attr = getattr(var, n)\n File " consumer_arguments = {NoneType} None durable = {bool} False exchange = {Exchange} Exchange testMailbox.pidbox(fanout)Copy the code

The logic is as follows:

3.5.1 Binding Write Redis

The binding is written to Redis so that the binding can be used later for routing.

The specific stack is as follows:

sadd, client.py:2243
_queue_bind, redis.py:817
queue_bind, base.py:568
bind_to, entity.py:674
queue_bind, entity.py:662
_create_queue, entity.py:617
declare, entity.py:606
declare, messaging.py:417
revive, messaging.py:404
__init__, messaging.py:382
Consumer, pidbox.py:78
listen, pidbox.py:91
main, node.py:20
<module>, node.py:29

Copy the code

Logic diagram, where Redis appears.

3.5.2 configuration

Code to the kombu/transport/virtual/base. Py, work here is as follows:

  • Add the consumer queue to the Channel
  • Add the callback function to the Channel;
  • Add Consumer to the loop;

In this way, Comuser’s queue and callback functions are linked through channels.

The code is as follows:

    def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs) :
        """Consume from `queue`."""
        self._tag_to_queue[consumer_tag] = queue
        self._active_queues.append(queue)

        def _callback(raw_message) :
            message = self.Message(raw_message, channel=self)
            if not no_ack:
                self.qos.append(message, message.delivery_tag)
            return callback(message)

        self.connection._callbacks[queue] = _callback
        self._consumers.add(consumer_tag)

        self._reset_cycle()

Copy the code

The call stack looks like this:

basic_consume, base.py:635
basic_consume, redis.py:598
consume, entity.py:738
_basic_consume, messaging.py:594
consume, messaging.py:473
listen, pidbox.py:92
main, node.py:20
<module>, node.py:29
Copy the code

It’s still in Channel

self = {Channel} <kombu.transport.redis.Channel object at 0x7fc252239908>
 Client = {type} <class 'redis.client.Redis'>
 Message = {type} <class 'kombu.transport.virtual.base.Message'>
 active_fanout_queues = {set: 1} {'localhost.testMailbox.pidbox'}
 active_queues = {set: 0} set()
 async_pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
 auto_delete_queues = {set: 1} {'localhost.testMailbox.pidbox'}
 body_encoding = {str} 'base64'
 channel_id = {int} 1
 client = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
 closed = {bool} False
 codecs = {dict: 1} {'base64': <kombu.transport.virtual.base.Base64 object at 0x7fc25218f5c0>}
 connection = {Transport} <kombu.transport.redis.Transport object at 0x7fc2522295f8>
 cycle = {FairCycle} <FairCycle: 0/1 ['localhost.testMailbox.pidbox']>
 deadletter_queue = {NoneType} None
 default_priority = {int} 0
 do_restore = {bool} True
 exchange_types = {dict: 3} {'direct': <kombu.transport.virtual.exchange.DirectExchange object at 0x7fc252239fd0>, 'topic': <kombu.transport.virtual.exchange.TopicExchange object at 0x7fc252239f60>, 'fanout': <kombu.transport.virtual.exchange.FanoutExchange object at 0x7fc252239f28>}
 handlers = {dict: 2} {'BRPOP': <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7fc252239908> >,'LISTEN': <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7fc252239908>>}
 keyprefix_fanout = {str} '/ 0.
 keyprefix_queue = {str} '_kombu.binding.%s'
 pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
 priority_steps = {list: 4} [0.3.6.9]
 qos = {QoS} <kombu.transport.redis.QoS object at 0x7fc252264320>
 queue_order_strategy = {str} 'round_robin'
 state = {BrokerState} <kombu.transport.virtual.base.BrokerState object at 0x7fc25218f6a0>
 subclient = {PubSub} <redis.client.PubSub object at 0x7fc252264400>
Copy the code

The specific cycle is as follows:

def _reset_cycle(self) :
    self._cycle = FairCycle(
        self._get_and_deliver, self._active_queues, Empty)
Copy the code

FairCycle is defined as follows:

class FairCycle:
    """Cycle between resources. Consume from a set of resources, where each resource gets an equal chance to be consumed from. Arguments: fun (Callable): Callback to call. resources (Sequence[Any]): List of resources. predicate (type): Exception predicate. """

    def __init__(self, fun, resources, predicate=Exception) :
        self.fun = fun
        self.resources = resources
        self.predicate = predicate
        self.pos = 0

    def _next(self) :
        while 1:
            try:
                resource = self.resources[self.pos]
                self.pos += 1
                return resource
            except IndexError:
                self.pos = 0
                if not self.resources:
                    raise self.predicate()

    def get(self, callback, **kwargs) :
        """Get from next resource."""
        for tried in count(0) :# for infinity
            resource = self._next(a)try:
                return self.fun(resource, callback, **kwargs)
            except self.predicate:
                # reraise when retries exchausted.
                if tried >= len(self.resources) - 1:
                    raise
Copy the code

The callback function looks like this:

fun = {method} <bound method AbstractChannel._get_and_deliver of <kombu.transport.redis.Channel object at 0x7fc252239908>>
resources = {list: 1} ['localhost.testMailbox.pidbox']

Copy the code

The logic is as follows:

 user scope        +      Kombu                                        +-----------------------+    +-----------------------+                                   +          redis
                   |                                                   | Transport             |    | MultiChannelPoller    |                                   |
                   |                                                   |                       |    |                       |                                   |
                   |       +--------------------------------------+    |            cycle +-------> |           _channels +----+                                |
                   |       |                                      |    |                       |    +-----------------------+  |                                |
 +------------+    |       | Connection: redis://localhost:6379// | | channels +--------+ v | | Connection|-----------> | | | | | +-----------------+---+ | +------------+ | | | | _avail_channels+---------+ | Channel | <------------+ | | | connection+-------> | | | | | | | | | | +-----------------------+ | | _active_queues +------------------------+ | | +--------------------------------------+ | |  | | | | | +----->+ | +---------+-+ | | | +----------------------------+ | cycle +------> | FairCycle | | | | | Exchange  | | | | | | | | +------------------------------+ +--> | | <-----+ | | +-----------+ | | +---------+ | | Mailbox | | | testMailbox.pidbox(fanout) | | | handlers+-----+ | | | mailbox|--------------> | | | +----------------------------+ | +-+--+----------------+ | | | +---------+ | | | | | ^ ^ | | | | | exchange +---------------+ +---------------------------------+ | | | v | | | | | | Exchange | | | | +---------------+---------------+ | | | | reply_exchange +------------> | | | | | |'BRPOP': Channel._brpop_read |   |    |
                   |       |                              |        | reply.testMailbox.pidbox(direct)|   |     |  |    |                               |   |    |
                   |       |        reply_queue +-------------+    +-------------------+-------------+   |     |  |    |  'LISTEN': Channel._receive   |   |    |
                   |       |                              |   |                        ^                 |     |  |    |                               |   |    |
                   |       |                              |   |    +--------+          |                 |     |  |    +-------------------------------+   |    |
                   |       +-------------------------+----+   +--> | Queue  +----------+                 |     |  |                                        |    |
                   |                                 ^             +--------+                            |     |  |                                        |    |
                   |                                 |                                                   |     |  |                                        |    |  +----------------------------------------------------+
                   |       +---------------------+   |                                                   |     |  |                                        |    |  |      _kombu.binding.testMailbox.pidbox             |
 +-----+           |       |                     |   |                                                   |     |  |                                        |    |  |                                                    |
 |node | +---------------->+ Node      channel+-------------------------------------------------------------------+                                        |    |  |                                                    |
 +-----+           |       |                     |   |                                                   |     |                                           |    |  |   "\x06\x16\x06\x16localhost.testMailbox.pidbox"| | | mailbox +-----+ | | | | | | | | | +----------------------------------------------------+ | | + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | ^ | | | | | | | | | | | | | +------------------------+ | +-----------------------------------------------------------------------------+ | | | +----------+ | | | | | Queue | | | | | | consumer | | | Consumer channel +-------+ | + | <-----+ | | +----------+ | | | | exchange | | | | | queues +---------------> | | | | | | | | | +------------------------+ +-----------+ | | callbacks |  | <localhost.testMailbox.pidbox -> Exchange testMailbox.pidbox(fanout)> | | | callback | | | + | | | | +------+----+ | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | ^ | | | | |  | | +--------------------------------------+ + |Copy the code

Mobile phones are as follows

3.5.3 Configuring load Balancing

Back in the Channel class, load balancing is finally configured to specify which Queue messages will be used next.

def basic_consume(self, queue, *args, **kwargs) :
    if queue in self._fanout_queues:
        exchange, _ = self._fanout_queues[queue]
        self.active_fanout_queues.add(queue)
        self._fanout_to_queue[exchange] = queue
    ret = super().basic_consume(queue, *args, **kwargs)

    # Update fair cycle between queues.
    #
    # We cycle between queues fairly to make sure that
    # each queue is equally likely to be consumed from,
    # so that a very busy queue will not block others.
    #
    # This works by using Redis's `BRPOP` command and
    # by rotating the most recently used queue to the
    # and of the list. See Kombu github issue #166 for
    # more discussion of this method.
    self._update_queue_cycle()
    return ret
  
  
def _update_queue_cycle(self) :
    self._queue_cycle.update(self.active_queues)
Copy the code

The stack is as follows:

update, scheduling.py:75
_update_queue_cycle, redis.py:1018
basic_consume, redis.py:610
consume, entity.py:738
_basic_consume, messaging.py:594
consume, messaging.py:473
listen, pidbox.py:92
main, node.py:20
<module>, node.py:29
Copy the code

The strategy is as follows:

class round_robin_cycle:
    """Iterator that cycles between items in round-robin."""

    def __init__(self, it=None) :
        self.items = it if it is not None else []

    def update(self, it) :
        """Update items from iterable."""
        self.items[:] = it

    def consume(self, n) :
        """Consume n items."""
        return self.items[:n]

    def rotate(self, last_used) :
        """Move most recently used item to end of list."""
        items = self.items
        try:
            items.append(items.pop(items.index(last_used)))
        except ValueError:
            pass
        return last_used
Copy the code

The logic is as follows:

 user scope        +      Kombu                                        +-----------------------+    +-----------------------+                                   +          redis
                   |                                                   | Transport             |    | MultiChannelPoller    |                                   |
                   |                                                   |                       |    |                       |                                   |
                   |       +--------------------------------------+    |            cycle +-------> |           _channels +----+                                |
                   |       |                                      |    |                       |    +-----------------------+  |                                |
 +------------+    |       | Connection: redis://localhost:6379// | | channels +--------+ v | | Connection|-----------> | | | | | +-----------------+---+ | +------------+ | | | | _avail_channels+---------+ | Channel | <------------+ | | | connection+-------> | | | | | | | | | | +-----------------------+ | | _active_queues +------------------------+ | | +--------------------------------------+ | |  | | | | | +----->+ cycle +------> +--------+--+ | | | +----------------------------+ | | | FairCycle | | | | | Exchange  | | | +-----------+ | | | +------------------------------+ +--> | | <-----+ | _queue_cycle+-----------+ | | +---------+  | | Mailbox | | | testMailbox.pidbox(fanout) | | | | | | | | mailbox|--------------> | | | +----------------------------+ | | handlers | v | | +---------+ | | | | | | + | round_robin_cycle| | | | exchange +---------------+ +---------------------------------+ | +-+--+----------------+ | | | | | | Exchange | | ^ ^ | | | | | reply_exchange +------------> | | | | | | | | | | | | reply.testMailbox.pidbox(direct)| | | | | | | | | reply_queue +-------------+ +-------------------+-------------+ | | | v | | | | | | ^ | | | +-----+-------------------------+ | | | | | | +--------+ | | | | |'BRPOP': Channel._brpop_read |   |    |
                   |       +-------------------------+----+   +--> | Queue  +----------+                 |     |  |    |                               |   |    |
                   |                                 ^             +--------+                            |     |  |    |  'LISTEN': Channel._receive   |   |    |
                   |                                 |                                                   |     |  |    |                               |   |    |  +----------------------------------------------------+
                   |       +---------------------+   |                                                   |     |  |    +-------------------------------+   |    |  |      _kombu.binding.testMailbox.pidbox             |
 +-----+           |       |                     |   |                                                   |     |  |                                        |    |  |                                                    |
 |node | +---------------->+ Node      channel+-------------------------------------------------------------------+                                        |    |  |                                                    |
 +-----+           |       |                     |   |                                                   |     |                                           |    |  |   "\x06\x16\x06\x16localhost.testMailbox.pidbox"| | | mailbox +-----+ | | | | | | | | | +----------------------------------------------------+ | | + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | ^ | | | | | | | | | | | | | +------------------------+ | +-----------------------------------------------------------------------------+ | | | +----------+ | | | | | Queue | | | | | | consumer | | | Consumer channel +-------+ | + | <-----+ | | +----------+ | | | | exchange | | | | | queues +---------------> | | | | | | | | | +------------------------+ +-----------+ | | callbacks |  | <localhost.testMailbox.pidbox -> Exchange testMailbox.pidbox(fanout)> | | | callback | | | + | | | | +------+----+ | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | ^ | | | | |  | | +--------------------------------------+ + |Copy the code

The mobile phone is as follows:

3.6 consumption

3.2.1 Consumer Subject

The following code completes the consumption.

def main(arguments) :
    consumer = node.listen(callback=callback)
    try:
        while True:
            print('Consumer Waiting')
            connection.drain_events()
    finally:
        consumer.cancel()

Copy the code

The drain_events code is used to read the messages:

def drain_events(self, connection, timeout=None) :
    time_start = monotonic()
    get = self.cycle.get
    polling_interval = self.polling_interval
    if timeout and polling_interval and polling_interval > timeout:
        polling_interval = timeout
    while 1:
        try:
            get(self._deliver, timeout=timeout)
        except Empty:
            if timeout is not None and monotonic() - time_start >= timeout:
                raise socket.timeout()
            if polling_interval is not None:
                sleep(polling_interval)
        else:
            break

Copy the code

3.2.2 Service Logic

3.2.2.1 registered

The get method does the following (note that each purchase requires a get function, i.e., each purchase is registered and consumed….) :

  • Registration response mode;
  • Poll, which is a general operation, or BRPOP, or LISTEN;
  • Call handle_event to read REDis, specific consumption;
def get(self, callback, timeout=None) :
    self._in_protected_read = True
    try:
        for channel in self._channels:
            if channel.active_queues:           # BRPOP mode?
                if channel.qos.can_consume():
                    self._register_BRPOP(channel)
            if channel.active_fanout_queues:    # LISTEN mode?
                self._register_LISTEN(channel)

        events = self.poller.poll(timeout)
        if events:
            for fileno, event in events:
                ret = self.handle_event(fileno, event) # read redis specifically for consumption
                if ret:
                    return
        # - no new data, so try to restore messages.
        # - reset active redis commands.
        self.maybe_restore_messages()
        raise Empty()
    finally:
        self._in_protected_read = False
        while self.after_read:
            try:
                fun = self.after_read.pop()
            except KeyError:
                break
            else:
                fun()

Copy the code

Since pubsub is used here, we call channel._subscribe to register the subscription as follows:

def _register_LISTEN(self, channel) :
    """Enable LISTEN mode for channel."""
    if not self._client_registered(channel, channel.subclient, 'LISTEN'):
        channel._in_listen = False
        self._register(channel, channel.subclient, 'LISTEN')
    if not channel._in_listen:
        channel._subscribe()  # send SUBSCRIBE

Copy the code

Specific categories are as follows:

self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7fc2522297f0>

Copy the code

_register combines channel and socket fd information. If the corresponding socket FD has a poll, the corresponding channel will be called.

def _register(self, channel, client, type) :
    if (channel, client, type) in self._chan_to_sock:
        self._unregister(channel, client, type)
    if client.connection._sock is None:   # not connected yet.
        client.connection.connect()
    sock = client.connection._sock
    self._fd_to_chan[sock.fileno()] = (channel, type)
    self._chan_to_sock[(channel, client, type)] = sock
    self.poller.register(sock, self.eventflags)
Copy the code

The concrete _subscribe is to register with the concrete Redis.

In this way, for consumers, Redis also connected, Poll also connected, and the following can be consumed.

def _subscribe(self) :
    keys = [self._get_subscribe_topic(queue)
            for queue in self.active_fanout_queues]
    if not keys:
        return
    c = self.subclient
    if c.connection._sock is None:
        c.connection.connect()
    self._in_listen = c.connection
    c.psubscribe(keys)
Copy the code

The stack is as follows:

_subscribe, redis.py:663
_register_LISTEN, redis.py:322
get, redis.py:375
drain_events, base.py:960
drain_events, connection.py:318
main, node.py:24
<module>, node.py:29
Copy the code

The corresponding variables are as follows, where client is a Redis-driven PubSub object:

c = {PubSub} <redis.client.PubSub object at 0x7fc252264400>

keys = {list: 1} ['/0.testMailbox.pidbox']

self = {Channel} <kombu.transport.redis.Channel object at 0x7fc252239908>
Copy the code

The logic is as follows:

                                                                                                                                                                +
user scope         +      Kombu                                                                                                                                 |          redis
                   |                                                                               psubscribe                                                   |
                   |                                                                                                                                            |           +----------------------------+
        +--------------------->   drain_events   +--------------------------------------------------------------------------------------------------------------------->    |  '/0.testMailbox.pidbox'| | | | +----------------------------+ | | | | | +-----------------------+ +-----------------------+ | | | | Transport |  | MultiChannelPoller | | | | | | | | | | | +--------------------------------------+ | cycle +-------> | _channels +----+ | | | | | | | +-----------------------+ | | +------+-----+ | | Connection: redis://localhost:6379// | | channels +--------+ v | | Connection|-----------> | | | | | +-----------------+---+ | +------------+ | | | | _avail_channels+---------+ | Channel | <------------+ | | | connection+-------> | | | | | | | | | | +-----------------------+ | | _active_queues +------------------------+ | | +--------------------------------------+ | |  | | | | | +----->+ cycle +------> +--------+--+ | | | +----------------------------+ | | | FairCycle | | | | | Exchange  | | | +-----------+ | | | +------------------------------+ +--> | | <-----+ | _queue_cycle+-----------+ | | +---------+  | | Mailbox | | | testMailbox.pidbox(fanout) | | | | | | | | mailbox|--------------> | | | +----------------------------+ | | handlers | v | | +---------+ | | | | | | + | round_robin_cycle| | | | exchange +---------------+ +---------------------------------+ | +-+--+----------------+ | | | | | | Exchange | | ^ ^ | | | | | reply_exchange +------------> | | | | | | | | | | | | reply.testMailbox.pidbox(direct)| | | | | | | | | reply_queue +-------------+ +-------------------+-------------+ | | | v | | | | | | ^ | | | +-----+-------------------------+ | | | | | | +--------+ | | | | |'BRPOP': Channel._brpop_read |   |    |
                   |       +-------------------------+----+   +--> | Queue  +----------+                 |     |  |    |                               |   |    |
                   |                                 ^             +--------+                            |     |  |    |  'LISTEN': Channel._receive   |   |    |
                   |                                 |                                                   |     |  |    |                               |   |    |  +----------------------------------------------------+
                   |       +---------------------+   |                                                   |     |  |    +-------------------------------+   |    |  |      _kombu.binding.testMailbox.pidbox             |
 +-----+           |       |                     |   |                                                   |     |  |                                        |    |  |                                                    |
 |node | +---------------->+ Node      channel+-------------------------------------------------------------------+                                        |    |  |                                                    |
 +-----+           |       |                     |   |                                                   |     |                                           |    |  |   "\x06\x16\x06\x16localhost.testMailbox.pidbox"| | | mailbox +-----+ | | | | | | | | | +----------------------------------------------------+ | | + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | ^ | | | | | | | | | | | | | +------------------------+ | +-----------------------------------------------------------------------------+ | | | +----------+ | | | | | Queue | | | | | | consumer | | | Consumer channel +-------+ | + | <-----+ | | +----------+ | | | | exchange | | | | | queues +---------------> | | | | | | | | | +------------------------+ +-----------+ | | callbacks |  | <localhost.testMailbox.pidbox -> Exchange testMailbox.pidbox(fanout)> | | | callback | | | + | | | | +------+----+ | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | ^ | | | | |  | | +--------------------------------------+ + |Copy the code

Mobile phones are as follows

3.2.2.2 consumption

As mentioned in the previous section, HANDLE_Event specifically reads Redis for consumption.

When the message is received, the following call is called:

def _deliver(self, message, queue) :
    try:
        callback = self._callbacks[queue]
    except KeyError:
        self._reject_inbound_message(message)
    else:
        callback(message)
Copy the code

The stack is as follows:

_deliver, base.py:975
_receive_one, redis.py:721
_receive, redis.py:692
on_readable, redis.py:358
handle_event, redis.py:362
get, redis.py:380
drain_events, base.py:960
drain_events, connection.py:318
main, node.py:24
<module>, node.py:29
Copy the code

Here’s the variable, _callback in basic_consume:

self._callbacks = {dict: 1} 
 'localhost.testMailbox.pidbox' = {function} <function Channel.basic_consume.<locals>._callback at 0x7fc2522c1840>
Copy the code

Call ahead, process the information

def receive(self, body, message) :
    """Method called when a message is received. This dispatches to the registered :attr:`callbacks`. Arguments: body (Any): The decoded message body. message (~kombu.Message): The message instance. Raises: NotImplementedError: If no consumer callbacks have been registered. """
    callbacks = self.callbacks
    [callback(body, message) for callback in callbacks]
Copy the code

The stack is as follows:

receive, messaging.py:583
_receive_callback, messaging.py:620
_callback, base.py:630
_deliver, base.py:980
_receive_one, redis.py:721
_receive, redis.py:692
on_readable, redis.py:358
handle_event, redis.py:362
get, redis.py:380
drain_events, base.py:960
drain_events, connection.py:318
main, node.py:24
<module>, node.py:29
Copy the code

The variables are as follows:

body = {dict: 5} {'method': 'print_msg'.'arguments': {'msg': 'Message for you'}, 'destination': None.'pattern': None.'matcher': None}

message = {Message} <Message object at 0x7fc2522e20d8 with details {'state': 'RECEIVED'.'content_type': 'application/json'.'delivery_tag': '7dd6ad01-4162-42c3-b8db-bb40dc7dfda0'.'body_length': 119.'properties': {}, 'delivery_info': {'exchange': 'testMailbox.pidbox'.'rout self = {Consumer} 
       
       
         -> bound to chan:1>]>
       
      Copy the code

Finally, the user method is called:

callback, node.py:15
<listcomp>, messaging.py:586
receive, messaging.py:586
_receive_callback, messaging.py:620
_callback, base.py:630
_deliver, base.py:980
_receive_one, redis.py:721
_receive, redis.py:692
on_readable, redis.py:358
handle_event, redis.py:362
get, redis.py:380
drain_events, base.py:960
drain_events, connection.py:318
main, node.py:24
<module>, node.py:29
Copy the code

So, the analysis of the Mailbox consumer side is complete.

0x04 Producer

Producer is sending emails, where the logic is much simpler.

The code is as follows:

def main(arguments) :
    connection = kombu.Connection('redis://localhost:6379')
    mailbox = pidbox.Mailbox("testMailbox".type="fanout")
    bound = mailbox(connection)
    bound._broadcast("print_msg", {'msg': 'Message for you'})
Copy the code

4.1 the Mailbox

Now that it’s in the Mailbox, you can see that _publish is called.

def _broadcast(self, command, arguments=None, destination=None,
               reply=False, timeout=1, limit=None,
               callback=None, channel=None, serializer=None,
               pattern=None, matcher=None) :

    arguments = arguments or {}
    reply_ticket = reply and uuid() or None
    chan = channel or self.connection.default_channel

    # Set reply limit to number of destinations (if specified)
    if limit is None and destination:
        limit = destination and len(destination) or None

    serializer = serializer or self.serializer
    self._publish(command, arguments, destination=destination,
                  reply_ticket=reply_ticket,
                  channel=chan,
                  timeout=timeout,
                  serializer=serializer,
                  pattern=pattern,
                  matcher=matcher)

    if reply_ticket:
        return self._collect(reply_ticket, limit=limit,
                             timeout=timeout,
                             callback=callback,
                             channel=chan)
Copy the code

The variables are as follows:

arguments = {dict: 1} {'msg': 'Message for you'}
self = {Mailbox} <kombu.pidbox.Mailbox object at 0x7fccf19514e0>
Copy the code

Continue calling _publish. If a reply is required, set it accordingly. Otherwise, producer is directly called to publish.

def _publish(self, type, arguments, destination=None,
             reply_ticket=None, channel=None, timeout=None,
             serializer=None, producer=None, pattern=None, matcher=None) :
    message = {'method': type.'arguments': arguments,
               'destination': destination,
               'pattern': pattern,
               'matcher': matcher}
    chan = channel or self.connection.default_channel
    exchange = self.exchange
    if reply_ticket:
        maybe_declare(self.reply_queue(channel))
        message.update(ticket=reply_ticket,
                       reply_to={'exchange': self.reply_exchange.name,
                                 'routing_key': self.oid})
    serializer = serializer or self.serializer
    with self.producer_or_acquire(producer, chan) as producer:
        producer.publish(
            message, exchange=exchange.name, declare=[exchange],
            headers={'clock': self.clock.forward(),
                     'expires': time() + timeout if timeout else 0},
            serializer=serializer, retry=True.)Copy the code

The variables are as follows:

exchange = {Exchange} Exchange testMailbox.pidbox(fanout)
message = {dict: 5} {'method': 'print_msg'.'arguments': {'msg': 'Message for you'}, 'destination': None.'pattern': None.'matcher': None}
Copy the code

4.2 producer

Then we have producer. The producer then does the action.

def _publish(self, body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, exchange, declare) :
    channel = self.channel
    message = channel.prepare_message(
        body, priority, content_type,
        content_encoding, headers, properties,
    )

    # handle autogenerated queue names for reply_to
    reply_to = properties.get('reply_to')
    if isinstance(reply_to, Queue):
        properties['reply_to'] = reply_to.name
    return channel.basic_publish(
        message,
        exchange=exchange, routing_key=routing_key,
        mandatory=mandatory, immediate=immediate,
    )
Copy the code

4.3 the Channel

Continuing to Channel is the time to process redis.

def basic_publish(self, message, exchange, routing_key, **kwargs) :
    """Publish message."""
    self._inplace_augment_message(message, exchange, routing_key)
    if exchange:
        return self.typeof(exchange).deliver(  # here
            message, exchange, routing_key, **kwargs
        )
    # anon exchange: routing_key is the destination queue
    return self._put(routing_key, message, **kwargs)
Copy the code

4.4 FanoutExchange

Send directly using Exchange.

class FanoutExchange(ExchangeType) :
    type = 'fanout'

    def lookup(self, table, exchange, routing_key, default) :
        return {queue for _, _, queue in table}

    def deliver(self, message, exchange, routing_key, **kwargs) :
        if self.channel.supports_fanout:
            self.channel._put_fanout(
                exchange, message, routing_key, **kwargs)
Copy the code

4.5 the Channel

The process enters a Channel, at which point the Redis driver is called to send.

def _put_fanout(self, exchange, message, routing_key, **kwargs) :
    """Deliver fanout message."""
    with self.conn_or_acquire() as client:
        client.publish(
            self._get_publish_topic(exchange, routing_key),
            dumps(message),
        )
Copy the code

4.6 redis drive

Finally, the Redis driver sends.

def publish(self, channel, message) :
    """ Publish ``message`` on ``channel``. Returns the number of subscribers the message was delivered to. """
    return self.execute_command('PUBLISH', channel, message)
Copy the code

The key variables are as follows:

channel = {str} '/0.testMailbox.pidbox'
message = {str} '{"body": "eyJtZXRob2QiOiAicHJpbnRfbXNnIiwgImFyZ3VtZW50cyI6IHsibXNnIjogIk1lc3NhZ2UgZm9yIHlvdSJ9LCAiZGVzdGluYXRpb24iOiBudWxsLCAicGF 0dGVybiI6IG51bGwsICJtYXRjaGVyIjogbnVsbH0=", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"cloc self = {Redis} Redis
      
       >>
      Copy the code

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

Kombu source code analysis a

Kombu source code analysis two

Kombu source code analysis three

Kombu source code analysis four

Kombu source code analysis five