The book follows, last time just through the command line provided by Kafka to demonstrate the use of producer consumer, this article through the Java API way to run. The producer code is as follows:

public class Producer { private static String topic = "test007"; public static void main(String[] args) throws Exception { Producer p = new Producer(); p.producer(); } public void producer() throws ExecutionException, InterruptedException { Properties pro = new Properties(); pro.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092"); // Kafka is a persistent data MQ, stored as byte[], data needs to be converted to byte[], Kafka does not process data, producers and consumers need to convention encoding // Zero copy in Kafka uses sendFile system call, Realizing the rapid and consumption data pro. SetProperty (ProducerConfig. KEY_SERIALIZER_CLASS_CONFIG, StringSerializer. Class. GetName ()); pro.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(pro); /* topic: test007 partition */ for (int I = 0; /* topic: test007 partition */ i < 3; i++) { for (int j = 0; j < 3; j++) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, "item-" + j, "value-" + i); Future<RecordMetadata> send = producer.send(record); RecordMetadata rm = send.get(); int partition = rm.partition(); long offset = rm.offset(); System.out.println("key:" + record.key() + ", value:" + record.value() + ", partition:" + partition + ", offset:" + offset); }}}}Copy the code

The code simulates three kinds of goods, each goods has three linear ids, the same goods will go to a partition (through K, V message form). The output is as follows:

key:item-1, value:value-0, partition:1, offset:8
key:item-2, value:value-0, partition:1, offset:9
key:item-0, value:value-1, partition:0, offset:8
key:item-1, value:value-1, partition:1, offset:10
key:item-2, value:value-1, partition:1, offset:11
key:item-0, value:value-2, partition:0, offset:9
key:item-1, value:value-2, partition:1, offset:12
key:item-2, value:value-2, partition:1, offset:13
Copy the code

All item-0 messages go to partition 0, and item-1 and item-2 messages go to Partition 1.

Consumer Code:

public class Consumer { private static String topic = "test007"; public static void main(String[] args) { Consumer c = new Consumer(); c.cousumer(); } public void cousumer() { Properties pro = new Properties(); pro.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092"); // Kafka is a persistent data MQ, stored as byte[], data needs to be converted to byte[], Kafka does not process data, producers and consumers need to convention encoding // Zero copy in Kafka uses sendFile system call, Realizing the rapid and consumption data pro. SetProperty (ConsumerConfig. KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer. Class. GetName ()); pro.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Set the consumption group pro.setProperty(consumerconfig. GROUP_ID_CONFIG, "TIGER_007"); /** * What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server  (e.g. because that data has been deleted): * <ul> * <li>earliest: automatically reset the offset to the earliest offset</li> * <li>latest: automatically reset the offset to the latest offset</li> * <li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li> * anything else: throw exception to the consumer. * </li></ul>"; * */ pro.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); SetProperty (consumerConfig. ENABLE_AUTO_COMMIT_CONFIG, "true"); //offset automatically submits pro.setProperty(consumerConfig. ENABLE_AUTO_COMMIT_CONFIG, "true"); //pro.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, ""); / / 5 seconds / / pro. SetProperty (ConsumerConfig MAX_POLL_RECORDS_CONFIG, ""); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(pro); consumer.subscribe(Arrays.asList(topic), New ConsumerRebalanceListener () {/ * * * consumer group and the corresponding partition rebalance *, for example At first only a consumer corresponding to two partition * at this time in the same consumer groups a new customer, The mapping between the consumer and the partition becomes one consumer corresponding to one partition */ @override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println("---onPartitionsRevoked:"); Iterator<TopicPartition> iter = partitions.iterator(); while(iter.hasNext()){ System.out.println(iter.next().partition()); } } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { System.out.println("---onPartitionsAssigned:"); Iterator<TopicPartition> iter = partitions.iterator(); while (iter.hasNext()) { System.out.println(iter.next().partition()); }}}); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0)); if (! records.isEmpty()) { Iterator<ConsumerRecord<String, String>> iter = records.iterator(); System.out.println("================="+records.count()+"=================="); / / = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = / / // Automatically commit consumerConfig.enable_auto_COMMIT_config, "true" /*while (iter. HasNext ()) {ConsumerRecord<String, String> record = iter.next(); int partition = record.partition(); long offset = record.offset(); System.out.println("key: " + record.key() + " value:" + record.value() + " partition:" + partition + " offset:" + offset); } * / / / = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = / / Set<TopicPartition> partitions = records.partitions(); // Each poll takes data from multiple partitions // and the data in each partition is ordered /** * If the offset is manually submitted, that is to say, consumerConfig. ENABLE_AUTO_COMMIT_CONFIG. "False" * Can be processed in one of three ways * 1, commit synchronously by message progress * 2, commit synchronously by partition granularity * 3, commit synchronously by batch of the current poll * * Think: */ partition (TopicPartition) */ partition (TopicPartition) partitions) { List<ConsumerRecord<String, String>> pRecords = records.records(partition); // In a batch, poll data is retrieved by partition. Iterator<ConsumerRecord<String, String>> piter = prerecords. Iterator (); while(piter.hasNext()){ ConsumerRecord<String, String> next = piter.next(); int par = next.partition(); long offset = next.offset(); String key = next.key(); String value = next.value(); long timestamp = next.timestamp(); System.out.println("key: "+ key+" val: "+ value+ " partition: "+par + " offset: "+ offset+"time:: "+ timestamp); TopicPartition sp = new TopicPartition("msb-items", par); OffsetAndMetadata om = new OffsetAndMetadata(offset); HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>(); map.put(sp,om); Consumer.com mitSync(map) is submitted synchronously for each record-level update, corresponding to the first point. } long poff = prerecords. Get (prerecords.size() -1).offset(); Pom = new OffsetAndMetadata(POFF); HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>(); map.put(partition,pom); // This is the second type, offset consumer.commitSync(map); Consumer.com mitSync(); /** * Because you have partitioned ** you have got the partitioned data set ** you want to process the whole data first */} }}}}Copy the code

In the consumer code, if the automatic commit is set to “true” with consumerConfig. ENABLE_AUTO_COMMIT_CONFIG, this is easy and we don’t have to handle the offset update ourselves. The system will submit it automatically for us. The offset subscript auto-commit may not be applicable in some scenarios because kafka commits the data directly after it has been pulled, which makes it easy to lose data, especially if you need to control things.

In many cases, we need to successfully pull data from Kafka and process the data before committing. If the data is pulled and then written to mysql, then we need to manually commit kafka’s offset subscript.

If you manually submit offset, namely consumerConfig. ENABLE_AUTO_COMMIT_CONFIG, “false” can be processed in one of three ways

  1. Commit synchronously according to message progress
  2. Commit synchronously by partition granularity
  3. Commit synchronously by batch of the current poll

Consider: How to handle multiple threads?

  1. The above method 1,3 does not use multithreading
  2. The above 2 approach is most readily thought of as multi-threaded, with one thread per partition

Each update can get the offset of the corresponding partition.