Basic concepts of Spark
Spark is based on the Resilient Distributed Dataset (RDD) to solve the problem. The RDD is distributed computing.
RDD supports two operations
-
transformations
Create a new dataset from an existing dataset, such as a Map
-
actions
Perform an operation on a dataset and return a value, such as reduce
Some operations on Spark can trigger the Shuffle event. Shuffle is a mechanism used in Spark to redistribute data so that data of different partitions can be regrouped.
The process of processing data
Spark processes the data as follows: It receives the data and splits them into batches. The Spark Engine then processes these batches and produces an arbitrary result
Spark Streaming provides a high level of abstraction for data. Discretized Stream or DStream represents data of a data stream. Internally, this is a continuous RDDs. Each RDD in DStream contains data from a specific interval, as shown in the figure below. Any operation on a DStream is translated into an operation on the RDDs it contains.
About StreamingContext
- Once the context is started, the data processing process cannot be modified
- Once the context stops, you can’t restart it
- There can only be one context in a JVM
The window function
Spark Streaming supports window functions so that you can perform sliding window operations
There are two parameters involved:
- Window Length – The length of the window
- Sliding Interval – Time interval for performing an operation
Note that the preceding two lengths must be multiples of the Batch Interval set when the context is initialized. Context initialization code can look like this:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
ctx = SparkContext()
streaming_ctx = StreamingContext(ctx, 1)
Copy the code
Batch Interval is set to 1, indicating that each second of data is converted to a batch.
practice
Spark Streaming consumes data sent by Kafka and statistics the frequency
conf = SparkConf()
conf.set("spark.serializer"."org.apache.spark.serializer.KryoSerializer")
conf.set('spark.streaming.kafka.maxRatePerPartition'.10000)
conf.set('spark.driver.extraJavaOptions'."-Xss30M")
ctx = SparkContext(appName="PythonStreamingIDCount", conf=conf)
ctx.setLogLevel('FATAL')
ctx.setCheckpointDir('./checkpoint')
streaming_ctx = StreamingContext(ctx, 1) # s
broker, topic = "127.0.0.1:9092"["new-spark"]
kvs = KafkaUtils.createDirectStream(streaming_ctx, topic, {"metadata.broker.list": broker})
lines = kvs.map(lambda items: items[1])
counts = lines.\
map(lambda ip: (ip, 1)).\
reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b, 10.1)
counts.pprint(10)
streaming_ctx.start()
streaming_ctx.awaitTermination()
Copy the code
Here is using a Direct Approach (there’s another way Receiver – -based Approach), limit the speed of filling in the data parameter is the spark. Streaming. Kafka. MaxRatePerPartition, And use the Receiver – -based Approach the limit to fill in the data parameter is spark. The streaming. Receiver. MaxRate. It is also a way to limit the consumption rate, but the parameters of the two ways are different, and if you make a mistake, you will find that the parameters are completely useless.
The statement for submitting the code is as follows
Spark-submit --jars ~/Downloads/spark-streaming- kafka-0-8-assembly_2.11-2.4..jar main.py --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"Copy the code
Note that you need to download the JAR package yourself, which can be downloaded directly from Maven.