This article describes some of the best practices associated with Pulsar client coding and provides commercially available sample code for your research and development to improve your Pulsar access efficiency. In production environments, Pulsar address information is usually obtained through configuration center or K8s domain name discovery, which is not the focus of this article and is replaced by pulsarconstant.service_http_URL. The examples in this article have been uploaded to Github.
Initial Client initialization and configuration
Initialize Client– Demo level
import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.PulsarClient; /** * @author hezhangjian */ @Slf4j public class DemoPulsarClientInit { private static final DemoPulsarClientInit INSTANCE = new DemoPulsarClientInit(); private PulsarClient pulsarClient; public static DemoPulsarClientInit getInstance() { return INSTANCE; } public void init() throws Exception { pulsarClient = PulsarClient.builder() .serviceUrl(PulsarConstant.SERVICE_HTTP_URL) .build(); } public PulsarClient getPulsarClient() { return pulsarClient; }}Copy the code
The Pulsar client at Demo level is initialized without any custom parameters, and does not consider exceptions during initialization. The Pulsar client will directly throw exceptions during init.
Initialize Client– on-line level
import io.netty.util.concurrent.DefaultThreadFactory; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.PulsarClient; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @author hezhangjian */ @Slf4j public class DemoPulsarClientInitRetry { private static final DemoPulsarClientInitRetry INSTANCE = new DemoPulsarClientInitRetry(); private volatile PulsarClient pulsarClient; private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-cli-init")); public static DemoPulsarClientInitRetry getInstance() { return INSTANCE; } public void init() { executorService.scheduleWithFixedDelay(this::initWithRetry, 0, 10, TimeUnit.SECONDS); } private void initWithRetry() { try { pulsarClient = PulsarClient.builder() .serviceUrl(PulsarConstant.SERVICE_HTTP_URL) .build(); log.info("pulsar client init success"); this.executorService.shutdown(); } catch (Exception e) { log.error("init pulsar error, exception is ", e); } } public PulsarClient getPulsarClient() { return pulsarClient; }}Copy the code
In the actual environment, we usually make sure that the pulsar client initialization failure does not affect the startup of the microservice, that is, after the startup of the microservice, we always retry the creation of the Pulsar client. The above code example achieves this goal with volatile plus continuous loop reconstruction and destroys the timer thread after the client is successfully created.
Initialize Client– commercial level
import io.netty.util.concurrent.DefaultThreadFactory; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SizeUnit; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @author hezhangjian */ @Slf4j public class DemoPulsarClientInitUltimate { private static final DemoPulsarClientInitUltimate INSTANCE = new DemoPulsarClientInitUltimate(); private volatile PulsarClient pulsarClient; private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-cli-init")); public static DemoPulsarClientInitUltimate getInstance() { return INSTANCE; } public void init() { executorService.scheduleWithFixedDelay(this::initWithRetry, 0, 10, TimeUnit.SECONDS); } private void initWithRetry() { try { pulsarClient = PulsarClient.builder() .serviceUrl(PulsarConstant.SERVICE_HTTP_URL) .ioThreads(4) .listenerThreads(10) .memoryLimit(64, SizeUnit.MEGA_BYTES) .operationTimeout(5, TimeUnit.SECONDS) .connectionTimeout(15, TimeUnit.SECONDS) .build(); log.info("pulsar client init success"); this.executorService.shutdown(); } catch (Exception e) { log.error("init pulsar error, exception is ", e); } } public PulsarClient getPulsarClient() { return pulsarClient; }}Copy the code
Five new configuration parameters have been added to the commercial level Pulsar Client:
- ioThreadsNetty’s ioThreads handle NETWORK I/O operations. You can set the value to a higher value if the service traffic is heavy
ioThreads
The number of; - ListenersThreads is responsible for calling the
listener
Mode starts the consumer callback function. It is recommended that the configuration be larger than that of the clientpartition
The number of; - memoryLimitCurrently used to restrict
pulsar
The maximum memory available to the producer can well prevent network outages, Pulsar failures and other scenarios in which messages are backloggedproducer
The Java program is OOM; - OperationTimeout Specifies the timeout period for some metadata operations. Pulsar is set to 30s by default, which is conservative and can be adjusted according to network conditions and processing performance.
- ConnectionTimeout Timeout period for connecting to Pulsar. The configuration principle is the same as the preceding.
Client advanced parameters (memory allocation related)
We can also control the Pulsar client memory allocation parameters by passing Java’s property. Here are a few important parameters:
- If pulsa.allocator. Pooled is true, the off-heap memory pool is used. If pooled is false, the off-heap memory pool is used. Efficient out-of-heap memory pools are used by default;
- Pulsa.allocator. Exit_on_oom Specifies whether to shut down the JVM if memory overflow occurs. The default value is false.
- pulsar.allocator.out_of_memory_policy 在 Github.com/apache/puls…Introduced, which is not officially released yet, to configure the behavior when out-of-heap memory is insufficient, optionally as
FallbackToHeap
和ThrowException
By default,FallbackToHeap
If you do not want the memory of message serialization to affect heap memory allocation, you can configureThrowException
.
producers
Initialize producer important parameters
maxPendingMessages
The producer message sending queue can be properly configured according to the actual topic level to avoid OOM in the case of network interruption or Pulsar failure. You are advised to configure memoryLimit between memoryLimit and the configuration on the client.
messageRoutingMode
Message routing pattern. The default is RoundRobinPartition. Select this parameter based on service requirements. If sequence preservation is required, select SinglePartition to send messages with the same key to the same partition.
autoUpdatePartition
Automatically updates partition information. If the partition information in topic remains unchanged, no configuration is required, reducing cluster consumption.
Batch Parameters
The batch sending mode is implemented by scheduled tasks. If the number of messages on a topic is small, do not enable Batch. In particular, a large number of low-interval scheduled tasks can cause netty thread cpus to soar.
- EnableBatching Whether to enable batch sending.
- BatchingMaxMessages Maximum number of messages to be sent in a batch
- BatchingMaxPublishDelay Specifies the interval for sending scheduled tasks in batches.
Static producer initializes
Static producer means that the producer does not start or shut down as the business changes. After starting the micro-service and initializing the client, initialize the producer. For example:
One producer one thread applies to scenarios with a small number of producers
import io.netty.util.concurrent.DefaultThreadFactory; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Producer; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @author hezhangjian */ @Slf4j public class DemoPulsarStaticProducerInit { private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-producer-init")); private final String topic; private volatile Producer<byte[]> producer; public DemoPulsarStaticProducerInit(String topic) { this.topic = topic; } public void init() { executorService.scheduleWithFixedDelay(this::initWithRetry, 0, 10, TimeUnit.SECONDS); } private void initWithRetry() { try { final DemoPulsarClientInit instance = DemoPulsarClientInit.getInstance(); producer = instance.getPulsarClient().newProducer().topic(topic).create(); } catch (Exception e) { log.error("init pulsar producer error, exception is ", e); } } public Producer<byte[]> getProducer() { return producer; }}Copy the code
One thread for multiple producers applies to scenarios with a large number of producers
import io.netty.util.concurrent.DefaultThreadFactory; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Producer; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @author hezhangjian */ @Slf4j public class DemoPulsarStaticProducersInit { private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-consumer-init")); private CopyOnWriteArrayList<Producer<byte[]>> producers; private int initIndex; private List<String> topics; public DemoPulsarStaticProducersInit(List<String> topics) { this.topics = topics; } public void init() { executorService.scheduleWithFixedDelay(this::initWithRetry, 0, 10, TimeUnit.SECONDS); } private void initWithRetry() { if (initIndex == topics.size()) { return; } for (; initIndex < topics.size(); initIndex++) { try { final DemoPulsarClientInit instance = DemoPulsarClientInit.getInstance(); final Producer<byte[]> producer = instance.getPulsarClient().newProducer().topic(topics.get(initIndex)).create();; producers.add(producer); } catch (Exception e) { log.error("init pulsar producer error, exception is ", e); break; } } } public CopyOnWriteArrayList<Producer<byte[]>> getProducers() { return producers; }}Copy the code
Dynamically generate a producer sample for destruction
There are also some businesses that our producer may dynamically start or destroy according to the business, such as receiving data from vehicles on the road and sending it to a specific topic. Instead of hosting all producers in memory, which would consume a large amount of memory, we can manage the life cycle of the producer in a similar way to the LRU Cache.
/**
* @author hezhangjian
*/
@Slf4j
public class DemoPulsarDynamicProducerInit {
/**
* topic -- producer
*/
private AsyncLoadingCache<String, Producer<byte[]>> producerCache;
public DemoPulsarDynamicProducerInit() {
this.producerCache = Caffeine.newBuilder()
.expireAfterAccess(600, TimeUnit.SECONDS)
.maximumSize(3000)
.removalListener((RemovalListener<String, Producer<byte[]>>) (topic, value, cause) -> {
log.info("topic {} cache removed, because of {}", topic, cause);
try {
value.close();
} catch (Exception e) {
log.error("close failed, ", e);
}
})
.buildAsync(new AsyncCacheLoader<>() {
@Override
public CompletableFuture<Producer<byte[]>> asyncLoad(String topic, Executor executor) {
return acquireFuture(topic);
}
@Override
public CompletableFuture<Producer<byte[]>> asyncReload(String topic, Producer<byte[]> oldValue,
Executor executor) {
return acquireFuture(topic);
}
});
}
private CompletableFuture<Producer<byte[]>> acquireFuture(String topic) {
CompletableFuture<Producer<byte[]>> future = new CompletableFuture<>();
try {
ProducerBuilder<byte[]> builder = DemoPulsarClientInit.getInstance().getPulsarClient().newProducer().enableBatching(true);
final Producer<byte[]> producer = builder.topic(topic).create();
future.complete(producer);
} catch (Exception e) {
log.error("create producer exception ", e);
future.completeExceptionally(e);
}
return future;
}
}
Copy the code
In this mode, streaming can be done elegantly based on the CompletableFuture
Can accept missing messages sent
final CompletableFuture<Producer<byte[]>> cacheFuture = producerCache.get(topic); cacheFuture.whenComplete((producer, e) -> { if (e ! = null) { log.error("create pulsar client exception ", e); return; } try { producer.sendAsync(msg).whenComplete(((messageId, throwable) -> { if (throwable ! = null) { log.error("send producer msg error ", throwable); return; } log.info("topic {} send success, msg id is {}", topic, messageId); })); } catch (Exception ex) { log.error("send async failed ", ex); }});Copy the code
This is the callback function that correctly handles Client creation and send failures. However, in the production environment, Pulsar is not always available and may fail to be sent due to VM faults or Pulsar service upgrades. If you want to ensure that the message is sent successfully, you need to retry the message.
Can tolerate send loss in extreme scenarios
final Timer timer = new HashedWheelTimer();
private void sendMsgWithRetry(String topic, byte[] msg, int retryTimes) {
final CompletableFuture<Producer<byte[]>> cacheFuture = producerCache.get(topic);
cacheFuture.whenComplete((producer, e) -> {
if (e != null) {
log.error("create pulsar client exception ", e);
return;
}
try {
producer.sendAsync(msg).whenComplete(((messageId, throwable) -> {
if (throwable == null) {
log.info("topic {} send success, msg id is {}", topic, messageId);
return;
}
if (retryTimes == 0) {
timer.newTimeout(timeout -> DemoPulsarDynamicProducerInit.this.sendMsgWithRetry(topic, msg, retryTimes - 1), 1 << retryTimes, TimeUnit.SECONDS);
}
log.error("send producer msg error ", throwable);
}));
} catch (Exception ex) {
log.error("send async failed ", ex);
}
});
}
Copy the code
In this case, the pulsar server is allowed to fail for a period of time. If 1+2+4+8+16+32+64=127s, the fault can be tolerated. This is sufficient for most production environments. Since theoretically there is a failure beyond 127s, it is still necessary to return upstream to failure in extreme scenarios.
The sequence of producer Partition is strictly protected
The key to strict ordering by producers is to send only one message at a time, and then send the next message after confirming that the message has been sent successfully. The implementation can use synchronous asynchronous mode:
- The key point of synchronous mode is to send a message in a loop until the last message is sent successfully, and then start sending the next message.
- The point of asynchronous mode is to observe the future of the last message sent, retry it on failure, and start the next message sent on success.
It is worth noting that partitions can be parallel in this mode, using OrderedExecutor or Per Partition per Thread.
Example of synchronization mode:
/** * @author hezhangjian */ @Slf4j public class DemoPulsarProducerSyncStrictlyOrdered { Producer<byte[]> producer; public void sendMsg(byte[] msg) { while (true) { try { final MessageId messageId = producer.send(msg); log.info("topic {} send success, msg id is {}", producer.getTopic(), messageId); break; } catch (Exception e) { log.error("exception is ", e); }}}}Copy the code
consumers
Initialize consumer important parameters
receiverQueueSize
Note: When processing fails, the consumption buffer queue will accumulate in memory. Properly configure to prevent OOM.
autoUpdatePartition
Automatically updates partition information. If the partition information in topic remains unchanged, no configuration is required, reducing cluster consumption.
subscribeType
Subscription type, depending on business requirements.
subscriptionInitialPosition
Where the subscription starts, first or last, depending on the business requirements.
messageListener
With listener mode consumption, you only need to provide the callback function and do not actively perform the receive() pull. Generally, the listener mode is recommended because there are no special requirements.
ackTimeout
When the server pushes a message, but the consumer does not respond to ack in time, it will push it again to the consumer for processing after ackTimeout, namely, the Redeliver mechanism.
Note that when using the ReDELIVER mechanism, it is important to use only the retry mechanism to retry recoverable errors. For example, if the message is decoded in the code, decoding failures are not appropriate to take advantage of the REdeliver mechanism. This results in the client being in retry all the time.
If in doubt, you can also configure a dead letter queue using the deadLetterPolicy below to prevent messages from being retried forever.
negativeAckRedeliveryDelay
The time when the reDeliver mechanism is triggered when the client invokes negativeAcknowledge. The redeliver mechanism is the same as ackTimeout.
It is important to note that ackTimeout and negativeAckRedeliveryDelay advice not to use at the same time, the general recommended negativeAck, users can have a more flexible control. If ackTimeout is configured improperly, messages may be unnecessarily retried when consumption time is uncertain.
deadLetterPolicy
Configure the maximum number of reDeliver and dead-letter topic.
Initialize the consumer principle
A consumer will only work if the creation succeeds, unlike a producer who can return a failure upstream, so the consumer will always retry the creation. Note: Consumers and topics can have a one-to-many relationship, and consumers can subscribe to more than one topic.
One thread per consumer applies to scenarios with a small number of consumers
import io.netty.util.concurrent.DefaultThreadFactory; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Consumer; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @author hezhangjian */ @Slf4j public class DemoPulsarConsumerInit { private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-consumer-init")); private final String topic; private volatile Consumer<byte[]> consumer; public DemoPulsarConsumerInit(String topic) { this.topic = topic; } public void init() { executorService.scheduleWithFixedDelay(this::initWithRetry, 0, 10, TimeUnit.SECONDS); } private void initWithRetry() { try { final DemoPulsarClientInit instance = DemoPulsarClientInit.getInstance(); consumer = instance.getPulsarClient().newConsumer().topic(topic).messageListener(new DemoMessageListener<>()).subscribe(); } catch (Exception e) { log.error("init pulsar producer error, exception is ", e); } } public Consumer<byte[]> getConsumer() { return consumer; }}Copy the code
One thread for multiple consumers applies to scenarios with a large number of consumers
import io.netty.util.concurrent.DefaultThreadFactory; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Consumer; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @author hezhangjian */ @Slf4j public class DemoPulsarConsumersInit { private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-consumer-init")); private CopyOnWriteArrayList<Consumer<byte[]>> consumers; private int initIndex; private List<String> topics; public DemoPulsarConsumersInit(List<String> topics) { this.topics = topics; } public void init() { executorService.scheduleWithFixedDelay(this::initWithRetry, 0, 10, TimeUnit.SECONDS); } private void initWithRetry() { if (initIndex == topics.size()) { return; } for (; initIndex < topics.size(); initIndex++) { try { final DemoPulsarClientInit instance = DemoPulsarClientInit.getInstance(); final Consumer<byte[]> consumer = instance.getPulsarClient().newConsumer().topic(topics.get(initIndex)).messageListener(new DemoMessageListener<>()).subscribe(); consumers.add(consumer); } catch (Exception e) { log.error("init pulsar producer error, exception is ", e); break; } } } public CopyOnWriteArrayList<Consumer<byte[]>> getConsumers() { return consumers; }}Copy the code
The consumer achieves at least one semantics
Use manual ack mode to ensure that the processing is successful before ack. If the processing fails, you can try again by yourself or through the negativeAck mechanism
Example of Synchronous Mode
It is important to note that if the time difference between processing messages is large, synchronous processing may cause messages that could be processed quickly not to be processed.
/** * @author hezhangjian */ @Slf4j public class DemoMessageListenerSyncAtLeastOnce<T> implements MessageListener<T> { @Override public void received(Consumer<T> consumer, Message<T> msg) { try { final boolean result = syncPayload(msg.getData()); if (result) { consumer.acknowledgeAsync(msg); } else { consumer.negativeAcknowledge(msg); }} catch (Exception e) {// The business method may throw an Exception log.error(" Exception is ", e); consumer.negativeAcknowledge(msg); @param MSG Message body * @return */ private Boolean syncPayload(byte[] MSG) {return System.currentTimeMillis() % 2 == 0; }}Copy the code
Example of Asynchronous mode
Consider the memory limits of asynchro, because asynchro can consume from the broker quickly without being blocked by business operations, and inflight messages can be very large. If Shared or KeyShared mode is used, this can be restricted by maxUnAckedMessage. In Failover mode, you can block the following data when consumers are busy and stop processing services by checking the number of inflight messages.
/** * @author hezhangjian */ @Slf4j public class DemoMessageListenerAsyncAtLeastOnce<T> implements MessageListener<T> { @Override public void received(Consumer<T> consumer, Message<T> msg) { try { asyncPayload(msg.getData(), new DemoSendCallback() { @Override public void callback(Exception e) { if (e == null) { consumer.acknowledgeAsync(msg); } else { log.error("exception is ", e); consumer.negativeAcknowledge(msg); }}}); } the catch (Exception e) {/ / business method might throw an Exception. Consumer negativeAcknowledge (MSG); }} /** * @param MSG message body * @param demoSendCallback Callback */ private void asyncPayload(byte[] MSG, DemoSendCallback demoSendCallback) { if (System.currentTimeMillis() % 2 == 0) { demoSendCallback.callback(null); } else { demoSendCallback.callback(new Exception("exception")); }}}Copy the code
When the consumer is busy, the pull message is blocked and business processing is no longer conducted
When the consumer process fails, the business process is stopped by blocking the Listener method. To avoid accumulating too many messages in the microservices resulting in OOM, use RateLimiter or Semaphore control to process.
/** * @author hezhangjian */ @Slf4j public class DemoMessageListenerAsyncAtLeastOnce<T> implements MessageListener<T> { @Override public void received(Consumer<T> consumer, Message<T> msg) { try { asyncPayload(msg.getData(), new DemoSendCallback() { @Override public void callback(Exception e) { if (e == null) { consumer.acknowledgeAsync(msg); } else { log.error("exception is ", e); consumer.negativeAcknowledge(msg); }}}); } the catch (Exception e) {/ / business method might throw an Exception. Consumer negativeAcknowledge (MSG); }} /** * @param MSG message body * @param demoSendCallback Callback */ private void asyncPayload(byte[] MSG, DemoSendCallback demoSendCallback) { if (System.currentTimeMillis() % 2 == 0) { demoSendCallback.callback(null); } else { demoSendCallback.callback(new Exception("exception")); }}}Copy the code
Consumers protect the order strictly according to partition
To implement strict order preservation for partition-level consumers, it is necessary to process messages for a single partition. Once a message fails to be processed, other messages for that partition cannot be processed until the message is successfully retried. The following is an example:
/** * @author hezhangjian */ @Slf4j public class DemoMessageListenerSyncAtLeastOnceStrictlyOrdered<T> implements MessageListener<T> { @Override public void received(Consumer<T> consumer, Message<T> msg) { retryUntilSuccess(msg.getData()); consumer.acknowledgeAsync(msg); } private void retryUntilSuccess(byte[] msg) { while (true) { try { final boolean result = syncPayload(msg); if (result) { break; } } catch (Exception e) { log.error("exception is ", e); }} /** * @param MSG Message body content * @return */ private Boolean syncPayload(byte[] MSG) {return System.currentTimeMillis() % 2 == 0; }}Copy the code
Thank you
Thanks to Peng Hui and Luo Tian for their review.
Author’s brief introduction
He Zhangjian, Apache Pulsar Contributor, graduated from Xidian University, is a senior engineer of Huawei Cloud iot. Pulsar has been widely used in Huawei Cloud iot. For more information, please visit his Jianshu blog address.
A link to the
- Best Practices | Apache Pulsar on huawei cloud iot tour
Join Apache Pulsar Chinese Communication group 👇🏻
Click on thelink, check out the Apache Pulsar Dry Goods collection