client

DefaultMQProducer

new

  • this.namespace = namespace;

  • this.producerGroup = producerGroup;

  • defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);

    • this.defaultMQProducer = defaultMQProducer;

    • this.rpcHook = rpcHook;

    • Build a thread pool that sends messages asynchronously

      • this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue(50000);

      • this.defaultAsyncSenderExecutor = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1000 * 60, TimeUnit.MILLISECONDS, this.asyncSenderThreadPoolQueue, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0);

          @Override
          public Thread newThread(Runnable r) {
              return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
          }
        Copy the code

        });

start()

  • defaultMQProducerImpl.start();

    • Check the configuration

      • Check the production group and throw an exception if it does not match

        • Inspection is not empty
        • Must by % | a – zA – Z0-9 _ –
        • Not for DEFAULT_PRODUCER
      • Check production group, check non – empty

      • Check the production group. The value cannot be DEFAULT_PRODUCER

    • If the instance name is DEFAULT, change the instance name to PID

    • Gets or creates a client factory

      • Build clientid = IP + PID from the base client configuration

      • Using Clientid as the key, factoryTable determines whether the customer factory exists. If the customer factory does not exist, a new one will be generated

        • New client factory

          • Assignment: Client common configuration class

          • Assignment: Client factory index

          • New Netty client configuration class

          • Netty client configuration class setting, the client callback thread pool number of threads

          • Netty client configuration class setting, whether to use TLS

          • New Client remote processor ClientRemotingProcessor

          • New client API implementer MQClientAPIImpl

            • ClientConfig assignment

            • TopAddressing assignment

            • new NettyRemotingClient

              • Build oneway semaphore Asynchronous semaphore default is 65535 permit, fair
              • NettyClientConfig assignment
              • ChannelEventListener is set to null
              • PublicExecutor assignment = a fixed thread pool Size of nettyClientConfig. GetClientCallbackExecutorThreads (the default is four, The prefix is NettyClientPublicExecutor_ thread name
              • The eventLoopGroupWorker assignment builds a single-threaded NioEventLoopGroup with the thread name prefixed with NettyClientSelector
              • Build sslContext if TLS is used
            • ClientRemotingProcessor assignment

            • RemotingClient registers an RPC hook

            • Register clientRemotingProcessor with remotingClient and all requests are processed by the Processor

              • Maintain processorTable in remotingClient, HashMap

                > Where thread pools use publicExecutor
          • NettyRemotingClient namesrvAddrList assignment

          • Assignment: client ID

          • The new MQ management implementation class MQAdminImpl

          • New PullMessageService service PullMessageService

          • New balances service RebalanceService

          • New a DefaultMQProducer with CLIENT_INNER_PRODUCER as the production group

          • New A message status manager ConsumerStatsManager

    • Register producers with the client factory

    • Maintain topicPublishInfoTable

    • Start the client factory

      • System variables -> Environment variables -> jmenv.taobao.net to get the address of nameserver

      • Start the client API implementer mQClientAPIImpl

        • Start the NettyRemotingClient

          • Build a DefaultEventExecutorGroup, number of threads is for nettyClientConfig getClientWorkerThreads (), the default is 4, thread name prefix for NettyClientWorkerThread_

          • Officially start the NIO client

            • Set the selector thread group eventLoopGroupWorker

            • Setting TCP Parameters

              • .option(ChannelOption.TCP_NODELAY, true)
              • .option(ChannelOption.SO_KEEPALIVE, false)
              • Option (ChannelOption CONNECT_TIMEOUT_MILLIS, nettyClientConfig. GetConnectTimeoutMillis ()) the default 3 s
              • Option (ChannelOption SO_SNDBUF, nettyClientConfig. GetClientSocketSndBufSize ()) 65535 by default
              • Option (ChannelOption SO_RCVBUF, nettyClientConfig getClientSocketRcvBufSize ()) 65535 by default
            • Set the ChannelInitializer

              • If SSL is present, the channel handler is joining SSL, In the woker thread processing pipeline. AddFirst (defaultEventExecutorGroup, “sslHandler”, sslContext newHandler (ch. Alloc ()));

              • Add the coding processor NettyEncoder

                • Serialize RemotingCommand to ByteBuf on netty

                  • RemotingCommand encapsulates a RocketMQ packet
                • If this fails, the connection is closed

              • Add the decoder processor NettyDecoder

                • NettyDecoder inherited from netty LengthFieldBasedFrameDecoder

                • NettyDecoder calls the superclass LengthFieldBasedFrameDecoder constructor

                  • The maxFrameLength helper represents the maximum length of a frame. The default is 16777216
                  • LengthFieldOffset =0, where lengthFieldOffset indicates the starting position of the message length field
                  • LengthFieldLength =4 specifies the length of the length field
                  • LengthAdjustment =0 indicates that lengthAdjustment is added to the length of the message, which is the length of the message
                  • Netty is used in the length field after reading the length field +lengthAdjustment bytes
                  • Initialbytesttestis =4, indicating that Netty removes the first four bytes from the frames that are obtained
                • Call the decoding method of the parent to fetch the frame to (ByteBuf) super.decode(CTX, in)

                • Gets the ByteBuffer that gets Java from ByteBuf and decodes it into a RemotingCommand object

              • Add IdleStateHandler to the IdleStateHandler

                • If more than 120 s does not read and write operations, a IdleStateEvent events, by NettyConnectManageHandler closes the connection
              • Join NettyConnectManageHandler connection management processor

                • Responsible for connection management
                • Closing the connection function
              • Add client processor NettyClientHandler

                • RemotingCommand processes read incoming packets

                  • Determine whether to request or respond based on the first bit of RemotingCommand

                    • Process the request if it is a request

                      • According to the request code from processorTable, fetch the handler that processes the request and the thread pool used to process the request. In the client, all requests are handled by the ClientRemotingProcessor, which is the publicExecutor thread pool

                      • If the request code does not have a corresponding processor, a response message is constructed to indicate that the request type is not supported

                      • Call the processor’s rejectRequest method and, if true, build a response indicating that the system is busy and write it back asynchronously

                      • Build a runnable submission to the thread pool for execution

                        • The content of the runnable

                          • Execute the hook method doBeforeRequest method
                          • Call the processor’s processRequest to process the request
                          • Execute the hook method doAfterResponse
                          • If it is not oneway’s request, the response message put back by the processor is written back to the peer end
                          • If the processor throws an exception during processing, a response message is constructed, indicating the system exception, and the name of the exception class and the stack information of the first layer are attached, and asynchronously written back
                      • If the thread pool uses a reject policy, then if the request is not oneway, a response message is created to indicate that the system is busy and asynchronously written back

                    • If it is a response, the response is processed

                      • Get Opaque from the response RemotingCommand. Opaque is unique in a process and identifies a request in a process

                      • Get ResponseFuture from responseTable with Opaque

                      • If the ResponseFuture is empty, the ResponseFuture log is printed and a response message was received, but there is currently no request corresponding to it

                      • If ResponseFuture is not empty

                        • The response message into the responseFuture. SetResponseCommand (CMD);

                        • ResponseTable deletes the responseFuture

                        • Processing depends on whether the call is synchronous or asynchronous

                          • If the call is asynchronous then ResponseFuture will have a callback and that callback is called

                            • If the nettyRemotingClient’s callbackExecutor is empty, the callback is made in the publicExecutor

                            • And it releases semaphores

                              • Makes the window for asynchronous requests larger
                          • If the call is synchronous, there is no callback

                            • Countdown countdownlatch allows the synchronized waiting thread to pass
                            • ResponseFuture semaphore release
          • Start the scheduled scanResponseTable task to timeout asynchronous calls once per second

            • Iterate over the asynchronous call in responseTable

            • If you have timed out

              • Release the semaphore and delete it in responseTable
              • Implement the callback
          • If channelEventListener is not null, nettyEventExecutor is started

      • Start scheduled tasks

        • 2 minutes fetchNameServerAddr is executed once

        • Perform a updateTopicRouteInfoFromNameServer 30 s

          • Get all consumer topics from consumerTable
          • Get all producer topics from producerTable
          • Iterate through all the topic call updateTopicRouteInfoFromNameServer (topic)
        • 30s Perform a heartbeat with the broker

          • Clear offline brokers

            • Use reentrant lock control to ensure that the following operations are serial
            • BrokerAddrTable traversal: ConcurrentMap<String/* Broker Name/, HashMap<Long/ brokerId /, String/Address */>> delete from topic routing table (TopicRouteTable)
          • Send heartbeat to all brokers

            • Serial control with reentrant lockHeartbeat

            • Send heartbeat to all brokers

              • Preparing heartbeat Data

                • Clientid assignment

                • Build the Set

                  • Traverse producerTable to obtain the production group
                • Build the Set

                  • Traverse consumerTable

                    • Consumer groups
                    • Consumer type, pull or push
                    • Message mode, cluster or broadcast
                    • ConsumeFromWhere
                    • Subscription data Set
                    • UnitMode
              • Traverse brokerAddrTable call mQClientAPIImpl. SendHearbeat to send all the broker heartbeat

              • Maintain brokerVersionTable, which holds the version number of each broker

              • If there are only producers, only heartbeat is sent to the primary broker

            • uploadFilterClassSource

        • 5s Perform the persistence of offset once

          • PersistAllConsumerOffset is called to the client factory’s persistAllConsumerOffset to persist all offsets

            • Iterate over the consumerTable and call the persistConsumerOffset for all consumers

              • Check whether the consumer is activated

              • Gets the message queue currently being subscribed to from the rebalanceImpl

              • PersistAll of the OffsetStore object persists the offset of all message queues that are being subscribed to

                • Broadcasting mode

                  • Write the ConcurrentMap

                    offsetTable object into a JSON string to a local file
                    ,>
                • Cluster pattern

                  • Call updateConsumeOffsetToBroker will save offset to the broker

                    • Call mQClientFactory findBrokerAddressInAdmin, looking for a broker, master-slave is ok
                    • Call getMQClientAPIImpl (). UpdateConsumerOffsetOneway
                  • Maintain offsetTable to remove message queues that are not currently subscribed to

        • Thread pool adjustments are performed once a minute

      • Start the pullMessageService to pullMessageService

        • New a thread, and start, run in the thread, pullMessageService run method

          • As long as there is no stop, do loop

            • From pullRequestQueue take PullRequest

            • Pull the message

              • Get the corresponding consumer from the consumer group in the pullRequest

              • Invoke the pull message method on the corresponding consumer (core logic)

                • Get the processing queue from the pullRequest, or return if the processing queue is dropped

                • Set a pull timestamp for processQueue to mark the last time it was pulled

                • Determine whether the consumption is started

                • Determine whether the consumer is pause

                • For flow control

                  • If the number of cached messages exceeds the threshold, the pullRequest will be delayed for 50 ms before being added to the pullRequestQueue

                  • If the size of the cached message is larger than the threshold, the pullRequest will be delayed for 50 ms before being added to the pullRequestQueue

                  • If it’s not sequential mode

                    • If the cache the span of the offset of message than consumeConcurrentlyMaxSpan, default is 2000, will the pullRequest 50 milliseconds delay in pullRequestQueue, return
                  • The sequential pattern has a special treatment

                    • Pq is locked

                      • This is not the first time a pullRequest has been locked

                        • PullRequest. SetNextOffset for offset in the store
                        • The set is locked for the first time
                    • If the PQ is not locked, delay 3s to lock the PQ during pullMessage and Rebance execution

                • If rebalanceImpl getSubscriptionInner () there is no corresponding SubscriptionData, will delay the pullRequest 50 milliseconds in pullRequestQueue, return

                • Build the pull message callback inner class

                • Call pullAPIWrapper pullKernelImpl

                  • Call findBrokerAddressInSubscribe get a broker address, can be a Lord can also from

                  • If the SQL filtering mode is used and the version of the broker is lower than V4_1_0_SNAPSHOT, the exception is exceeded

                  • The offset is not committed if the message is pulled from the broker

                  • If it is hasClassFilterFlag, do a special treatment

                  • Call mQClientFactory. GetMQClientAPIImpl (.) pullMessage pull for message

                    • Build RemotingCommand for pull messages

                    • pullMessageAsync

                      • Async callback when results are returned

                        • To obtain the response

                          • If not empty

                            • Call MQClientAPIImpl. This. ProcessPullResponse will response parsed into pullResult, after the callback pullCallback. OnSuccess

                              • Call DefaultMQPushConsumerImpl. This. PullAPIWrapper processPullResult, to deal with accordingly

                                • Maintain pullFromWhichNodeTable

                                • Filtering messages on the client

                                  • According to the tag to filter
                                  • Filter hook For filtering
                              • Yes found information

                                • Maintain the incPullRT for ConsumerStatsManager

                                • Assign the nextOffset of the pullRequest to the NextBeginOffset returned by the broker

                                • If the pullRequest is empty, the pullRequest will be queued immediately

                                • If the pulled message is not empty

                                  • Maintenance ConsumerStatsManager (). IncPullTPS

                                  • Drop the pull back message into the msgTreeMap of processQueue

                                    • If the Sink is not set, it is set
                                  • Call consumeMessageService. SubmitConsumeRequest for message consumption

                                    • Order ConsumeMessageOrderlyService consumption

                                      • If the current processQueue is not empty and is not consuming, build ComsumeRequest and submit it to the thread pool

                                        • The run method content of ConsumeRequest

                                          • Return if processQueue has been dropped

                                          • Gets the synchronized lock corresponding to this message queue

                                          • If it is broadcast mode or the PQ is locked and there is no lock timeout, messages are continually fetched from the PQ for consumption until the pQ is empty

                                            • Returns if PQ droped

                                            • If the pq is in cluster mode and is not locked, return after 10ms to try locking and consuming

                                            • If the cluster mode is used and the pq lock times out, the lock and consumption attempt is returned after 10ms

                                            • If the consumption times out (60s), the lock and consumption attempt will be returned after 10ms

                                            • Get getConsumeMessageBatchMaxSize, the default is 1

                                            • From the pq getConsumeMessageBatchMaxSize a message

                                              • In the relevant information is transferred to consumingMsgOrderlyTreeMap msgTreeMap
                                            • If the fetched message is not empty, it is consumed

                                              • Construct ConsumeMessageContext, execute hook method, executeHookBefore

                                              • Get reentrant lock Pq.getlockConsume

                                              • The callback messageListener consumes a messageListener

                                              • Unlock reentrant lock pq.getLockConsume

                                              • Build ConsumeReturnType

                                                • If the listener returns status=null

                                                  • If there is abnormal, it is ConsumeReturnType. EXCEPTION
                                                  • Otherwise for ConsumeReturnType RETURNNULL
                                                • ConsumeReturnType.TIME_OUT if the consumption times out

                                                • If ConsumeOrderlyStatus SUSPEND_CURRENT_QUEUE_A_MOMENT = = status, it is ConsumeReturnType. FAILED

                                                • If ConsumeOrderlyStatus SUCCESS = = status is ConsumeReturnType. SUCCESS

                                              • Assign ConsumeReturnType and status to ConsumeMessageContext

                                              • Execute hook method, executeHookAfter

                                              • Maintain ConsumeMessageOrderlyService. This. GetConsumerStatsManager ()

                  .incCONSUMert – Process consumption results

                  - if the listener will ConsumeOrderlyContext set to automatically submit - the status to SUCCESS - the pq submission - empty consumingMsgOrderlyTreeMap - maintain msgCount and msgSize - Return the current offset - maintain getConsumerStatsManager(). IncConsumeOKTPS -status to SUSPEND_CURRENT_QUEUE_A_MOMENT - Maintain getConsumerStatsManager().incConSumeFailedtps - Sequential mode If the maximum number of retries is not set, the client will retry indefinitely. Otherwise, if the maximum number of retries is exceeded, the server will retry. If the server fails to retry, In the client try - if on the server successfully execute pq commit - if the client is try - the message from consumingMsgOrderlyTreeMap transferred to msgTreeMap - again After waiting for the context getSuspendCurrentQueueTimeMillis, submit ComsumeRequest again, No further consumption - If listener sets ConsumeOrderlyContext to not commit automatically - SUCCESS - Maintain getConsumerStatsManager().incConSumeoktps - SUSPEND_CURRENT_QUEUE_A_MOMENT - maintaining this. GetConsumerStatsManager (.) incConsumeFailedTPS - sequential pattern if not set maximum retries, try will be infinite in the client, Otherwise, if the maximum number of retries is exceeded, the server tries again. If the server retries fail, In the client try - if the client is - the message from consumingMsgOrderlyTreeMap again transferred to msgTreeMap - after waiting for the context. GetSuspendCurrentQueueTimeMillis, Submit ComsumeRequest again, consume the message again, exit the runnable - Maintain offsetStore's offsetTable - consume on success or failure but continue on server retry, otherwise not consume - If the message retrieved is empty, Pq Consuming = false; Return - otherwise - determine whether pQ, dropped, if yes return - delay 100 ms, Try locking and consumption (thread name prefix ConsumeMessageScheduledThread_) - lock queue delay tasks run method content - - if ConsumeMessageOrderlyService does not stop the lock, GetRebalanceImpl ().lock(MQ) - Get the address of the primary broker - lock the consumption queue on the primary broker - set PQ locked and lastLockTimestamp - processQueue.setLocked(true); - processQueue.setLastLockTimestamp(System.currentTimeMillis()); - lock success - 10 ms delay in submit ConsumeRequest - lock failure - delay submit ConsumeRequest - ConsumeMessageConcurrentlyService - 3 s Use consumeBatchSize to separate the message array, Build the consumer request object ComsumeRequest - submit the ComsumeRequest to the thread pool (consumeExecutor) for execution - return if processQueue has been dropped - set the topic of the message to retry topic - ConsumeMessageContext: ConsumeMessageContext: ConsumeMessageContext: ConsumeMessageContext: ConsumeMessageContext: ConsumeMessageContext: ConsumeMessageContext Set ConsumeStartTimeStamp property - Call back consumeMessage for listener - build ConsumeReturnType enumeration - EXCEPTION if listener is called abnormally - RETURNNULL if the listener returns null - TIME_OUT if the consumption time exceeds getConsumeTimeout, Default 15 minutes - FAILED if RECONSUME_LATER is returned - SUCCESS if CONSUME_SUCCESS is returned - Save the consumption result to ConsumeMessageContext Call executeHookAfter of ConsumeMessageHook - Maintain ConsumerStatsManager()Copy the code

          IncConsumeRT – If the processQueue is not dropped, process the consumption result

          RECONSUME_LATER; ackIndex = -1; CONSUME_SUCCESS; From ConsumeConcurrentlyContext take out this value, if the user is not set the value, Its initial value is integer.max_value, which indicates that all messages were successfully consumed - statistics are maintained based on consumption results - The consumer successfully maintains getConsumerStatsManager().incConSumeoktps and getConsumerStatsManager().IncConSumeFailedtps - Consumer failure maintenance getConsumerStatsManager().incConSumeFailedtps - Processing of consumer failure messages - broadcast mode - printing only logs, No more retries - cluster mode - send failed consuming messages to the server - collect failed sending messages, consume them + 1, delay 5s, before throwing them to consumeExecutor to consume them - remove them from processQueue, But not included in the message retried by the client - offset information in the offsetStore that maintains the consumer - offset indicates the offset from which the message should currently be pulled from the broker, - Delay the pullRequest and put the PullInterval() into the queue. - No matching message or no new message. - Update the NextoffSet of the pullRequest = PullResult getNextBeginOffset - will pullRequest input queue - if ProcessQueue msgTreeMap was not used to store consumption of news is empty, Will offset offsetstore object updates to pullResult getNextBeginOffset - the offset error - will processQueue drop shows that current consumption after the queue - consumers are not performed - after 10 s Persisting the offset of the queue to NextBeginOffset in the pullResult - deleting the corresponding message queue in processQueueTable - then it will cause exception processing in balancing - not empty - delay 3s before the pullRequest is put into the queueCopy the code
      • Start in balancing service, rebalanceService

        • RebalanceService: new a thread and start it to run rebalanceService’s run method

          • As long as there is no stop, do loop

            • Waiting for 20 s

            • Execute the doRebalance method in the client factory

              • Walk through the consumerTable and call doRebalance on all consumers

                • If pause is not setting, call the rebalanceImpl. DoRebalance

                  • Iterate through subscriptionInner, get all topics subscribed by the consumer, rebalanceByTopic, rebalanceByTopic

                    • rebalanceByTopic

                      • Allocates the message queue

                        • Broadcasting mode

                          • No allocation is required. TopicSubscribeInfoTable All queues that correspond to this topic are all queues that are assigned
                        • Cluster pattern

                          • All queues corresponding to this topic from topicSubscribeInfoTable
                          • Call findConsumerIdList in the client factory to get all the consumer ids for this topic, the consumer group
                          • Calls the message queue allocation strategy (AllocateMessageQueueStrategy) that is allocated to the message queue
                      • Update the process queue table to updateProcessQueueTable

                        • Deletes message queues that are no longer subscribed or that have pulled out of time

                          • Get the OffsetStore object to save the current offset

                          • Maintain offsetTable in the OffsetStore object and delete the corresponding message queue

                          • Cluster mode and sequential mode has a special treatment

                            • If pq.getlockConsume () can be obtained within a second, return true and the message queue can be deleted

                              • Go to the broker oneWay to unlock it

                                • If the current PQ has a message, the delay is 20, resolving the lock
                                • Resolve the lock immediately if there is no message
                            • If not, return false and the message queue cannot be deleted

                          • Deletes the message queue from processQueueTable

                        • Process message queues for new subscriptions

                          • There is a special treatment for sequential patterns

                            • Lock mq

                              • Server locking
                              • processQueue.setLocked(true);

              processQueue.setLastLockTimestamp(System.currentTimeMillis());

              - If the lock fails, the message queue will be skipped. - Calculate offset for the newly subscribed message queue. And maintain offsetTable in offsetStore - get the ConsumeFromWhere enumeration configured in the consumer - get the offsetStore object configured in the consumer - calculate offsets based on the configured ConsumeFromWhere - Calloffsetstore. readOffset(mq, Readoffsettype. READ_FROM_STORE) - broadcast mode - obtain offset from file - Maintain offsetTable in offsetStore: ConcurrentMap<MessageQueue, AtomicLong> - cluster mode - get the MessageQueue offset from the Broker - maintain offsetTable in offsetStore: ConcurrentMap<MessageQueue, AtomicLong> - if there is no offset - if it is a retry queue, Offset = 0 - otherwise call mQClientFactory. GetMQAdminImpl () maxOffset (mq) - CONSUME_FROM_FIRST_OFFSET computing - CONSUME_FROM_TIMESTAMP - add new subscribed message queue - build pullRequest for new message queue - iterate through the pullRequest, Execute pull messages in sequence - put the pullRequest into the pullRequestQueue of the client factory's PullMessageService - if there are changes to the message queue allocated after balancing and the previously allocated message queue, otherwise maintain the various messages and send heartbeatCopy the code
      • The start production group is DefaultMQProducer of CLIENT_INNER_PRODUCER, and this call does not start the client factory

  • Processing traceDispatcher

shutdown

  • this.defaultMQProducerImpl.shutdown();

    • DefaultMQProducerImpl is processed only if the status of DefaultMQProducerImpl is RUNNING for ServiceState

      • Unregister in the MQ client factory

        • this.producerTable.remove(group);
        • Inform all brokers that the producer is unregistered
      • Close the thread pool that sends messages asynchronously

      • Shut down the MQ client factory

        • Whether the client factory can be closed after determining whether the current consumer is closed

          • ! This. ConsumerTable. IsEmpty () is not closed
          • ! This. AdminExtTable. IsEmpty () is not closed
          • This.producertable.size () > 1 do not close
        • Shut down the MQ client factory

          • The client factory is closed only when the state is RUNNING

            • Close the default producers, but not to close the client factory this. DefaultMQProducer. GetDefaultMQProducerImpl (). Shutdown (false);

            • this.serviceState = ServiceState.SHUTDOWN_ALREADY;

            • Disable the pullMessageService to pullMessageService

              • Close the close interface for the parent ServiceThread

                • Start set to false
                • Stop setting true
                • HasNotified set true
                • countdownlatch countdown
                • Interrupts the pullMessageService thread
                • If it is not a daemon thread, the current thread waits for the pullMessageService thread to finish
              • Example Shut down the scheduled task executor

            • Example Close the scheduled task thread pool

            • Close the rebalancing of the service, enclosing rebalanceService. Shutdown ();

            • this.mQClientAPIImpl.shutdown();

              • Close all channels
              • Empty channelTables
              • Close the eventLoopGroupWorker
              • nettyEventExecutor ! = null, nettyEventExecutor is disabled
              • Close the defaultEventExecutorGroup
              • Close the publicExecutor
            • UDP socket If the UDP socket is not empty, disable the UDP socket

            • Where to remove the factory from the factory manager, MQClientManager. GetInstance (). RemoveClientFactory (enclosing clientId);

      • this.serviceState = ServiceState.SHUTDOWN_ALREADY;

  • If tracing messages, traceDispatcher.shutdown();

SendResult send(

    Message msg)
Copy the code
  • The message validation

    • Message non-null validation

    • Topic check

      • Topic non-null test
      • Topic A-ZA-Z0-9_ – Must be these characters
      • Topic Length The length of the verification cannot exceed 255
      • Topic cannot be TBW102
    • Body non-null validation

    • Bod size test, not more than 4m

  • defaultMQProducerImpl.send(msg)

    • send(Message msg, long timeout)

      • sendDefaultImpl(

      Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout)

      - Verify that the sender implementation has been started - find topic publishing info TopicPublishInfo: TryToFindTopicPublishInfo - see if there is a corresponding TopicPublishInfo topicPublishInfoTable, have and effective returns - otherwise, from ns topic routing updates, By calling the client factory updateTopicRouteInfoFromNameServer (topic) - with a reentrant lock control, ensure the operation is serial - get TopicRouteData, By calling the mQClientAPIImpl. GetTopicRouteInfoFromNameServer - returns true if there is changed - maintenance topicRouteTable: ConcurrentMap<String/* Topic */, TopicRouteData> - Maintain brokerAddrTable: ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> - Convert TopicRouteData to TopicPublishInfo - Maintain topicPublishInfoTable for each DefaultMQProducerImpl: ConcurrentMap<String/* topic */, TopicPublishInfo> - Converts TopicRouteData to Set<MessageQueue> - Maintain every DefaultMQPushConsumerImpl or DefaultMQPullConsumerImpl enclosing rebalanceImpl. TopicSubscribeInfoTable: ConcurrentMap<String/* topic */, Set < MessageQueue > > - didn't change - if you return false on asynchronous access to effective TopicPublishInfo returns - otherwise call updateTopicRouteInfoFromNameServer (topic, true, This.defaultmqproducer) - Get TopicPublishInfo from TBW102 - select a message queue - send the message sendKernelImpl - Find the address of the primary broker - if not in the cache, Call the tryToFindTopicPublishInfo () to obtain a VIP channels have a special treatment - if the message is not MessageBatch, Uniid-id = IP + PID + class loader hashcode + timestamp + an increasing value - if namespace is not null, InstanceId = namespace - compressed message - If MessageBatch, Without compression - if the message than his. DefaultMQProducer. GetCompressMsgBodyOverHowmuch compression (), Default 4k - Deflater compressed with java.util.zip.Deflater - set compression flag if compressed - TRAN_MSG property if true, transaction message Set transaction flag bit - if there is CheckForbiddenHook, then check the CheckForbiddenHook list, execute checkForbidden - If there is a send message hook, check the hook, Execute sendMessageBefore - build SendMessageRequestHeader - set other attributes - set message except body attributes into SendMessageRequestHeader - if sending a retry message - SetReconsumeTimes - setMaxReconsumeTimes - Sends messages based on communication mode - Sends messages in asynchronous mode - Determines timeout - mQClientFactory.getMQClientAPIImpl().sendMessage(Copy the code

      final String addr, final String brokerName, final Message msg, final SendMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int retryTimesWhenSendFailed, final SendMessageContext context, final DefaultMQProducerImpl producer

    )

      					- 构建RemotingCommand
    
      						- 如果是sendSmartMsg (默认为true)或者 是批量消息,则用SendMessageRequestHeaderV2替换与原来的SendMessageRequestHeader,来构建RemotingCommand
      						- 否则用SendMessageRequestHeader来构建RemotingCommand
      						- 将message的body赋值到RemotingCommand里面
    
      					- 根据通信模式进行消费的发送
    
      						- oneway调用
    
      							- this.remotingClient.invokeOneway
    
      								- 获取或者创建渠道
      								- 执行钩子doBeforeRequest
      								- invokeOnewayImpl
    
      									- 将请求的报文的flag的oneway位置位
      									- 在超时时间里获取oneway信号量
      									- 调用异步写,并加入监听器,写操作完成的时候会回调channel.writeAndFlush(request).addListener
    
      										- 监听器的内容
    
      											- 释放信号量
      											- 如果写不成功,打印日志
    
      									- 如果在超时内没有获取到信号量,则抛出异常
    
      										- 超时间为=0
    
      											- 则说明超时时间设的太短了,抛出RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast")
    
      										- 超时间大于0
    
      											- 则抛出RemotingTimeoutException并且附上目前oneway信号量的信息
    
      							- 返回null
    
      						- 异步调用
    
      							- 超时判断
      							- 调用sendMessageAsync
    
      								- new一个InvokeCallback,然后remotingClient 调用invokeAsync
    
      									- 获取或者创建连接
      									- 获取不到有效连接则关闭连接,抛出异常
      									- 调用钩子
      									- 如果超时则抛出异常
      									- invokeAsyncImpl
    
      										- 在超时间内获取异步调用的信号量,默认是65535,也就是同一个时间最多允许65535个异步调用
      										- 如果在超时内没有获取到信号量,则抛出异常
    
      											- 超时间为=0
    
      												- 则说明超时时间设的太短了,抛出RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast")
    
      											- 超时间大于0
    
      												- 则抛出RemotingTimeoutException并且附上目前异步信号量的信息
    
      										- 获取到信号量执行异步调用
    
      											- 超时处理
    
      												- 如果超时则,释放信号量,并抛出异常RemotingTimeoutException("invokeAsyncImpl call timeout");
    
      											- 构建ResponseFuture
    
      												- channel
      												- opaque
      												- timeoutMillis
      												- invokeCallback
      												- SemaphoreReleaseOnlyOnce
    
      											- put进responseTable
    
      												- this.responseTable.put(opaque, responseFuture);
    
      											- 调用异步写,并加入监听器,写操作完成的时候会回调channel.writeAndFlush(request).addListener
    
      												- 监听器的内容
    
      													- 如果写成功则设置发送请求成功responseFuture.setSendRequestOK(true);
      													- 失败,进行失败处理
    
      														- 从responseTable移除
      														- responseFuture 设置发送请求失败responseFuture.setSendRequestOK(false);
      														- 设置响应报文为空
      														- 执行回调
    
      															- 在回调线程池中进行回调,callbackExecutor为null,则用publicExecutor
    
      														- 释放信号量
    
      								- InvokeCallback的内容
    
      									- 如果sendCallback为null,且response不为null
    
      										- 从响应报文解析出SendResult,  MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
      										- 将SendResult设置进SendMessageContext中
      										- 回调钩子方法sendMessageAfter
      										- 维护发送者的失败策略
    
      									- 如果sendCallback不为null,且response不为null
    
      										- 从响应报文解析出SendResult,  MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
      										- 将SendResult设置进SendMessageContext中
      										- 回调钩子方法sendMessageAfter
      										- 回调sendCallback的onSuceess方法,sendCallback.onSuccess(sendResult);
      										- 维护发送者的失败策略
    
      									- 如果response=null
    
      										- 维护发送者的失败策略
      										- 进行失败原因分析
    
      											- 构建异常
    
      												- 如果写请求失败,new MQClientException("send request failed", responseFuture.getCause());
      												- 如果写成功,但是超时没有返回结果,new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
                          responseFuture.getCause());
      												- 否则new MQClientException("unknow reseaon", responseFuture.getCause());
    
      											- 将异常设置进SendMessageContext
      											- 调用钩子sendMessageAfter
      											- 回调sendCallback.onException方法sendCallback.onException(e);
    
      							- 返回空
    
      						- 同步调用
    
      							- 超时处理
      							- this.remotingClient.invokeSync
    
      								- 获取或者创建渠道channel,就是连接
    
      									- 如果地址为null,表明是要获取或创建ns的链接
    
      										- 从namesrvAddrChoosed原子引用中获取ns地址
      										- 如果ns地址不为null,则从channelTables中看是否存在连接,存在且ok,则返回
      										- 串行创建和ns的链接
    
      											- 如果ns地址不为null,则从channelTables中看是否存在连接,存在且ok,则返回
      											- 遍历namesrvAddrList创建和ns的链接
    
      												- 从namesrvAddrList选择一个ns作为namesrvAddrChoosed
      												- 创建连接
    
      									- 否则则获取或创建和broker的连接
    
      										- 看ConcurrentMap<String /* addr */, ChannelWrapper> channelTables中是否存在连接
    
      											- 所以在一个java进程中到一个broker中只有一个连接
    
      										- 如果存在,且连接是active则返回连接
      										- 否则创建连接,返回null表示创建连接失败
    
      											- channelTables如果中存在该连接,且连接时ok的,则关掉该连接,且删除该连接
      											- 串行创建连接
    
      												- 获取lockChannelTables,可重入锁,并做3秒的超时
      												- 抢到锁
    
      													- 再次检查channelTables是否存在该连接
      													- 不存在则创建
    
      														- ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
      														- cw = new ChannelWrapper(channelFuture);
      														- this.channelTables.put(addr, cw);
    
      													- 存在,且ok则关闭连接->删除->创建
      													- 存在,但还未完成3握手,则不创建
    
      												- 在超时间内没有抢到锁 打印日志
      												- 如果cw为空,则返回为null
      												- cw不为空,则等待连接完成
    
      													- 在连接超时的时间内等待连接完成,this.nettyClientConfig.getConnectTimeoutMillis())
      													- 如果在超时内连接完成
    
      														- 连接成功,返回channel
      														- 连接失败,打印日志返回null
    
      													- 超时时间内没有完成连接,返回null
    
      								- 如果连接不active则关闭连接,并抛出异常
      								- 否则进行同步调用
    
      									- 执行rpc钩子的doBeforeRequest方法
      									- 超时处理
      									- 同步调用invokeSyncImpl,netty所有的io操作都是异步的,所以这个地方构建一个ResponseFuture,用一个CountDownLatch 做一个等待,如果写失败,则在回调线程里面释放countDownLatch,否则等收到写响应报文后释放countDownLatch
    
      										- 构建ResponseFuture
      										- 用opaque为key, ResponseFuture 为value,put进NettyRemotingAbstract.responseTable里面
      										- 异步写RemotingCommand,channel.writeAndFlush(request)
      										- add listenser用于监听写操作完成
    
      											- 如果写操作成功则responseFuture.setSendRequestOK(true);返回,主线程继续等待,等待响应报文的来临
      											- 如果写失败
    
      												- esponseFuture.setSendRequestOK(false)
      												- responseTable删除该responseFuture
      												- responseFuture设置异常原因responseFuture.setCause(f.cause());
      												- responseFuture设置response=null,同时释放countdownlatch,主线程返回
    
      										- 用countdownlatch 做超时等待
      										- 从countdownlatch 做超时等待的超时中返回
    
      											- countdownlatch超时到
      											- 超时时间未到, 写失败
      											- 超时未到,写成功,并读取到响应报文
    
      										- 如果响应报文为空则超出异常
    
      											- 如果写成功则抛出RemotingTimeoutException, 表明是broker出现问题
      											- 如果写失败则抛出RemotingSendRequestException,发送请求异常
    
      										- 不为空则返回响应报文
      										- finally中 this.responseTable.remove(opaque);
    
      									- 调用rpc钩子的doAfterResponse
      									- 返回response
    
      							- 将响应报文RemotingCommand转化为SendResult
    
      								- 如果响应码,不是ResponseCode.FLUSH_DISK_TIMEOUT,ResponseCode.FLUSH_SLAVE_TIMEOUT,ResponseCode.SLAVE_NOT_AVAILABLE,ResponseCode.SUCCESS中的一种,则抛出异常MQBrokerException
      								- 否则组装SendResult对象
    
      									- 根据响应码构建SendStatus
      									- 从响应报文的extFields中解析出SendMessageResponseHeader对象
      									- SendStatus赋值
      									- msgId=uniqMsgId
      									- offsetMsgId=服务端返回的offsetMsgId
      									- queueOffset赋值
      									- messageQueue赋值
      									- transactionId赋值
      									- regionId赋值
      									- traceon赋值
    
      								- 返回SendResult对象
    
      			- 同步和oneway方式的发送
    
      				- 超时判断
      				- mQClientFactory.getMQClientAPIImpl().sendMessage(
      final String addr,
      final String brokerName,
      final Message msg,
      final SendMessageRequestHeader requestHeader,
      final long timeoutMillis,
      final CommunicationMode communicationMode,
      final SendMessageContext context,
      final DefaultMQProducerImpl producer
    Copy the code

    )

    - If there is a hook to send a message, traverse the hook and execute sendMessageAfter - Retry - Whether to switch to another broker - timeout - exception handlingCopy the code

Void send(Message MSG,SendCallback,SendCallback)

  • Call sendDefaultImpl in the asynchronous sending thread pool

Void sendOneway(Message MSG)

  • msg.setTopic(withNamespace(msg.getTopic()));

  • this.defaultMQProducerImpl.sendOneway(msg);

    • this.sendDefaultImpl

Batch sending messages SendResult (

    Collection<Message> msgs)
Copy the code
  • Build MessageBatch

    • Generate MessageBatch

      • check

        • There can be no delayed messages
        • Could not have retry message
        • Topic needs to be the same
        • The isWaitStoreMsgOK property needs to be the same
      • new MessageBatch

      • messageBatch.setTopic(first.getTopic());

      • messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());

    • Validators. CheckMessage (message, this);

    • Iterate over all messages and set the UNIQ_KEY property

    • By encoding all the messages and storing them in the message body field, a single message becomes a normal message, and the batch of messages is passed just like a message

  • Synchronous Message sending

Sending transaction messages

  • If the transaction listener is not set, an exception is thrown

  • Set a namespace for topic

  • Sending transaction messages

    • Check the message

    • Set transaction attribute true

    • Set production group properties

    • Synchronous message sending

    • Processing sending results

      • Performing local transactions

        • If it is successfully sent

          • Set properties __transactionId__
          • Set the property transactionId to the unikey of the message
          • The callback transaction listener executes the local transaction and gets the local transaction status
        • If the result is FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT, or SLAVE_NOT_AVAILABLE, The local transaction status to roll back the message LocalTransactionState. ROLLBACK_MESSAGE

        • Other state of affairs for the unknown LocalTransactionState. UNKNOW

      • Sends the local transaction execution result

        • Decodes MessageId from offsetMsgId or MsgId

        • Seek out the primary broker

        • Build EndTransactionRequestHeader

          • Setting transaction ID

          • Set commitLogOffset, requestHeader setCommitLogOffset (id. GetOffset ());

          • Set setCommitOrRollback based on the local transaction status

            • COMMIT_MESSAGE

              • requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            • ROLLBACK_MESSAGE

              • requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            • UNKNOW

              • requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
          • requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());

          • requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());

          • requestHeader.setMsgId(sendResult.getMsgId());

        • Call remotingClient. InvokeOneway will EndTransactionRequestHeader sent out

Requests sent and processed by the client

client

  • Unregister, requestcode.unregister_client
  • Local end of transaction request, requestcode.end_TRANSACTION
  • Send a message, requestcode. SEND_BATCH_MESSAGE, requestcode. SEND_MESSAGE_V2, requestcode. SEND_MESSAGE
  • Requestcode. GET_ROUTEINTO_BY_TOPIC Sends topic routing information to NS to obtain routing information returned by NS
  • Requestcode.get_consumer_list_by_group sends a consumer group to a broker to obtain the machines in the consumer group
  • Query the consumer group’s consumption offset for a queue, requestcode. QUERY_CONSUMER_OFFSET
  • Get the maximum offset for a queue in topic, requestcode.get_max_offset
  • Pull the message, requestCode.pull_message
  • Queue locking in sequential mode requestCode.lock_batCH_MQ
  • Send the heartbeat request code.heart_beat

server

  • If the request code is CHECK_TRANSACTION_STATE, then checkTransactionState is called

    • The child theme Decoding the CheckTransactionStateRequestHeader1 from the response message

    • Decoded MessageExt from the response message

    • Send the UNIQ_KEY attribute from messageExt, and assign it to setTransactionId if it is not an empty string

    • Fetch the PROPERTY_PRODUCER_GROUP property from messageExt, which is the production group

    • The producer that pulls the response from the client factory with the production group

    • The producer is then called checkTransactionState

      • Build a runnable to submit to the checkExecutor thread pool for execution

        • The content of the runnable

          • Gets the transaction listener

          • Callback the transaction listener’s checkLocalTransaction method to get the transaction status

          • Process transaction state

            • Build EndTransactionRequestHeader

              • thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
              • thisHeader.setProducerGroup(producerGroup);
              • thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
              • thisHeader.setFromTransactionCheck(true);
              • thisHeader.setMsgId(uniqueKey);
              • thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
              • Set setCommitOrRollback based on the transaction state
            • If the check has thrown a record abnormal information remark = “checkLocalTransactionState Exception:” + RemotingHelper. ExceptionSimpleDesc (Exception);

            • this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,3000);

              • Build RemotingCommand
              • this.remotingClient.invokeOneway(addr, request, timeoutMillis);
    • Returns null

  • Notifies the consumer of the id change with the RequestCode requestcode.notify_consumer_ids_changed

    • To balance this with immediate effect. RebalanceService. Wakeup ();
    • return null
  • Reset the consumer’s offset, requestcode.reset_consumer_client_offset

    • Topic, group, offsetTable (Map<MessageQueue, Long>)

    • Get the corresponding consumer from consumerTable

    • Suspend the consumer and set Pause to true

    • MessageQueue, whose offset is to be reset, stops consuming first

      • ProcessQueue.setDropped(true);
      • ProcessQueue.clear();
    • Sleep 10 s

    • Resets the offset of the relevant consumption queue

    • Persist these offsets

    • Delete these offsets

    • Delete messageQueue and processQueue

    • pause = false

    • In the balance

    • return null

  • Get the client consumption offset, requestcode. GET_CONSUMER_STATUS_FROM_CLIENT

    • Get ConcurrentMap

      offsetTable
      ,>
    • Encapsulate offsetTable into RemotingCommand
    • return RemotingCommand
  • Get the consumer run information, requestcode.get_consumer_running_info

    • Decoded from RemotingCommand GetConsumerRunningInfoRequestHeader, access to consumer groups

    • ConsumerRunningInfo retrieves the running information of the corresponding consumer according to the consumer group

      • Get the consumer MQConsumerInner from consumerTable

      • Get the consumer run information ConsumerRunningInfo from the consumer MQConsumerInner

        • The pull handle

        • Push handle

          • new ConsumerRunningInfo

          • ConsumerRunningInfo Sets whether to consume sequentially

          • ConsumerRunningInfo sets the number of core threads in the consuming thread pool

          • ConsumerRunningInfo gets the consumer’s startup time

          • ConsumerRunningInfo sets up the consumer’s subscription information

          • ConsumerRunningInfo sets the queue consumption, the current offset

          • The ConsumerRunningInfo sets consumption information for each topic of the consumer

            • Get this information from the statistics module ConsumerStatsManager
      • Obtain the NS address list

      • ConsumerRunningInfo Sets the NS address list

      • ConsumerRunningInfo sets the consumption type pull or push

      • ConsumerRunningInfo Gets the client version

    • If ConsumerRunningInfo is not null, it determines whether the JStack information is required, and if so, it sets the jStack on the response message

      • Map<Thread, StackTraceElement[]> Map = thread.getallStacktraces ();

      • Convert the thread stack information to a string and set it to ConsumerRunningInfo

        • Iterate through all the thread stack information of StackTraceElement [], StackTraceElement. The toString (), all the thread’s stack information synthesis of a string
    • If null, responsecode.system_error is returned with an error message

  • Consume a message directly, requestcode. CONSUME_MESSAGE_DIRECTLY

    • Decoding the ConsumeMessageDirectlyResultRequestHeader and MessageExt from the response message

    • News consumption

      • Get the response consumption from the consumerTable

      • consumer.getConsumeMessageService().consumeMessageDirectly

        • Concurrent consumption

          • ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult()
          • result.setOrder(false);
          • result.setAutoCommit(true);
          • The listener is called to consume the message
          • Set consumption results
          • Set consumption Time
        • Order consumption

          • ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult()
          • result.setOrder(true);
          • Call the Listener for consumption
          • Set consumption results
          • Set whether to commit automatically. Sequential consumption can be set in the listener. The default value is auto commit
          • Set consumption time
    • If the consumption result is null, responsecode.system_error is returned

    • If the consumption result is not null, return responsecode.success and write the consumption result away

DefaultMQPushConsumer

subscribe(String topic, String subExpression)

  • defaultMQPushConsumerImpl.subscribe

    • Build the SubscriptionData class SubscriptionData

      • The subscription expression is set to * if it is empty
      • If the subscribe expression is not empty, use | | will subscribe expression is separated, to add the tag to the Set of tag tagsSet hashcode join the Set
    • RebalanceImpl ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner

    • Call the client factory to send heartbeats to all brokers

registerMessageListener

  • DefaultMQPushConsumer assignment
  • DefaultMQPushConsumerImpl assignment

start()

  • defaultMQPushConsumerImpl.start()

    • Check the configuration

      • Consumer group check

        • The consumer group cannot be empty
        • Consumer advocate must meet ^ [% | a – zA – Z0 – _ – 9] + $
        • The length cannot exceed 255
        • The consumer group cannot be DEFAULT_CONSUMER
      • Message patterns cannot be empty

      • ConsumeFromWhere cannot be empty

      • The timestamp cannot be empty

      • AllocateMessageQueueStrategy cannot be empty

      • Subscription information Subscription cannot be empty

      • MessageListener cannot be empty

      • messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently

      • consumeThreadMin Out of range [1, 1000]

      • consumeThreadMax Out of range [1, 1000]

      • consumeThreadMin can’t be larger than consumeThreadMax

      • consumeConcurrentlyMaxSpan Out of range [1, 65535]

      • pullThresholdForQueue Out of range [1, 65535]

      • pullThresholdForTopic Out of range [1, 6553500]

      • pullThresholdSizeForQueue Out of range [1, 1024]

      • pullThresholdSizeForTopic Out of range [1, 102400]

      • pullInterval Out of range [0, 65535]

      • consumeMessageBatchMaxSize Out of range [1, 1024]

      • pullBatchSize Out of range [1, 1024]

    • Assign subscription information

      • DefaultMQPushConsumer Map
        Subscription is not empty Copy the data to the rebalanceImpl’s subscriptionInner

      • If cluster mode is used

        • Tectonic retryTopic

          • RETRY % % + consumer groups
        • Tectonic SubscriptionData

        • Maintain it in a subscriptionInner of rebalanceImpl

    • In cluster mode, if the instance name is DEFAULT, change the instance name to PID

    • Gets or creates a client factory

    • Give rebalanceImpl assignment

      • Consumer groups
      • Message mode, broadcast or cluster
      • Message queue allocation policy
      • Client factory
    • Build PullAPIWrapper

    • Build offsetStore

      • If cluster mode is used

        • RemoteBrokerOffsetStore
      • In broadcast mode

        • LocalFileOffsetStore
    • offsetStore.load

      • Cluster pattern

        • The method body is empty
      • Broadcasting mode

        • Read ConcurrentMap

          offsetTable from local file and load it into OffsetStore
          ,>
    • Build ConsumeMessageService

      • If it is MessageListenerOrderly

        • Build ConsumeMessageOrderlyService
      • If it is MessageListenerConcurrently

        • Build ConsumeMessageConcurrentlyService
    • Start the ConsumeMessageService

    • Register consumers in the client factory

    • Start the client factory

    • Update subscription messages when subscription information changes

    • checkClientInBroker

    • Send heartbeats to all brokers

    • Immediately in balance

  • TraceDispatcher processing