1. Custom serialized access scheme (Protobuf)
In actual application scenarios, there are various complex transfer objects and high transmission processing performance is required. In this case, customized serialization is required. Protobuf is used as an example.
Function: Kafka production and consumption of the same Topic, using Protobuf to do serialization and deserialization transmission, verify whether the normal parsing data.
-
JAVA files are generated using protobuf scripts
syntax = "proto3"; option java_package = "com.itcast.flink.connectors.kafka.proto"; option java_outer_classname = "AccessLogProto"; // Message structure definition message AccessLog { string ip = 1; string time = 2; string type = 3; string api = 4; string num = 5; } Copy the code
JAVA files are generated using batch scripts:
@echo off for %%i in (proto/*.proto) do ( d:/TestCode/protoc.exe --proto_path. = /proto --java_out=.. /java. /proto/ % %i echo generate% %i to java file successfully!).Copy the code
Note that the path must be configured correctly.
-
Custom serialization implementation
Add POM dependencies:
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>Flink - connector - kafka_2. 11</artifactId> <version>1.11.2</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.8.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>5.1.8. RELEASE</version> </dependency> </dependencies> Copy the code
AccessLog objects:
@Data public class AccessLog implements Serializable { private String ip; private String time; private String type; private String api; private Integer num; } Copy the code
CustomSerialSchema:
/** * Custom serialization implementation (Protobuf) */ public class CustomSerialSchema implements DeserializationSchema<AccessLog>, SerializationSchema<AccessLog> { private static final long serialVersionUID = 1L; private transient Charset charset; public CustomSerialSchema(a) { this(StandardCharsets.UTF_8); } public CustomSerialSchema(Charset charset) { this.charset = checkNotNull(charset); } public Charset getCharset(a) { return charset; } /** * deserialization implementation *@param message * @return* / @Override public AccessLog deserialize(byte[] message) { AccessLog accessLog = null; try { AccessLogProto.AccessLog accessLogProto = AccessLogProto.AccessLog.parseFrom(message); accessLog = new AccessLog(); BeanUtils.copyProperties(accessLogProto, accessLog); return accessLog; } catch (Exception e) { e.printStackTrace(); } return accessLog; } @Override public boolean isEndOfStream(AccessLog nextElement) { return false; } /** * serialization process *@param element * @return* / @Override public byte[] serialize(AccessLog element) { AccessLogProto.AccessLog.Builder builder = AccessLogProto.AccessLog.newBuilder(); BeanUtils.copyProperties(element, builder); return builder.build().toByteArray(); } /** * Defines the message type *@return* / @Override public TypeInformation<AccessLog> getProducedType(a) { returnTypeInformation.of(AccessLog.class); }}Copy the code
-
Implementation of kafka message producer through Flink
public class KafkaSinkApplication { public static void main(String[] args) throws Exception { // 1. Create a running environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. Read Socket data source DataStreamSource<String> socketStr = env.socketTextStream("localhost".9911."\n"); // 3. Conversion processes stream data SingleOutputStreamOperator<AccessLog> outputStream = socketStr.map(new MapFunction<String, AccessLog>() { @Override public AccessLog map(String value) throws Exception { System.out.println(value); // Parse data based on delimiters String[] arrValue = value.split("\t"); // Assemble data into objects AccessLog log = new AccessLog(); log.setNum(1); for(int i=0; i<arrValue.length; i++) { if(i == 0) { log.setIp(arrValue[i]); }else if( i== 1) { log.setTime(arrValue[i]); }else if( i== 2) { log.setType(arrValue[i]); }else if( i== 3) { log.setApi(arrValue[i]); }}returnlog; }});// 3. Producer configuration for Kakfa Properties properties = new Properties(); properties.setProperty("bootstrap.servers"."10.10.20.132:9092"); FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer( "10.10.20.132:9092"./ / broker list "flink-serial"./ / the target topic new CustomSerialSchema() // Serialization ); // 4. Add a kafka writer outputStream.addSink(kafkaProducer); socketStr.print().setParallelism(1); // 5. Execute tasks env.execute("job"); }}Copy the code
Open the Kafka consumer command line terminal to verify producer availability:
12-1.1.1] [root @ flink1 kafka_2.# bin/kafka-console-consumer.sh --bootstrap-server 10.10.20.132:9092 --topic flink-serial 1601649380422GET" getAccount 1601649381422POSTaddOrder 1601649382422POST" Copy the code
-
Implementation of Kafka message subscribers through Flink
public class KafkaSourceApplication { public static void main(String[] args) throws Exception { // 1. Create a running environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. Set the kafka service connection information Properties properties = new Properties(); properties.setProperty("bootstrap.servers"."10.10.20.132:9092"); properties.setProperty("group.id"."fink_group"); // 3. Create the Kafka consumer FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer( "flink-serial"./ / the target topic new CustomSerialSchema(), // Custom serialization properties); // 4. Read Kafka data source DataStreamSource<AccessLog> socketStr = env.addSource(kafkaProducer); socketStr.print().setParallelism(1); // 5. Execute tasks env.execute("job"); }}Copy the code
Test and verify consumer functionality by sending Flink’s Kafka producer message.
This article was created and shared by Mirson. For further communication, please add to QQ group 19310171 or visit www.softart.cn