Kafka Streams supports streaming since version 0.10, and we can use Kafka Streams to develop real-time applications. This chapter introduces the Spring Boot integration with Kafka Streams for streaming computing.
The basic configuration and usage of Spring Boot integrated Kafka are described in “Spring Boot Integrated Kafka” and will not be detailed here.
Rely on
Using Kafka Streams processing, in addition to integrating Spring Kafka, we also need to introduce:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
Copy the code
configuration
- In the application. Yml configuration
spring:
kafka:
streams:
application-id: test-kafka-stream The springboot application name is used by default
bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} Bootstrap-servers configuration will be overridden
# auto-startup: true
properties:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde # serialize key
value:
serde: org.springframework.kafka.support.serializer.JsonSerde # serialize value
timestamp:
extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor
spring:
json:
trusted:
packages: com.engrz.lab.* Package that allows JSON deserialization
Copy the code
Streamhandling configuration: spring.kafka.streams.*
More configuration references: Spring Boot Integration Properties
- Configure in Java code (alternative to application.yml configuration)
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs(a) {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
Copy the code
Values are serialized using JsonSerde. Trust packages need to be configured or Spring will say: If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
use
- Create a flow
Annotate the assembly using @enableKafkastreams
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {
@Bean
public KStream<String, Object> kStream(StreamsBuilder streamsBuilder) {
KStream<String, Object> stream = streamsBuilder.stream("streamTopic");
stream.map((k, v) -> new KeyValue<>(k, v)).to("myTopic", Produced.with(Serdes.String(), new JsonSerde<>()));
returnstream; }}Copy the code
Multiple topics can be specified to store the received content in myTopic
Flow calculation
This chapter covers only Spring Boot integration, and Kafka Streams calculations will be covered in the Kafka topic. The following is an example of an application scenario:
- Define an order Model class
/ * * *@author Engr-Z
* @since2021/1/29 * /
@Data
public class OrderModel implements Serializable {
/** * user id */
private Integer userId;
/** * Order number */
private String orderNo;
/** * order time */
private LocalDateTime orderTime;
/** * Order amount */
private BigDecimal orderAmt;
/** * Order status */
private String orderStatus;
}
Copy the code
- Find orders with transactions less than $1 and send them to orderTopic
@Bean
public KStream<String, OrderModel> kStream(StreamsBuilder streamsBuilder) {
KStream<String, OrderModel> stream = streamsBuilder.stream("streamTopic");
stream.map((k, v) -> new KeyValue<>(k, v)).to("tableTopic", Produced.with(Serdes.String(), new JsonSerde<>()));
stream.filter((k, v) -> {
BigDecimal orderAmt = v.getOrderAmt();
return orderAmt.compareTo(new BigDecimal(1))"0;
}).to("orderTopic", Produced.with(Serdes.String(), new JsonSerde<>()));
}
Copy the code
With real-time computing, we can solve a lot of business problems. Such as: real-time data positions, real-time risk control and so on.
All are “Siege Lion · Zheng” unless noted. Link to this article: engr-z.com/169.html