1. Custom serialized access scheme (Protobuf)

In actual application scenarios, there are various complex transfer objects and high transmission processing performance is required. In this case, customized serialization is required. Protobuf is used as an example.

Function: Kafka production and consumption of the same Topic, using Protobuf to do serialization and deserialization transmission, verify whether the normal parsing data.

  1. JAVA files are generated using protobuf scripts

    syntax = "proto3";
    option java_package = "com.itcast.flink.connectors.kafka.proto";
    option java_outer_classname = "AccessLogProto";
    
    // Message structure definition
    message AccessLog {
    
        string ip = 1;
    
        string time = 2;
    
        string type = 3;
    
        string api = 4;
    
        string num = 5;
    }
    
    Copy the code

    JAVA files are generated using batch scripts:

    @echo off
    for %%i in (proto/*.proto) do (
      d:/TestCode/protoc.exe --proto_path. = /proto  --java_out=.. /java. /proto/ % %i
      echo generate% %i to java file successfully!).Copy the code

    Note that the path must be configured correctly.

  2. Custom serialization implementation

    Add POM dependencies:

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>Flink - connector - kafka_2. 11</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>5.1.8. RELEASE</version>
        </dependency>
    </dependencies>
    
    Copy the code

    AccessLog objects:

    @Data
    public class AccessLog implements Serializable {
    
        private String ip;
    
        private String time;
    
        private String type;
    
        private String api;
    
        private Integer num;
    }
    Copy the code

    CustomSerialSchema:

    /** * Custom serialization implementation (Protobuf) */
    public class CustomSerialSchema implements DeserializationSchema<AccessLog>, SerializationSchema<AccessLog> {
    
    	private static final long serialVersionUID = 1L;
    
    	private transient Charset charset;
    
    	public CustomSerialSchema(a) {
    		this(StandardCharsets.UTF_8);
    	}
    
    	public CustomSerialSchema(Charset charset) {
    		this.charset = checkNotNull(charset);
    	}
    
    	public Charset getCharset(a) {
    		return charset;
    	}
      
        /** * deserialization implementation *@param message
         * @return* /
    	@Override
    	public AccessLog deserialize(byte[] message) {
            AccessLog accessLog = null;
            try {
                AccessLogProto.AccessLog accessLogProto = AccessLogProto.AccessLog.parseFrom(message);
                accessLog = new AccessLog();
                BeanUtils.copyProperties(accessLogProto, accessLog);
                return accessLog;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return accessLog;
    	}
    
    	@Override
    	public boolean isEndOfStream(AccessLog nextElement) {
    		return false;
    	}
    
        /** * serialization process *@param element
         * @return* /
    	@Override
    	public byte[] serialize(AccessLog element) {
            AccessLogProto.AccessLog.Builder builder = AccessLogProto.AccessLog.newBuilder();
            BeanUtils.copyProperties(element, builder);
    		return builder.build().toByteArray();
    	}
    
        /** * Defines the message type *@return* /
    	@Override
    	public TypeInformation<AccessLog> getProducedType(a) {
    		returnTypeInformation.of(AccessLog.class); }}Copy the code
  3. Implementation of kafka message producer through Flink

    public class KafkaSinkApplication {
    
        public static void main(String[] args) throws Exception {
    
            // 1. Create a running environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 2. Read Socket data source
            DataStreamSource<String> socketStr = env.socketTextStream("localhost".9911."\n");
            // 3. Conversion processes stream data
            SingleOutputStreamOperator<AccessLog> outputStream = socketStr.map(new MapFunction<String, AccessLog>() {
                @Override
                public AccessLog map(String value) throws Exception {
                    System.out.println(value);
                    // Parse data based on delimiters
                    String[] arrValue = value.split("\t");
                    // Assemble data into objects
                    AccessLog log = new AccessLog();
                    log.setNum(1);
                    for(int i=0; i<arrValue.length; i++) {
                        if(i == 0) {
                            log.setIp(arrValue[i]);
                        }else if( i== 1) {
                            log.setTime(arrValue[i]);
                        }else if( i== 2) {
                            log.setType(arrValue[i]);
                        }else if( i== 3) { log.setApi(arrValue[i]); }}returnlog; }});// 3. Producer configuration for Kakfa
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers"."10.10.20.132:9092");
            FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(
                    "10.10.20.132:9092"./ / broker list
                    "flink-serial"./ / the target topic
                    new CustomSerialSchema()                 // Serialization
                    );   
    
            // 4. Add a kafka writer
            outputStream.addSink(kafkaProducer);
    
            socketStr.print().setParallelism(1);
    
            // 5. Execute tasks
            env.execute("job"); }}Copy the code

    Open the Kafka consumer command line terminal to verify producer availability:

    12-1.1.1] [root @ flink1 kafka_2.# bin/kafka-console-consumer.sh --bootstrap-server 10.10.20.132:9092 --topic flink-serial
    1601649380422GET"
    getAccount
    1601649381422POSTaddOrder
    1601649382422POST"
    Copy the code
  4. Implementation of Kafka message subscribers through Flink

    public class KafkaSourceApplication {
    
        public static void main(String[] args) throws Exception {
    
            // 1. Create a running environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 2. Set the kafka service connection information
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers"."10.10.20.132:9092");
            properties.setProperty("group.id"."fink_group");
    
            // 3. Create the Kafka consumer
            FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer(
                    "flink-serial"./ / the target topic
                    new CustomSerialSchema(),   // Custom serialization
                    properties);
    
            // 4. Read Kafka data source
            DataStreamSource<AccessLog> socketStr = env.addSource(kafkaProducer);
    
            socketStr.print().setParallelism(1);
    
            // 5. Execute tasks
            env.execute("job"); }}Copy the code

    Test and verify consumer functionality by sending Flink’s Kafka producer message.


This article was created and shared by Mirson. For further communication, please add to QQ group 19310171 or visit www.softart.cn