preface
Message queues typically provide push and pull modes on the consumer side. RocketMQ implements both modes, providing two implementation classes DefaultMQPushConsumer and DefaultMQPullConsumer. Both approaches have their own advantages:
Push mode: in push mode, the server immediately pushes messages to the client after it has data. The client and the server need to establish a long connection, which has high real-time performance and is simple for the client to receive and process messages. The disadvantage is that the server does not know the ability of the client to process messages, which may cause data backlog, increase the workload of the server, and affect the performance of the server.
Pull mode: pull mode, namely the client to the server actively pull data, the initiative on the client side, pull the data, and then process the data, pull the data again, the cycle continuously, the specific bad pull time interval of data set, too short may lead to a large number of connection couldn’t pull data, is too long cause data not timely; RocketMQ uses long polling, which combines the advantages of both push and pull modes. Here is a brief introduction to long polling and further analysis of RocketMQ’s built-in long polling mode.
Long polling
Through the cooperation between the client and the server, long polling can achieve the initiative in the client, but also to ensure the real-time data; Long polling is polling in essence, but the optimization of ordinary polling, the server does not immediately return data when there is no data, will hold the request, waiting for the server to have data, or there has been no data timeout processing, and then the cycle continues; Let’s look at how to implement a long poll simply;
1. Implementation steps
1.1 Client Polling sends requests
The client should have a loop that continuously sends requests for messages to the server.
1.2 The server processes data
After the server receives the client request, it first checks whether there is data, if there is data, it returns directly, if there is no data, it keeps the connection, waiting to obtain data, after the server gets data, it will notify the previous request connection to obtain data, and then returns to the client;
1.3 Client Receives Data
In normal cases, the client receives the data immediately or waits for a period of time to obtain the data. If the data cannot be retrieved, there will be timeout processing; The connection is closed after the data has been retrieved or timed out, and the long poll request is made again.
2. Implementation examples
The following uses netty to simulate an HTTP server, uses HttpURLConnection to simulate the client to send requests, uses BlockingQueue to store data;
Server code
public class Server {
public static void start(final int port) throws Exception {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup woker = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
try {
serverBootstrap.channel(NioServerSocketChannel.class).group(boss, woker)
.childOption(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("http-decoder", new HttpServerCodec()); ch.pipeline().addLast(new HttpServerHandler()); }}); ChannelFuture future = serverBootstrap.bind(port).sync(); System.out.println("server start ok port is "+ port); DataCenter.start(); future.channel().closeFuture().sync(); } finally { boss.shutdownGracefully(); woker.shutdownGracefully(); } } public static void main(String[] args) throws Exception { start(8080); }}Copy the code
Netty supports HTTP by default. You can use it directly. Port 8080 is used as the boot port. Start the data center service at the same time. The code is as follows:
public class DataCenter {
private static Random random = new Random();
private static BlockingQueue<String> queue = new LinkedBlockingQueue<>();
private static AtomicInteger num = new AtomicInteger();
public static void start() {
while (true) {
try {
Thread.sleep(random.nextInt(5) * 1000);
String data = "hello world" + num.incrementAndGet();
queue.put(data);
System.out.println("store data:" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static String getData() throws InterruptedException {
returnqueue.take(); }}Copy the code
In order to simulate the situation where the server has no data and needs to wait, BlockingQueue is used to insert data irregularly into the queue and provide a method to obtain the data. This method uses the take method. No data will block until it has data. GetData is used in the HttpServerHandler class, which is also very simple, as follows:
public class HttpServerHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof HttpRequest) {
FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
httpResponse.content().writeBytes(DataCenter.getData().getBytes());
httpResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8"); httpResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, httpResponse.content().readableBytes()); ctx.writeAndFlush(httpResponse); }}}Copy the code
After getting the request from the client, it gets a message from the data center, and if there is no data, it waits until there is data. It is then returned to the client using FullHttpResponse; The client uses HttpURLConnection to set up a connection with the server. The code is as follows:
public class Client {
public static void main(String[] args) {
while (true) {
HttpURLConnection connection = null;
try {
URL url = new URL("http://localhost:8080");
connection = (HttpURLConnection) url.openConnection();
connection.setReadTimeout(10000);
connection.setConnectTimeout(3000);
connection.setRequestMethod("GET");
connection.connect();
if (200 == connection.getResponseCode()) {
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
StringBuffer result = new StringBuffer();
String line = null;
while((line = reader.readLine()) ! = null) { result.append(line); } System.out.println("Time." + new Date().toString() + "result = " + result);
} finally {
if(reader ! = null) { reader.close(); } } } } catch (IOException e) { e.printStackTrace(); } finally {if(connection ! = null) { connection.disconnect(); } } } } }Copy the code
This is a simple simulation of long polling, but let’s look at how RocketMQ implements long polling;
RocketMQ long polling
RocketMQ’s consumer side provides two consumption modes: DefaultMQPushConsumer and DefaultMQPullConsumer. DefaultMQPushConsumer is a long polling mode, so we will focus on this mode.
1. PullMessage service
From the name can be seen as a client from the server to pull data service, look inside a core method:
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while(! this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
Copy the code
The PullRequest can be regarded as the parameters needed to pull data. The partial code is as follows:
public class PullRequest {
private String consumerGroup;
private MessageQueue messageQueue;
private ProcessQueue processQueue;
private long nextOffset;
private boolean lockedFirst = false; . Omit... }Copy the code
Each MessageQueue corresponds to a PullRequest encapsulated because the pull data is in the Queue below each Broker and there is also a ProcessQueue inside. Each MessageQueue also corresponds to a ProcessQueue. Saves a snapshot of the MessageQueue message processing status; NextOffset is used to identify where to read; Moving on to the pullMessage, send the request header to the server:
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
return pullResult;
Copy the code
SuspendTimeoutMillis sets the maximum blocking time of the Broker. The default is 15 seconds. If no message is sent, it will return immediately.
2. PullMessageProcessor service
As you can see from the name of the service used by the server to process the pullMessage, let’s focus on the processRequest method, which includes handling the different results:
switch (response.getCode()) {
caseResponseCode.SUCCESS: ... Omit...break;
case ResponseCode.PULL_NOT_FOUND:
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if(! this.brokerController.getBrokerConfig().isLongPollingEnable()) { pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); } String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData); this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null;break;
}
case ResponseCode.PULL_RETRY_IMMEDIATELY:
break;
caseResponseCode.PULL_OFFSET_MOVED: ... Omit...break;
default:
assert false;
Copy the code
PullRequestHoldService (responsecod. not_found, PullRequestHoldService) is called. As the name indicates, this service is used to hold the request and does not return it immediately. The response is null and no response is given to the client. To focus on the PullRequestHoldService:
@Override
public void run() {
log.info("{} service started", this.getServiceName());
while(! this.isStopped()) { try {if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info("{} service end", this.getServiceName());
}
Copy the code
Pullrequest () : suspendPullRequest () {responsecode. PULL_NOT_FOUND ();
private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
new ConcurrentHashMap<String, ManyPullRequest>(1024);
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (null == mpr) {
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
if(prev ! = null) { mpr = prev; } } mpr.addPullRequest(pullRequest); }Copy the code
Put the PullRequest to be processed by the hold into a ConcurrentHashMap and wait to be checked. The specific check code is in checkHoldRequest:
private void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
try {
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e); }}}}Copy the code
This method is used to obtain the maximum offset under the specified messageQueue, which is then compared with the current offset to determine whether a new message is coming. Look at the notifyMessageArriving method below:
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if(mpr ! = null) { List<PullRequest> requestList = mpr.cloneListAndClear();if(requestList ! = null) { List<PullRequest> replayList = new ArrayList<PullRequest>();for (PullRequest request : requestList) {
long newestOffset = maxOffset;
if (newestOffset <= request.getPullFromThisOffset()) {
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
}
if (newestOffset > request.getPullFromThisOffset()) {
if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue; }}if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
replayList.add(request);
}
if(! replayList.isEmpty()) { mpr.addPullRequest(replayList); }}}}Copy the code
Two important decisions in the method are: compare the current offset with maxoffset to see if a new message arrives and is returned to the client; The other is to compare the current time with the blocking time to see if the maximum blocking time is exceeded. This method is called not only in the PullRequestHoldService service class but also when the message is stored in DefaultMessageStore. In fact, it is active inspection and passive notification of two ways.
3. PullCallback callback
After processing the message, the PullCallback is called back to the client. After processing the message, the important step is to put the pullRequest into the PullMessageService service again and wait for the next polling.
conclusion
This article begins with an introduction to the two modes of consuming messages, the advantages and disadvantages of each, then introduces long polling, briefly simulates long polling locally, and finally highlights how long polling is implemented in RocketMQ.
Sample code address
Github Gitee