Similar we write a blocker ProducerInterceptorAnalysis2 again. The purpose of both interceptors is to add a prefix before sending
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
String modifiedValue = "th-" + record.value();
return newProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), modifiedValue, record.headers()); }@Override
public void close(a) {
System.out.println([INFO] Interceptor B CLOSED);
}
Copy the code
Modify the last line of the configuration file to separate interceptors with commas
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptorAnalysis.class.getName() + "," + ProducerInterceptorAnalysis2.class.getName());
Copy the code
See what happens first Obviously, the call order is A > B and the close order is B > A, which is consistent with Springmvc. Let’s see how Kafka is implemented. In org. Apache. Kafka. Clients. Producer. The internals of the ProducerInterceptors. Class the interrupted point in the process, several other method is the same, all is to use an iterator.
private final List<ProducerInterceptor<K, V>> interceptors;
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
ProducerRecord<K, V> interceptRecord = record;
Iterator var3 = this.interceptors.iterator();
while(var3.hasNext()) {
ProducerInterceptor interceptor = (ProducerInterceptor)var3.next();
try {
interceptRecord = interceptor.onSend(interceptRecord);
} catch (Exception var6) {
if(record ! =null) {
log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}".new Object[]{record.topic(), record.partition(), var6});
} else {
log.warn("Error executing interceptor onSend callback", var6); }}}return interceptRecord;
}
Copy the code
Also look at the timing of interceptor invocation in Producer
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return this.send(record, (Callback)null);
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// Interceptors are of type ProducerInterceptors, and onSend() executes all interceptors' onSend() methods chained
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return this.doSend(interceptedRecord, callback);
}
Copy the code