0 x00 the

In this series we introduce the message queue Kombu. Kombu is positioned as an AMQP compliant message queue abstraction. Learn how Kombu starts and how to set up a basic shelf.

Since there was a previous review, you will find that some of the concept explanations will appear in both the subsequent article and the review.

0 x01 sample

The following code is used to illustrate.

This example from liqiang. IO /post/kombu-…

def main(arguments) :
    hub = Hub()
    exchange = Exchange('asynt_exchange')
    queue = Queue('asynt_queue', exchange, 'asynt_routing_key')

    def send_message(conn) :
        producer = Producer(conn)
        producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
        print('message sent')

    def on_message(message) :
        print('received: {0! r}'.format(message.body))
        message.ack()
        # hub.stop() # <-- exit after one message

    conn = Connection('redis://localhost:6379')
    conn.register_with_event_loop(hub)

    def p_message() :
        print(' kombu ')

    with Consumer(conn, [queue], on_message=on_message):
        send_message(conn)
        hub.timer.call_repeatedly(3, p_message)
        hub.run_forever()

if __name__ == '__main__':
    sys.exit(main(sys.argv[1:))Copy the code

0 x02 start

Let’s take a look at what Kombu does and get a sense of what’s going on inside Kombu.

This article focuses on how Connection, Channel, and Hub are linked together.

2.1 the Hub

At the beginning of the program, we set up the Hub.

The Hub is used to build the message Loop, but it is not yet built, so it is just a static instance.

hub = Hub()
Copy the code

Its definition is as follows:

class Hub:
    """Event loop object. Arguments: timer (kombu.asynchronous.Timer): Specify custom timer instance. """
    def __init__(self, timer=None) :
        self.timer = timer if timer is not None else Timer()

        self.readers = {}
        self.writers = {}
        self.on_tick = set()
        self.on_close = set()
        self._ready = set()

        self._running = False
        self._loop = None

        self.consolidate = set()
        self.consolidate_callback = None

        self.propagate_errors = ()
        self._create_poller()
Copy the code

Since no loop was created at this point, the important step is to create a Poll with the Stack as follows:

_get_poller, eventio.py:321
poll, eventio.py:328
_create_poller, hub.py:113
__init__, hub.py:96
main, testUb.py:22
<module>, testUb.py:55
Copy the code

In eventio.py, we can see that Kombu can use multiple models for kernel message processing:

def _get_poller() :
    ifdetect_environment() ! ='default':
        # greenlet
        return _select
    elif epoll:
        # Py2.6 + Linux
        return _epoll
    elif kqueue and 'netbsd' in sys.platform:
        return _kqueue
    elif xpoll:
        return _poll
    else:
        return _select
Copy the code

Because of the native case, _poll is selected here.

+------------------+
| Hub              |
|                  |
|                  |            +-------------+
|      poller +---------------> | _poll       |
|                  |            |             |         +-------+
|                  |            |    _poller+---------> |  poll |
+------------------+            |             |         +-------+
                                +-------------+
Copy the code

2.2 the Exchange and Queue

Second, Exchange and Queue are established.

  • Exchange: switch. The sender sends messages to the Exchange, which dispatches messages to the Queue.
  • Queue: A message Queue that stores messages to be consumed by applications. Exchanges distribute messages to queues and consumers receive messages from queues.

Since there are no concrete messages at this point, we can’t explore the Exchange mechanism.

exchange = Exchange('asynt')
queue = Queue('asynt', exchange, 'asynt')
Copy the code

The Exchange is now associated with the Queue. The illustration is as follows:

+------------------+
| Hub              |
|                  |
|                  |            +-------------+
|      poller +---------------> | _poll       |
|                  |            |             |         +-------+
|                  |            |    _poller+---------> |  poll |
+------------------+            |             |         +-------+
                                +-------------+


+----------------+         +-------------------+
| Exchange       |         | Queue             |
|                |         |                   |
|                |         |                   |
|     channel    | <------------+ exchange     |
|                |         |                   |
|                |         |                   |
+----------------+         +-------------------+
Copy the code

2.3 the Connection

The third step is to establish a Connection.

A Connection is an abstraction of an MQ Connection, and a Connection corresponds to an MQ Connection. Now it is time to abstract the ‘redis://localhost:6379’ connection.

conn = Connection('redis://localhost:6379')
Copy the code

2.3.1 definition

As you know from the definition annotation, a Connection is a Connection to the broker. As can be seen from the specific code, Connection is closer to a logical concept, and specific functions are delegated to others.

Messages are never sent directly to queues, and Producers may not even know that queues exist. How does the Producer send messages to the Consumer? This needs to be processed and delivered by Message Broker.

In AMQP, AMQP Server is responsible for Message Broker function. From this perspective, producers and consumers of AMQP are both AMQP clients.

In the Kombu architecture, transport abstracts all brokers to provide a consistent solution for different brokers. With Kombu, developers have the flexibility to choose or change brokers based on their needs.

The Connection main member variable is, but there is no assignment at this time:

  • _connection:
  • _transport: Is the abstraction of the broker mentioned above.
  • Cycle: A scheduling policy that interacts with the broker.
  • Failover_strategy: Selects the policy of other hosts when the connection fails.
  • Heartbeat: Used to implement the heartbeat.

The code is as follows:

class Connection:
    """A connection to the broker"""

    port = None
    virtual_host = '/'
    connect_timeout = 5

    _connection = None
    _default_channel = None
    _transport = None
    uri_prefix = None

    #: The cache of declared entities is per connection,
    #: in case the server loses data.
    declared_entities = None

    #: Iterator returning the next broker URL to try in the event
    #: of connection failure (initialized by :attr:`failover_strategy`).
    cycle = None

    #: Additional transport specific options,
    #: passed on to the transport instance.
    transport_options = None

    #: Strategy used to select new hosts when reconnecting after connection
    #: failure. One of "round-robin", "shuffle" or any custom iterator
    #: constantly yielding new URLs to try.
    failover_strategy = 'round-robin'

    #: Heartbeat value, currently only supported by the py-amqp transport.
    heartbeat = None

    resolve_aliases = resolve_aliases
    failover_strategies = failover_strategies

    hostname = userid = password = ssl = login_method = None
Copy the code

2.3.2 init and transport

The main task within Connection is to establish transport.

The Stack is as follows:

Transport, redis.py:1039
<module>, redis.py:1031
import_module, __init__.py:126
symbol_by_name, imports.py:56
resolve_transport, __init__.py:70
get_transport_cls, __init__.py:85
__init__, connection.py:183
main, testUb.py:40
<module>, testUb.py:55
Copy the code

2.4 Transport

In the Kombu architecture, transport abstracts all brokers to provide a consistent solution for different brokers. With Kombu, developers have the flexibility to choose or change brokers based on their needs.

Transport: a real MQ connection, an instance of a real connection to MQ(redis/ RabbitMQ). Is the entity that stores and sends messages, and is used to distinguish whether the underlying message queue is implemented using AMQP, Redis, or some other implementation.

Transport takes care of the details, but much of the work is handed over to loop and MultiChannelPoller.

Against 2.4.1 definition

Its main member variables are:

  • The driver type of this transport, name;
  • Corresponding Channel;
  • Cycle: MultiChannelPoller, as described below;

The definition is as follows:

class Transport(virtual.Transport) :
    """Redis Transport."""

    Channel = Channel

    polling_interval = None  # disable sleep between unsuccessful polls.
    default_port = DEFAULT_PORT
    driver_type = 'redis'
    driver_name = 'redis'

    implements = virtual.Transport.implements.extend(
        asynchronous=True,
        exchange_type=frozenset(['direct'.'topic'.'fanout']))def __init__(self, *args, **kwargs) :
        if redis is None:
            raise ImportError('Missing redis library (pip install redis)')
        super().__init__(*args, **kwargs)

        # Get redis-py exceptions.
        self.connection_errors, self.channel_errors = self._get_errors()
        # All channels share the same poller.
        self.cycle = MultiChannelPoller()
Copy the code

2.4.2 Handover Operation

Transport takes care of the details, but much of the work is handed over to loop and MultiChannelPoller, as you can see from the code below.

def register_with_event_loop(self, connection, loop) :
    cycle = self.cycle
    cycle.on_poll_init(loop.poller)
    cycle_poll_start = cycle.on_poll_start
    add_reader = loop.add_reader
    on_readable = self.on_readable

    def _on_disconnect(connection) :
        if connection._sock:
            loop.remove(connection._sock)
    cycle._on_connection_disconnect = _on_disconnect

    def on_poll_start() :
        cycle_poll_start()
        [add_reader(fd, on_readable, fd) for fd in cycle.fds]
        
    loop.on_tick.add(on_poll_start)
    loop.call_repeatedly(10, cycle.maybe_restore_messages)
    
    health_check_interval = connection.client.transport_options.get(
        'health_check_interval',
        DEFAULT_HEALTH_CHECK_INTERVAL
    )
    
    loop.call_repeatedly(
        health_check_interval,
        cycle.maybe_check_subclient_health
    )
Copy the code

The key is the MultiChannelPoller. A Connection has a Transport, a Transport has a MultiChannelPoller, poll operations are done by the MultiChannelPoller, redis operations are done by the channel.

2.4.3 MultiChannelPoller

The execution engine provides the following functions:

  • Collect channel;
  • Establish fd to channel mapping;
  • Create channel to SOCKS mapping;
  • Using poll;
class MultiChannelPoller:
    """Async I/O poller for Redis transport."""

    eventflags = READ | ERR

    def __init__(self) :
        # active channels
        self._channels = set(a)# file descriptor -> channel map.
        self._fd_to_chan = {}
        # channel -> socket map
        self._chan_to_sock = {}
        # poll implementation (epoll/kqueue/select)
        self.poller = poll()
        # one-shot callbacks called after reading from socket.
        self.after_read = set(a)Copy the code

2.4.4 access

Transport is pre-generated, if required, by name.

TRANSPORT_ALIASES = {
    'amqp': 'kombu.transport.pyamqp:Transport'.'amqps': 'kombu.transport.pyamqp:SSLTransport'.'pyamqp': 'kombu.transport.pyamqp:Transport'.'librabbitmq': 'kombu.transport.librabbitmq:Transport'.'memory': 'kombu.transport.memory:Transport'.'redis': 'kombu.transport.redis:Transport'.'pyro': 'kombu.transport.pyro:Transport'
}

_transport_cache = {}


def resolve_transport(transport=None) :
    """Get transport by name. """
    if isinstance(transport, str) :try:
            transport = TRANSPORT_ALIASES[transport]
        except KeyError:
            if '. ' not in transport and ':' not in transport:
                from kombu.utils.text import fmatch_best
                alt = fmatch_best(transport, TRANSPORT_ALIASES)
        else:
            if callable(transport):
                transport = transport()
        return symbol_by_name(transport)
    return transport

def get_transport_cls(transport=None) :
    """Get transport class by name. """
    if transport not in _transport_cache:
        _transport_cache[transport] = resolve_transport(transport)
    return _transport_cache[transport]
Copy the code

At this time, the Connection data is as follows, note that some of its member variables are not meaningful:

conn = {Connection} <Connection: redis://localhost:6379// at 0x7faa910cbd68>
 alt = {list: 0} []
 connect_timeout = {int} 5
 connection = {Transport} <kombu.transport.redis.Transport object at 0x7faa91277710>
 cycle = {NoneType} None
 declared_entities = {set: 0} set()
 default_channel = {Channel} <kombu.transport.redis.Channel object at 0x7faa912700b8>
 failover_strategies = {dict: 2} {'round-robin': <class 'itertools.cycle'>,'shuffle': <function shufflecycle at 0x7faa9109a0d0>}
 failover_strategy = {type} <class 'itertools.cycle'>
 heartbeat = {int} 0
 host = {str} 'localhost:6379' hostname = {str} 'localhost' manager = {Management} 
      
        port = {int} 6379 recoverable_channel_errors = {tuple: 0} () resolve_aliases = {dict: 2} {'
      pyamqp':'amqp', 'librabbitmq':'amqp'} transport = {Transport} 
      
        transport_cls = {str} '
      redis' uri_prefix = {NoneType} None userid = {NoneType} None virtual_host = {str} '/'
Copy the code

At this point, the Kombu is basically established, but there is no logical connection between them.

Therefore, the example is as follows. Note that there is no connection between the three:

+-------------------+ +---------------------+ +--------------------+ | Connection | | redis.Transport | | MultiChannelPoller | | | | | | | | | | | | _channels | | | | cycle +------------> | _fd_to_chan | | transport +---------> | | | _chan_to_sock | | | | | | poller | +-------------------+ +---------------------+ | after_read | | | +--------------------+ +------------------+ | Hub | | | | | +-------------+ | poller +---------------> | _poll | | | | |  +-------+ | | | _poller+---------> | poll | +------------------+ | | +-------+ +-------------+ +----------------+ +-------------------+ | Exchange | | Queue | | | | | | | | | | channel | <------------+ exchange | | | | | | | | | +----------------+ +-------------------+Copy the code

0x03 Connection Register hub

As mentioned earlier, the basic shelf is set up, but the modules are not connected to each other. Let’s look at how to do that.

Sample code goes to:

conn.register_with_event_loop(hub)
Copy the code

This is a registration that associates the hub with a Connection. It then calls:

def register_with_event_loop(self, loop) :
    self.transport.register_with_event_loop(self.connection, loop)
Copy the code

And then call to transport class: < kombu. Transport. Redis. Transport object at 0 x7fd23e962dd8 >

The specific code is as follows:

def register_with_event_loop(self, connection, loop) :
    cycle = self.cycle
    cycle.on_poll_init(loop.poller)Loop is a hub
    cycle_poll_start = cycle.on_poll_start
    add_reader = loop.add_reader
    on_readable = self.on_readable

    def _on_disconnect(connection) :
        if connection._sock:
            loop.remove(connection._sock)
    cycle._on_connection_disconnect = _on_disconnect

    def on_poll_start() :
        cycle_poll_start()
        [add_reader(fd, on_readable, fd) for fd in cycle.fds]
        
    loop.on_tick.add(on_poll_start)
    loop.call_repeatedly(10, cycle.maybe_restore_messages)
    
    health_check_interval = connection.client.transport_options.get(
        'health_check_interval',
        DEFAULT_HEALTH_CHECK_INTERVAL
    )
    
    loop.call_repeatedly(
        health_check_interval,
        cycle.maybe_check_subclient_health
    )
Copy the code

3.1 set up the Channel

The registration starts by setting up channels. There’s an action to connect, and that’s where the Channel is set up.

@property
def connection(self) :
    """The underlying connection object"""
    if not self._closed:
        if not self.connected:
            return self._ensure_connection(
                max_retries=1, reraise_as_library_errors=False
            )
        return self._connection
Copy the code

The setup is done in base.py, which is the Transport base class. The Stack is as follows:

create_channel, base.py:920
establish_connection, base.py:938
_establish_connection, connection.py:801
_connection_factory, connection.py:866
retry_over_time, functional.py:325
_ensure_connection, connection.py:439
connection, connection.py:859
register_with_event_loop, connection.py:266
main, testUb.py:41
<module>, testUb.py:55
Copy the code

3.2 the Channel

Channel: Similar to the concept in AMQP, can be understood as multiple lightweight connections sharing a Connection. It’s a real connection.

Think of it as an encapsulation of redis operations and connections. Each Channel can set up a connection with Redis on which redis can be operated. Each connection has a socket, and each socket has a file from which it can poll.

In order to better illustrate, we give the communication process as follows in advance:

+----------------------------------------------------------------------------------------------------------------------- ----------------+ | +--------------+6                       parse_response         |
            |                                +--> | Linux Kernel | +---+                                                                            |
            |                                |    +--------------+     |                                                                            |
            |                                |                         |                                                                            |
            |                                |                         |  event                                                                     |
            |                                |  1                      |                                                                            |
            |                                |                         |  2                                                                         |
            |                                |                         |                                                                            |
    +-------+---+    socket                  +                         |                                                                            |
    |   redis   | <------------> port +-->  fd +--->+                  v                                                                            |
    |           |                                   |           +------+--------+                                                                   |
    |           |    socket                         |           |  Hub          |                                                                   |
    |           | <------------> port +-->  fd +--->----------> |               |                                                                   |
    | port=6379 |                                   |           |               |                                                                   |
    |           |    socket                         |           |     readers +----->  Transport.on_readable                                        |
    |           | <------------> port +-->  fd +--->+           |               |                     +                                             |
    +-----------+                                               +---------------+                     |                                             |
                                                                                                      |                                             |
                                                        3| | +----------------------------------------------------------------------------------------+ | | v | _receive_callback  |5    +-------------+                      +-----------+
+------------+------+                     +-------------------------+                                    'BRPOP' = Channel._brpop_read +-----> | Channel     | +------------------> | Consumer  |
|       Transport   |                     |  MultiChannelPoller     |      +------>  channel . handlers  'LISTEN' = Channel._receive           +-------------+                      +---+-------+
|                   |                     |                         |      |                                                                                           8                |
|                   | on_readable(fileno) |                         |      |                                                                         ^                                  |
|           cycle +---------------------> |          _fd_to_chan +---------------->  channel . handlers  'BRPOP' = Channel._brpop_read               |                                  |
|                   |        4            |                         |      |                             'LISTEN' = Channel._receive                 |                                  |
|  _callbacks[queue]|                     |                         |      |                                                                         |                            on_m  |  9
|          +        |                     +-------------------------+      +------>  channel . handlers  'BRPOP' = Channel._brpop_read               |                                  |
+-------------------+                                                                                    'LISTEN' = Channel._receive                 |                                  |
           |                                                                                                                                         |                                  v
           |                                                7_callback | +----------------------------------------------------------------------------------------------------------------------- ------------------+ User FunctionCopy the code

Here’s what’s on the phone:

3.2.1 definition

The main members of Channel are:

  • Async_pool: redis asynchronous connection pool.
  • Pool: redis connection pool.
  • Channel_id: indicates the Channel ID.
  • Client: StrictRedis driver;
  • Connection: corresponding Transport;
  • cycle = {FairCycle} <FairCycle: 0/0 []>
  • Queue_order_strategy: The strategy for obtaining the queue;
  • State: BrokerState;
  • Subclient: the client used by PubSub; Keyprefix_queue = ‘{p}_kombu.binding.%s’. Format (p=KEY_PREFIX) : key used by bing;

For example, _get_client is the client.

def _get_client(self) :
    if redis.VERSION < (3.2.0) :raise VersionMismatch(
            'Redis transport requires redis-py versions 3.2.0 or later. '
            'You have {0.__version__}'.format(redis))
    return redis.StrictRedis
Copy the code

The simplified version is defined as follows:

class Channel(virtual.Channel) :
    """Redis Channel."""

    QoS = QoS

    _client = None
    _subclient = None
    keyprefix_queue = '{p}_kombu.binding.%s'.format(p=KEY_PREFIX)
    keyprefix_fanout = '/{db}.'
    sep = '\x06\x16'
    _fanout_queues = {}
    unacked_key = '{p}unacked'.format(p=KEY_PREFIX)
    unacked_index_key = '{p}unacked_index'.format(p=KEY_PREFIX)
    unacked_mutex_key = '{p}unacked_mutex'.format(p=KEY_PREFIX)
    unacked_mutex_expire = 300  # 5 minutes
    unacked_restore_limit = None
    visibility_timeout = 3600   # 1 hour
    max_connections = 10
    queue_order_strategy = 'round_robin'

    _async_pool = None
    _pool = None

    from_transport_options = (
        virtual.Channel.from_transport_options +
        ('sep'.'ack_emulation'.'unacked_key'.'max_connections'.'health_check_interval'.'retry_on_timeout'.'priority_steps')  # <-- do not add comma here!
    )

    connection_class = redis.Connection if redis else None
Copy the code

3.2.2 the base class

The base class is defined as follows:

class Channel(AbstractChannel, base.StdChannel) :
    """Virtual channel. Arguments: connection (ConnectionT): The transport instance this channel is part of. """

    #: message class used.
    Message = Message

    #: QoS class used.
    QoS = QoS

    #: flag to restore unacked messages when channel
    #: goes out of scope.
    do_restore = True

    #: mapping of exchange types and corresponding classes.
    exchange_types = dict(STANDARD_EXCHANGE_TYPES)

    #: flag set if the channel supports fanout exchanges.
    supports_fanout = False

    #: Binary <-> ASCII codecs.
    codecs = {'base64': Base64()}

    #: Default body encoding.
    # :NOTE: ``transport_options['body_encoding']`` will override this value.
    body_encoding = 'base64'

    #: counter used to generate delivery tags for this channel.
    _delivery_tags = count(1)

    #: Optional queue where messages with no route is delivered.
    #: Set by ``transport_options['deadletter_queue']``.
    deadletter_queue = None

    # List of options to transfer from :attr:`transport_options`.
    from_transport_options = ('body_encoding'.'deadletter_queue')

    # Priority defaults
    default_priority = 0
    min_priority = 0
    max_priority = 9
Copy the code

The final concrete examples are as follows:

self = {Channel} <kombu.transport.redis.Channel object at 0x7fe61aa88cc0>
 Client = {type} <class 'redis.client.Redis'>
 Message = {type} <class 'kombu.transport.virtual.base.Message'>
 QoS = {type} <class 'kombu.transport.redis.QoS'>
 active_fanout_queues = {set: 0} set()
 active_queues = {set: 0} set()
 async_pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
 auto_delete_queues = {set: 0} set()
 channel_id = {int} 1
 client = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
 codecs = {dict: 1} {'base64': <kombu.transport.virtual.base.Base64 object at 0x7fe61a987668>}
 connection = {Transport} <kombu.transport.redis.Transport object at 0x7fe61aa399b0>
 connection_class = {type} <class 'redis.connection.Connection'>
 cycle = {FairCycle} <FairCycle: 0/0 []>
 deadletter_queue = {NoneType} None
 exchange_types = {dict: 3} {'direct': <kombu.transport.virtual.exchange.DirectExchange object at 0x7fe61aa53588>, 'topic': <kombu.transport.virtual.exchange.TopicExchange object at 0x7fe61aa53550>, 
 handlers = {dict: 2} {'BRPOP': <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7fe61aa88cc0> >,'LISTEN': <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7fe61aa88cc0>>}
 pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
 qos = {QoS} <kombu.transport.redis.QoS object at 0x7fe61aa88e48>
 queue_order_strategy = {str} 'round_robin'
 state = {BrokerState} <kombu.transport.virtual.base.BrokerState object at 0x7fe61a987748>
 subclient = {PubSub} <redis.client.PubSub object at 0x7fe61aa39cc0>
Copy the code

3.2.3 Redis message callback function

Here’s what you need to say about the above member variables

 handlers = {dict: 2} 
  {
    'BRPOP': <bound method Channel._brpop_read of <kombu.transport.redis.Channel object at 0x7fe61aa88cc0> >,'LISTEN': <bound method Channel._receive of <kombu.transport.redis.Channel object at 0x7fe61aa88cc0>>
  }
Copy the code

This is the redis callback when there is a message, that is:

  • When BPROP has a message, call channel. _brpop_read;
  • LISTEN calls channel. _receive when there is a message.

3.2.4 Key members directly related to Redis

The members that are directly related to Redis are defined in Redis /client.py.

The main members directly related to Redis are the following, which will use the following variables to perform specific Redis operations:

  • Async_pool: redis asynchronous connection pool.
  • Pool: redis connection pool.
  • Client: StrictRedis driver;
  • Subclient: the client used by PubSub;

The corresponding types are as follows:

channel = {Channel} <kombu.transport.redis.Channel object at 0x7fabeea23b00>
 Client = {type} <class 'redis.client.Redis'>
 async_pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
 client = {Redis} Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>
 connection = {Transport} <kombu.transport.redis.Transport object at 0x7fabeea23940>
 connection_class = {type} <class 'redis.connection.Connection'>
 connection_class_ssl = {type} <class 'redis.connection.SSLConnection'>
 pool = {ConnectionPool} ConnectionPool<Connection<host=localhost,port=6379,db=0>>
 subclient = {PubSub} <redis.client.PubSub object at 0x7fabeea97198>
Copy the code

The specific code is as follows:

def _create_client(self, asynchronous=False) :
    if asynchronous:
        return self.Client(connection_pool=self.async_pool)
    return self.Client(connection_pool=self.pool)

def _get_pool(self, asynchronous=False) :
    params = self._connparams(asynchronous=asynchronous)
    self.keyprefix_fanout = self.keyprefix_fanout.format(db=params['db'])
    return redis.ConnectionPool(**params)

def _get_client(self) :
    if redis.VERSION < (3.2.0) :raise VersionMismatch(
            'Redis transport requires redis-py versions 3.2.0 or later. '
            'You have {0.__version__}'.format(redis))
    return redis.StrictRedis

@property
def pool(self) :
    if self._pool is None:
        self._pool = self._get_pool()
    return self._pool

@property
def async_pool(self) :
    if self._async_pool is None:
        self._async_pool = self._get_pool(asynchronous=True)
    return self._async_pool

@cached_property
def client(self) :
    """Client used to publish messages, BRPOP etc."""
    return self._create_client(asynchronous=True)

@cached_property
def subclient(self) :
    """Pub/Sub connection used to consume fanout queues."""
    client = self._create_client(asynchronous=True)
    return client.pubsub()
Copy the code

Because a Channel has been added, it looks like this:

+-----------------+ | Channel | | | +-----------------------------------------------------------+ | client +---------> |  Redis<ConnectionPool<Connection<host=localhost,port=6379> |
|                 |      +-----------------------------------------------------------+
|                 |
|                 |      +---------------------------------------------------+-+
|    pool  +---------->  |ConnectionPool<Connection<host=localhost,port=6379> | | | +---------------------------------------------------+-+ | | | | | | | connection | | | +-----------------+ +-------------------+ +---------------------+ +--------------------+ | Connection | | redis.Transport | | MultiChannelPoller | | | | | | | | | | | | _channels | | | | cycle +------------> | _fd_to_chan | | transport +---------> | | | _chan_to_sock | | | | | | poller | +-------------------+ +---------------------+ | after_read | | | +------------------+ +--------------------+ | Hub | | | | | +-------------+ | poller +---------------> | _poll | | | | |  +-------+ | | | _poller+---------> | poll | +------------------+ | | +-------+ +-------------+ +----------------+ +-------------------+ | Exchange | | Queue | | | | | | | | | | channel | <------------+ exchange | | | | | | | | | +----------------+ +-------------------+Copy the code

3.3 Connection between channel and Connection

At this point, we all understand the basic principle, but we need to analyze the specific connection between the two.

3.3.1 Obtaining channel from Connection

In the definition of Connection, a channel is obtained by transport:

def channel(self) :
    """Create and return a new channel."""
    self._debug('create channel')
    chan = self.transport.create_channel(self.connection)
    return chan
Copy the code

3.3.2 Creating Transport

In Transport there are:

def create_channel(self, connection) :
    try:
        return self._avail_channels.pop()
    except IndexError:
        channel = self.Channel(connection)
        self.channels.append(channel)
        return channel
Copy the code

There are two channels listed in Transport:

self._avail_channels
self.channels
Copy the code

If _avail_Channels has content, fetch it directly, otherwise a new Channel is generated.

To actually connect, establish_connection is called in self._avail_channels.

def establish_connection(self) :
    # creates channel to verify connection.
    # this channel is then used as the next requested channel.
    # (returned by ``create_channel``).
    self._avail_channels.append(self.create_channel(self))
    return self     # for drain events
Copy the code

The stack looks like this:

__init__, redis.py:557
create_channel, base.py:921
establish_connection, base.py:939
_establish_connection, connection.py:801
_connection_factory, connection.py:866
retry_over_time, functional.py:313
_ensure_connection, connection.py:439
connection, connection.py:859
channel, connection.py:283
<module>, node.py:11
Copy the code

3.3.3 Establishing contacts

In init there are:

def __init__(self, *args, **kwargs) :
    super().__init__(*args, **kwargs)

    if not self.ack_emulation:  # disable visibility timeout
        self.QoS = virtual.QoS

    self._queue_cycle = cycle_by_name(self.queue_order_strategy)()
    self.Client = self._get_client()
    self.ResponseError = self._get_response_error()
    self.active_fanout_queues = set()
    self.auto_delete_queues = set()
    self._fanout_to_queue = {}
    self.handlers = {'BRPOP': self._brpop_read, 'LISTEN': self._receive}
 
    ......

    self.connection.cycle.add(self)  # add to channel poller.

    if register_after_fork is not None:
        register_after_fork(self, _after_fork_cleanup_channel)
Copy the code

The point is:

self.connection.cycle.add(self)  # add to channel poller.
Copy the code

This is to link channels with poller in Transport so that Transport can use channels to interact with real Redis.

The stack is as follows:

add, redis.py:277
__init__, redis.py:531
create_channel, base.py:920
establish_connection, base.py:938
_establish_connection, connection.py:801
_connection_factory, connection.py:866
retry_over_time, functional.py:325
_ensure_connection, connection.py:439
connection, connection.py:859
register_with_event_loop, connection.py:266
main, testUb.py:41
Copy the code

Because it has been linked, it is as follows:

+-----------------+ | Channel | | | +-----------------------------------------------------------+ | client +---------> |  Redis<ConnectionPool<Connection<host=localhost,port=6379> |
|                 |      +-----------------------------------------------------------+
|                 |
|                 |      +---------------------------------------------------+-+
|    pool  +---------->  |ConnectionPool<Connection<host=localhost,port=6379> | | | +---------------------------------------------------+-+ | | | | <------------------------------------------------------------+ | | | | connection +---------------+ | | | | | +-----------------+ | | v | +-------------------+ +---+-----------------+ +--------------------+ | | Connection | | redis.Transport | | MultiChannelPoller | | | | | | | | | | | | | | _channels +--------+ | | | cycle +------------> | _fd_to_chan | | transport +---------> | | | _chan_to_sock | | | | | | poller | +-------------------+ +---------------------+ | after_read | | | +------------------+ +--------------------+ | Hub | | | | | +-------------+ |  poller +---------------> | _poll | | | | | +-------+ | | | _poller+---------> | poll | +------------------+ | | +-------+ +-------------+ +----------------+ +-------------------+ | Exchange | | Queue | | | | | | | | | | channel | <------------+ exchange | | | | | | | | | +----------------+ +-------------------+Copy the code

3.3 Transport Communicates with the Hub

On_poll_init here is the kombu. Transport. Redis. Associated with the transport Hub.

Associate Transport with the poll of the Hub using self.poller = poller. This allows Transport to take advantage of polling.

def on_poll_init(self, poller) :
    self.poller = poller
    for channel in self._channels:
        return channel.qos.restore_visible(
            num=channel.unacked_restore_limit,
        )
Copy the code

The variables are as follows:

poller = {_poll} <kombu.utils.eventio._poll object at 0x7fb9bcd87240>
self = {MultiChannelPoller} <kombu.transport.redis.MultiChannelPoller object at 0x7fb9bcdd6a90>
 after_read = {set: 0} set()
 eventflags = {int} 25
 fds = {dict: 0} {}
 poller = {_poll} <kombu.utils.eventio._poll object at 0x7fb9bcd87240>
Copy the code

Therefore, we ended up as follows:

+-----------------+ | Channel | | | +-----------------------------------------------------------+ | client +---------> |  Redis<ConnectionPool<Connection<host=localhost,port=6379> |
|                 |      +-----------------------------------------------------------+
|                 |
|                 |      +---------------------------------------------------+-+
|    pool  +---------->  |ConnectionPool<Connection<host=localhost,port=6379> | | | +---------------------------------------------------+-+ | | | | <------------------------------------------------------------+ | | | | connection +---------------+ | | | | | +-----------------+ | | v | +-------------------+ +---+-----------------+ +--------------------+ | | Connection | | redis.Transport | | MultiChannelPoller | | | | | | | | | | | | | | _channels +--------+ | | | cycle +------------> | _fd_to_chan | | transport +---------> | | | _chan_to_sock | | | | | +<----+ poller | +-------------------+ +---------------------+ | | after_read | | | | +------------------+ +--------------+ +--------------------+ | Hub | | | | v | | +-------+-----+ | poller +---------------> | _poll | | | | | +-------+ | | | _poller+---------> | poll | +------------------+ | | +-------+ +-------------+ +----------------+ +-------------------+ | Exchange | | Queue | | | |  | | | | | | channel | <------------+ exchange | | | | | | | | | +----------------+ +-------------------+Copy the code

0 x04 summary

Specific figure, you can see that the above three basic modules have been linked together.

As you can see,

  • At present, it takes Transport as the center and associates the real Redis represented by Channel with poll in Hub. However, it is not known how to use it.
  • The user uses Connection as the API entry, and Connection gets Transport.

Now that the basic architecture is in place, let’s start with a look at how Consumer works.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

Registry and EntryPoint for kombu excellent open source projects

(2) Give up pika and choose Kombu

Kombu Message Framework < 2 >

Concepts in AMQP

Basic concepts of AMQP

In-depth understanding of AMQP protocol

Kombu and message queue summary

Understanding epoll Version Server (Python implementation)

Celery source code interpretation