The troika of Storm, Spark Streaming, and Flink stream processing each has its own advantages.

Storm has low latency and has a significant market share that many companies still use.

Spark Streaming takes advantage of Spark’s system and active community.

Flink, on the other hand, is designed to be more stream-friendly and has a convenient API.

However, they were all dependent on Kafka’s message forwarding, so In 0.10.0.0 Kafka launched its own framework for processing Streams, Kafka Streams. Apache Kafka® is *a distributed streaming platform, *a distributed streaming platform.

Real-time streaming computing

Real-time streaming computing has developed rapidly in recent years mainly because of the value of real-time data and its impact on data processing architecture. Real-time streaming computing includes the characteristics of unbounded data consistency and repeatable results. A type of data processing engine that is designed with infinite data sets in mind.

1. Unlimited data: An ever-growing, essentially infinite set of data. These are often referred to as “streaming data.” Infinite streaming data sets can be called unbounded data, whereas finite batch data is bounded data.

2. Unbounded data processing: A continuous mode of data processing that applies to the unbounded data above. Batch processing of data (offline computing) can also be run repeatedly to process data, but there is a performance bottleneck.

3, low latency, near real-time results: Compared with offline computing, offline computing does not consider the problem of delay.

Solves two problems that stream processing can be replaced by batch systems:

1. Correctness: With this, it is equivalent to batch calculation.

Streaming needs to be able to compute data over time. Spark Streaming solves this problem with the idea of microbatching, providing consistent storage for both real-time and offline systems, which should be met in future real-time computing systems.

Time reasoning tools: This allows us to go beyond batch calculations.

A good temporal reasoning tool is essential for processing unbounded and unordered data of different events.

Time is divided into event time and processing time.

There are many other concepts related to real-time streaming computing that are not covered here.

Introduction of Kafka Streams

Kafka Streams is considered the easiest way to develop real-time applications. It is a Kafka client API library that allows streaming processing by writing simple Java and Scala code.

Advantage:

  • Elastic, highly extensible, fault tolerant

  • Deploy to container, VM, bare metal, and cloud

  • The same applies to small, medium, and large use cases

  • Fully integrated with Kafka security

  • Write standard Java and Scala applications

  • Developed on Mac, Linux, Windows

  • Exactly – once semantics

Use cases:

The New York Times uses Apache Kafka and Kafka Streams to store and distribute published content in real time to a variety of applications and systems for readers to use.

Pinterest uses Apache Kafka and Kafka Streams on a large scale to support its real-time predictive budget system for advertising infrastructure. With Kafka Streams, predictions are more accurate than ever.

As Europe’s leading online fashion retailer, Zalando uses Kafka as an ESB (Enterprise Service Bus) to help us move from a single-service architecture to a microservice architecture. Using Kafka to process the flow of events enables our technical team to achieve near-real-time business intelligence.

Rabobank is one of the three largest banks in the Netherlands. Its digital nervous system, the Business Event Bus, is supported by Apache Kafka. It is used by a growing number of financial processes and services, one of which is Rabo Alerts. This service alerts customers in real time to financial events and is built using Kafka Streams.

LINE uses Apache Kafka as the central database for our services to communicate with each other. Hundreds of billions of messages are generated every day to perform a variety of business logic, threat detection, search indexing, and data analysis. LINE leverages Kafka Streams to reliably transform and filter topics, allowing consumers to efficiently consume subtopics while maintaining ease of maintenance due to its complex and simple codebase.

Topology

Kafka Streams defines its computational logic through one or more topologies, where the topology is a graph composed of Streams (edges) and stream processors (nodes).

There are two special types of processors in the topology

  • Source processor: A source processor is a special type of stream processor that does not have any upstream processors. It generates input streams from one or more Kafka topics for its topology by using records from these topics and forwarding them to its downstream processors.
  • Sink processor: A sink processor is a special type of stream processor that has no downstream processor. It sends any records received from its upstream processor to the specified Kafka topic.

In a normal processor node, you can also send data to a remote system. Therefore, the processed results can be streamed back to Kafka or written to an external system.

Kafka provides the most common data conversion operations, such as Map, filter, Join and Aggregations, which are easy to use.

There are also things about time, Windows, aggregation, out-of-order processing, etc. In the future, we will introduce them in detail. Here we will carry out a simple introduction to case development.

Quick start

The Java and Scala versions of WordCount are available first.

Java8 + :

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
 
import java.util.Arrays;
import java.util.Properties;
 
public class WordCountApplication {
 
    public static void main(final String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> textLines = builder.stream("TextLinesTopic");
        KTable<String, Long> wordCounts = textLines
            .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
            .groupBy((key, word) -> word)
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
        wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long())); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); }}Copy the code

Scala:

import java.util.Properties
import java.util.concurrent.TimeUnit
 
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
 
object WordCountApplication extends App {
  import Serdes._
 
  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
    p
  }
 
  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
  val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
    .groupBy((_, word) => word)
    .count()(Materialized.as("counts-store"))
  wordCounts.toStream.to("WordsWithCountsTopic")
 
  val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
  streams.start()
 
  sys.ShutdownHookThread {
     streams.close(10, TimeUnit.SECONDS)
  }
}
Copy the code

If Kafka is already started, skip the first two steps.

1, download

Download version 2.3.0 and unzip it. Note that there are several downloadable versions of Scala and we chose to use the recommended version (2.12) :

> tar -xzf kafka_2.12-2.3.0.tgz
> cdKafka_2. 12-2.3.0Copy the code

2, start,

Kafka uses ZooKeeper, so if you don’t already have a ZooKeeper server, start it first.

> bin/zookeeper server - start. Sh config/zookeeper properties [the 15:01:37 2013-04-22, 495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ...Copy the code

Start Kafka server:

> bin/kafka-server-start.sh config/server.properties [2013-04-22 15:01:47,028] INFO Verifying properties (kafka. Utils. VerifiableProperties) [the 15:01:47 2013-04-22, 051] the INFO Property socket.. The send buffer. The bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ...Copy the code

3. Create a topic to start producers

We create an input theme named stream-plaintext-input and an output theme named stream-wordcount-output:

> bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input
Created topic "streams-plaintext-input". > bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 1 \ --topic  streams-wordcount-output \ --config cleanup.policy=compact Created topic"streams-wordcount-output".
Copy the code

To view:

> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
 
Topic:streams-plaintext-input   PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: streams-plaintext-input  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
Topic:streams-wordcount-output  PartitionCount:1    ReplicationFactor:1 Configs:cleanup.policy=compact
    Topic: streams-wordcount-output Partition: 0    Leader: 0   Replicas: 0 Isr: 0
Copy the code

4. Start WordCount

The following command launches the WordCount demo application:

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
Copy the code

The demo application reads from the input topic stream-plaintext-input, performs a WordCount algorithm calculation on each read message, and writes its current result consecutively to the output topic stream-Wordcount-Output. Therefore, there is no STDOUT output other than the log entry because the result is written back to Kafka.

Now we can start the console generator in a separate terminal and write some input data for this topic:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
Copy the code

And check the output of the WordCount demo application by reading its output topic in a separate terminal using the console user:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Copy the code

5. Process data

We put in some data on the producer side.

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
Copy the code

Output:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 
all     1
streams 1
lead    1
to      1
kafka   1
Copy the code

Continue typing:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams
Copy the code
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 
all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
Copy the code

We see that as the data is entered in real time, the result of WordCount is output in real time.

6. Stop the program

You can now stop the console user, console producer, Wordcount application, Kafka agent, and ZooKeeper server in sequence with CtrL-C.

What is Kafka? Kafka monitor tool Kafka Fast start Consumer Kafka core Kafka core

Kafka Connect alternative to Flume

For more blog posts on real-time computing,Flink,Kafka and other related technologies, welcome to real-time streaming computing