Tasks that are asynchronous in previous business processes are handled in the manner of celery+redis. It just so happens that one of the big names in a public account recently asked about the use of RabbitMQ and wanted to introduce it to my latest business. Many people are probably familiar with RabbitMQ.
The main purpose of this article is to integrate RabbitMQ into our FastAPI later!
1 message queue, message middleware concepts
First of all, those of you who might be using message queuing (MQ) as a backend probably know what it is used for. What are the main scenarios? But it may also be a very new concept for beginners. I’m just going to explain what I know about message queues from my own perspective.
Message queue is actually a protocol method used to deal with asynchronous communication. It can realize the communication between processes or between different threads of the same process. It can be understood as a way to transmit messages upstream and downstream
Message + queue: message + queue
The message here is usually an abstract encapsulation of the task message that we need to process, and these messages are usually pushed to a queue with processing and queued to be taken out for consumption by others.
What are the benefits of using message queues and so on?
- Asynchronous processing, faster response, and web throughput.
- The application and business are decoupled, and the dispatched message can provide consumers with different mechanisms to achieve, because the message transmission is not directly call the relationship, are dependent on middleware to deal with, the system is not strong invasion, low coupling degree.
- Flow control, effective peak cutting valley filling, avoid flow spike caused by high system load.
- Message communication, which can be a bit of point-to-point communication, or chat – time communication. (Py microservice framework Nameko relies on RabbitMQ)
What message-oriented middleware?
Message-oriented systems (message-oriented middleware) are the basic software for sending and receiving messages in distributed systems. Messaging middleware can also be called message queue, which refers to the use of efficient and reliable messaging mechanism to carry out platform-independent data exchange, and based on data communication to carry out distributed system integration. Communication of processes can be extended in a distributed environment by providing a messaging and message queue model.
Message queues are sometimes referred to as message-oriented middleware
If there is one difference between message-oriented middleware and message queues:
-
A message queue is a description of how data is processed.
-
Message-oriented middleware usually refers to a component that is dependent on the entire process of data processing
-
Message-oriented middleware: provides an environment for receiving and storing data and distributing it to consumers.
The flow diagram of messages in a message queue
3 Major industry architecture message-oriented middleware
There are several popular message middleware in the current industry:
- ActiveMQ (Loveless)
- RabbitMQ (the main character of this issue)
- RocketMQ (Have a chance to practice)
- Kafka is a big data killer.
- ZeroMQ is possible in py, but it is only queued and does not persist messages.
- Redis is not exactly messaging middleware
4: RabbitMQ main character learning
The main thing is that the client supports PY! First, before learning about message queues, we should take a look at the communication protocol of AMQP. As mentioned earlier, this protocol is primarily an asynchronous communication protocol.
4.1 closer agreement
AMQP (Advanced Message Queuing Protocol) is an application-layer standard that provides unified messaging services. It is an open application-layer Protocol designed for message-oriented middleware.
The client and message-oriented middleware based on this protocol can transmit messages, regardless of different products of client/middleware and different development languages. Implementations of Erlang include RabbitMQ, etc.
Conceptual model diagram of AMQP protocol:
The diagram above comes from the network ~ if have tort! Contact delete!
As can be seen from the model diagram above:
Our publisher — publish messages — enter the virtual host in the AMQP, query the Exchange that needs to be connected, and then the corresponding binding amount of Exchange is our queue, and then our message can only exist in our queue, and then wait for consumers to take our message from the channel!
From this we can see that our AMQP message routing usually consists of three parts:
- Queue: Carrier of messages
- Switch: Definition of distribution policy
- Binding operations: Define matching rules and act as middlemen between queues and switches.
Several other important roles involved in AMQP:
If you’ve looked at queues in threads and processes before, you’ll often use a producer and consumer example to illustrate the use of queues. And our AMQP is also inseparable from these concepts:
-
Message: A message is the body of a transmission and usually consists of two parts:
- Payload: A specific piece of data to be transmitted. It can be anything, such as a JSON string, binary, or custom data protocol.
- Label; Payloads are described and used by Rabbit to determine who will get the message delivered.
-
Publisher (producer) : Used for message creation and label setting (tag wrapping at publish time)
The message is encapsulated and sent to RabbitMQ Server, where the AMQP in MQ represents the message according to the label (a switch name and optional topic tag), and after a route match, Rabbit sends the message according to the label to the subscriber consumer.
-
Consumer: Connect to our messaging middleware, subscribe to a queue, and if there are messages in the queue, the RabbitMQ server will send them to the consumer, who will then consume them.
Let’s say here with our celery:
Our RabbitMQ will play a Borker role in this regard, and will act as a conduit for consumption.
As can be seen from the figure above, the whole business process is as follows:
-
The application dispatches the task message of the task, submitting the task to our Borker (producer)
-
The storage of task messages undertaken by a Borker message broker and then waiting for someone else to fetch the task for consumption (message-oriented middleware)
-
The consumer gets the Task message from Borker for consumption (consumer)
-
After the consumer performs the task consumption processing, the consumer will automatically (or manually triggered) notify Borker of the task processing status and mark the result of the task by default
-
When our Task message (ACK) is confirmed, Borker does not immediately remove the message from the queue unless it has received an acknowledgement from the child consumer.
The next step is to start working with RabbitMQ middleware. For the advantages and disadvantages of middleware I will not expand here, compared to my current contact is not a lot, do not want to overturn ~ haha
A few more vocabulary concepts for RabbitMQ:
- Product【 producer 】
For the production of messages, which are posted to middleware, the command is basic.publish
- Broker meaning in Chinese
The application for receiving and distributing messages, the server of RabbitMQ, is an application!
- 3. Consumer
For message consumption processing, the command corresponding to AMQP protocol is basic. Consume or basic.get
- Connection【 Connection 】
A TCP connection for our middleware on a client connection
- 4. Channel
To reduce the overhead of TCP connections between clients and our RabbitMQ, it is further encapsulated within RabbitMQ based on Connectio.
- Virtual Host [a type of database name similar to that of a database on RabbitMQ]
Virtual Host is one of the basic components of AMQP similar to the database name in the data
- Exchange【 Exchange 】
Before entering the queue, the message must pass through the switch. The switch matches the routing key in the query table according to the message distribution rule and distributes the message to the specified matching queue.
- 7. Queue
Message storage carrier, messages can only be stored in queues waiting for consumers to consume.
1: Queue storage space is limited only by server memory and disk, which is essentially a large message buffer. 2: Multiple producers can send messages to the same queue, 3: multiple consumers can receive messages from the same queue.Copy the code
- Binding exchange and queue
Binding information is stored in the query table in Exchange and is used for matching messages to distribute messages to different queues.
4.2 Record RabbitMQ learning from Zero
Purely personal notes, if there is a slip of the pen ~ please big guy correct! Thank you so much! Some of the materials are from the network, with notes later
4.2.1 Docket quick RabbitMQ setup:
Step 1: Load the docket and specify the associated port
docker run -d --name rabbitmq --publish 5671:5671 --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 rabbitmq
Copy the code
Step 2: Enter the configuration of user manager information related to container configuration (optional)
Docker exec -it rabbitmq bash # Rabbitmq-plugins enable rabbitmq_managementCopy the code
Step 3: Add and delete users:
Rabbitmqctl add_user user name password # Add user abbitmqctl set_user_tags user name administrator # specify user permission tag guest: Rabbitmqctl delete_user guest # Delete default administratorCopy the code
4.2.2 Environment Construction in Linux
Consent7 Install rabbitMQ and Erlang locally.
PS: The downloaded installation package is in the format of. RPM ~
Update basic information system:
yum -y update
Copy the code
4.2.2.1 Linux Basic Environment Preparations
The following installation steps are perfect for RabbitMQ beginners – Java tutorial!
- If you have previously installed the Erlang language, you can delete it to avoid versioning issues
[root@localhost ~]# yum remove erlang
Loaded plugins: fastestmirror
No Match for argument: erlang
No Packages marked for removal
[root@localhost ~]#
Copy the code
- Install the dependent C++ compilation environment
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC unixODBC-devel httpd python-simplejson
Copy the code
- Download the installation packages for Erlang and rabbitMQ
# # download Erlang wget http://www.erlang.org/download/otp_src_20.1.tar.gz download rabbitMQ wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.0/rabbitmq-server-generic-unix-3.7.0.tar.xzCopy the code
4.2.2.2 install Erlang
Step 1: Unpack the Erlang installation package
Tar - XVF otp_src_20. 1. Tar. GzCopy the code
Step 2: Go to the unzip folder
CD otp_src_20. 1Copy the code
Step 3: Specify the installation directory and installation configuration (JDK must be installed and configured first)
/configure --prefix=/usr/local/ Erlang --enable-smp-support --enable-threads --enable-sctp --enable-kernel-poll --enable-hipe --with-ssl --without-javacCopy the code
Step 4: Compile and install
make && make install
Copy the code
Step 5: Configure the Erlang environment variables
Vi/etc/profile will export PATH = $PATH: / usr/local/Erlang/bin is added to the end of the fileCopy the code
Step 6: Reload the profile file
source /etc/profile
Copy the code
4.2.2.2 install the RabbitMQ
Step 1: Extract the RabbitMQ installation package
PS: The downloaded installation package is an Xz file. Decompress xz to a tar file twice
Start by unpacking Xz into tar
Xz - d the rabbitmq server - generic - Unix - 3.7.0. Tar. XzCopy the code
Then decompress the tar file
The tar XVF - the rabbitmq server - generic - Unix - 3.7.0. TarCopy the code
Step 2: Go to the sbin directory of the decompressed RabbitMQ
CD rabbitmq_server - 3.7.0 / sbinCopy the code
Step 3 Start rabbitmq_server:
CD rabbitmq_server - 3.7.0 / sbinCopy the code
Step 4 Check the process:
Ps aux | grep rabbit # ps a shows the current terminal under all programs, including other users' application. #ps u to display the status of the program in a user-oriented format. #ps X displays all applications, regardless of terminal.Copy the code
Step 5 Start the management interface – Start the RabbitMQ management plug-in (go to the sbin directory) :
./rabbitmq-plugins enable rabbitmq_management
Copy the code
Step 6 Aliyun words, pay attention to the release port number:
Yum install net-tools yum install net-tools
View and permit ports
netstat -tlnp
firewall-cmd --add-port=15672/tcp --permanent
firewall-cmd --add-port=5672/tcp --permanent
Copy the code
Or turn off the firewall:
CentOS7 # Disable systemctl stop firewalld # Disable systemctl disable firewalld # Check the status systemctl status firewalldCopy the code
4.2.3 RabbitMQ setup in WINDOS environment:
Download RabbitMQ from RabbitMQ:
The latest version of rabbitMQ is 3.8.17
Download the Erlang
Download address:
https://www.rabbitmq.com/download.html https://www.rabbitmq.com/install-windows.html https://www.erlang.org/downloads https://www.rabbitmq.com/which-erlang.html to view the corresponding versionCopy the code
4.2.3.1 install Erlang
Step 1: Click exe to install
Step 2: Configure environment variables (note that the name of our computer cannot be Chinese)
Create a new ERLANG_HOME variable inside the environment variable. The value is the directory where we installed Erlang!
Add our configured environment variable to PATH:
Add variable to new:
%ERLANG_HOME%\bin
Copy the code
Verify the installation result:
4.2.3.2 installation rabbinmq
Start from the command line, and then install the plug-in for the admin UI
Install plug-in:
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.17\sbin>rabbitmq-plugins.bat enable rabbitmq_management
Enabling plugins on node rabbit@DESKTOP-16CKEN1:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@DESKTOP-16CKEN1...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
started 3 plugins
Copy the code
Then restart the service:
Can also be direct:
Open the sbin directory and double-click rabbitmq-server.batCopy the code
The reverse is to disable:
To enable the web UI: RabbitMQ serverrabbitmq_server-3.8.17 \sbin, type rabbitmq-plugins enable rabbitmq_management to disable the web UI: RabbitMQ Server\rabbitmq_server-3.8.17\sbin, enter rabbitmq-plugins disable mochiwebCopy the code
Start the UI of the management interface:
Access address :http://localhost:15672/ Default account: guest Guest
PS: You can’t use this username for the default production environment! Instant use you also want to change password !!!! Otherwise easy to be scolded drop yo!
4.2.3.3 RabbinMQ Management
User management includes adding users, deleting users, viewing the user list, and changing user passwords.
1. User information:
2: User permission label type:
If you want to set multiple roles, separate the management interface with, and separate the commands with Spaces.
-
You can log in to the management console, view all the information, and perform related operations on users and policies. You can monitor everything you can, manage users, vhosts, and permissions, and close connections to other users. And manage the policies and parameters of all VHosts
-
Monitoring A user who can access the management plug-in and view all connections and channels as well as node-related information. The user mainly has the permission to monitor and view node information, such as the number of processes, memory usage, disk usage, queue consumption, etc.
-
Policymaker can log into the administrative console and manage the policies and parameters of vhosts that they have access to, but are not allowed to view node information.
-
Management Common administrators can only log in to the management console (with the Management Plugin enabled). They cannot see node information or manage policies.
-
Impersonator does not currently know the specific permissions
-
None cannot log on to the administrative console, and is usually just plain old producers and consumers.
3: Cognition of virtual hosts:
Similar to database ids in REIDS, each vhost is essentially a mini RabbitMQ server with its own connection, exchange, queue, binding and privileges.
Authorization management for vhosts
Types of patterns for topics:
5 PiKA and Rabbitmq Quick Meals
Description of premises:
The following sample code is based on the latest version 1.2 of PIKA currently in use!
The following sample code is based on the latest version 1.2 of PIKA currently in use!
The following sample code is based on the latest version 1.2 of PIKA currently in use!
The client difference between different versions is quite big! The old version of the previous many functions and pass parameters are almost no new version!
5.1 Simple python client practices the six message types
5.1.2 Client Installation
We can learn from the examples of message types provided on the official website:
Install the client first:
python -m pip install pika --upgrade
Copy the code
5.1.3 Preliminary understanding of several key methods
First, we need to understand the parameters of the key functions to create MQ:
Step 1: Create login credentials when you connect
- pika.PlainCredentials(username, password, erase_on_connect)
Parameter: -username MQ login account -password MQ login password -Whether to delete credentials on the connection. The default value is False
Step 2: Connect to MQ
- pika.ConnectionParameters(host, port, virtual_host, credentials)
Parameter: -host IP – port port – virtual_host, connected to the same meaning of the database Vhost name. The default value is’/’ -credentials
Step 3: Block connection to MQ
- pika.BlockingConnection(parameters)
-parameters: connection parameters (including host/port/virtual host/account/password and other credential information)
Step 4: Create a channel
- pika.channel(channel_number)
Parameter: channel_number: indicates the number of channels. The default value is “None”
Step 5: Declare the queue
- channel.queue_declare(callback,queue,passive,durable,exclusive,auto_delete,nowait,arguments)
Arguments: -callback: callback method when queue. DeclareOk; Must be None when nowait=True.
- Queue = ": Queue name - Passive =False: Only check whether the queue exists - Durable =False: The queue remains durable when RabbitMQ restarts. - EXCLUSIVE =False: Allow only current connections to access -auto_delete =False: automatically delete the Queue when consumers cancel or disconnect -nowait =False: no need to wait while queue. DeclareOk -arguments =None: Customize key/value pairs for the queueCopy the code
Step 6: Declare the switch
- channel.exchange_declare(callback,exchange,exchange_type,passive,durable,auto_delete,internal,nowait,arguments)
Parameters:
-callback =None: this method is called when Exchange.DeclareOk, and the value must be None when nowait=True. The value is a non-empty switch name, consisting of letters, digits, hyphens (-), underscores (_), and periods (.). - Exchange_type = 'direct' : Switch type - Passive =False: Execute a declaration or check whether it exists. The switch will remain persistent when RabbitMQ restarts, that is, it will not be lost. -auto_delete =False: The switch will be automatically deleted when there is no queue bound to it. -internal =False: Can only be published by other exchanges - nowait=False: DeclareOk response-arguments =None: specifies a custom key/value pair for the Exchange. The default value is nullCopy the code
Step 7: Bind queues to switches using routing keys
- channel.queue_bind(callback, queue, exchange,routing_key,nowait,arguments)
Parameter: callback: callback function when queue. BindOk, which must be None when nowait=True Queue: name of the Queue to bind to exchange: source exchange to bind to Routing_key =None: Bind routing key nowait=False: Queue.BindOk response arguments=None: Custom key/value pair for this bindCopy the code
Step 8: Publish the message to the RabbitMQ switch
- channel.basic_publish(exchange, routing_key, body, properties, mandatory, immediate)
Parameters: exchange: the target exchange to publish routing_key: the routing key bound to the exchange Body: the message body to carry Properties =None: the message attributes, such as text/binary etc. Mandatory =False: Return RabbitMQ returns the message to the producer. When mandatory is set to false, RabbitMQ returns the message to the producer by calling basic. Return. In this case, the message is discarded immediate=False: indicates the immediacy flagCopy the code
Step 9: Get the message from the queue and start consuming
- channel.basic_consume(consumer_callback, queue, no_ack, exclusive, consumer_tag, arguments)
Arguments: Consumer_callback: This callback function is called when a consumer is to be consumed. Arguments include channel, method, properties,body queue= ' ': Exclusive =False: no other consumers are allowed to consume this queue consumer_tag=None: Specifies your own consumption tag arguments=None: Set a custom key-value pair for the consumerCopy the code
Step 10: Message confirmation
- channel.basic_ack()
Parameter: delivery_tag=0: server assigned delivery identifier Multiple =False: Batch confirmation reply, usually FalseCopy the code
Step 11: Cancel the consumption. This method does not affect the message already sent, but no new message is sent to the consumer
- channel.basic_cancel(callback, consumer_tag, nowait)
Callback =None: Callback function when basic. CancelOk responds; None when nowait=True. Callable when nowait=False Consumer_tag = ": consumption flag nowait=False: Basic.CancelOk Response is not expectedCopy the code
Step 12: Handle the I/O events and basic_consume callbacks until all consumers are canceled for consumer startup
- channel.start_consuming()
Callback =None: Callback function when basic. CancelOk responds; None when nowait=True. Callable when nowait=False Consumer_tag = ": consumption flag nowait=False: Basic.CancelOk Response is not expectedCopy the code
Step 13: Reject a single message
- channel.basic_reject(delivery_tag, requeue=True)
Parameters: delivery_tag: Deliver tag Requeue =True: Return to queue Example: channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)Copy the code
Step 14: Reject one or more messages
- channel.basic_nack(delivery_tag=None, multiple=False, requeue=True)
Parameter: delivery_tag=None: Tag to be delivered Multiple =False: Whether to batch, i.e. multiple messages requeue=True: Example: channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True) print(' Put current message back into queue ')Copy the code
Step 15: Delete the declared switch
- channel.exchange_delete(callback=None,exchange=None,if_unused=False,nowait=False)
Parameters: callback=None: deletes the callback, nowait=True, which must be Node exchange=None: indicates the name of the switch if_unused=False: indicates whether to delete only the unavailable switches. Nowait =False: Indicates whether to delete only the unavailable switches. Whether to perform the deletion asynchronouslyCopy the code
Step 16: Send the properties of the message
- pika.BasicProperties()
Parameters: Content_type =None, content_encoding=None, headers=None, delivery_mode=None Priority =None, correlation_id=None, reply_to=None, expiration=None, message_id=None, timestamp=None, type=None, user_id=None, app_id=None, cluster_id=NoneCopy the code
Setting form:
msg_props = pika.BasicProperties()
msg_props.content_type = 'text/plain'
Copy the code
Step 17: Callback function
- callback
Method: contains consumer_tag, Delivery_tag, exchange, redelivered, routing_key Properties: Basic_publish a parameter passed in through properties, containing some additional attributes of the information body: basic_publish sends the messageCopy the code
Step 18: Prefetch number of messages
- channel.basic_qos(prefetch_size=0, prefetch_count=0, global_qos=Fals)
Parameter: prefetchSize:0 Specifies the size limit for a single message. Zero means no limit, usually no limit. PrefetchCount: Set a fixed value that tells rabbitMQ not to push more than N messages to a consumer at the same time. If N messages have not been ack, the consumer will block until it ack. Global: Whether truefalse applies the above Settings to channels, i.e., whether the above restrictions apply to channels or consumer levels.Copy the code
5.1.4 Type 1: While sending and while receiving, producer-consumer model
Message model:
Connection mode AMQP mode using URLParameters:
def __init__(self, conn_str='amqp://user:pwd@host:port/%2F'):
self.exchange_type = "direct"
self.connection_string = conn_str
self.connection = pika.BlockingConnection(pika.URLParameters(self.connection_string))
Copy the code
In end Send. Py
Import pika # create user login credentials PlainCredentials("guest","guest") # Create connection = (pika pika. BlockingConnection. ConnectionParameters (host = 'localhost', credentials = credentials)) channel = # create channel object Connection.channel () # Before sending, we need to make sure that the recipient queue exists. If we send a message to a location that doesn't exist, RabbitMQ will delete it. Let's create a hello message to the queue: # # and throws an exception information pika. Exceptions. ChannelClosedByBroker: (404, "NOT_FOUND - no queue 'hello' in vhost '/'") # create queue (declare queue) name -- this defaults to an empty default switch identified by an empty string Channel. queue_DECLARE (queue='hello',passive=True,exclusive=True) # queue: queue name # passive: Whether to be passive If this is True, the boot consumer will throw an exception if the queue does not exist. The default value is false, and the queue will not be declared if it does not exist. If this is turned off, all queue messages will be lost if RABBIT is restarted. # EXCLUSIVE specifies whether the queue is exclusive and whether the queue is in exclusive mode. Exclusive mode means that the current queue is restricted to the current link, and no other queue can be used if the connection is disconnected. Whether to automatically delete messages in the queue, trued-- delete messages once the link breaks # arguments Other arguments # start publishing messages on an empty string, Then match routing_key=hello to specify exactly which queue to send import time for I in range(1,1000): Time.sleep (1) channel.publish (exchange='', routing_key='hello',body=' ') {}'.format(I).encode('utf-8') print('utf-8') print('utf-8' connection.close()Copy the code
After running the above example, we look in the background and see the new message waiting to be consumed
The exception problems you may encounter are:
Question 1:
pika.exceptions.ChannelClosedByBroker: (404, "NOT_FOUND - no queue 'hello' in vhost '/'") Normally, we need to start the consumer first to create our queue. If the producer starts the queue first, it should be ok, but this time it is abnormal!Copy the code
Question 2:
pika.exceptions.ChannelClosedByBroker: (405, "RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'hello' in vhost '/' This is related to our set queue property where passive is false and TrueCopy the code
A few notes on status values:
Then launch our consumer receive.py:
#! /usr/bin/env python import pika, sys, os def main(): Create user login credentials, The credentials = pika.PlainCredentials("guest", "Guest") # create connections connection = pika. BlockingConnection (pika. ConnectionParameters (host = 'localhost', The credentials=credentials)) # create channel object channel = connection.channel() # create queue (declare queue) name -- this defaults to using an empty default switch identified by an empty string channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print('ch', Ch) print('properties',properties) print(" [x] Received %r" % body.decode(' utF-8 ') Channel. Basic_consume (queue='hello', on_message_callback=callback, auto_ack=True) Print (' [*] Waiting for messages. To exit press CTRL+C') The message's callback function channel.start_Reply () if __name__ == '__main__': try: main() except KeyboardInterrupt: print('Interrupted') try: sys.exit(0) except SystemExit: os._exit(0)Copy the code
Once started, you can start consuming related messages!
The above example on the consumer side is automatic ACK confirmation, but if there is an exception in our consumer side’s processing process, this automatic ACK mechanism will have problems, so we can change to manual ACK mode, the modified code is as follows:
Def callback(ch, method, properties, body): print('ch', Ch) print('properties',properties) print(" [x] Received %r" % body.decode(' utF-8 ') Ch. Basic_ack (delivery_tag=method.delivery_tag) # Subscribe to the sending of messages from one of our queues # auto_ACK whether to automatically acknowledge the receiving of messages = 'hello' channel. Basic_consume (queue, on_message_callback = callback, auto_ack = False)...Copy the code
At this point, we can observe the specific consumption situation:
Main points of concern:
PS:RabbitMQ does not allow you to redefine existing queues with different parameters
Note the following parameters when creating a queue by default:
1: Consumers whether open automatic confirmation, this also depends on your business, set up auto_ack is true, as long as consumers to receive messages, automatically as a confirmation, if, is false, the need to send confirmation received, usually if you need to undertake processing business processing exception handling, suggestion is manually reply to confirm, Because when our business logic processes, receiving the message does not mean that the business logic processes successfully, and our RabitMQ end will delete the corresponding message from the queue after receiving the confirmation. (Correct previous rollover)
2: There is another way. If you receive a message but do not want to consume it, what should you do? I don’t know about this yet, but I’ll check it out later
3: The examples above do not deal with topics such as connection management, error handling, connection recovery, concurrency, and metric collections, so be careful where you use them
Also to consider are:
- How reliable is the message occurrence? How can I confirm that the message has occurred on my proxy server and the abnormal rollback of the message has occurred
- Batch ACKS on the consumer side
Can be extended from here to understand: www.rabbitmq.com/confirms.ht…
5.1.5 Type 2: Work mode
That is to say, in this mode of message, our message can be multiple subscriptions of consumers, for consumption! However, in this mode, multiple consumers can consume messages in two ways:
- One is the evenly distributed mode (evenly distributed), in which the consumer gets the message and must wait for none of the consumers to complete the assigned task before sending the next message: for example: It may take 10 seconds to allocate A task to A and only 5 seconds to allocate A task to B. If B completes message consumption before A, it will not consume new messages immediately. After the 10-second message processing of A is completed, A will be assigned A new 10-second task and then B
- One is the do-it-all model (specifying the number of pre-fetched messages consumed by each consumer), in which the consumer’s consumption pattern is based on the ability of the competent to take on more tasks to consume. No need to wait!
Consumption model diagram:
Examples of evenly distributed patterns:
#! /usr/bin/env python import pika import sys PlainCredentials = pika.PlainCredentials("guest", "Guest") # create connections connection = pika. BlockingConnection (pika. ConnectionParameters (host = 'localhost', Channel = connection.channel() channel = connection.channel() channel = task_queue Channel. queue_DECLARE (Queue ='task_queue', durable=True) # Send messages message = ". Join (sys.argv[1:]) or "Hello World!" # start Posting messages to our proxy server, notice that there is no confirmation of successful occurrence of the message!! Channel.basic_publish (# exchange='', # routing_key='task_queue', # body=message, Properties =pika.BasicProperties(delivery_mode=2, # make message persistent) print(" [x] Sent %r" % message)Copy the code
Then define our other multiple consumers, which can be launched to observe the specific consumption situation:
#! /usr/bin/env python import pika import time The credentials = pika.PlainCredentials("guest", "Guest") # create connections connection = pika. BlockingConnection (pika. ConnectionParameters (host = 'localhost', Channel = connection.channel() channel = connection.channel() channel = task_queue # queue cannot be changed from persistent to normal queue, and vice versa! Otherwise an error will be reported! So the start of queue type creation must be determined! Channel. queue_DECLARE (Queue ='task_queue', durable=True) print(' [*] Average allocation mode processing. To exit press CTRL+C') # def callback(ch, method, properties, body): Print (" [x] Received %r" % body.decode()) print(" [x] Received %r" % body.decode()) print(" [x] Received %r" % body.decode() Basic_ack (delivery_tag=method.delivery_tag) # channel. Basic_qos (prefetch_count=1) # channel Consume (queue='task_queue', on_message_callback=callback) # channel. Basic_consume (queue='task_queue', on_message_callback=callback) The message's callback function channel.start_Consuming () is executedCopy the code
Another consumer example, the main difference is
Time. Sleep (2)Copy the code
At this point, to observe the effect, we first start our two consumers, and then start the producer:
Consumers get messages in even and odd mode, and their processing power is different, but they still need to wait! So in order to optimize this model, all the best! You can add
Channel. Basic_qos (prefetch_count=1) channel. Basic_qos (queue='task_queue') On_message_callback =callback) # the consumer will block here, waiting for the message, and once the message is in the queue, will execute the message's callback function channel.start_reply ()Copy the code
At this point, we can observe our consumption: to facilitate observation, we can delete the queue first:
The specific situation after modification:
R1 sets the consuming task of consumption message to require 3 seconds, R2 to require 1 second, then the consumption situation is as follows:
R1:
R2:
5.1.6 Type 3: Publish/subscribe – send information to many consumers simultaneously
In the previous two examples, we used a switch with exchange as an empty string by default and did not specify the type of the switch.
The default path for messages is to pass through the exchange with an empty string to the queue we created
A complete messaging model would look like this:
-
Producers can only send messages to the switch
-
The switch receives and processes messages from producers and, on the other hand, pushes messages to queues.
About the switch: The exchange must know exactly how to process the messages it receives. Should it be attached to a particular queue? Should it be attached to multiple queues? Or should it be discarded?
For our scheduled broadcast mode, which means similar broadcast mode, a message of mine can be published to all subscribers at the same time, and all subscribers of this channel can receive the opportunity to distribute this message.
But in this broadcast mode, queue names are randomly generated, and only the consumer is started! Will generate the corresponding queue name, and as the consumer is disconnected, the queue will also be deleted!
Broadcast mode model (only broadcast messages, messages are not stored, no future!) :
There are several broadcast modes under this model:
There are several types of switches:
-
Direct Indicates the direct switch type, which can also be understood as the multicast mode
-
Topic Topic switches can also be understood as rule matching mode broadcasts
-
The switch whose headers header key passes the matching type
-
Fanout pure broadcast mode.
Examples of fanout pure broadcast mode:
Broadcast message end send.py:
Import pika # create user login credentials PlainCredentials("guest","guest") # Create connection = (pika pika. BlockingConnection. ConnectionParameters (host = 'localhost', credentials = credentials)) channel = # create channel Connection.channel () # Queue channel.exchange_DECLARE (exchange="ceshilog", Exchange type="fanout") import time for I in range(1,1000): Time.sleep (1) channel.publish (exchange="ceshilog", routing_key="", {}'.format(i).encode('utf-8'), # properties= pika.basicProperties (delivery_mode=2) print(" Message completed ") connection.close()Copy the code
The above example, after our start, has been broadcast data out, who can subscribe to who can accept!
Message subscriber R1.py:
Import pika # establish a connection to RabbitMQ # create user login credentials PlainCredentials("guest","guest") # Create connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',credentials=credentials)) channel = Connection.channel () channel.exchange_declare(exchange="ceshilog", exchange_type="fanout") Rabbit randomly assigns a name # exclusive=True that will terminate the queue after the consumer disconnects. Result = channel.queue_declare(queue="", exclusive=True) Queue_name = result.method.queue print(" queue_name = result.method. ", queue_name) channel.queue_bind(exchange="ceshilog", Queue = queue_name) # queue bound to the repeater def callback (ch, method, the properties, the body) : Print (" receive broadcast message as: %r"%body.decode("utf8")) # channel. Basic_consume (queue_name,callback,True) # channel. Receiving message channel.start_Consuming ()Copy the code
The subscriber above can start more than one.
At this point, we start three observations: corresponding queues generated by three consumers receiving the broadcast:
The broadcast messages on all three backend terminals are consistent:
Looking at our channel, there are four, one is send and three are receive:
Link status:
Channel condition:
Switch situation:
At this point, if a consumer is disconnected, our queue will also be deleted, leaving only two ~ :
In this mode of messaging, there is a relationship between our switch and the queue called binding! Because it needs to tell our switch exactly which queue to bind to and which queue to send messages to for processing! But note that its binding does not depend on routing_key, meaning that routing_key can be equal to = ‘ ‘.
PS: You can list all bindings using the rabbitmqctl list_bindings command
You can even view current binding information:
Before starting consumers:
After starting consumers:
C:\Program Files\RabbitMQ Server\ rabbitMQ_server-3.8.17 \sbin>rabbitmqctl list_Bindings Listing Bindings for vhost /... source_name source_kind destination_name destination_kind routing_key arguments exchange amq.gen-gnZvx4FayXOzjI2dNs8oJw queue amq.gen-gnZvx4FayXOzjI2dNs8oJw [] exchange amq.gen-VjNQR5GnPRZxMxjwBYobCw queue amq.gen-VjNQR5GnPRZxMxjwBYobCw [] exchange task_queue queue task_queue [] exchange amq.gen-KM14-0FDVBey833APzCqyQ queue amq.gen-KM14-0FDVBey833APzCqyQ [] logs exchange amq.gen-KM14-0FDVBey833APzCqyQ queue amq.gen-KM14-0FDVBey833APzCqyQ [] logs exchange amq.gen-VjNQR5GnPRZxMxjwBYobCw queue amq.gen-VjNQR5GnPRZxMxjwBYobCw [] ! [image.png](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/430b1a405573471eb12c90a620f7d1e4~tplv-k3u1fbpfcp-watermark Image) C: \ Program Files \ the RabbitMQ Server \ rabbitmq_server - 3.8.17 \ sbin >Copy the code
5.1.7 Direct Multicast routing mode
Direct Multicast – Routing mode, – Mode for selectively receiving messages based on switch rouing_key
In this mode, the switch is located as a direct switch type, but this type of switch can also have the characteristics of fan-shaped switches, even if multiple bindings, that is, multiple matching routing_keys, You can bind multiple routing_keys to the same queue!
In multicast mode, our corresponding consumers will only match their own rule messages.
For example, sender send.py:
Import pika import sys PlainCredentials("guest","guest") # Create connection = (pika pika. BlockingConnection. ConnectionParameters (host = 'localhost', credentials = credentials)) channel = # create channel Connection.channel () # Queue channel.exchange_DECLARE (exchange="direct_ceshilogs", # Declare the name of the broadcast switch # specify the name of the multicast similar switch exchange_type="direct") # The importance level, which is defined by default as info-- accept the command line parameter value information, # Multiple values separated by Spaces: Info error severity = sys.argv[1] if len(sys.argv)>1 else 'info' import time for I in range(1, 1000): Time.sleep (1) channel.publish (exchange="direct_ceshilogs", routing_key=severity, There must be body='fanout 'pure broadcast mode - Hello Xiao Zhong classmates! {}'.format(I).encode('utf-8'), # properties=pika.BasicProperties(delivery_mode=2)) print(" Message ") connection. The close ()Copy the code
R1.py:
#! /usr/bin/evn python # -*- coding: utf-8 -*- #! /usr/bin/env python # -*- coding:utf-8 -*- # Author: Vita import pika import sys PlainCredentials("guest","guest") # Create connection = (pika pika. BlockingConnection. ConnectionParameters (host = 'localhost', credentials = credentials)) channel = # create channel Connection.channel () Channel.exchange_declare (exchange="direct_ceshilogs", exchange_type="direct") Rabbit randomly assigns a name # exclusive=True that will terminate the queue after the consumer disconnects. Result = channel.queue_declare(queue="", Queue_name = result.method.queue print(" queue name: ", queue_name) severities = sys.argv[1:] Sys.stderr. Write ("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) # loop list for severity in severities: print(severity) channel.queue_bind(exchange='direct_ceshilogs', queue=queue_name, routing_key=severity) def callback(ch,method,properties,body): Print (" Received a broadcast message as: routingKey: Basic_consume (queue_name,callback,True) # consume Receiving message channel.start_Consuming ()Copy the code
We need to receive our routing_key rule from the command line, so we can use the command line to start it, or we can just write dead code:
Start a consumer that only listens to routing_key= info:
(fastapi_xxx) D:\code\python\local_python\fastapi_xx_msg\rabbitmqtest\test4>python r1.py info amq.gen-Ruv_66fZoKBvDqgYyN52SQ infoCopy the code
Start another colleague listening for routing_key= info and error consumers:
(fastapi_xxx) D:\code\python\local_python\fastapi_xx_msg\rabbitmqtest\test4>python r1.py info error Name of random queue: amq.gen-5a7YDQXA4AMk-D6giXjGNw info errorCopy the code
The queue name corresponding to me after startup is shown in the figure below:
When the producer is started, the default sender sends only info messages, and observe our output:
Both consumers process info messages at the same time, similar to our pure broadcast mode!
Stop our sender at this point and send only error messages instead:
Only consumers with both info and error bound receive messages about error!
In this case, like our pure broadcast mode, if a client exits, the corresponding queue will also disappear ~
5.1.8 Topic Regular broadcast, fuzzy matching routing mode
— TOPIC rulesasting, fuzzy matching routing pattern, – message pattern in the form of the switch rouing_key wildcard
This mode is mostly the same as the previous multicast mode, but the rouing_key has been changed. Where the routing_key becomes a “. Delimited string, “. Split the string into several words, each representing a condition;
The following is a diagram of the regular broadcast model:
For example:
Routing_key: routing_key; routing_key;
Error has a separate definition of a queue, so the queue only receives messages of this type
Another queue: I receive both info erroe and warning messages
Complete example, production message end send.py
Import pika import sys PlainCredentials = pika.PlainCredentials("guest", "Guest") # create connections connection = pika. BlockingConnection (pika. ConnectionParameters (host = 'localhost', The credentials= the credentials) channel = connection.channel() And specify type channel.exchange_DECLARE (exchange='topic_ceshilogs', exchange_type='topic') # If not, enter the rule to start the data that needs to be sent, Info routing_key = sys.argv[1] if len(sys.argv) > 2 else 'xiaozhong.info.*' import time for I in range(1, 1000). Channel.basic_publish (exchange="topic_ceshilogs", Routing_key =routing_key, # empty, must have body='fanout ' {}'.format(I).encode('utf-8'), # properties=pika.BasicProperties(delivery_mode=2)) print(" Message ") connection. The close ()Copy the code
Then corresponding to our consumer side:
Import pika import sys PlainCredentials("guest","guest") # Create connection = (pika pika. BlockingConnection. ConnectionParameters (host = 'localhost', credentials = credentials)) channel = # create channel Channel.exchange_declare (exchange='topic_ceshilogs', Exchange type='topic') # create queue result = channel.queue_declare(" ", Queue print(" random queue name :",queue_name Argv [1:] # if not binding_keys: sys.stderr. Write ("Usage: %s [binding_key]... \n" % sys.argv[0]) sys.exit(1) # start loop binding for binding_key in binding_keys: Queue_bind (exchange='topic_ceshilogs', queue=queue_name, Routing_key =binding_key) print(' [*] . To exit press CTRL+C') def callback(ch, method, properties, body): Print (" regular broadcast received message: Basic_consume (queue=queue_name, on_message_callback=callback, Auto_ack =True) # Start message definition, loop listening channel.start_sink ()Copy the code
- Start our consumer side first:
python r.py "#"
Copy the code
The output is:
(fastapi_5g_msg) D:\code\python\local_python\fastapi_5g_msg\rabbitmqtest\test5>python Amq. Gen -cuvwaAk8jMFYY2eppAP1TQ binding rules are: ['#'] [*] Regular broadcast mode start! . To exit press CTRL+CCopy the code
- Start our consumer end # 2:
(fastapi_5g_msg) D:\code\python\local_python\fastapi_5g_msg\rabbitmqtest\test5>python r.py "xiaozhong.info.*" Amq. Gen - Ehx5OGWTBNY1Z-7RJUCCCW binding rules are: ['xiaozhong.info.*'] [*] Paste matching - regular broadcast mode start! . To exit press CTRL+CCopy the code
Personally feel this fuzzy matching way ~ may use relatively less scene! Follow-up have the opportunity to further study! Both of the above consumers can consume our production information at the same time!
First, the first one above receives a #, so all messages sent to this switch can be received!
The first message received is xiaozhong.info.*, all this similarity just matches, can also receive!
If you define the binding rule as: [‘xiaozhong.#’] then you can also receive xiaozhong.xxx. XXX other!
5.1.9 RPC Call message mode
RPC calls message schema
We’ll come back to this after we’ve sorted out some of the stuff about PYTHyon using RPC for ourselves! Put it down here for the moment!
5.1.10 Sending JSON Data
If you want to send characteristic data types, you can define your own content_type:
channel.queue_declare(queue=qname, auto_delete=False, durable=True)
prop = pika.BasicProperties(
content_type='application/json',
content_encoding='utf-8',
headers={'key': 'value'},
delivery_mode = 1,
)
channel.basic_publish(
exchange='',
routing_key=qname,
properties=prop,
body='{message: hello}'
)
Copy the code
5.1.11 Heartbeat Detection
Heartbeat detection for RabbitMQ is typically used to detect the survival of communication between proxy servers, similar to our usual soket heartbeat packets. The main principle is also to detect the corresponding socket connection on the data is normal, if there has been no data in a fixed period of time, then we need to send a heartbeat packet to check a ah, if the heartbeat packet sent within a period of time no reply, then it is judged as heartbeat timeout, In this case, the system determines that the peer end crashes abnormally. The TCP connection is eventually closed.
About setting the heartbeat packet detection interval:
-
The rabbitmq.config server configuration is modified
-
Configure heartbeat parameters on the client
PS: If heartbeat is 0, the heartbeat detection is not enabled
class ConnectionParameters(Parameters):
"""Connection parameters object that is passed into the connection adapter
upon construction.
"""
# Protect against accidental assignment of an invalid attribute
__slots__ = ()
class _DEFAULT(object):
"""Designates default parameter value; internal use"""
def __init__( # pylint: disable=R0913,R0914
self,
host=_DEFAULT,
port=_DEFAULT,
virtual_host=_DEFAULT,
credentials=_DEFAULT,
channel_max=_DEFAULT,
frame_max=_DEFAULT,
heartbeat=_DEFAULT,
ssl_options=_DEFAULT,
connection_attempts=_DEFAULT,
retry_delay=_DEFAULT,
socket_timeout=_DEFAULT,
stack_timeout=_DEFAULT,
locale=_DEFAULT,
blocked_connection_timeout=_DEFAULT,
client_properties=_DEFAULT,
tcp_options=_DEFAULT,
**kwargs):
"""Create a new ConnectionParameters instance. See `Parameters` for
default values.
Copy the code
Specific configuration:
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=hosts,
credentials=credential,
heartbeat=0,
))
Copy the code
5.1.12 Other Operations on queue Switches
import pika credentials = pika.PlainCredentials('test', 'test') connection = pika. BlockingConnection (pika. ConnectionParameters (host = '127.0.0.1', Credentials =credentials)) channel = connection.channel() channel.exchange_delete("ex1") # delete the switch Channel.queue_delete ("aaa") #Copy the code
5.1.13 About manual ACK mechanism
0. X version # no_ack = no_manual_ACK = auto_ACK; No manual answer, Channel. Basic_consume (Consumer_callback =ConsumerCallback, queue=MQQueueNode2Center, # 1. X channel. Basic_consume (queue=MQQueueNode2Center, on_message_callback=ConsumerCallback, auto_ack=False)Copy the code
5.1.14 Time To Live(TTL) Of a Message
A TTL is a message’s lifetime, or maximum lifetime, usually in milliseconds
RabbitMQ can set the TTL for messages or queues:
-
Message Settings: The expiration time of a specific message can be specified when the message is sent. The expiration time of each message can be different.
-
About queues: RabbitMQ supports queue expiration, counting from the time a message is queued to the time the queue expires, when the message becomes dead letter and is automatically cleared (if no dead letter queue is configured).
-
For mixed doubles: If both methods are used together, the expiration time is the smaller of the two.
-
If TT is not set: if TT is not set, messages will not expire. If the TTL is set to 0, the message is immediately discarded unless it can be delivered directly to the consumer at this point.
When and how to configure the TTL:
Define message TTL for the queue using the policy Configure the Settings using the command line:
rabbitmqctl rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues
rabbitmqctl (Windows)
rabbitmqctl set_policy TTL ".*" "{""message-ttl"":60000}" --apply-to queues
Copy the code
The above Settings will apply a 60-second TTL configuration to all queues!
It can also be set by using interface requests:
curl -i -u guest:guest -H "content-type:application/json" -XPUT
-d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 60000}}'
http://localhost:15672/api/queues/{vhost}/{queuename}
Copy the code
Python code: The x-message-TTL parameter in queue_DECLARE controls the duration of messages published to the queue before being discarded. If the duration of a message in the queue exceeds the configured TTL value, the x-message-TTL parameter in queue_DECLARE controls the duration of messages published to the queue before being discarded. We say the message is “dead.”
How to set TTL for messages:
Import pika import sys PlainCredentials("guest","guest") # Create connection = (pika pika. BlockingConnection. ConnectionParameters (host = 'localhost', credentials = credentials)) channel = # by connecting to create channel Connection.channel () # create our queue with the name task_queue, and the messages in this queue are persistent! # queue cannot be changed from persistent to normal queue, and vice versa! Otherwise an error will be reported! So the start of queue type creation must be determined! Channel. queue_DECLARE (Queue ='task_queue', durable=True) # Define the contents of the messages that need to be sent # Publish messages to our proxy server. Import time for I in range(1,100): Time.sleep (1) properties = pika.BasicProperties(delivery_mode=2,) # expiration Expiration ='2000' body =' expiration ' {}'.format(I).encode('utf-8') print(body) channel.basic_publish( # default matching key routing_key='task_queue', # body=body, Properties =properties# pika.BasicProperties delivery_mode=2 indicates that message is persistent, 1 indicates that message is not persistent 2: There are a few things to note about message persistence, which is not 100% reachable! There are several main reasons for this: # 1: Message reachability, related to network connectivity and jitter! There are also requirements for persistence when writing our RabbitMQ to disk: Durable =True+ properties=pika.BasicProperties(delivery_mode=2,) # Disconnect connection.close()Copy the code
The code above is producer-side code: the most critical code Settings are:
BasicProperties = pika.BasicProperties(delivery_mode=2,) # expiration properties.expiration='2000'Copy the code
Set the expiration time of each message to be 2 seconds, observe the information of our queue, under the condition of sending all the time, his content to be consumed is still less, that is because of the expired! Discarded!
Setting the expiration time for a message alone does not conflict with the persistence feature of the queue.
However, setting an expiration time for the queue will conflict with the queue persistence feature!!
How to set TTL for all messages in the queue:
Problems:
pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'task_queue' in vhost '/': received the value '2000' of type 'longstr' but current is none")
Copy the code
The reason is:
We set the queue to persist, but also set the expiration time! So the conflict!!
It still appears after modification:
pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'task_queue' in vhost '/': received 'false' but current is 'true'")
Copy the code
This is because the queue we created in the first place already has this property! It cannot change the attributes of the queue dynamically! The best way to do this is to delete the queue. If you want to continue using this queue name!! Or a new one! One more point: Queue-declared functions that the durable remembers to be consistent !!!!
New problem after queue deletion:
pika.exceptions.ChannelClosedByBroker: (406, 'PRECONDITION_FAILED - invalid arg \'x-message-ttl\' for queue \'task_queue\' in vhost \'/\': "expected integer, got longstr"')
Copy the code
The reason is:
Cannot be set to a string type!!
Complete sample code for setting TTL for queues:
#! /usr/bin/env python import pika import sys PlainCredentials("guest","guest") # Create connection = (pika pika. BlockingConnection. ConnectionParameters (host = 'localhost', credentials = credentials)) channel = # by connecting to create channel Connection.channel () # create our queue with the name task_queue, and the messages in this queue are persistent! # queue cannot be changed from persistent to normal queue, and vice versa! Otherwise an error will be reported! So the start of queue type creation must be determined! arguments = {} # TTL: The unit of TTL is US, TTL =60000 60s arguments['x-message-ttl'] = 2000 # auto_delete=False, Durable # Durable and X-message-TTL cannot coexist channel.queue_declare(queue='task_queue', Durable =False,arguments=arguments,auto_delete=False) # Define the contents of messages that need to be sent # Start sending messages to our proxy server!! Import time for I in range(1,100): Time.sleep (1) properties = pika.BasicProperties(delivery_mode=2,) # expiration Expiration ='2000' body =' expiration ' {}'.format(I).encode('utf-8') print(body) channel.basic_publish( # default matching key routing_key='task_queue', # body=body, Properties =properties# pika.BasicProperties delivery_mode=2 indicates that message is persistent, 1 indicates that message is not persistent 2: There are a few things to note about message persistence, which is not 100% reachable! There are several main reasons for this: # 1: Message reachability, related to network connectivity and jitter! There are also requirements for persistence when writing our RabbitMQ to disk: Durable =True+ properties=pika.BasicProperties(delivery_mode=2,) # Disconnect connection.close()Copy the code
If the combination exists, verify that, for example, I set the queue expiration time to 1 second and the message expiration time to 2 seconds: mixed doubles setting: If both methods are used together, the expiration time is the smaller of the two values. This is visible! No more code!
5.1.15 Dead letter messages and dead letter queues
5.1.15.1 Dead letter message and dead letter queue definitions
Instructions regarding dead-letter is the official document address: www.rabbitmq.com/ttl.html#pe…
Dead Letter Exchange (DLX) queue
In addition, for dead-letter messages: usually, if one of our messages has the following conditions, it is called dead-letter messages:
-
1: The message is rejected by the consumer, channel.basicNack or channel.basicReject, and the Requeue property is set to false
-
2: indicates that the TTL of the message in the queue exceeded the specified TTL.
-
3: The number of messages in the message queue exceeds the maximum queue length, and messages cannot be added to MQ
-
4: The TTL of a message in one queue has no effect on the TTL of the same message in other queues
Rabbitmq will decide whether a dead letter queue is configured to handle dead letter messages. If configured dead letter queue information is enabled, the message will be transferred to the dead letter queue (DLX), if not, the message will be discarded!
5.1.15.2 Configuring a dead letter queue
Website document: www.rabbitmq.com/dlx.html
-
You can configure a dead letter switch for each queue that needs to use dead letter services
-
Each queue can be configured with its own dead-letter queue. The related messages entering the dead-letter queue need to be processed by dead-letter switch
-
A dead letter switch is just a normal switch, but it is designed to handle dead letters
-
A queue can be created by attaching a dead-letter switch to the queue, in which invalidated messages that are problematic in their respective circumstances are re-routed to the attached switch, which can then reroute the message.
Specific illustration:
To specify a DLX using a policy, add the key “dead-letter exchange” to the policy definition. Such as:
rabbitmqctl
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues
Rabbitmqctl(Windows)
rabbitmqctl set_policy DLX ".*" "{""dead-letter-exchange"":""my-dlx""}" --apply-to queues
Copy the code
The above policy applies the DLX queue “my-DLx” to all queues. The above is just an example; in fact, different queues may use different dead-word Settings (or none at all).
Other ways to configure a dead-letter queue are:
X-dead-letter-exchange: resend a dead letter to the specified exchange x-dead-letter-key: After a dead letter appears, the dead letter is sent according to the specified routing-keyCopy the code
PS: After a dead letter switch is specified, the user must have the read permission on the queue and write permission on the dead letter switch in addition to the configuration permission on the declaration queue. Permissions are validated at queue declaration time.
Complete with a simple example:
1: Set the expiration time of the message to 2s, after 2s it becomes dead letter
2: Dead-letter messages are transferred to a queue on our other dead-letter switch
#! /usr/bin/env python import pika import sys PlainCredentials("guest","guest") # Create connection = (pika pika. BlockingConnection. ConnectionParameters (host = 'localhost', credentials = credentials)) channel = # by connecting to create channel Connection.channel () # ======== # Create exception switches and queues for messages that are not processed properly. channel.exchange_declare(exchange='xz-dead-letter-exchange',exchange_type='fanout',durable=True) Channel. queue_declare(Queue ='xz-dead-letter-queue',durable=True) # Bind queues to specified switches channel.queue_bind(queue='xz-dead-letter-queue',exchange= 'xz-dead-letter-exchange',routing_key= 'xz-dead-letter-queue') # ========= # Create our queue by channel with the name task_queue, and the messages in this queue are persistent! # queue cannot be changed from persistent to normal queue, and vice versa! Otherwise an error will be reported! So the start of queue type creation must be determined! arguments = {} # TTL: The unit of TTL is US, TTL =60000 60s # arguments[' x-message-TTL '] = 2000 # arguments['x-dead-letter-exchange'] = 'xz-dead-letter-exchange' # auto_delete=False, Durable # Durable and X-message-TTL cannot coexist channel.queue_declare(queue='task_queue', Durable =True,arguments=arguments,auto_delete=False) # Durable =True,arguments=arguments, AUTO_delete =False) # Start sending messages to our proxy server Import time for I in range(1,100): Time.sleep (1) properties = pika.BasicProperties(delivery_mode=2,) # expiration Expiration ='2000' body =' expiration ' {}'.format(I).encode('utf-8') print(body.decode('utf-8')) channel.basic_publish( # default matching key routing_key='task_queue', # body=body, Properties =properties# pika.BasicProperties delivery_mode=2 indicates that message is persistent, 1 indicates that message is not persistent 2: Persistency) connection.close()Copy the code
After running the producer code above, observe our output: China sent eight messages,
Hello, Xiao Zhong! Hello, Xiaozhong! Hello, Xiaozhong! Hello, Xiaozhong! Hello, Xiaozhong! Hello, Xiao Zhong! Hello, Xiao Zhong! Hello, Xiao Zhong! 8Copy the code
When none of the eight messages were consumed, they all ended up in the dead-letter queue:
Note the following about the dead letter queue:
Messages are deleted from the original DLX target queue immediately after they are published to the dead-letter queue. This ensures that there is no potential for too many messages to accumulate and deplete broker resources, but it does mean that messages can be lost if the target queue is unable to receive them.
5.1.15.3 Consumption of dead letters in the dead letter queue
When our dead letter consumers to consume dead letter messages, we need to pay attention to the following points:
We added an “X-death” array to the header field in the properties of our “dead letter” message, which contains the following fields:
<BasicProperties(['delivery_mode=2', "headers={'x-death': [{'count': 1L, 'reason': 'expired', 'queue': 'task_queue', 'time': datetime.datetime(2021, 6, 22, 8, 40, 1), 'exchange': '', 'routing-keys': ['task_queue'], 'original-expiration': '2000'}], 'x-first-death-exchange': '', 'x-first-death-queue': 'task_queue', 'x-first-death-reason': 'expired'}"])>
Copy the code
Where our ‘X-death’ content is:
{'x-death': [{'count': 1L, 'reason': 'expired', 'queue': 'task_queue', 'time': datetime.datetime(2021, 6, 22, 8, 40, 1), 'exchange': '', 'routing-keys': ['task_queue'], 'original-expiration': '2000'}], 'x-first-death-exchange': '', 'x-first-death-queue': 'task_queue', 'x-first-death-reason': 'expired'}
Copy the code
The specific meaning of each field is:
- Queue: The name of the message queue from which it was sent before entering the dead-letter queue
- The reason for this news to be dead letter? Expired means expired! Become dead letter!
- Count: How many times this message is killed in this queue
- Time: indicates the release time of the message
- Exchange: to which switch the message has been published, PS: if the message is dead letter multiple times, this place will be dead letter switch
- Routing-keys The message could not be sent to the source route keys
- Original-expiration: Indicates the expiration time attribute of the original message, PS (if the message is dead letter) TTL for each message) :. The expiration attribute is removed from the dead letter to prevent it from expiring again in any queues to which it is routed.
- X-first-death-exchange: the switch from which the first death-letter is generated
- X-first-death-queue: source queue when it becomes dead letter for the first time
- X-first-death-reason: the reason why the letter became dead for the first time: expired means it has expired!
Other reasons for the dead-letter:
Expired: Indicates that the TTL time of the message expires. Maxlen: Indicates that the TTL time exceeds the maximum value of the queueCopy the code
5.1.16 Delay queue
RabbitMQ does not support delay queuing directly, but with the use of dead letter queues and expiration times we can combine these two features to create a delay queue. When a message expires after a fixed time, it will enter the dead-letter queue, and other dead-letter consumers will process these expired messages in real time, which can play a delayed processing effect!
6 Message reliability
How do we ensure that our messages are correctly routed to MQ?
Actually the problem of the reliability of the information, some on business processing business scenario is very important, for the strong dependence of message, we have to make sure that our message correct delivery to our MQ, even need to make sure that our message is correct for consumption, avoid duplication of consumption, There are even times when we have to deal with the idempotency of message consumption.
PS: The idempotency of message consumption actually means that my message will get the same result even if the message is the same for many times!
Therefore, the problem of message reliability can be analyzed in two ways:
6.1: Reliability of message delivery at the production end
In the previous examples, focusing on message delivery reliability was not involved, and our previous examples simply sent messages without caring if they were actually sent correctly to our MQ. This short essay will focus on some of these issues.
First, a message needs to go through the following steps to be delivered to MD correctly:
Message delivery process: message – “switch —-” message queue – “message persistent storage
Each step in the above process involves verifying the reliability of our MQ message processing. How to ensure that our messages are not lost, or how to detect the monitoring of our message delivery failure, this is the problem we need to consider.
So the first thing we need to figure out is what are the possibilities of message loss?
1: an exception occurred when the message was posted to the switch – “message lost” 2: the message entered our switch correctly —- “but an exception occurred when the message entered the queue, or the routing_key did not match correctly -” message lost “3: Messages are correctly entered into our message queue, persistence is enabled, and there are problems with persistence – messages are lost
For the above mentioned problems, we need to solve the following:
1: we need to make sure that our message to the MQ is normal, a success or a mistake should have receipt notification 2: we need to ensure that the message is routed to the correct queue on matching wrong ~ can’t happen, there is an error, there should be a return receipt notification producers 3: need to ensure that our messages in the queue storage – right
There are actually two kinds of solutions provided by the official website:
Use transaction mechanism
The use of transactions can guarantee the accuracy of messages, but it greatly sacrifices performance, so we can adopt another efficient solution to ensure the accuracy of messages by using Confirm mode for performance requirements.
Use Confirm mode according to the documentation on piKA
Step 1 is to enable the confirmation mode:
channel.confirm_delivery()
Copy the code
Step 2: Set mandatory flags and handle exceptions to check whether the message has been delivered when publishing:
The first is to set the mandatory flag:
mandatory=True
Copy the code
We then perform a step capture process on our published messages
Channel.confirm_delivery () # Release switch # Set mandatory flags and handle exceptions to check whether the message has been delivered: # The latest version of piKA on the official site is handled using an exception capture method! try: Channel. basic_publish(# default/exchange='', # default matching key routing_key=' task_queuexXXXXXXX-xxxx ', # body=body, # found message type properties=properties, Set mandatory flag! Mandatory =True # pika.BasicProperties Delivery_mode =2 specifies that message is persistent, and 1 indicates that message is not persistent. Said persistence) print (' the Message was published ', channel) except pika. Exceptions. UnroutableError as e: print('Message was returned',e) except pika.exceptions.NackError: Print (' Message was NackError ') except pika. Exceptions. StreamLostError: print (' connection is not on the proxy server! MQ suddenly stopped running! ') except pika. Exceptions. ConnectionClosedByBroker: print (' link suddenly disconnect! ')Copy the code
The following UI on the admin side is closed:
Click on the closure of the first, will trigger the pika. Exceptions. ConnectionClosedByBroker, in this case, your client can only restart or try to create a new link for processing!
Another example of error: routing to an incorrect key
A complete example of an error occurs when our published message is routed to a non-existent, or incorrect, routing_key:
Import pika from pika import exceptions Import sys PlainCredentials("guest","guest") # Create connection = (pika pika. BlockingConnection. ConnectionParameters (host = 'localhost', credentials = credentials)) channel = # by connecting to create channel Connection.channel () # create our queue with the name task_queue, and the messages in this queue are persistent! # queue cannot be changed from persistent to normal queue, and vice versa! Otherwise an error will be reported! So the start of queue type creation must be determined! arguments = {} # TTL: The unit of TTL is US, TTL =60000 60s # arguments['x-message-ttl'] = 2000 # auto_delete=False, Durable # Durable and X-message-TTL cannot coexist channel.queue_declare(queue='task_queue', Durable =False,arguments=arguments,auto_delete=False) # Define the contents of messages that need to be sent # Start sending messages to our proxy server!! Import time for I in range(1,100): Time.sleep (1) properties = pika.BasicProperties(delivery_mode=1,) # expiration Expiration ='2000' body =' expiration ' {} '. The format (I) encode (' utf-8) print (body) # open to people, If an exception is sent to our switch when channel.confirm_delivery() # Publish the switch # Set mandatory flags and handle exceptions to check whether the message has been delivered: # the latest version of piKA on the official website is handled using an exception capture method! try: Channel. basic_publish(# default/exchange='', # default matching key routing_key=' task_queuexXXXXXXX-xxxx ', Body =body, properties=properties, Mandatory =True # pika.BasicProperties delivery_mode=2 Specifies that message is persistent, and 1 indicates that message is not persistent. Said persistence) print (' the Message was published ', channel) except pika. Exceptions. UnroutableError as e: print('Message was returned',e) except pika.exceptions.NackError: Print (' Message was NackError ') except pika. Exceptions. StreamLostError: print (' connection is not on the proxy server! MQ suddenly stopped running! ') connection.close()Copy the code
This is our run to publish our message, we will normally receive abnormal information!
But the above returned error message, if we want to understand the removal of words, it seems that we do not know the meaning, and the specific reason for the error cannot be understood! When analyzing the internal source code, we found:
It throws actually a _puback_return! ReturnedMessage:
From: From Pika. Adapters. Blocking_connection import ReturnedMessage
class ReturnedMessage(object):
"""Represents a message returned via Basic.Return in publish-acknowledgments
mode
"""
__slots__ = ('method', 'properties', 'body')
def __init__(self, method, properties, body):
"""
:param spec.Basic.Return method:
:param spec.BasicProperties properties: message properties
:param bytes body: message body; empty string if no body
"""
self.method = method
self.properties = properties
self.body = body
Copy the code
The error is caused by the following error:
except pika.exceptions.UnroutableError as e: # self._puback_return = ReturnedMessage(method, properties, ReturnedMessage print(" current type ",type(e.messages)) print('Message was returned:::: ',e.messages[0]. Method) print('Message was returned::::', e.messages[0].properties)Copy the code
Then look at our printout:
Hello, Xiao Zhong! 5 WARNING:pika.adapters.blocking_connection:Published message was returned: _delivery_confirmation=True; channel=1; method=<Basic.Return(['exchange=', 'reply_code=312', 'reply_text=NO_ROUTE', 'routing_key=task_queue2222'])>; properties=<BasicProperties(['delivery_mode=1'])>; body_size=20; body_prefix=b'\xe5\xb0\x8f\xe9\x92\x9f\xe5\x90\x8c\xe5\xad\xa6\xe4\xbd\xa0\xe5\xa5\xbd! 5' <BasicProperties(['delivery_mode=1'])> Current type <class 'list'> Message was returned:::: <Basic.Return(['exchange=', 'reply_code=312', 'reply_text=NO_ROUTE', 'routing_key=task_queue2222'])> Message was returned:::: <BasicProperties(['delivery_mode=1'])>Copy the code
Reply_text =NO_ROUTE: indicates the route key to find the queue.
PS: Where I caught my exception, we can confirm the successful publication of the message again, but it seems that the above method is still a blocking mode, the subsequent use of the coroutine part of aiO-PIka should handle the problem of providing the blocking of these published messages! Check it out later when you have time!
6.2: Confirmation notification of message consumption at the consumer end
Message side message consumption confirmation, usually if some of our less important messages, we can set back to the message automatic confirmation mechanism. However, for some special messages, it is best to suggest opening our manual ACK mode to confirm the completion of message consumption. (Rollover correction)
This pattern of manual ACK is also covered in the previous example:
Import pika import time The credentials = pika.PlainCredentials("guest", "Guest") # create connections connection = pika. BlockingConnection (pika. ConnectionParameters (host = 'localhost', Channel = connection.channel() channel = connection.channel() channel = task_queue # queue cannot be changed from persistent to normal queue, and vice versa! Otherwise an error will be reported! So the start of queue type creation must be determined! Channel. queue_DECLARE (Queue ='task_queue', durable=True) print(' [*] Multiple labors Assignment mode processing. To exit press CTRL+C') def callback(ch, method, properties, body): Print (" [x] Received %r" % body.decode()) time.sleep(0.1) print(" [x] Done" Ch.basic_ack (delivery_tag=method.delivery_tag) print(KKK) # channel Channel. Basic_consume (queue='task_queue', on_message_callback=callback,auto_ack=False) # consume here, waiting for messages If there is a message in the queue, the message callback function channel.start_Consuming () is executedCopy the code
The main areas are shown below:
6.3: The situation of Unacked message consumption at the consumer end
If the mode of manual ACK acknowledgement is enabled for the message, when the consumer does not send an ACK receipt after processing the message, the message in our queue will not be deleted because there is no ACK acknowledgement received, and the state of the message will change to: Unacked, the message is put back into the queue (the state changes from Unacked to Ready) and sent to another consumer when another consumer is present.
The following is an example:
Import pika import time The credentials = pika.PlainCredentials("guest", "Guest") # create connections connection = pika. BlockingConnection (pika. ConnectionParameters (host = 'localhost', Channel = connection.channel() channel = connection.channel() channel = task_queue # queue cannot be changed from persistent to normal queue, and vice versa! Otherwise an error will be reported! So the start of queue type creation must be determined! Channel. queue_DECLARE (Queue ='task_queue', durable=True) print(' [*] Multiple labors Assignment mode processing. To exit press CTRL+C') def callback(ch, method, properties, body): Print (" [x] Received %r" % body.decode()) time.sleep(0.1) print(" [x] Done" Ch.basic_ack (delivery_tag=method.delivery_tag) print(" delivery_tag ") channel Channel. Basic_consume (queue='task_queue', on_message_callback=callback) # consume will block here, waiting for messages, there are messages in the queue, The message's callback function channel.start_Consuming () is executedCopy the code
In the above code I started the manual acknowledgement mode, and only prefetched a message, and then our message has not responded to our ack:
The message state changes to Unacked and is always blocked.
Disconnect the consumer: after some time, our message goes from unacked to ready and back to our old queue!
6.4 About Persistence of MQ
MQ persistence can be divided into:
- Persistence of the switch
Channel.exchange_declare (Exchange ='topic_ceshilogs', Exchange_type ='topic',durable=True)Copy the code
- Persistence of queues
Create a task_queue (task_queue, task_queue, task_queue, task_queue, task_queue) # queue cannot be changed from persistent to normal queue, and vice versa! Otherwise an error will be reported! So the start of queue type creation must be determined! channel.queue_declare(queue='task_queue', durable=True)Copy the code
-
Persistence of messages
# pika.BasicProperties delivery_mode=2 indicates that message is persistent, 1 indicates that message is not persistent 2: BasicProperties = pika.BasicProperties(delivery_mode=1,) # expiration properties.expiration='2000'Copy the code
For if all open, the performance of MQ is certainly affected drops, more things to deal with!
6.5 Reconnection Mechanism for client Disconnection
Here break line is mainly for the process of sending, there may be abnormal problems!
Retries based on retry:
from retry import retry
@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
def consume():
connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_consume('test', on_message_callback)
try:
channel.start_consuming()
# Don't recover connections closed by server
except pika.exceptions.ConnectionClosedByBroker:
pass
consume()
Copy the code
7 message consumption idempotency problem
Idempotence simply means that the processing results of one request or multiple requests initiated by users for the same operation are consistent, that is, the changes of data related to the whole process of one request and multiple requests are consistent! There can be no different processing logic!
7.1 Retry This section describes how to configure automatic message retry
Message retry triggers for a variety of reasons:
- Network jitter occurs during message transmission between the MQ Broker and the consumer. Procedure
- An exception in the consumer message consumption process
- Repeated distribution of timed messages
- The network is intermittently disconnected during ack confirmation
In this scenario, it is mainly the case of abnormal consumption of consumer processing, how to handle the retry consumption of messages. Except for PY, it seems that every other client implements retry mechanism, but PY does not! Pathetic need to do it yourself!
At this point, the retry mechanism of the third-party library can be used to handle this retry!
Such as:
pip install tenacity
Copy the code
For the use of this Tenacity, please refer to the documentation on the official website.
I won’t go into too much here!
The retry here is actually because I need to retry on some incorrect exception! Some validation exception words! I don’t think there’s any point in trying again! So we emphasize in retry is reasonable retry such as cause your program bug has been unable to start the kind of! You won’t have to try again! Retry is usually mainly for some network abnormal jitter caused by the error, retry may have a chance to recover!
Example of complete retry (for reference only) :
#! /usr/bin/evn python # -*- coding: utf-8 -*- import pika import time from tenacity import retry, retry_if_exception_type, retry_if_result from tenacity import retry, stop_after_attempt, before_log, after_log, before_sleep_log,stop_after_delay from tenacity import RetryError import logging logger = logging.getLogger(__name__) # Create user login credentials, The credentials = pika.PlainCredentials("guest", "Guest") # create connections connection = pika. BlockingConnection (pika. ConnectionParameters (host = 'localhost', Channel = connection.channel() channel = connection.channel() channel = task_queue # queue cannot be changed from persistent to normal queue, and vice versa! Otherwise an error will be reported! So the start of queue type creation must be determined! Channel. queue_DECLARE (Queue ='task_queue', durable=True) print(' [*] Multiple labors Assignment mode processing. To exit press CTRL+C') def callback(ch, method, properties, body): ----ZeroDivisionError is just for demonstration!! # use @stop_after_delay to specify the retry interval -stop=stop_after_delay(5)- specify 5 seconds to retry Retry (wait=wait_fixed(10),stop=stop_after_delay(5)) |stop_after_attempt(2),retry=retry_if_exception_type(ZeroDivisionError), before=before_log(logger, logging.DEBUG)) def yewukluji(): Print (" [x] Received %r" % body.decode()) time.sleep(0.1) print(" [x] Done") 5675/0 ', method.delivery_tag) try: yewukluji() except RetryError: print(" No way you're still wrong! I threw my rest back!" Multiple (bool) - If set to True, the pass flag will be treated as "at most and included", # so that multiple messages can be confirmed using a single method. # If set to False, pass tokens referencing individual messages. If multiple fields are 1 and the passing mark is zero, # acknowledges all pending messages. # Requeue (bool) - # If Requeue is true, the server will try to re-queue the message and continue trying to consume the task! . Discard or delete the message if requeue is false. Ch.basic_nack (delivery_tag=method.delivery_tag, multiple=False, requeue=False) # close, multiple=False Print (" Perfect consumption success !!!!" Basic_ack (delivery_tag=method.delivery_tag) # channel. Basic_qos (prefetch_count=1) # channel Consume (queue='task_queue', on_message_callback=callback) # channel. Basic_consume (queue='task_queue', on_message_callback=callback) The message's callback function channel.start_Consuming () is executedCopy the code
Ps: As a demonstration, just try the exception for ZeroDivisionError! This doesn’t make sense! Test demo only!
Another thing about message cancellation:
Multiple (bool) - If set to True, the pass flag will be treated as "at most and included", # so that multiple messages can be confirmed using a single method. # If set to False, pass tokens referencing individual messages. If multiple fields are 1 and the passing mark is zero, # acknowledges all pending messages. # Requeue (bool) - # If Requeue is true, the server will try to re-queue the message and continue trying to consume the task! . Discard or delete the message if requeue is false. ch.basic_nack(delivery_tag=method.delivery_tag, multiple=False, requeue=False)Copy the code
- Requeue =False: the message will be discarded!
- Requeue =True: the message will be consumed repeatedly and retry until the consumer is disconnected. The message will change from unacked to ready
7.2 Message idempotency problem to prevent repeated consumption
Since there is a message re-consumption mechanism, it is possible for the consumption to be consumed multiple times, and if the message is consumed multiple times, some business scenarios are not allowed! Like transferring money! So how do you guarantee message idempotence
Usually the main processing mechanism is to verify that the globally unique message ID has been consumed before the message is consumed! This is implemented based on globally unique message IDS or other flags.
This mechanism can be implemented in two ways based on database and business logic:
-
If the business is handling inserts, this can be done with a unique primary key constraint on a database table, ensuring that there is only one primary key value in the table
-
If the business is other operations such as updates, it can be implemented using the optimistic locking mechanism of the database.
Also can be implemented based on Redis atomicity and consumers when message is received, can according to the message ID or other globally unique ID as key setnx command execution, if success is said didn’t handle the news, can be consumed, failure so said news consumption has been before! No more consumption! The validity period of this Redis lock depends on your business.
Extension extension notes:
Optimistic locking means: assuming that data will not cause conflicts in general, data will be formally detected for conflict only when the data is submitted for update. If a conflict is found, an error message will be returned to the user, allowing the user to decide how to handle the conflict. (The probability of an error is relatively low, so be optimistic!)
Pessimistic locks: relative to the pessimistic locking, pessimistic locking, is that no matter what you want to change my data to get the first lock, lock after others don’t want to come again, need after I finish my this change, the lock is released again to obtain, pessimistic locks it is directly on the data in a way that is locked to prevent concurrent! Keep pessimistic about the occurrence of the error, think it might actually happen, for safety, I can only protect myself lock processing!! Using database locks, low efficiency, but high security factor)
A few things to add about optimistic locking:
Optimistic lock does not really lock, high efficiency.
Scheme 1: Record the data version. Each time a data modification operation is performed, a version number is added. If the version number is consistent with the data version number, the modification operation can be performed and the +1 operation can be performed. Otherwise, the operation fails, and the version number of each operation increases. [Incremental scheme, can be multiple]
The above version number increment method is problematic for high concurrency scenarios!
Option 2: Use atomic operations when updating
8 Summary of Rabbitma queue classification
Rabbitma queues are divided into different dimensions
- Exclusive queues
- The average queue
- Delay queue (dead letter queue)
- Inert queue
- Publish subscribe queue
8.1 Rabbitma Priority Queue
All priority queues actually set a priority number for the message, which is used to prioritize the message. The priority message scenario, according to the business order, if such as emergency messages such notification processing, need to be priority consumption!
Create our priority queue via backend admin UI Settings:
You can view other optional parameters on the management page:
Create our priority queue and message priority in code:
Start the production end first: Release 20 debug test information:
Then start our client to view the consumption result:
The higher the priority, the more it consumes ~ first
8.2 Rabbitma Lazy Queue
The difference between an ordinary queue and an inert queue:
- Normal queue messages are stored in MQ memory, and messages occupy MQ memory
- Lazy queue messages are stored in disk, which takes up disk space, but the data is relatively small (only written to disk, but does not mean that it will not be lost, if not persistent, restart MQ will be lost, so it is different from persistent queue, of course, can also be lazy + persistent binary mix!)
The lazy queue is really concerned with the way messages are stored whether they are stored in memory or on disk.
Lazy queues store messages on disk, MQ writes the message to disk when it arrives, and MQ reads the message from disk to MQ’s memory when the consumer needs to fetch the message and distributes it to our consumer! But this process is a time-consuming process.
Application scenarios of the lazy queue:
- When MQ messages are backlogged due to an exception on the consuming end, the messages are saved to the disk to avoid MQ memory overflow.
Create our lazy queue via backend admin UI Settings:
- Lazy mode(X-queue-mode = Lazy) : Lazy Queues: Messages are stored on disk first, not in memory, and loaded into memory when consumers start consuming
The lazy queue is set to create in code:
arguments['x-queue-mode'] = 'lazy'
channel.queue_declare(queue='task_queue', durable=True,arguments=arguments,auto_delete=False)
Copy the code
To send a message to a queue that has no persistence:
Then restart MQ:
Rabbitmqctl stop: stops rabbitmq rabbitmq-server restart: restarts rabbitmq rabbitmq-server start: starts rabbitMQCopy the code
The watch line is gone!
Alternatively, we would also enable lazy + queue persistence and then restart MQ:
Wronged!! Forgot to persist messages too!! Our queue will not send messages! But the news is gone! At this time plus our message also persistent retest!
Observe after restart (lazy + queue persistence – message persistence) :
The choice between lazy and normal queues:
-
Inertia is used to reduce the footprint of MESSAGES on MQ memory and avoid page feed operations (a space displacement before memory and disk) due to insufficient associated memory
-
Processing efficiency Ordinary queue messages are fetched directly from memory, which is more efficient than lazy messages
-
Using inertia can reduce the message footprint of MQ memory if efficient execution is not required
8.2 Rabbitma Delay Queue (Extension of dead letter Queue)
But here, combined with the above inertia, if your dead-letter queue is also an order of magnitude higher! In fact, we can further optimize our dead letter queue is also an inert queue, so that in fact, it can reduce the memory footprint, and can achieve delayed consumption of messages! This is also a scheme that can be considered!
About delay queues! The previous section described, here copy!
8.2.1 Dead letter Messages and dead letter queues
8.2.1.1 Dead-letter Message and dead-letter queue definitions
Instructions regarding dead-letter is the official document address: www.rabbitmq.com/ttl.html#pe…
Dead Letter Exchange (DLX) queue
In addition, for dead-letter messages: usually, if one of our messages has the following conditions, it is called dead-letter messages:
-
1: The message is rejected by the consumer, channel.basicNack or channel.basicReject, and the Requeue property is set to false
-
2: indicates that the TTL of the message in the queue exceeded the specified TTL.
-
3: The number of messages in the message queue exceeds the maximum queue length, and messages cannot be added to MQ
-
4: The TTL of a message in one queue has no effect on the TTL of the same message in other queues
Rabbitmq will decide whether a dead letter queue is configured to handle dead letter messages. If configured dead letter queue information is enabled, the message will be transferred to the dead letter queue (DLX), if not, the message will be discarded!
8.2.1.2 Configuring a dead letter queue
Website document: www.rabbitmq.com/dlx.html
-
You can configure a dead letter switch for each queue that needs to use dead letter services
-
Each queue can be configured with its own dead-letter queue. The related messages entering the dead-letter queue need to be processed by dead-letter switch
-
A dead letter switch is just a normal switch, but it is designed to handle dead letters
-
A queue can be created by attaching a dead-letter switch to the queue, in which invalidated messages that are problematic in their respective circumstances are re-routed to the attached switch, which can then reroute the message.
Specific illustration:
To specify a DLX using a policy, add the key “dead-letter exchange” to the policy definition. Such as:
rabbitmqctl
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues
Rabbitmqctl(Windows)
rabbitmqctl set_policy DLX ".*" "{""dead-letter-exchange"":""my-dlx""}" --apply-to queues
Copy the code
The above policy applies the DLX queue “my-DLx” to all queues. The above is just an example; in fact, different queues may use different dead-word Settings (or none at all).
Other ways to configure a dead-letter queue are:
X-dead-letter-exchange: resend a dead letter to the specified exchange x-dead-letter-key: After a dead letter appears, the dead letter is sent according to the specified routing-keyCopy the code
PS: After a dead letter switch is specified, the user must have the read permission on the queue and write permission on the dead letter switch in addition to the configuration permission on the declaration queue. Permissions are validated at queue declaration time.
Complete with a simple example:
1: Set the expiration time of the message to 2s, after 2s it becomes dead letter
2: Dead-letter messages are transferred to a queue on our other dead-letter switch
#! /usr/bin/env python import pika import sys PlainCredentials("guest","guest") # Create connection = (pika pika. BlockingConnection. ConnectionParameters (host = 'localhost', credentials = credentials)) channel = # by connecting to create channel Connection.channel () # ======== # Create exception switches and queues for messages that are not processed properly. channel.exchange_declare(exchange='xz-dead-letter-exchange',exchange_type='fanout',durable=True) Channel. queue_declare(Queue ='xz-dead-letter-queue',durable=True) # Bind queues to specified switches channel.queue_bind(queue='xz-dead-letter-queue',exchange= 'xz-dead-letter-exchange',routing_key= 'xz-dead-letter-queue') # ========= # Create our queue by channel with the name task_queue, and the messages in this queue are persistent! # queue cannot be changed from persistent to normal queue, and vice versa! Otherwise an error will be reported! So the start of queue type creation must be determined! arguments = {} # TTL: The unit of TTL is US, TTL =60000 60s # arguments[' x-message-TTL '] = 2000 # arguments['x-dead-letter-exchange'] = 'xz-dead-letter-exchange' # auto_delete=False, Durable # Durable and X-message-TTL cannot coexist channel.queue_declare(queue='task_queue', Durable =True,arguments=arguments,auto_delete=False) # Durable =True,arguments=arguments, AUTO_delete =False) # Start sending messages to our proxy server Import time for I in range(1,100): Time.sleep (1) properties = pika.BasicProperties(delivery_mode=2,) # expiration Expiration ='2000' body =' expiration ' {}'.format(I).encode('utf-8') print(body.decode('utf-8')) channel.basic_publish( # default matching key routing_key='task_queue', # body=body, Properties =properties# pika.BasicProperties delivery_mode=2 indicates that message is persistent, 1 indicates that message is not persistent 2: Persistency) connection.close()Copy the code
After running the producer code above, observe our output: China sent eight messages,
Hello, Xiao Zhong! Hello, Xiaozhong! Hello, Xiaozhong! Hello, Xiaozhong! Hello, Xiaozhong! Hello, Xiao Zhong! Hello, Xiao Zhong! Hello, Xiao Zhong! 8Copy the code
When none of the eight messages were consumed, they all ended up in the dead-letter queue:
Note the following about the dead letter queue:
Messages are deleted from the original DLX target queue immediately after they are published to the dead-letter queue. This ensures that there is no potential for too many messages to accumulate and deplete broker resources, but it does mean that messages can be lost if the target queue is unable to receive them.
8.2.1.3 Consumption of dead letters in the dead letter queue
When our dead letter consumers to consume dead letter messages, we need to pay attention to the following points:
We added an “X-death” array to the header field in the properties of our “dead letter” message, which contains the following fields:
<BasicProperties(['delivery_mode=2', "headers={'x-death': [{'count': 1L, 'reason': 'expired', 'queue': 'task_queue', 'time': datetime.datetime(2021, 6, 22, 8, 40, 1), 'exchange': '', 'routing-keys': ['task_queue'], 'original-expiration': '2000'}], 'x-first-death-exchange': '', 'x-first-death-queue': 'task_queue', 'x-first-death-reason': 'expired'}"])>
Copy the code
Where our ‘X-death’ content is:
{'x-death': [{'count': 1L, 'reason': 'expired', 'queue': 'task_queue', 'time': datetime.datetime(2021, 6, 22, 8, 40, 1), 'exchange': '', 'routing-keys': ['task_queue'], 'original-expiration': '2000'}], 'x-first-death-exchange': '', 'x-first-death-queue': 'task_queue', 'x-first-death-reason': 'expired'}
Copy the code
The specific meaning of each field is:
- Queue: The name of the message queue from which it was sent before entering the dead-letter queue
- The reason for this news to be dead letter? Expired means expired! Become dead letter!
- Count: How many times this message is killed in this queue
- Time: indicates the release time of the message
- Exchange: to which switch the message has been published, PS: if the message is dead letter multiple times, this place will be dead letter switch
- Routing-keys The message could not be sent to the source route keys
- Original-expiration: Indicates the expiration time attribute of the original message, PS (if the message is dead letter) TTL for each message) :. The expiration attribute is removed from the dead letter to prevent it from expiring again in any queues to which it is routed.
- X-first-death-exchange: the switch from which the first death-letter is generated
- X-first-death-queue: source queue when it becomes dead letter for the first time
- X-first-death-reason: the reason why the letter became dead for the first time: expired means it has expired!
Other reasons for the dead-letter:
Expired: Indicates that the TTL time of the message expires. Maxlen: Indicates that the TTL time exceeds the maximum value of the queueCopy the code
8.2.1.4 Delay queue
RabbitMQ does not support delay queuing directly, but with the use of dead letter queues and expiration times we can combine these two features to create a delay queue.
When a message expires after a fixed time, it will enter the dead-letter queue, and other dead-letter consumers will process these expired messages in real time, which can play a delayed processing effect!
Lazy queue plus lazy queue! In fact, it can be considered! , which can reduce the memory footprint, and can realize the message processing delay
8 Enable log query
During the debugging phase of development, if you need to see the related built-in log information, you usually need to enable the log configuration (only need to configure before the startup) :
BasicConfig (level= logging.info) import Logging.Copy the code
After this function is enabled, specific link process information can be seen:
INFO: pika. Adapters. Utils. Connection_workflow: pika version 1.2.0 connecting to (' : '1, 5672, 0. 0) INFO:pika.adapters.utils.io_services_utils:Socket connected: <socket.socket fd=500, family=AddressFamily.AF_INET6, type=SocketKind.SOCK_STREAM, proto=6, laddr=('::1', 64257, 0, 0), raddr=('::1', 5672, 0, 0)> INFO:pika.adapters.utils.connection_workflow:Streaming transport linked up: (<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x000001ED519C1080>, _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x000001ED519C1080> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>). INFO:pika.adapters.utils.connection_workflow:AMQPConnector - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x000001ED519C1080> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>> INFO:pika.adapters.utils.connection_workflow:AMQPConnectionWorkflow - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x000001ED519C1080> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>> INFO:pika.adapters.blocking_connection:Connection workflow succeeded: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x000001ED519C1080> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>> INFO:pika.adapters.blocking_connection:Created channel=1 WARNING:pika.adapters.blocking_connection:Published message was returned: _delivery_confirmation=True; channel=1; method=<Basic.Return(['exchange=', 'reply_code=312', 'reply_text=NO_ROUTE', 'routing_key=task_queue222'])>; properties=<BasicProperties(['delivery_mode=1'])>; body_size=20; body_prefix=b'\xe5\xb0\x8f\xe9\x92\x9f\xe5\x90\x8c\xe5\xad\xa6\xe4\xbd\xa0\xe5\xa5\xbd! 1 'Copy the code
There are also log messages when errors occur:
Hello, Xiao Zhong! 10 <BasicProperties(['delivery_mode=1'])> Message was returned 1 unroutable message(s) returned ERROR:pika.adapters.blocking_connection:confirm_delivery: confirmation was already enabled on channel=1 WARNING:pika.adapters.blocking_connection:Published message was returned: _delivery_confirmation=True; channel=1; method=<Basic.Return(['exchange=', 'reply_code=312', 'reply_text=NO_ROUTE', 'routing_key=task_queue222'])>; properties=<BasicProperties(['delivery_mode=1'])>; body_size=21; body_prefix=b'\xe5\xb0\x8f\xe9\x92\x9f\xe5\x90\x8c\xe5\xad\xa6\xe4\xbd\xa0\xe5\xa5\xbd! 10 'Copy the code
9 How can I avoid message accumulation?
There are several reasons why message accumulation may occur:
- The rate at which messages are produced is much faster than the rate at which messages are consumed
- Consumers have insufficient spending power
- Consumers cannot consume messages normally
- Consumers are constrained in their ability to spend at a faster rate
- The consumer just hung up
- Performance bottlenecks for consumers
Speed up consumer messaging!
- 1) Simultaneous consumption by multiple consumers
- 2) Message consumption can be processed asynchronously and can be accelerated by using thread pools
- 3) For non-essential throwable messages, set the TTL time and add them to the dead-letter queue for consumption
- 4) Transfer the message to another queue for processing
References:
Blog.csdn.net/yaomingyang… Blog.csdn.net/wohu1104/ar… www.cnblogs.com/mfrank/p/11…
Conclusion:
The above is a large department code is from the official website to provide some simple cases, combined with their own practice to do a simple note! If there are clerical errors! Welcome criticism and correction! Thank you!
At the end
Simple notes! For reference only!
END
Jane: www.jianshu.com/u/d6960089b…
The Denver nuggets: juejin. Cn/user / 296393…
Public account: wechat search [children to a pot of wolfberry wine tea]
Let students | article | welcome learning exchange together 】 【 original 】 【 QQ: 308711822