This is the 8th day of my participation in the Gwen Challenge in November. See details: The Last Gwen Challenge in 2021.
A. SparkStreaming overview
1.1 SparkStreaming
-
It is an extensible, high throughput and fault tolerant streaming computing framework
Throughput: The number of successful data transfers per unit of time
Spark-core and Spark-SQL are offline batch processing tasks. Data is usually stored in a fixed location. You usually write a script to process data periodically every day, calculate and save data results. Such tasks usually take T+1 day and have low requirements on real-time performance.
However, there are many demands for real-time processing in enterprises. For example, Jingdong Ali usually makes a real-time data screen to display real-time orders on double 11. In this case, the demand for real-time data is high, and only a delay of 1 minute or a few seconds can be tolerated.
Real-time computing framework comparison
Storm
- Flow computing framework
- Process data in record unit
- Also supports the micro-batch approach (Trident)
Spark
- Batch computing framework
- Process data in RDD units
- Support micro-batch Streaming
Contrast:
- Throughput: Spark Streaming is better than Storm
- Lag: Spark Streaming worse than Storm
1.2 Components of SparkStreaming
- Streaming Context
- Once a Context has been started (calling Streaming Context start()), no new Dstream operators can be created or added to the Context
- Once a context has been stopped, it cannot be restarted (Streaming context cannot call start() again after calling stop)
- In the JVM(Java Virtual Machine), only one Streaming Context can be active at a time, and a SparkContext creates a Streaming Context
- Calling the Stop method on Streaming Context also closes the SparkContext object. If you want to close only the Streaming Context object, set the optional Stop () argument to false
- A SparkContext object can be reused to create multiple Streaming Context objects (without closing SparkContext), but you need to close one and then open the next
- DStream (discrete stream)
- Represents a continuous stream of data
- Internally, DStream consists of a series of sequential RDD’s
- Each RDD in DStreams contains data for a defined time interval
- Any operation on DStreams is transformed into an operation on the RDD implied by DStreams
- The data source
- The basic source
- TCP/IP Socket
- FileSystem
- A senior source
- Kafka
- Flume
- The basic source
2. The Spark Streaming
2.1 Spark Streaming coding procedure
- Create a StreamingContext
- Create a data object from StreamingContext
- Transformation is performed on the data object
- The output
- Start and Stop
2.2 Realizing Word Count by Spark Streaming
Requirement: monitor the network data on a port and count the number of different words in real time.
-
Sudo yum install -y nc
-
Execution instruction: NC-LK 9999-V
import os
Configure the Path to the Python interpreter used by the Spark Driver and PySpark runtime
PYSPARK_PYTHON = "/miniconda2/envs/py365/bin/python"
JAVA_HOME='/root/bigdata/jdk'
SPARK_HOME = "/root/bigdata/spark"
# When multiple versions exist, not specifying them is likely to result in an error
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
sc = SparkContext("local[2]",appName="NetworkWordCount")
Parameter 2: Specifies the interval between calculations
ssc = StreamingContext(sc, 1)
# listen on IP, data on port
lines = ssc.socketTextStream('localhost'.9999)
# Split the data into multiple words by space
words = lines.flatMap(lambda line: line.split(""))
# Convert the word to the form (word, 1)
pairs = words.map(lambda word:(word,1))
# Count words
wordCounts = pairs.reduceByKey(lambda x,y:x+y)
Print the result information to enable the previous transformation operation to execute
wordCounts.pprint()
# start StreamingContext
ssc.start()
# Wait for the calculation to finish
ssc.awaitTermination()
Copy the code
You can view the visualization by listening on port 4040.
3. Operation of Spark Streaming status
There are two state operations in Spark Streaming
- UpdateStateByKey
- The Windows operating
To use stateful Transformation, enable Checkpoint
- Fault tolerant mechanism of Spark Streaming
- It checkpoint enough information to certain fault-tolerant storage systems, such as HDFS, so that errors can be quickly recovered
3.1 UpdateStateByKey
Spark Streaming implements a real-time batch operation that packages data at regular intervals into RDD and is stateless.
Stateless: Refers to the fact that there is no correlation between the data of each time segment.
Requirement: You want to continuously accumulate data over a large time period (1 day), that is, over multiple small time periods
RDD or Spark SQL is generally used for offline batch processing over a day
If we don’t have UpdateStateByKey, we need to calculate every second and put it into mysql, and then use mysql to do the statistical calculation
Spark Streaming provides this state protection mechanism, namely, updateStateByKey
Steps:
- First, you define a state, which can be any data type
- Second, define the state update function — specify how a function updates the state with the previous state and the new value
- For each batch, Spark applies a state update function to each existing key, regardless of whether there is new data in the batch. If the state update function returns None, the state corresponding to the key is removed
- The state update function is also executed for each new key
Example: word statistics
Case: updateStateByKey
Requirement: Listen to the network port data, get the number of words appearing in each batch, and need to keep the information of each batch
Code:
import os
Configure the Path to the Python interpreter used by the Spark Driver and PySpark runtime
PYSPARK_PYTHON = "/miniconda2/envs/py365/bin/python"
JAVA_HOME='/root/bigdata/jdk'
SPARK_HOME = "/root/bigdata/spark"
# When multiple versions exist, not specifying them is likely to result in an error
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
# to create SparkContext
spark = SparkSession.builder.master("local[2]").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 3)
Open checkpoints
ssc.checkpoint("checkpoint")
Define the state update function
def updateFunc(new_values, last_sum) :
return sum(new_values) + (last_sum or 0)
lines = ssc.socketTextStream("localhost".9999)
# Split the data into multiple words with Spaces
counts = lines.flatMap(lambda line: line.split("")) \.map(lambda word: (word, 1)) \
.updateStateByKey(updateFunc=updateFunc)# apply the updateStateByKey function
counts.pprint()
ssc.start()
ssc.awaitTermination()
Copy the code
3.2 Windows
- Window length L: amount of data to be calculated
- Slide interval G: Controls how often an operation is performed
Collects the latest data in L seconds every G seconds
Operational details
- Window operations work based on Window length and sliding interval
- The window length control takes into account previous batches of data volume
- The default is the sliding interval of batch processing to determine the frequency of computed results
Correlation function
- Smart computation
- invAddFunc
reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[num,Tasks])
Func: forward operation, similar to updateStateByKey
InvFunc: Reverse operation
For example, in the case of a hot word, it may be a hot word in the previous window, but it may not be a hot word in this window, so you need to remove the hot word in this window
Typical case: Hot search term sliding statistics, every 10 seconds, statistics the search frequency of the last 60 seconds of the search term, and print the top three search terms appear times.
case
Monitor the data of the network port and count the number of online words in the previous 6 seconds every 3 seconds
import os
Configure the Path to the Python interpreter used by the Spark Driver and PySpark runtime
PYSPARK_PYTHON = "/miniconda2/envs/py365/bin/python"
JAVA_HOME='/root/bigdata/jdk'
SPARK_HOME = "/root/bigdata/spark"
# When multiple versions exist, not specifying them is likely to result in an error
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
def get_countryname(line) :
country_name = line.strip()
if country_name == 'usa':
output = 'USA'
elif country_name == 'ind':
output = 'India'
elif country_name == 'aus':
output = 'Australia'
else:
output = 'Unknown'
return (output, 1)
if __name__ == "__main__":
Define the time interval for processing
batch_interval = 1 # base time unit (in seconds)
Define the window length
window_length = 6 * batch_interval
# define the sliding interval
frequency = 3 * batch_interval
# get StreamingContext
spark = SparkSession.builder.master("local[2]").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, batch_interval)
Need to set checkpoints
ssc.checkpoint("checkpoint")
lines = ssc.socketTextStream('localhost'.9999)
addFunc = lambda x, y: x + y
invAddFunc = lambda x, y: x - y
# Call reduceByKeyAndWindow to call window functions
window_counts = lines.map(get_countryname) \
.reduceByKeyAndWindow(addFunc, invAddFunc, window_length, frequency)
# Output processing result information
window_counts.pprint()
ssc.start()
ssc.awaitTermination()
Copy the code