I. Application scenarios
For a large software system, there are many components, or modules, or subsystems. How do these modules communicate? This is quite different from traditional IPC. Many traditional IPC systems are based on the Scalability of a single system. It is true that different modules can be deployed on different machines if sockets are used, but there are a number of issues that need to be addressed. Such as:
How do senders and receivers maintain this connection, and if one party is disconnected, how is the data lost in the interim?
How to reduce the coupling between sender and receiver?
How do I get data to recipients with higher priorities first?
How to Load Balance? Effective load balancing for receivers?
How to effectively send data to relevant recipients? That is to say, the receiver will subscribe different data, how to do the effective filter.
How to be scalable and even send this communication module to the cluster?
How to ensure that the receiver receives complete and correct data?
The AMQP protocol addresses these issues, and RabbitMQ implements AMQP.
Second, system architecture
1.RabbitMQ Server
Also called Broker Server, it is not a food truck, but a delivery service. RabbitMQ isn’t a food truck, it’s a delivery service. Its role is to maintain a path from Producer to Consumer, ensuring that data is transmitted in a specified way. Although this guarantee is not 100% guaranteed, it is sufficient for common applications. Of course, for commercial systems, another layer of data consistency guard can be made to completely ensure system consistency.
2.Client P
Also known as Producer, the sender of data. Create messages and publish (send) them to a Broker Server (RabbitMQ). A Message has two parts: payload and label. Payload is the data that is transmitted. The label is the name of the exchange, or a tag, that describes the payload, and it is the label that RabbitMQ uses to decide which Consumer to send the Message to. AMQP only describes the label, and RabbitMQ determines the rules for how to use the label.
3.Client C
Also called Consumer, the receiver of data. Consumers attach to a Broker Server (RabbitMQ) and subscribe to a queue. Think of a queue as a mailbox with a name. When a Message arrives in a mailbox, RabbitMQ sends it to one of its subscribers, the Consumer. Of course, the same Message could be sent to many consumers. So in this Message, there’s only payload, and the label has been deleted. For the Consumer, it does not know who sent the message, but the protocol itself does not support it. Of course, if the Producer sends a payload that contains information from the Producer, that’s a different story. There are three other concepts that need to be clarified for the proper transfer of data from Producer to Consumer: exchanges, queues and Bindings.
Exchanges are where producers publish their messages.
Queues are where the messages end up and are received by consumers.
Bindings are how the messages get routed from the exchange to particular queues.
A few other concepts that are not highlighted in the diagram above are Connection and Channel.
4.Connection
It’s a TCP connection. Both producers and consumers connect to the RabbitMQ Server over TCP. As we will see later, the program starts by establishing this TCP connection.
5.Channel
Virtual connection. It is built over the TCP connection described above. All data flows are in channels. In other words, the normal situation is that the program starts to establish a TCP connection, and the second step is to establish this Channel.
So why use Channel instead of using TCP connections directly?
Establishing and closing TCP connections is costly for the OS. Frequent establishing and closing TCP connections has a significant impact on the system performance, and the number of TCP connections is also limited, which limits the system’s ability to handle high concurrency. However, setting up a Channel in a TCP connection does not incur these costs. A Producer or Consumer can simultaneously Publish or Receive multiple channels. Experiments show that 1s data can Publish10K data packets. Of course, this figure is different for different hardware environments and different packet sizes, but I just want to point out that this is enough for ordinary consumers or producers. If not, you should be thinking about how to SPLIT your design.
6. Related Definitions:
Broker: Simple message queue server entity
Exchange: message Exchange, which specifies the rules by which messages are routed to which queue
Queue: Message Queue carrier, each message is put to one or more queues
Binding exchanges and queues according to routing rules
Routing Key: The Key used by Exchange to deliver messages
VHost: Virtual host. Multiple vhosts can be set up within a broker to separate permissions between different users.
Producer: A program that delivers messages
A Consumer is a program that receives messages
Channel: Message Channel. In each connection of the client, multiple channels can be established. Each Channel represents a session task
Exchange, Queue, and RoutingKey determine a unique route from Exchange to Queue.
3. Basic concepts
A Connection Factory, a Connection, and a Channel are all basic objects in the RabbitMQ API. Connection is the RabbitMQ socket Connection, which encapsulates some of the socket protocol logic. Connection Factory is the manufacturing Factory for Connection.
Channel is the most important interface we use with RabbitMQ. Most of our business operations are done in Channel, including defining queues, defining exchanges, binding queues to exchanges, and publishing messages.
1.Queue
A Queue is RabbitMQ’s internal object for storing messages, as shown below.
Messages in RabbitMQ can only be stored in a Queue, where the producer (P in the image below) produces the message and delivers it to the Queue, and the consumer (C in the image below) gets the message from the Queue and consumes it.
Multiple consumers can subscribe to the same Queue, and the messages in the Queue are evenly distributed to multiple consumers for processing, rather than each consumer receiving all the messages and processing them.
2.Message acknowledgment
In practice, messages could be lost if consumers receive messages from the Queue but fail (or otherwise fail) without processing them. To avoid this, we can ask the consumer to send a acknowledgment to RabbitMQ after consuming the Message, and RabbitMQ should wait until it receives the Message Acknowledgment to remove the Message from the Queue.
If RabbitMQ does not receive a receipt and detects that the consumer’s RabbitMQ connection is down, RabbitMQ will send the message to other consumers (if there are more than one) for processing. There is no timeout and no amount of time a consumer takes to process a message will cause it to be sent to another consumer unless its RabbitMQ connection is disconnected.
Another problem here is that if our developers forget to send the receipt to RabbitMQ after processing the business logic, this will cause a serious bug – the Queue will accumulate more and more messages. After the consumer restarts, the messages are repeatedly consumed and the business logic is repeatedly executed.
Publish message does not have ACK.
3.Message durability
If we want to avoid messages being lost even when the RabbitMQ service is restarted, we can make both queues and messages durable to guarantee that RabbitMQ messages will not be lost in most cases. This does not resolve the occurrence of a small loss event (such as a RabbitMQ server power failure after the RabbitMQ server has received a producer’s message before it can persist the message), and if we need to manage such a small loss event, we need to use transactions. Since this is only a brief introduction to RabbitMQ, rabbitMQ-related transactions will not be covered here.
4.Prefetch count
Earlier we said that if multiple consumers subscribe to messages in the same Queue, the messages in the Queue will be spread across multiple consumers. In this case, if the processing time of each message is different, it is possible that some consumers will be busy all the time, while others will quickly finish their work and remain idle. We can set prefetchCount to limit the number of messages a Queue can send to each consumer. For example, if prefetchCount=1, the Queue can send one message to each consumer at a time. After the consumer processes the message, the Queue sends another message to the consumer.
5.Exchange
We saw in the previous section that producers post messages to queues, which in fact never happens in RabbitMQ. In reality, the producer sends the message to the Exchange (the Exchange, X in the figure below), which routes the message to (or drops it) in one or more queues.
What logic does Exchange use to route messages to the Queue? This is covered in the Binding section.
There are four Types of exchanges in RabbitMQ, each of which has a different routing policy, as described in the Exchange Types section.
6.Routing Key
When a producer sends a message to Exchange, it usually specifies a Routing Key to specify the Routing rule for the message. The Routing Key must be used together with the Exchange Type and Binding Key to take effect.
With the Exchange Type and Binding key fixed (which is usually fixed in normal use), our producer can determine where the message will go by specifying the Routing key when sending a message to the Exchange.
The RabbitMQ Routing Key length is limited to 255 bytes.
7.Binding
A Binding binds exchanges to queues so RabbitMQ knows how to route messages to the Queue correctly.
8.Binding key
When an Exchange and Queue are bound, a Binding key is usually specified. When a consumer sends a message to an Exchange, it typically specifies a Routing Key. When the Binding key matches the Routing key, the message is routed to the corresponding Queue. This will be illustrated with practical examples in the Exchange Types section.
These bindings allow the same Binding key to be used when Binding multiple queues to the same Exchange.
The Binding key does not work in all cases. It depends on the Exchange Type. For example, an Exchange of Type FANout will ignore the Binding key and route messages to all queues bound to the Exchange.
9.Exchange Types
The main Exchange types for RabbitMQ are FANout, Direct, Topic, and headers. The AMQP specification also mentions system and custom Exchange types.
10.fanout
The Exchange routing rule of type FANout is very simple and routes all messages sent to the Exchange to all queues bound to it.
In the figure above, all messages sent by the producer (P) to Exchange (X) are routed to the two queues in the figure and are eventually consumed by the two consumers (C1 and C2).
11.direct
Exchange Routing rules of the direct type are also simple, Routing messages to queues whose Binding keys exactly match Routing keys.
For example, if we send a message to Exchange with routingKey=”error”, the message will be routed to Queue1 (amqp.gen -s9b… This is the Queue name automatically generated by RabbitMQ) and Queue2 (amqp.gen-Agl…). ; If we send a message with Routing Key=”info” or routingKey=”warning”, the message will only be routed to Queue2. If we send messages with other Routing keys, the messages will not be routed to either Queue.
12.topic
As mentioned above, Exchange Routing rules of direct type match Binding keys and Routing keys completely. However, such strict matching mode cannot meet actual service requirements in many cases. Topic Exchange extends the matching rule, which is similar to direct Exchage and routes messages to queues with Binding keys and Routing keys, but the matching rule is different:
The Routing Key is a period (.). Delimited strings (we call each separate string separated by a period “. “a word), such as” stock.USD. Nyse “, “NYx.vmw”, “quick.orange.rabbit “. Binding Key and Routing Key are strings separated by periods (.).
The Binding Key can have two special characters “and” #” for fuzzy matching, where “” matches one word and” #” matches multiple words (which can be zero).
For example, a routingKey= “quick.orange.rabbit” message will be routed to both Q1 and Q2, a routingKey= “lazy.orange.fox” message will be routed to Q1, A routingKey= “lazy.brown.fox” message will be routed to Q2, and a routingKey= “lazy.pink.rabbit” message will be routed to Q2 (it will only be delivered to Q2 once, Although this routingKey matches both Q2 bindingkeys); RoutingKey = “quick.brown.fox”, routingKey= “orange”, routingKey= “quick.orange.male. Rabbit” messages will be discarded because they do not match any bindingKey.
13.headers
A HEADERS Exchange does not rely on a Routing Key and Binding Key matching rule to route a message. Instead, it matches the HEADERS attribute in the content of the sent message.
Specify a set of key-value pairs when binding Queue and Exchange. When a message is sent to Exchange, RabbitMQ takes the headers (also in the form of a key-value pair) of the message and compares whether the key-value pair matches exactly the key pair specified when the Queue was bound to Exchange. The message is routed to the Queue if it is a perfect match, otherwise it is not.
This type of Exchange has not been used (but should be useful), so I won’t cover it.
14.RPC
MQ itself is based on asynchronous message processing, and in the previous example all the producers (P) sent a message to RabbitMQ will not know whether the consumer (C) processed it successfully or failed (or even if there was a consumer to process the message).
However, in the actual application scenario, we probably need some synchronization processing, which needs to wait for the server to complete my message processing before proceeding to the next step. This is equivalent to Remote Procedure Calls (RPC). RPC is also supported in RabbitMQ.
15.RabbitMQ implements RPC by:
When a client sends a request (Message), two values are set to replyTo (a Queue name, It is used to tell the server to send the notification message to this Queue when the processing is complete) and correlationId (the id of the request. The server needs to return this attribute after the processing is complete. The client will know which request is successfully executed or fails according to this ID). After receiving the message, the server generates a reply message to the Queue specified by replyTo, with the correlationId attribute. The client has subscribed to the Queue specified by replyTo. After receiving the reply message from the server, it analyzes which request is executed according to the correlationId attribute and processes subsequent services based on the execution result.
Four, detail clarification
1. Use ACK to confirm Message delivery
By default, messages are removed from the Queue if they have been correctly received by a Consumer. You can also send the same Message to multiple consumers.
If a Queue is not subscribed by any Consumer, the data will be cached and not discarded when it arrives. When there is a Consumer, the data is immediately sent to the Consumer. When this data is correctly received by the Consumer, it is deleted from the Queue.
So what is correctly received? Through an ACK. Each Message should be acknowledged (ACK). We can either explicitly ACK it programmatically or automatically ACK it. If any data has not been ACK, the RabbitMQ Server will send this information to the next Consumer.
If the APP has a bug and forgets to ACK, the RabbitMQ Server will not send data to it because the Server thinks the Consumer has limited processing power. Moreover, ACK mechanisms can be Benefitto throttling: Sending an ACK after the Consumer has processed the data, or even after an additional delay, effectively balances the Consumer’s load.
Of course, for a real example, we might merge some data, such as the data in merge 4S, and then sleep 4s to retrieve the data. Especially in the state of the monitoring system, we do not want all the states to be transmitted in real time, but hope to have a certain delay. This reduces some IO, and the end user doesn’t feel it.
2.Reject a message
There are two methods. The first, Reject, allows the RabbitMQ Server to send the Message to the next Consumer. The second option is to delete the Message immediately from the Queue.
3.Creating a queue
Both the Consumer and Procuder can create queues through queue.declare. A Consumer cannot declare a queue for a Channel but subscribe to other queues. You can also create private queues. So only the APP itself can use this queue. Queues can also be deleted automatically. A queue marked auto-delete is automatically deleted when the last Consumer unsubscribes. What about creating an existing queue? Then it won’t have any effect. Note that there is no effect, that is, if the second creation is different from the first, the operation succeeds, but the attributes of the queue are not modified.
So who should be responsible for creating this queue? Is it Consumer or Producer?
If the queue doesn’t exist, of course the Consumer doesn’t get any messages. Then the Producer Publish Message will be discarded. So, to prevent data loss, both consumers and producers try to create the queue! Either way, this interface is going to work.
The Queue handles load Balance perfectly. For multiple consumers, RabbitMQ uses a round-robin method to evenly distribute packets to different consumers.
4.Exchanges
As can be seen from the schema diagram, the Procuder published Message enters the Exchange. RabbitMQ will then find out which queue to place the Message in by “routing keys “. The queue is also bound using this routing key.
There are three types of Exchanges: direct, fanout, and topic. Each implements a different routing algorithm.
Direct Exchange: If the routing keys match, the Message will be passed to the corresponding queue. When a queue is created, it will automatically bind the exchange with the name of the queue as the routing key.
Fanout Exchange: broadcasts to the responding queue.
Topic exchange: Pattern matching for keys, e.g. ab can be passed to all AB queues.
5.Virtual hosts
Each virtual host is essentially a RabbitMQ Server, with its own queue, exchagne, bings rule, etc. This ensures that you can use RabbitMQ in multiple different applications.
Five, installation and operation
1. The MAC, the installation
usehomebrewEasy to install
brew install rabbitmq
2. Run the RabbitMQ
cd
to/ usr/local/Cellar/rabbitmq / 3.7.9 / sbin
Next, the implementation ofrabbitmq-server
Configure environment variables so that rabbitMQ can be started in any directory
vi ~/.bash_profile
Copy the code
Add:
PATH = $PATH: / usr/local/Cellar/rabbitmq / 3.7.9 / sbinCopy the code
The last source ~ /. Following
3. Log in to the management panel
http://localhost:15672/
The default password is guest
Use Python to call RabbitMQ
1. Install the pika
pip install pika
2. Send the MESSAGE
Send Hello World to queue
Import connection = pika pika # a connection has been established. BlockingConnection (pika. ConnectionParameters (" localhost ")) # create channel channel = Connection.channel () # create queue channel.queue_declare(queue="hello") # declare(queue="hello") We identify channel.basic_publish(exchange="",routing_key="hello",body=" hello MQ5") connection.close() with an empty string ("")Copy the code
3. Receive messages
Import connection = pika pika # a connection has been established. BlockingConnection (pika. ConnectionParameters (" localhost ")) # create channel channel = Queue channel.queue_declare(queue="hello") def callback(ch, method, properties, body): print(f"receive from MQ is:{body}") # subscribe channel.basic_consume(callback, queue="hello", No_ack =True) # Listening channel.start_sink ()Copy the code
4. Demonstrate
First run send message code, in the run receive message code, you can receive the message sent as shown
5. Queue polling distribution and persistence
The situation where a message is passed to a consumer
Import pika import sys message = "".join(sys.argv[1:]) or "Hello world! # connect connection = pika. BlockingConnection (pika. ConnectionParameters (" localhost ")) # create channel channel = Connection.channel () # Queue,durable=True Result = channel.queue_declare(queue='task_queue', Durable =True) # properties=pika.BasicProperties(delivery_mode=2) Channel. Basic_publish (Exchange ="", routing_key="task_queue", body=message, properties=pika.BasicProperties(delivery_mode=2)) connection.close()Copy the code
# # receive messages import pika import time connect connection = pika. BlockingConnection (pika. ConnectionParameters (" localhost ")) # Durable =True: Queue durable=True: Queue durable=True: Queue durable=True: Queue durable=True: Queue durable Channel. queue_declare(queue="task_queue", Durable =True) # Def callback(ch, method, properties, body): print(f" Preparing..." Count (b".")) print(f"receive from MQ is:{body}") Basic_ack (delivery_tag = method.delivery_tag) # RabbitMQ defaults to polling, # set preFETch_count =1 for sending messages to workers one by one. This tells RabbitMQ not to send more than one message to a worker at the same time. In other words, do not send a new message to a worker until he has processed the previous message and returned the notification. It sends the message to the next busy worker. Channel. Basic_qos (prefetch_count=1) # subscribe # no_ack=False Basic_consume (callback, queue="task_queue", no_ack=False) # Listen to channel.start_consuming()Copy the code
6. Publish/subscribe: Subscribe to all messages
Send messages broadcast to multiple consumers, subscribing to all messages
# coding: utF-8 import pika import sys message = "".join(sys.argv[1:]) or "Hello world!" # connect connection = pika. BlockingConnection (pika. ConnectionParameters (" localhost ")) # create channel channel = Connection.channel () # transaction types: direct, topic, headers and FANout (broadcast all messages it receives to all queues it knows) # create an exchange of this type called logs: channel.exchange_declare(exchange="logs", Exchange_type ="fanout") # Queue,durable=True # result = channel.queue_declare(queue='task_queue', Durable =True) # Declare temporary queues: Generate a fresh empty queue whenever you connect to Rabbit. This can be done without queue parameters for Queue_DECLARE. # Once we disconnect the consumer connection, the queue should be removed. Result = channel.queue_declare(EXCLUSIVE =True) # The relationship between an exchange and a queue is called a binding. Channel. queue_bind(exchange="logs", Queue = result.method.queue) # properties=pika.BasicProperties(delivery_mode=2) Routing_key is ignored for fanout exchanges channel.basic_publish(exchange="logs", routing_key="", body=message, properties=pika.BasicProperties(delivery_mode=2)) connection.close()Copy the code
# # receive messages import pika import time connect connection = pika. BlockingConnection (pika. ConnectionParameters (" localhost ")) # Durable =True: Queue durable=True: Queue durable=True: Queue durable=True: Queue durable=True: Queue durable # channel.queue_declare(queue="task_queue", durable=True) channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue Queue_bind (exchange='logs', queue=queue_name) def callback(ch, method, properties, body) print(f"preparing..." Count (b".")) print(f"receive from MQ is:{body}") Basic_ack (delivery_tag = method.delivery_tag) # RabbitMQ defaults to polling, # set preFETch_count =1 for sending messages to workers one by one. This tells RabbitMQ not to send more than one message to a worker at the same time. In other words, do not send a new message to a worker until he has processed the previous message and returned the notification. It sends the message to the next busy worker. Channel. Basic_qos (prefetch_count=1) # subscribe # no_ack=False Basic_consume (callback, queue=queue_name, no_ack=False) # Listen to channel.start_consuming()Copy the code
7. Publish/subscribe: Subscribe to specific messages
Subscribe only to specific messages via routing_key
# Coding: UTF-8 import pika import sys # Provide the log severity (log severity) which is the first parameter used as routing key severity = sys.argv[1] if len(sys.argv) > 2 else 'info' print(f"severity is{severity}") message = "".join(sys.argv[2:]) or "Hello world!" Print (" the message is {message} ") f # connect connection = pika. BlockingConnection (pika. ConnectionParameters (" localhost ")) # Channel = connection.channel() # create transaction type: direct, topic, headers and fanout # create transaction type: direct_logs The # fanout type of exchange doesn't give us much flexibility -- it just unconsciously broadcasts. Channel.exchange_declare (exchange="direct_logs", channel.exchange_declare(exchange="direct_logs", Exchange_type ="direct") # Create a queue named task_queue,durable=True # result = channel.queue_declare(queue='task_queue', Durable =True) # Declare temporary queues: Generate a fresh empty queue whenever you connect to Rabbit. This can be done without queue parameters for Queue_DECLARE. # Once we disconnect the consumer connection, the queue should be removed. Result = channel.queue_declare(exclusive=True) queue_name = result.method.queue The meaning of a binding key (also known as the routing_key in the following line) depends on the type of exchange. Channel. queue_bind(exchange="direct_logs", Queue = queue_name,routing_key=severity) # properties=pika.BasicProperties(delivery_mode=2) Routing_key for FANout exchanges whose value is ignored channel.basic_publish(exchange="direct_logs", routing_key=severity, body=message, properties=pika.BasicProperties(delivery_mode=2)) connection.close()Copy the code
Severities = sys.argv[1:] if not severities: Sys. stderr. Write (f"Usage:{sys.argv[0]}[info][warning][error]") sys.exit(1) # Establish connection = (pika pika. BlockingConnection. ConnectionParameters (" localhost ")) # create channel channel = connection. The channel (#) Durable =True: Queue durable=True: Queue durable=True: Queue durable=True: Queue durable=True: Queue durable=True # channel.queue_declare(queue="task_queue", Durable =True) # With direct exchanges, a message will be sent to a queue whose binding key matches the routing key of the message channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for severity in severities: print(f"severity is {severity}..." ) channel.queue_bind(exchange='direct_logs', queue=queue_name, Def callback(ch, method, properties, body): print(f" Preparing..." ) print(f"routing_key is {method.routing_key}") time.sleep(body.count(b".")) print(f"receive from MQ is:{body}") # Basic_ack (delivery_tag = method.delivery_tag) # RabbitMQ defaults to polling. # set preFETch_count =1 for sending messages to workers one by one. This tells RabbitMQ not to send more than one message to a worker at the same time. In other words, do not send a new message to a worker until he has processed the previous message and returned the notification. It sends the message to the next busy worker. Channel. Basic_qos (prefetch_count=1) # subscribe # no_ack=False Basic_consume (callback, queue=queue_name, no_ack=False) # Listen to channel.start_consuming()Copy the code
To demonstrate, one window sends messages, and the other two receive error and INFO messages respectively and output:
8. Publish/subscribe: Subscribe to multi-conditional messages
A message sent to a topic type exchange does not have a single routing key, routing_key; it must be a list of terms, separated by a ‘. ‘. The binding key must be of the same form. The logic behind topic exchanges is similar to direct — messages sent using a specific routing key are delivered to all queues with matching binding keys. But there are two important special cases for binding keys: “*” (star) can replace exactly one word, and “#” (hash) can replace zero or more words.
# coding: UTF-8 import pika import sys # Providing log severity (log severity) routing_key = sys.argv[1] if len(sys.argv)>2 else "anonymous.info" print(f"routing_key is{routing_key}") message = "".join(sys.argv[2:]) or "Hello world!" Print (" the message is {message} ") f # connect connection = pika. BlockingConnection (pika. ConnectionParameters (" localhost ")) # Channel = connection.channel() # create transaction type: direct, topic, headers, fanout # create transaction type: topic_logs # Using the topic type to exchange messages can't just have a single arbitrary routing key (routing_key) -- it must be a list of terms, Channel.exchange_declare (exchange="topic_logs", Exchange_type ="topic") # Queue,durable=True # result = channel.queue_declare(queue='task_queue', Durable =True) # Declare temporary queues: Generate a fresh empty queue whenever you connect to Rabbit. This can be done without queue parameters for Queue_DECLARE. # Once we disconnect the consumer connection, the queue should be removed. Result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # A binding key The meaning of the routing_key line below depends on the type of exchange. channel.queue_bind(exchange="topic_logs", Queue = queue_name,routing_key=routing_key) # properties=pika.BasicProperties(delivery_mode=2) Mark message persistence # Post messages to the named exchange Logs, Routing_key is ignored for fanout exchanges channel.basic_publish(exchange="topic_logs", routing_key=routing_key, body=message, properties=pika.BasicProperties(delivery_mode=2)) connection.close()Copy the code
# import pika import time import sys (pika pika. BlockingConnection. ConnectionParameters (" localhost ")) # create channel channel = connection. The channel (#) Durable =True: Queue durable=True: Queue durable=True: Queue durable=True: Queue durable=True: Queue durable=True Queue_declare (Queue ="task_queue", durable=True) # Messages that use topic exchanges cannot have a single, arbitrary routing key (routing_key) -- it must be a vocabulary list, Channel.exchange_declare (exchange='topic_logs', exchange_type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write(f"Usage: {sys.argv[0]}[binding_key]... \n") for binding_key in binding_keys: print(f"binding_key is {binding_key}..." ) channel.queue_bind(exchange='topic_logs', queue=queue_name, Def callback(ch, method, properties, body): print(f"preparing..." ) print(f"routing_key is {method.routing_key}") time.sleep(body.count(b".")) print(f"receive from MQ is:{body}") # Basic_ack (delivery_tag = method.delivery_tag) # RabbitMQ defaults to polling. # set preFETch_count =1 for sending messages to workers one by one. This tells RabbitMQ not to send more than one message to a worker at the same time. In other words, do not send a new message to a worker until he has processed the previous message and returned the notification. It sends the message to the next busy worker. Channel. Basic_qos (prefetch_count=1) # subscribe # no_ack=False Basic_consume (callback, queue=queue_name, no_ack=False) # Listen to channel.start_consuming()Copy the code
Presentation:
9.RPC
Message properties
The AMQP 0-9-1 protocol predefined a set of 14 attributes for a message, most of which are rarely used, except for the following:
Delivery_mode: Marks a message as persistent (value 2) or transient (any other value). You may remember this property from section 2. Content_type: Commonly used to describe an encoded MIME-type. For common JSON formats, it is best to set this property to application/ JSON. Reply_ to: Usually used to name a callback queue. Correlation_id: Useful for associating requests with RPC responses.
The service side
# server import pika connection = pika. BlockingConnection (pika. ConnectionParameters (host = 'localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()Copy the code
The client
Import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) self.channel = Self.connection.channel () # declare the callback queue result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: Self. response = body # def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)Copy the code
From: juejin.cn/post/684490…