In our work, we need to use Kafka Connect to transfer json data from Kafka to S3 and save it as Parquet. So let’s record the Demo here.

S3 Parquet Sink is not supported in Kafka Connect earlier versions. We used Confluent 5.5.5.

Avro serialization

When using Parquet Sink, we are officially required to use AvroConverter, JsonSchemaConverter will bring runtime errors.

When using Avro, we need to define the schema for data. In fact, Avro and Parquet file formats require header to define the schema. Kafka also provides us with Schema Registry, a service that defines schemas.

Schema registry

The Schema Registry provides us with an API for defining schemas. When the Producer or Consumer reads or writes data, the Schema Registry can be used to define and verify data schemas. When the Schema Registry is started correctly, port 8081 is accessible. Multiple Schema Registries can be started, and they work in coordination with each other. In Kafka, topic “_schema” is created, which holds and manages the Schema information. Here are some of the simplest REST apis:

'-' curl -X POST -H 'is not supported for name application/vnd.schemaregistry.v1+json" \ --data '{"schema": "{\"type\":\"record\",\"name\":\"topic_name\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"ty PE \ ", \ "string \"}]}} '\ http://localhost:8081/subjects/topic_name/versions # query schema curl -x GET http://localhost:8081/subjects/topic_name/versions curl -X GET http://localhost:8081/subjects curl -X GET http://localhost:8081/subjects/topic_name/versions/3 curl -X GET http://localhost:8081/subjects/topic_name/versions/latest curl -x GET http://localhost:8081/schemas/ids/266 # delete schema curl -X DELETE http://localhost:8081/subjects/topic_name/versions/1 curl -X DELETE http://localhost:8081/subjects/topic_name # comparing latest and provide the schema are consistent curl -x POST - H "content-type: application/vnd.schemaregistry.v1+json" \ --data '{"schema": "{\"type\":\"record\",\"name\":\"topic_name\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"ty pe\":\"string\"}]}"}' \ http://localhost:8081/compatibility/subjects/topic_name/versions/latestCopy the code

Kafka Code

POJO

public class User implements Serializable {
    private static final long serialVersionUID = -8441975211075128605L;
    private String name;
    private Integer age;
}
Copy the code

Producer

public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, " http://schema-registry:8081");
    Producer<String, Object> producer = null;
    try {
        producer = new KafkaProducer<>(properties);
        String topicName = "topic_name";
        String userSchema = "{\"type\":\"record\",\"name\":\"topic_name\","
                + "\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}";
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(userSchema);
        User u1 = new User("zs".1000000000);
        ReflectDatumWriter<User> reflectDatumWriter = new ReflectDatumWriter<>(schema);
        GenericDatumReader<Object> genericRecordReader = new GenericDatumReader<>(schema);
        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
        reflectDatumWriter.write(u1, EncoderFactory.get().directBinaryEncoder(bytes, null));
        GenericRecord avroRecord = (GenericRecord) genericRecordReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null));

        producer.send(new ProducerRecord<String, Object>(topicName, avroRecord)
                    , new Callback() {
                        @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if(e ! =null) {
                                System.out.println(e.getMessage());
                            } else{}}}); }catch (Exception e){
        System.out.println(e.getMessage());
    }finally {
        if(producer ! =null){ producer.close(); }}}Copy the code

Consumer

public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers"."kafka:9092");
    props.put("group.id"."group-id");
    props.put("key.deserializer", StringDeserializer.class);
    props.put("value.deserializer", KafkaAvroDeserializer.class);
    props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, " http://schema-registry:8081");
    final Consumer<String,User> consumer = new KafkaConsumer<String, User>(props);
    String topic = "topic_name";
    consumer.subscribe(Arrays.asList(topic));
    try {
        while (true) {
            ConsumerRecords<String, User> records = consumer.poll(100);
            for (ConsumerRecord<String, User> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value()); }}}finally{ consumer.close(); }}Copy the code

Config of Connect

{
  "name": "connector-name"."config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector"."errors.log.include.messages": "true"."s3.region": "region"."topics.dir": "folder"."flush.size": "300"."tasks.max": "1"."timezone": "UTC"."s3.part.size": "5242880"."enhanced.avro.schema.support": "true"."rotate.interval.ms": "6000"."locale": "US"."format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat"."s3.part.retries": "18"."value.converter": "io.confluent.connect.avro.AvroConverter"."errors.log.enable": "true"."s3.bucket.name": "bucket"."partition.duration.ms": "3600000"."topics": "topics"."batch.max.rows": "100"."partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner"."value.converter.schemas.enable": "true"."name": "connector-name"."storage.class": "io.confluent.connect.s3.storage.S3Storage"."rotate.schedule.interval.ms": "6000"."value.converter.schema.registry.url": "http://schema-registry:8081"."schema.registry.url": "http://schema-registry:8081"."path.format": "'log_year'=YYYY/'log_month'=MM/'log_day'=dd/'log_hour'=HH"}}Copy the code

More complex data structures:

Schema registry

{"schema": "{\"type\":\"record\",\"name\":\"name_1\",\"fields\":[{\"name\":\"name\",\"type\":[\"string\",\"null\"],\"default\" : null},{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"nickName\",\"type\":[\"null\",\"string\"],\"default\" : null},{\"name\":\"jsonData\",\"type\":{\"type\":\"record\",\"name\":\"jsonData\",\"fields\":[{\"name\":\"jsonK1\",\"type \":\"string\"},{\"name\":\"jsonK2\",\"type\":\"int\"}]}},{\"name\":\"arrayStringData\",\"type\":{\"type\": \"array\", \"items\": \"string\"}},{\"name\":\"arrayJsonData\",\"type\":{\"type\": \"array\", \"items\": {\"type\":\"record\",\"name\":\"arrayJsonData\",\"fields\":[{\"name\":\"arrayKsonK1\",\"type\":\"string\"},{\"name\":\"a rrayKsonK2\",\"type\":\"int\"}]}}}]}"}Copy the code

POJO

public class UserArrayJsonItemData implements Serializable {
    private static final long serialVersionUID = -3621940426286065831L;
    private String arrayKsonK1;
    private int arrayKsonK2;
}
Copy the code
public class UserJsonData implements Serializable {
    private static final long serialVersionUID = 5226807403457803391L;
    private String jsonK1;
    private int jsonK2;
}
Copy the code
public class User2 implements Serializable {
    private static final long serialVersionUID = 5976712933804910638L;
    private String name;
    private int age;
    private String nickName;
    private UserJsonData jsonData;
    private List<String> arrayStringData;
    private List<UserArrayJsonItemData> arrayJsonData;
}
Copy the code

Producer

public static void main(String[] args) {

    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    properties.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, " http://schema-registry:8081");
    Producer<String, Object> producer = null;
    try {
        producer = new KafkaProducer<>(properties);
        String topicName = "topic_name";
        String userSchema = "{"type":"record","name":"topic_name","fields": [{"name":"name","type":"string"}, {"name":"age","type":"int"}, {"name":"nickName","type": ["null","string"],"default" : null},{"name":"jsonData","type": {"type":"record","name":"jsonData","fields": [{"name":"jsonK1","type":"string"}, {"name":"jsonK2","type":"int"}]}},{"name":"arrayStringData","type": {"type":"array","items":"string"}}, {"name":"arrayJsonData","type": {"type":"array","items": {"type":"record","name":"arrayJsonData","fields": [{"name":"arrayKsonK1","type":"string"}, {"name":"arrayKsonK2","type":"int"}]}}}]}";
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(userSchema);
        User2 u1 = new User2("zs".1000000000.null.new UserJsonData("jk-1".50), Arrays.asList("array-1"."array-2"),Arrays.asList(new UserArrayJsonItemData("array-json-item-1".53),new UserArrayJsonItemData("array-json-item-2".63)));
        ReflectDatumWriter<User2> reflectDatumWriter = new ReflectDatumWriter<>(schema);
        GenericDatumReader<Object> genericRecordReader = new GenericDatumReader<>(schema);
        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
        reflectDatumWriter.write(u1, EncoderFactory.get().directBinaryEncoder(bytes, null));
        GenericRecord avroRecord = (GenericRecord) genericRecordReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null));
        producer.send(new ProducerRecord<String, Object>(topicName, avroRecord)
                    , new Callback() {
                        @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if(e ! =null) {
                                System.out.println(e.getMessage());
                            } else{ System.out.println(e.getMessage()); }}}); }catch (Exception e){
        System.out.println(e.getMessage());
    }finally {
        if(producer ! =null){ producer.close(); }}}Copy the code

Consumer

public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers"."kafka:9092");
    props.put("group.id"."group-id");
    props.put("key.deserializer", StringDeserializer.class);
    props.put("value.deserializer", KafkaAvroDeserializer.class);
    props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, " http://schema-registry:8081");
    final Consumer<String,User> consumer = new KafkaConsumer<String, User>(props);
    String topic1 = "topic_name";
    consumer.subscribe(Arrays.asList(topic1));
    try {
        while (true) {
            ConsumerRecords<String, User> records = consumer.poll(100);
            for (ConsumerRecord<String, User> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value()); }}}finally{ consumer.close(); }}Copy the code