Kafka Streams supports streaming since version 0.10, and we can use Kafka Streams to develop real-time applications. This chapter introduces the Spring Boot integration with Kafka Streams for streaming computing.

The basic configuration and usage of Spring Boot integrated Kafka are described in “Spring Boot Integrated Kafka” and will not be detailed here.

Rely on

Using Kafka Streams processing, in addition to integrating Spring Kafka, we also need to introduce:

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-streams</artifactId>
</dependency>
Copy the code

configuration

  • In the application. Yml configuration
spring:
  kafka:
    streams:
      application-id: test-kafka-stream The springboot application name is used by default
      bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} Bootstrap-servers configuration will be overridden
# auto-startup: true
      properties:
        default:
          key:
            serde: org.apache.kafka.common.serialization.Serdes$StringSerde # serialize key
          value:
            serde: org.springframework.kafka.support.serializer.JsonSerde # serialize value
          timestamp:
            extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor
        spring:
          json:
            trusted:
              packages: com.engrz.lab.* Package that allows JSON deserialization
Copy the code

Streamhandling configuration: spring.kafka.streams.*

More configuration references: Spring Boot Integration Properties

  • Configure in Java code (alternative to application.yml configuration)
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs(a) {
	Map<String, Object> props = new HashMap<>();
	props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
	props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
	props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
	props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
	props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
	return new KafkaStreamsConfiguration(props);
}
Copy the code

Values are serialized using JsonSerde. Trust packages need to be configured or Spring will say: If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

use

  • Create a flow

Annotate the assembly using @enableKafkastreams

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {
        @Bean
    public KStream<String, Object> kStream(StreamsBuilder streamsBuilder) {
        KStream<String, Object> stream = streamsBuilder.stream("streamTopic");
        stream.map((k, v) -> new KeyValue<>(k, v)).to("myTopic", Produced.with(Serdes.String(), new JsonSerde<>()));
        returnstream; }}Copy the code

Multiple topics can be specified to store the received content in myTopic

Flow calculation

This chapter covers only Spring Boot integration, and Kafka Streams calculations will be covered in the Kafka topic. The following is an example of an application scenario:

  • Define an order Model class
/ * * *@author Engr-Z
 * @since2021/1/29 * /
@Data
public class OrderModel implements Serializable {

    /** * user id */
    private Integer userId;

    /** * Order number */
    private String orderNo;

    /** * order time */
    private LocalDateTime orderTime;

    /** * Order amount */
    private BigDecimal orderAmt;

    /** * Order status */
    private String orderStatus;

}
Copy the code
  • Find orders with transactions less than $1 and send them to orderTopic
@Bean
public KStream<String, OrderModel> kStream(StreamsBuilder streamsBuilder) {
    KStream<String, OrderModel> stream = streamsBuilder.stream("streamTopic");
    stream.map((k, v) -> new KeyValue<>(k, v)).to("tableTopic", Produced.with(Serdes.String(), new JsonSerde<>()));
    stream.filter((k, v) -> {
        BigDecimal orderAmt = v.getOrderAmt();
        return orderAmt.compareTo(new BigDecimal(1))"0;
    }).to("orderTopic", Produced.with(Serdes.String(), new JsonSerde<>()));
}
Copy the code

With real-time computing, we can solve a lot of business problems. Such as: real-time data positions, real-time risk control and so on.


All are “Siege Lion · Zheng” unless noted. Link to this article: engr-z.com/169.html