Basic concepts of Spark

Spark is based on the Resilient Distributed Dataset (RDD) to solve the problem. The RDD is distributed computing.

RDD supports two operations

  1. transformations

    Create a new dataset from an existing dataset, such as a Map

  2. actions

    Perform an operation on a dataset and return a value, such as reduce

Some operations on Spark can trigger the Shuffle event. Shuffle is a mechanism used in Spark to redistribute data so that data of different partitions can be regrouped.

The process of processing data

Spark processes the data as follows: It receives the data and splits them into batches. The Spark Engine then processes these batches and produces an arbitrary result

Spark Streaming provides a high level of abstraction for data. Discretized Stream or DStream represents data of a data stream. Internally, this is a continuous RDDs. Each RDD in DStream contains data from a specific interval, as shown in the figure below. Any operation on a DStream is translated into an operation on the RDDs it contains.

About StreamingContext

  • Once the context is started, the data processing process cannot be modified
  • Once the context stops, you can’t restart it
  • There can only be one context in a JVM

The window function

Spark Streaming supports window functions so that you can perform sliding window operations

There are two parameters involved:

  1. Window Length – The length of the window
  2. Sliding Interval – Time interval for performing an operation

Note that the preceding two lengths must be multiples of the Batch Interval set when the context is initialized. Context initialization code can look like this:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
ctx = SparkContext()
streaming_ctx = StreamingContext(ctx, 1) 
Copy the code

Batch Interval is set to 1, indicating that each second of data is converted to a batch.

practice

Spark Streaming consumes data sent by Kafka and statistics the frequency

conf = SparkConf()
conf.set("spark.serializer"."org.apache.spark.serializer.KryoSerializer")
conf.set('spark.streaming.kafka.maxRatePerPartition'.10000)
conf.set('spark.driver.extraJavaOptions'."-Xss30M")
ctx = SparkContext(appName="PythonStreamingIDCount", conf=conf)
ctx.setLogLevel('FATAL')
ctx.setCheckpointDir('./checkpoint')
streaming_ctx = StreamingContext(ctx, 1)  # s
broker, topic = "127.0.0.1:9092"["new-spark"]
kvs = KafkaUtils.createDirectStream(streaming_ctx, topic, {"metadata.broker.list": broker})
lines = kvs.map(lambda items: items[1])
counts = lines.\
					map(lambda ip: (ip, 1)).\
					reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b, 10.1)
counts.pprint(10)
streaming_ctx.start()
streaming_ctx.awaitTermination()
Copy the code

Here is using a Direct Approach (there’s another way Receiver – -based Approach), limit the speed of filling in the data parameter is the spark. Streaming. Kafka. MaxRatePerPartition, And use the Receiver – -based Approach the limit to fill in the data parameter is spark. The streaming. Receiver. MaxRate. It is also a way to limit the consumption rate, but the parameters of the two ways are different, and if you make a mistake, you will find that the parameters are completely useless.

The statement for submitting the code is as follows

Spark-submit --jars ~/Downloads/spark-streaming- kafka-0-8-assembly_2.11-2.4..jar main.py --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"Copy the code

Note that you need to download the JAR package yourself, which can be downloaded directly from Maven.