I. Problem description

The business side reported a large number of failures in production messages. I logged in and checked the information reported by Cat, and found that there were 5 producers in this Topic, among which 4 producers had a large number of failures, which were evenly distributed in every time period, rather than concentrated at a certain point in time. There is a message from producer that production is normal.

The following figure shows failure reporting statistics:

Second, first say the reason & solution

1. Before that

Before I say why, I want to introduce a few concepts to make it easier to understand why. You can skip these concepts if you already understand the mechanisms they represent.

1.1 NamespacesBundle and ownership

In pulSAR, a topic (i.e. a partition of a topic if it is a partition) can only be served by one broker at a time, and the broker serving this topic is called ownership. However, it is true that the ownership of a topic is at the namespace level.

Specifically, it is determined by the NamespaceBundles mechanism. The relationship between NamespaceBundles and topics is similar to the implementation of consistent hash, where NamespaceBundles are a pre-segmented hash ring (by default, they are divided into four segments, each of which is a bundle). Each broker is responsible for a portion of the hash ring,

The Topic of a NamespaceBundle becomes the responsibility of the broker.

When a client connects to a broker, it first triggers a lookup, which determines which broker is responsible for the topic, and then establishes a connection with that broker.

Pulsar provides an unload management API to unload a NamespaceBundle from the Broker. After unloading, the other Broker triggers a tryAcquiringOwnership action, and the one that gets the NamespaceBundle becomes the new Ownership.

The function of the Pulsar also provides another loadBalancerShedding, if you start with the function of the (loadBalancerSheddingEnabled = true, Enabled by default) The Broker Leader dynamically unloads NamespaceBundle depending on the load at run time.

1.2 Shed Load Balance

As we know from above, the Broker Leader dynamically unload NamespaceBundles based on the Load of all brokers at runtime. This function is called Shed Load Balance.

Currently, pulsar2.5.0 collects 5 indicators including CPU, heap memory, off-heap memory and inbound and outbound traffic. By default, if the usage of one of these indicators exceeds the specified threshold (the default is 85), LoadBalancerBrokerOverloadedThresholdPercentage configuration) will trigger the action.

The simple process is to select the NamespaceBundles that need to unload and call the UNLOAD API

Here does not involve the specific logic, interested students can read the document: pulsar.apache.org/docs/en/adm…

Source of words reading org. Apache. The pulsar. Broker. Loadbalance. Impl. OverloadShedder and org., apache. The pulsar. Broker. Loadbalance. LoadSheddingTask

1.3 Relationship between Producer and Connection

The connection is the TCP connection established between the client and broker.

In Pulsar, the producer creates a topic and then establishes a connection to the broker to send NewProducer instructions to the broker. In other words, a client needs to initialize as many producers as there are topics that need to send messages, but a connection can be shared. By default, all producers and consumers share a connection. The problem is that when clients send NewProducer to brokers, they need to have a unique identifier for that producer to prevent repeated creation and determine which producer CMD is used for subsequent interactions between clients and servers. This identifier is ProducerId(cannot be used topic). The ProducerId needs to be unique on the connection and is currently generated on the client side using AtomicLong.

2, the reason

When NamespaceUnload occurs, the Client attempts to create a Command that sends NewProducer. When ServerCnx processes the NewProducer command, it determines whether ProducerFuture already exists based on ProducerId (CompletableFuture is used). If there are any, this will block the error that returns to the client that the Producer is already present on the connection. However, on the client side, the Producer is not already created and must always retry NewProducer. This is the problem we run into. From the broker’s logs, a large number of producers are already present on the connection.

log.warn("[{}][{}] Producer with id {} is already present on the connection", remoteAddress,
topicName, producerId);
ctx.writeAndFlush(Commands.newError(requestId, error,
"Producer is already present on the connection"));
Copy the code

Later, it was found that the Server would perform the checkTopicNsOwnership judgment to prevent the same Topic from being owned by multiple brokers during the process of NewProducer. When NamespaceUnload this, the method fails and throws a RuntimeException. However, the ServerCnx#handleProducer method fails to handle the exception on the Server side. ProducerFuture has been unable to complete or has been an exception (another weird thing is that this RuntimeException does not have any output from the exception stack, causing subsequent checks to fail to locate the real cause).

If semantics or manual unload is triggered again, the ownership of the NamespaceBundle changes back to the upper broker. When performing the NewProducer process, because there was a dangling producerFuture, the producer could not be successfully created. The client kept trying again and again and again and again and again and again, so it was unnecessary to send the message.

The pre-judgment logic of NewProducer:

CompletableFuture<Producer> producerFuture = new CompletableFuture<>(); CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId, producerFuture); / / have producerFutureif(existingProducerFuture ! = null) {if(existingProducerFuture.isDone() && ! ExistingProducerFuture. IsCompletedExceptionally ()) {/ / producer has been created, direct return success / /... Omit}else{// There is already a producer creating ServerError error = null;if(! ExistingProducerFuture. IsDone ()) {/ / creates a producer on or not completed / / a dangling producerfuture into until the error = here ServerError.ServiceNotReady; }else {
error = getErrorCode(existingProducerFuture);
// remove producer with producerId as it's already completed with exception producers.remove(producerId); Log. Warn ("[{}][{}] Producer with ID {} is already present on the connection", remoteAddress, topicName, producerId); ctx.writeAndFlush(Commands.newError(requestId, error, "Producer is already present on the connection")); return null; }}Copy the code

The code for the exception:

Try {// The code throws a RuntimeException that causes producerFuture dangling topic.addProducer(producer); / /... Omit} Catch (BrokerServiceException ise) {log.error(BrokerServiceException ise)"[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName,
ise.getMessage());
ctx.writeAndFlush(Commands.newError(requestId,
BrokerServiceException.getClientErrorCode(ise), ise.getMessage()));
producerFuture.completeExceptionally(ise);
}
Copy the code

3. Solutions

Once you know the cause, it’s easy to fix it by simply changing catch (BrokerServiceException ISE) to Exception or adding RuntimeExceptioncatch handling.

4. Repeat the steps

This problem is a little difficult to reproduce, I have tried many times in the local has been unable to reproduce, a brief description of the reproduce steps.

A. TBSP triggers a namespace bundle

2. After disconnecting producer, but not yet removeTopic (this is important), a reconnecting request from producer enters the broker. And getOrCreateTopic needs to be performed before removeTopic is executed by A.

2. An agreement between a class and an order should not be an option. B Perform an operation to generate an dangling Future

C. an error occurs when a namespace bundle returns to the same broker. (We need to ensure that the connection is the same, if the reconnect occurs, serverCnx will also rebuild)

Three, the investigation process

This problem is very simple once the reason is explained thoroughly. However, due to incomplete log and local failure to reproduce, the investigation was led astray for several times, leading to a detour. The real cause was located only on the second day. Just a quick rundown of the process

1. Quick application recovery is the most important thing

Immediately after receiving feedback from the business side, the add_entry success rate was 100% through Grafana and Prometheus, and the pulsar_MSg_backlog was within the security threshold, And the topic configuration Backlog policy is that the consumer fails, not the producer fails.

Log in Cat to check the distribution of failures and find five producers, four of which are failing and there are many producers in every period. I silently ruled out the network problem, but in order to confirm, I asked the business side and said that the three machines are all distributed in Yizhuang and the two machines are in what machine room, so the network problem was basically ruled out. (Pulsar cluster deployed in Yizhuang Machine Room)

I checked whether there was any abnormal information in Cat and found that everything was OK. I confirmed with the business side that no exception was logged, but only an event was reported. OK, next step.

When I logged in the background of Pulsar-Manager to check the situation of this topic, I found that only three producers were connected. At that time, IT was clear that Cat had 5 nodes. In order to further confirm, I logged in the server to check the situation of this topic through pulsar-admin Topics stats and found that only 3 producers are connected, then what about the other 2 producers?

Rapid screening out the 2 sets of machine not connected IP, and then inform the business of the classmate, can’t this crucial moment directly took out a big bombshell pulsar (restart business applications, not restart), the business side after restart by monitoring the found two producer has been connected to the observation index released under found the release rate and delay are normal. There will no longer be any failure event reported on Cat, so the next step is to confirm the reason why the two machines are not connected.

I directly grep the IP of these two machines online and find that these two nodes have been disconnect and connect, which will be triggered once in about 30-40 minutes. There is also a Warn log that Producer is already present on the connection. Then I grep the IP of the other three machines to find the same problem. See neat operation, so I was 1, 2, computer network is very unstable more frequently to broker load balancing, network is not stable this possibility but room immediately I deny (if the computer network is not stable, group is not early fryer), that can only be broker load balancing problems.

A class log sends a namespace, bundle, and semantics message to a class.

I thought it was broker load balancing, but in order to confirm this, I picked up my phone silently. At this time, I just need to wait. If the broker load balancing problem is triggered again later, the problem will definitely occur again.

2, After 20 minutes of waiting, namespace bundle semantics appears in the log, and topics STATS discovers that the ownership has changed and some producer fails to connect. While I was waiting, I looked at the JVM monitoring and found that heap memory usage has been high for the past few hours, averaging over 80%, which should be the cause of load balancing.

After checking in with the manager, I restarted the Pulsar Broker, turning off shedLoadBalance and increasing the JVM heap memory.

Subsequently, alarms were configured to reduce the number of producer, and the sending indicator status was continuously observed. This problem was recovered. However, the hard business side needs to fix the past data, big guy tea!

2. Cause location

2.1. Local simulation

Start 2 brokers and 5 Producers + call unlod Job once in 3 minutes in the local K8S cluster, but the problem cannot be repeated. It is speculated that the delay is very low in the local environment, and the creation of producer cannot be triggered.

2.2, ShedLoadBalance

Since it was confirmed above that this was due to the ShedLoadBalance function, I first checked to see if there was a bug in this function.

Looking through the code and test cases, the functionality is as simple as calculating the namespaceBundles that need to unload and then calling the Unload API provided by the broker.

/ / get the broker: bundle final Multimap < String, the String > bundlesToUnload = strategy. FindBundlesForUnloading (loadData, conf); bundlesToUnload.asMap().forEach((broker, Bundle) -> {bundle. ForEach (bundle -> {final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); / / namespace affinityif(! shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {return;
}
log.info("[Overload shedder] Unloading bundle: {} from broker {}", bundle, broker); Admin client unloadNamespace pulsa.getAdminClient ().namespaces().unloadNamespaceBundle(namespaceName, bundleRange); loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis()); } catch (PulsarServerException | PulsarAdminException e) { log.warn("Error when trying to perform load shedding on {} for broker {}", bundle, broker, e); }}); });Copy the code

The tricky part is unload, which simply generalizes the process

1. Clear NamespaceBundle Ownership of local cache

2. Disconnect REPLICator, producer, and consumer

3. Close manageLedger and cache

4. Update ZK

2.3 under what circumstances is Producer already present on the connection

If a Producer with ID XXX is already present on the connection, the Producer cannot be connected. If a Producer with ID XXX is already present on the connection, the Producer cannot be connected.

In ServerCnx#handleProducer there is the following code:

CompletableFuture<Producer> producerFuture = new CompletableFuture<>();
CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId,
producerFuture);
if(existingProducerFuture ! = null) {if(existingProducerFuture.isDone() && ! ExistingProducerFuture. IsCompletedExceptionally ()) {/ / producer has been created, Direct return success Producer Producer = existingProducerFuture. GetNow (null). log.info("[{}] Producer with the same id {} is already created: {}", remoteAddress, producerId, producer); / / created successfully producer CTX. WriteAndFlush (Commands. NewProducerSuccess (requestId, producer getProducerName (), producer.getSchemaVersion()));return null;
} else{// There is already a producer creating ServerError error = null;if(! ExistingProducerFuture. IsDone ()) {/ / creates a producer on didn't complete the error = ServerError. ServiceNotReady; }else {
error = getErrorCode(existingProducerFuture);
// remove producer with producerId as it's already completed with exception producers.remove(producerId); Log. Warn ("[{}][{}] Producer with ID {} is already present on the connection", remoteAddress, topicName, producerId); ctx.writeAndFlush(Commands.newError(requestId, error, "Producer is already present on the connection")); return null; }}Copy the code

The producers map already contains a producerFuture, and the producerFuture has been unable to complete or completeException.

Then why has the producerFuture been unable to be completed? I simply looked at the creation logic behind it and found no problem. Then I went to the log to see if there were any clues to help locate the problem.

2.4. Enable scan mode

Since multiple machines were unable to connect and different brokers were unable to connect, the machines of both brokers were taken out to find common ground from the log of each producer that was unable to connect.

After comparing each shed unload with the log that Producer is already present on the connection, I found a strange thing.

2, After flushing producer disconnectedness, if a producer needs to connect to an interface, then creating producer should not follow. And grep the endpoint directly to find that the Producer is already present on the connection.

Here’s a partial log I captured:

I was misled after I found this reconnection problem. I actually went to the client to check why the client immediately reconnected after sending the disconnect command on the server. After checking the source code, I found that a job would automatically initiate reconnection according to the step size. I should consider this in case of an abnormal network disconnection.

It was normal for the client to re-connect. I should have checked why the server generated an dangling Future.

Here is really later attention, first of all to locate the real cause of their own, even if it is really due to external request abnormal factors caused by the problem, it is first of all their own system is not robust will appear problems!!

2.5 serverCnx Handles logical Verification created by Producer

Once I figured this out, I located the code based on the Creating Producer log, which was actually output after putting producerFuture.

service.getOrCreateTopic(topicName.toString())

In the above code I entered a misunderstanding, because above have seen in the log is triggered only after disconnecting the connection, the phase I of course thought of switchable viewer to enter the reconstruction, so it should be entered loadOrCreatePersistentTopic () method.

Let’s continue to look at this method. First, we made a judgment of ownership:

log.warn()

RuntimeException
trye catch runtimeexception

RuntimeException
log.warn

checkownship
RuntimeException

Because when executed here unloading might not remove the cache in this topic, also won’t go to the trigger loadOrCreatePersistentTopic method (mainly I see disconnect in the log in front of the producer, I took it for granted that topic has been uninstalled, all tears!! 1) to ensure that the topic cache is removed and that the close method that PersistentTopic needs to be sorted into a class. The cache will not be removed until producer, subscriber, consumer, and managedLedger are all closed.

2.6 The real reason emerges

After confirming the topic cache problem from above, I continued to execute the code to find the problem from the following logic. There was a lot of logic behind, I directly deleted the other useless and kept the core code. Here lies the core problem

topic.addProducer
readlock
checkownership
runtimeexception

The upper level caller catch does handle only a BrokerServiceException, resulting in a direct exception abort, and then an Dangling ProducerFuture. Subsequent attempts to create the same producerId using this connection will always fail unless the connection is rebuilt or the application is restarted!!

Four, bring us some thinking

The problem is found, after finding the cause, it can be said that it is a very simple bug, and it is easy to fix, but the generation of this bug does bring some thinking.

1. Why is it usedRuntimeException

Check the signature of the checkOwnership method and see that it throws a RuntimeException!!

check exception
CheckedException

loadOrCreatePersistentTopic
CheckedException
RuntimeException

Code robustness comes first, everything else comes second

Personal views, for reference only, there are other views can be discussed together.