I. Version description

Spark provides two integration solutions for different versions of Kafka: Spark-streaming-Kafka-0-8 and Spark-streaming-Kafka-0-10. The main differences are as follows:

spark-streaming-kafka-0-8 spark-streaming-kafka-0-10
Kafka version 0.8.2.1 or who 0.10.0 or who
The AP status Deprecated

As of Spark 2.3.0, Kafka 0.8 support has been deprecated
Stable
Language support Scala, Java, Python Scala, Java
Receiver DStream Yes No
Direct DStream Yes Yes
SSL / TLS Support No Yes
Offset Commit API No Yes
Dynamic Topic Subscription

(Dynamic topic subscription)
No Yes

The Kafka version used in this paper is kafka_2.12-2.2.0, so the second way to integrate.

2. Project dependence

The project uses Maven to build, mainly relying on the following:

<properties>
    <scala.version>2.12</scala.version>
</properties>

<dependencies>
    <! -- Spark Streaming-->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <! Spark Streaming -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
        <version>2.4.3</version>
    </dependency>
</dependencies>
Copy the code

Spark-streaming -kafka spark- Streaming -kafka

Integrate Kafka

Create an input stream by calling the createDirectStream method of the KafkaUtils object. The complete code is as follows:

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

/** * Spark streaming */
object KafkaDirectStream {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("KafkaDirectStream").setMaster("local[2]")
    val streamingContext = new StreamingContext(sparkConf, Seconds(5))

    val kafkaParams = Map[String, Object](
      /* * Specifies the address list of brokers. The list does not need to contain all brokers. Producers will look up information about other brokers from a given broker. * However, it is recommended to provide at least two broker information as fault tolerance. * /
      "bootstrap.servers" -> "hadoop001:9092"./* key serializer */
      "key.deserializer" -> classOf[StringDeserializer],
      /* Serializers for values */
      "value.deserializer" -> classOf[StringDeserializer],
      /* The group ID of the consumer */
      "group.id" -> "spark-streaming-group".* latest: In the case of an invalid offset, the consumer will start reading data from the latest record (the record generated after the consumer started) * earliest: In the case of an invalid offset, the consumer reads the partition's record */ from the starting position
      "auto.offset.reset" -> "latest"./* Whether to automatically submit */
      "enable.auto.commit"- > (true: java.lang.Boolean)
    )
    
    /* You can subscribe to multiple topics */
    val topics = Array("spark-streaming-topic")
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      /* Position policy */
      PreferConsistent,
      /* Subscribe to the topic */
      Subscribe[String, String](topics, kafkaParams)
    )

    /* Prints the input stream */
    stream.map(record => (record.key, record.value)).print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}
Copy the code

3.1 ConsumerRecord

Each Record in the input stream obtained here is actually an instance of ConsumerRecord

, which contains all available information for the Record, source code as follows:
,>

public class ConsumerRecord<K.V> {
    
    public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
    public static final int NULL_SIZE = -1;
    public static final int NULL_CHECKSUM = -1;
    
    /* Topic name */
    private final String topic;
    /* Partition number */
    private final int partition;
    /* Offset */
    private final long offset;
    /* Timestamp */
    private final long timestamp;
    /* What the timestamp represents */
    private final TimestampType timestampType;
    /* Key serializer */
    private final int serializedKeySize;
    /* Value serializer */
    private final int serializedValueSize;
    /* Value serializer */
    private final Headers headers;
    / * * /
    private final K key;
    / * * /
    private finalV value; . }Copy the code

3.2 Producer Attributes

In the sample code kafkaParams encapsulates the Kafka consumer properties, which have nothing to do with Spark Streaming and are defined in the Kafka native API. Where the server address, key serializer, and value serializer are mandatory, other configurations are optional. The remaining configuration items are as follows:

1. fetch.min.byte

The minimum number of bytes that a consumer retrieves a record from the server. If the amount of data available is less than the set value, the broker waits for enough data to be available before returning it to the consumer.

2. fetch.max.wait.ms

The time it takes for the broker to return data to the consumer.

3. max.partition.fetch.bytes

The maximum number of bytes returned by the partition to the consumer.

4. session.timeout.ms

The amount of time a consumer can disconnect from the server before being considered dead.

5. auto.offset.reset

This property specifies what the consumer should do if it reads a partition without an offset or if the offset is invalid:

  • Latest (default) : In the case of an invalid offset, the consumer will start reading data from the most recent record generated since it was started;
  • Earliest: In case the offset is invalid, the consumer will read the partition’s record from the starting location.

6. enable.auto.commit

Whether to commit offsets automatically. The default value is true, but it can be set to false to avoid duplicates and data loss.

7. client.id

The client ID, which the server uses to identify the source of the message.

8. max.poll.records

The number of records that can be returned by a single call to the poll() method.

. 9. The receive buffer, bytes and the send buffer. The byte

These two parameters specify the size of the buffer for receiving and sending packets on the TCP socket. -1 indicates the default value of the operating system.

3.3 Location Policy

Spark Streaming provides the following location policies to specify the assignment relationship between The Kafka theme partition and the Spark execution program Executors:

  • PreferConsistent: It will distribute partitions evenly over all Executors;

  • PreferBrokers: select this option when the Spark Executor is on the same machine as the Kafka Broker, and assign the boss partition on the Spark Executor to the Executor on the same machine.

  • PreferFixed: Indicates the mapping between a topic partition and a specific host, and explicitly assigns the partition to a specific host, using the following constructor: PreferFixed: indicates the mapping between a topic partition and a specific host, and explicitly assigns the partition to a specific host.

@Experimental
def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy =
  new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))

@Experimental
def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy =
  new PreferFixed(hostMap)
Copy the code

3.4 Subscription Mode

Spark Streaming provides two topic subscription modes: Subscribe and SubscribePattern. The latter can use the re to match the name of the subscribed topic. Its constructors are as follows:

/ * * *@paramCollection of topics to subscribe to *@paramKafka consumer parameters *@paramOffsets (Optional): Offsets to start at initial startup. If not, the saved offset or the value of the auto-.offset. reset attribute */ is used
def Subscribe[K, V](
    topics: ju.Collection[jl.String],
    kafkaParams: ju.Map[String, Object],
    offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = { ... }

/ * * *@paramRe * to subscribe to@paramKafka consumer parameters *@paramOffsets (Optional): Offsets to start at initial startup. If not, the saved offset or the value of the auto-.offset. reset attribute */ is used
def SubscribePattern[K, V](
    pattern: ju.regex.Pattern,
    kafkaParams: collection.Map[String, Object],
    offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { ... }
Copy the code

In the example code, we don’t actually specify the third parameter offsets, so the program defaults to the configured auto-.offsets. Reset property value latest, meaning that in the case of an invalid offset, the consumer will start reading from the latest record generated since it started.

3.5 Submitting the Offset

In the sample code, we set enable.auto.mit to true, which means auto-commit. In some cases, you may need more reliability, such as committing offsets after the business has been fully processed, using manual commit. To commit manually, call Kafka’s native API:

  • commitSync: for asynchronous submission;
  • commitAsync: Used for synchronous submission.

For details, see: [Kafka Consumer Details](github.com/heibaiying/… Md)

Four, start the test

4.1 Creating a Theme

1. Start the Kakfa

Kafka relies on ZooKeeper to run, and requires a pre-boot. You can start Kafka’s built-in ZooKeeper, or start its own:

#Zookeeper startup command
bin/zkServer.sh start

#Built-in ZooKeeper startup commands
bin/zookeeper-server-start.sh config/zookeeper.properties
Copy the code

Start single-node kafka for testing:

# bin/kafka-server-start.sh config/server.properties
Copy the code

2. Create a topic

#Create a test topic
bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                    --replication-factor 1 \
                    --partitions 1  \
                    --topic spark-streaming-topic

#View all topics
 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
Copy the code

3. Create producers

Create a Kafka producer to send test data:

bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic spark-streaming-topic
Copy the code

4.2 Local Mode Test

Here I start the Spark Streaming program directly in local mode. After startup, use producers to send data and view the results from the console.

As you can see from the console output, the data stream has been received successfully. Kafka-console-producer. sh sends data without a key by default, so the key value is null. You can also see from the output the groupId specified in the program and the clientId automatically assigned by the program.

The resources

  1. Spark.apache.org/docs/latest…