This is the third day of my participation in Gwen Challenge
Concepts and common message queues
- Messages are delivered through middleware (message queues) rather than calling each other
- Queuing: Applications communicate through queues
- Application scenario (Advantage) : Decoupling, asynchronous, and peak clipping
- Have a problem:
- MQ server stability;
- Data consistency;
- Message loss;
- Message backlog;
- Repeated consumption;
- Things, problems, etc
1. Multithreading and message queues
- Multithreading: programming layer, server resources consumed on the machine
- Message queues: At the architectural level, the resource consumption of the server is shifted to the consumer server
- There is no conflict between multithreading and the use of message queues:
- Using message queue message asynchronous transmission, users do not worry to know the results can be used;
- Multiple threads can act as producers and consumers of queues
2. Repeat spending
- Cause: Network error (MQ does not receive complete message notification)
- How to guarantee?(idempotent)
- Strong validation: transaction processing, pre-operation checking, database uniqueness constraints
- Weak check: Repeat not so important case
3. Message loss
- why:
- Automatic confirmation message mode is used
- No confirmed transaction
- Disk persistence problem
- How to guarantee?
- ACK acknowledgement mechanism
- Message persistence
- Message compensation mechanism
- Setting cluster Mirroring mode (?)
Second, the ActiveMQ
Third, the RabbitMQ
1. The concept
2. Important components
- ConnectionFactory Connection manager
- Channel Channel Channel used by message push
- An Exchange Exchange is used to receive and distribute messages
- Queue Queues are used to store messages from producers
- The RoutingKey allocates the generated data to the switch
- The BindingKey BindingKey binds the exchange’s message to the queue
3. Data model
- 2.1 In RabbitMQ, messages are not delivered directly toQueue: Message QueueYou have to go through itExchangeThis layer;
- The Exchange allocates our messages to the corresponding Queue.
- 2.2 RabbitMQBinding(Binding) willExchange(switch) withQueue(message queues) associated,
- A BindingKey is usually specified at binding time so RabbitMQ knows how to route messages to queues correctly.
- 2.3 Exchange and Queue binding can be many-to-many
Procedure producer -> Exchange Exchange Binding -> Queue message Queue -> Consumer ConsumerCopy the code
4.Exchange Types
- 4.1 fanout(the default)
- Messages sent to the Exchange are routed to all queues bound to it, without any judgment operation, the fastest one, suitable for broadcasting.
- 4.2 direct
- Routing messages to queues where Bindingkey and RoutingKey match exactly
- 4.3 topic
- Fuzzy matching routes to a specific Queue where: * multiple # matches multiple words
- 4.4 headers(Not recommended)
- A HEADERS exchange does not rely on the matching rules of the routing key to route a message, but matches it based on the HEADERS attribute in the content of the sent message
- RoutingKey: Set when sending messages
- Bindingkey: Specifies what RoutingKey will be assigned to the current bound Queue under the current Exchange
Four, Kafka
1. Concept and advantages
- advantage
- Extreme performance: The design uses a lot of batch, asynchronous processing ideas, can handle up to ten million messages per second
- Strong compatibility
2. Distributed flow platform
3. Message model
- Publish and subscribe: Producer -> Topic -> Consumer
- Partitions in Kafka actually correspond to queues in message queues.
Kafka Cluster consists of several kafka brokers. Kafka Cluster consists of several kafka brokers. Kafka Cluster consists of several kafka brokers. Partition A Partition is part of a TopicCopy the code
- Four core apis
- Producer: Allows applications to publish a message to one or more Kafka topics
- Consumer: Allows applications to subscribe to one or more topics and process the resulting flow of data recorded against them
- Stream-api: Allows an application to consume data from one or more topics and then output the consumed data to one or more other topics, effectively transforming the input streams to output streams. It’s like a data transfer station
- Connector-api: Allows you to build or run reusable producers or consumers that link topics to existing applications or data systems.
4. Kafka configuration file
Kafka configuration file
- Configuration file 1: kafka-3/conf/server.properties
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
Zookeeper's myID is the same as zooKeeper's myID
broker.id=3
############################# Socket Server Settings #############################
# listeners=PLAINTEXT://:9092
The default kafka port is 9092
port=9093
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
# this is the number of borker threads for network processing (receiving network requests).
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
# this is the number of borker I/O processing threads
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
Data is not sent all at once. It is stored back in the buffer and sent after reaching a certain size
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
Kafka receives buffer size when data reaches a certain size before serializing to disk
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
# This parameter is the maximum number of requests to send a message to Kafka or to send a message to Kafka. This value cannot exceed the Java stack size
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/usr/mfj/testKafka/datas/data3
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
The default number of partitions per topic is 1
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion due to age
The default maximum persistence time for messages is 168 hours, 7 days
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
Because Kafka messages are appended to files, kafka starts a new file when this value is exceeded
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
# Every 300000 ms check log expiration time (log.retention. Hours =168), check directory to see if there are expired messages, if so, delete them
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
Set the zooKeeper connection port
Zookeeper. Connect = 127.0.0.1:2181127.00 0.1:2182127.00 0.1:2183
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
Copy the code
- Profile 2: kafka – 3 / config/zookeeper. Properties
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
Copy the code
Zookeeper configuration file
- Configuration file: zookeeper-3/conf/zoo.cfg
# The number of milliseconds of each tick
This time is used as the interval between Zookeeper servers or between clients and servers to maintain the heartbeat, i.e. each tickTime will send a heartbeat.
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
This configuration item is used to configure the Zookeeper server to accept the client. It is the maximum heartbeat interval that the Follower server connected to the Leader in the Zookeeper cluster can endure during initial connection.
If the Zookeeper server does not receive a message from the Zookeeper client after 5 heartbeats (tickTime), the connection to the Zookeeper client has failed. The total length of time is 5 times 2000 is 10 seconds
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
# This configuration item identifies the length of sending messages between the Leader and Follower. The maximum length of the request and response time cannot exceed the number of Ticktimes. The total length is 5*2000=10 seconds
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
# Snapshot log storage path
If this is not specified, transaction logs will be stored in the directory specified by dataDir by default. This will seriously affect zK performance. When zK throughput is high, too many transaction logs and snapshot logs will be generated
dataDir=/usr/mfj/testZkeeper/datas/data3
dataLogsDir=/usr/mfj/testZkeeper/logs/logs3
# the port at which the clients will connect
This port is the port through which the client connects to the Zookeeper server. Zookeeper listens on this port and accepts client access requests. Modify his port to be larger
clientPort=2183
server.1=localhost:2881:3881
server.2=localhost:2882:3882
server.3=localhost:2883:3883
# server. 1 = 192.168.7.100:12888-13888
# server. 2 = 192.168.7.101:12888-13888
# server. 3 = 192.168.7.107:12888-13888
#server.1 This 1 is the id of the server or any other number that identifies the server. This id is written to the myID file under the snapshot directory
#192.168.7.107 is the IP address in the cluster. The first port is the communication port between the master and slave. The default port is 2888
Copy the code
5. KafKa – command
- 1. Use the script in the installation package to start the single-node Zookeeper instance
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
Copy the code
- 2. Start the kafka service (kafka-server-start.sh) :
bin/kafka-server-start.sh config/server.properties
bin/kafka-server-start.sh -daemon config/server.properties # background boot
Copy the code
- 3. Create a topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic sunhb-topic-1 --replication-factor 1 --partitions 1
Copy the code
- 4. Check the topic list:
bin/kafka-topics.sh --zookeeper localhost:2181 --list
Copy the code
- 5. Modify the subject zone
Alter the subject using the ALTER command
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic sunhb-topic-1 --partitions 10
Copy the code
- 6. Delete the topic command
/bin/kafka-topics --delete --topic sunhb-topic-1 --zookeeper localhost:2181
Copy the code
- Kafka-console-producer.sh: kafka-console-producer.sh
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sunhb-topic-1
Copy the code
- 8. Consumer consumption message (kafka-console-consumer.sh) :
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sunhb-topic-1 --from-beginning
Copy the code
- 9. View description topics (kafka-topics. Sh) :
bin/kafka-topics.sh --zookeeper localhost:2181 --describe
Copy the code
- 10. View the log of consumer consumption information
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9091 --describe --group test-group
Copy the code
6. KafKa – Producer – divider
- As shown in theRefer to the link
7. Kafka – Consumer – Distribution partition
- As shown in theRefer to the link
8. How does Kafka guarantee sequential consumption of messages?
- Kafka can only guarantee that messages in partitions are ordered (by offset); The order of the partitions in Topic cannot be guaranteed
- How to guarantee?
- Method 1: One Topic corresponds to only one Partition
- Method 2: Specify key/Partition when sending messages (recommended)
9. How does Kafka ensure that messages are not lost?
- Asynchronous: producer. The send (MSG);
- Synchronous: producer. The send (MSG). The get (); – < data will not be lost >
- Producer lost message:
- The default value of retries is 3, which can be set to a larger value
- Set the retry interval. If the retry interval is too small, the retry effect is not obvious
- Consumer lost message:
- Set acks = all (the message is not successfully sent until all replicas have received it)
- Set replication.factor >= 3 (multiple copies)
- Set min.insync.replicas to > 1 (at least 2 replicas must be written to to be sent successfully)
- Set up theunclean.leader.election.enable = false
- If the leader copy fails, the leader will not be selected from the copies whose synchronization degree between the follower copy and the leader does not meet the requirements, thus reducing the possibility of message loss
- Kafka 0.11.0.0 versions are unclean. Leader. Election. Enable parameters with default value of true or false instead
Fifth, RocketMQ
Reference links:
- For a detailed introduction to RabbitMQ, just read this!
- The web’s most accessible Kafka entry!
- Kafka- producer – partition details
- How do Kafka consumers allocate partitions