This series of blogs summarizes and shares examples drawn from real business environments, and provides practical guidance on Spark business applications. Stay tuned for this series of blogs. Copyright: This set of Spark business application belongs to the author (Qin Kaixin).
- Qin Kaixin technology community – big data business combat series complete catalogue
- Kafka business environment combat – Kafka production environment planning
- Kafka Business Environment in action – Kafka producer and consumer throughput test
- Kafka business environment combat – Kafka Producer parameter setting and parameter tuning suggestions
- Kafka cluster Broker parameter Settings and tuning guidelines
- Kafka Business Environment – Kafka Producer synchronous and asynchronous message sending and transaction idempotency case application
- Kafka business Environment combat – Kafka Consumer multiple consumption mode official case application combat
1 I’m safe
Why do you say that? Fear in my heart. Kafka’s Producer is thread-safe, and users can be very, very comfortable using multiple threads.
The official advice, however, is that it is generally more efficient for a single thread to maintain a Kafka producer.
2 Producer Sends messages
- Step 1: Encapsulate ProducerRecord
- Step 2: The partitioner Partioner performs data routing, selecting a Topic partition. If no key is specified, messages are evenly distributed across all partitions.
- Step 3: After the partition is determined, the leader of the partition will be found, followed by the replica synchronization mechanism.
3. Producer is an official instance
3.1 Fire and Fogret Case (Indifferent attitude)
-
After sending, the result is ignored
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close(); Copy the code
3.2 Official Case of Asynchronous Callback (not blocking)
-
The JavaProducer send method returns a JavaFuture object for the user to retrieve later. This is the callback mechanism.
-
Fully non-blocking usage can make use of the Callback parameter to provide a callback that will be invoked when the request is complete.
-
RecordMetadata and Exception cannot be null at the same time. Exception is null when the message is successfully sent, and Metadata is null when the message fails to be sent.
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value); producer.send(myRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e ! = null) { e.printStackTrace(); } else { System.out.println("The offset of the record we just sent is: " + metadata.offset()); }}});Copy the code
3.3 Synchronizing Official Cases (Blocked)
-
The Future object is returned via producer.send (record), and the result is returned indefinitely by calling future.get ().
Producer. The send (record). The get ()Copy the code
3.4 Sending official cases based on transactions (atomicity and idempotency)
-
From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producer and the transactional producer. The idempotent producer strengthens Kafka’s delivery semantics from at least once to exactly once delivery. In particular producer retries will no longer introduce duplicates. The transactional producer allows an application to send messages to multiple partitions (and topics!) atomically.
-
To enable idempotence, the enable.idempotence configuration must be set to true. If set, the retries config will default to Integer.MAX_VALUE and the acks config will default to all. There are no API changes for the idempotent producer, so existing applications will not need to be modified to take advantage of this feature.
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("transactional.id", "my-transactional-id"); Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); producer.initTransactions(); try { producer.beginTransaction(); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))); producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and exit. producer.close(); } catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. producer.abortTransaction(); } producer.close(); Copy the code
-
As is hinted at in the example, there can be only one open transaction per producer. All messages sent between the beginTransaction() and commitTransaction() calls will be part of a single transaction. When the transactional.id is specified, all messages sent by the producer must be part of a transaction.
3.5 RetriableException (Inheriting RetriableException)
- LeaderNotAvailableException: the partition of a copy of the Leader is not available, this may be the result of general election instantaneous exception, retry several times can be restored
- NotControllerException: Controller is mainly used to select copy partitions and each partition leader a copy of the information, is mainly responsible for unified management partition information, etc., may also be caused by election.
- NetWorkerException: Caused by an instantaneous network fault.
3.6 Unreachable Exceptions
- SerializationException: serialization failure exception
- RecordToolLargeException: message size too large.
3.7 Different Treatment of exceptions
producer.send(myRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) {if(e ==null){system.out.println ("The offset of The record we just sent is: " + metadata.offset()); }else{if(e instanceof RetriableException) {// Handle retried exceptions...... } else {// Handle unretried exceptions...... }}}});Copy the code
3.8 Producer’s gentleman shut down
- Producer.close () : The message is processed first and exits gracefully.
- Close (timeout): the producer is forcibly shut down when it times out.
4 summarizes
To prove that technology is a piece of paper, I would tear Kafka apart.
Qin Kaixin in Shenzhen 2018