Necessary parameters
bootstrap.servers
This parameter is the address of the broker. You do not need to fill in all of them, as Kafka will retrieve information about other brokers from the current broker. In case a broker fails, multiple broker addresses are used
key.serializer
How do I serialize the message key
value.serializer
How is the message content serialized
The sample code
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Copy the code
Producer interceptor
The message is processed before it is sent, before the serializer, before the partitioning.
Implement org. Apache. Kafka. Clients. Producer. ProducerInterceptor interface, can be custom interceptors
Introduce the method of interface definition
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
Messages can be processed before they are sent
void onAcknowledgement(RecordMetadata metadata, Exception exception
Called before the message is answered or when the message fails to be sent
void close()
When the producer is shut down, it is called
Kafka allows you to configure a chain of interceptors, with multiple interceptors separated by a number.
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TestProducerInterceptor.class.getName() + "," + TestProducerInterceptor2.class.getName());
Copy the code
serialization
Serialization occurs before the partition editor implement org.apache.kafka.com mon. Serialization. StringSerializer interface can be custom serialization
Introduce the method of interface definition
void configure(Map<String, ? > configs, boolean isKey)
In the StringSerializer implementation, it is used to set the encoding
byte[] serialize(String topic, String data)
Define how to serialize
void close()
When the producer is closed, it is called
Partition is
Implement org. Apache. Kafka. Clients. Producer. The Partitioner can custom partitions
Kafka can hash by key (MurmurHash2) to send messages to the same partition. If no key is specified, the message will be sent to a random partition.
Introduce the method of interface definition
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
Define which partition to send to See The DefaultPartitioner for the implementation
void close()
When the producer is closed, it is called
And the similarities and differences between RocketMQ
- with
kafka
Consistent,rocketMQ
Allows a producer to send a message to a specified ‘partition’ rocketMQ
There is noThe serializer
The concept of. Message content is provided byrocketMQ
Self-serialization- From the individual’s current usage,
rocketMQ
It doesn’t offer anything similarThe interceptor
concept rocketMQ
provideshock
To process the message before it is sent and after it is sent
Such as:
DefaultMQProducer producer = new DefaultMQProducer("default");
producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageHook() {
@Override
public String hookName(a) {
return null;
}
@Override
public void sendMessageBefore(SendMessageContext context) {}@Override
public void sendMessageAfter(SendMessageContext context) {}});Copy the code