preface

Fault tolerance of Spark Streaming

At this point, we start an Application task. Depending on the mode we start and the type of cluster we run, we select a server as the Driver server according to certain policies. After its initialization, these executors are initialized.

The Driver then sends a Receiver to an Executor, which is used to receive data (you can also think of it as a task). It stores the data it receives in a block every 200 milliseconds, and that block is bound to have a copy mechanism.

When the blocks are completed, the Driver returns a report informing it of their existence, and the Driver then generates small tasks for these executors to complete.

1.1 Executor hang up

What happens when our Executor dies?

If the bottom Executor dies, it doesn’t matter because all the data is stored in the top Executor, but if the top Executor dies, its Receiver stops working and the data is lost.

Did you think I would say that 😏? Spark Streaming has a very good fault tolerance mechanism. If an Executor with a Receiver dies, the Driver will automatically find another Executor to create the Receiver back up. The data is backed up with blocks. So you don’t need to worry too much about data loss

The tasks that were assigned to the Executor that died will be reassigned for re-execution without human intervention.

1.2 Driver hang up

If the Driver fails, all executors fail. In this case, we need to use the checkPoint mechanism, which periodically writes the Driver information to the HDFS.

In this line of thinking, the goal is for the Driver to be able to recover automatically when it drops and then compute the last result without human intervention

1.2.1 Configuring automatic Driver Restart

Let’s assume we’re in cluster mode, because fault tolerance is only possible in cluster mode.

Standalone

Add the following two parameters to spark-Submit:

--deploy-mode cluster



--supervise (this is the parameter that makes the task automatically restart)

Copy the code

Yarn (most cases)

Add the following parameters to spark-Submit:

--deploy-mode cluster

Copy the code

Set in the configuration of yarn and yarn. The resourcemanager. Am. Max – attemps, this is to set up a task can fail many times at most. If this parameter is set to 3 (😶 for 3 times in our company), Yarn automatically restarts the task when it is suspended for the third time. If the task is suspended for the third time, the task is actually suspended.

Mesos

Marathon can restart Mesos applications

1.2.2 Setting the checkPoint directory of the HDFS

Specify a checkPoint directory to store your critical information, otherwise it won’t work. Generally, the HDFS directory is the HDFS directory, which has high availability and no single point of failure.

It’s just one line of code, nothing special

streamingContext.setCheckpoint(hdfsDirectory

Copy the code

1.2.3 code

// Function to create and setup a new StreamingContext

def functionToCreateContext(): StreamingContext = {

  val ssc = new StreamingContext(...)   // new context

  val lines = ssc.socketTextStream(...) // create DStreams

.

  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory

  ssc

}



// Get StreamingContext from checkpoint data or create a new one





val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)



// Do additional setup on context that needs to be done,

// irrespective of whether it is being started or restarted

context. ...



// Start the context

context.start()

context.awaitTermination()

Copy the code

If there is data in the checkpointDirectory, restore it. If there is no data, create a Driver

1.3 Executor Data Loss Problem

At this point, we can restore the Driver, but as a result of the Driver failure, the Executor memory is lost, so we need to implement some fault tolerance for this data as well

WAL mechanisms are: For example, when Kafka sends Data through a Data Stream, the Receiver turns on WAL and writes Data to HDFS. After the write is complete, the Receiver notifies Kafka that the data has been successfully received (Kafka’s acks are set to -1). This is exactly the same as checkPoint but uses a WAL name

1.3.1 Setting the checkpoint directory

streamingContext.setCheckpoint(hdfsDirectory)

Copy the code

I don’t need to expand on this, what I just said

1.3.2 Enabling WAL Logs

It is disabled by default, so set it to true manually

sparkConf.set("spark.streaming.receiver.writeAheadLog.enable","true").

Copy the code

1.3.3 reliable receiver

Not to be misled here, it’s really about reliable data sources. This reliable data source is Kafka. When WAL is written, Kafka is told that data has been consumed. For data that has not been fed back to Kafka, data can be re-consumed from Kafka

It sounds like a mouthful, but here it is: The Receiver needs to write data to HDFS, and if the application dies before it can write, then we retrieve data from Kafka again. Kafka does support backtracking. Sends previously sent data to the Receiver (as described in the Kafka series).

What is unreliable data source, such as Socket, it does not retain the previous data, lost is lost, cannot be retrieved from it again.

1.3.4 Canceling backup

Storagelevel.memory_and_disk_ser is used to store data sources. Policies with suffix 2 are not required (the default policy is 2), because HDFS has multiple copies.

1.4 Troubleshooting the Problem that a Task Runs slowly

We need to turn on the guessing mechanism:

spark.speculation=true.

Copy the code

After this mechanism is enabled, it checks which running tasks need to be rescheduled at intervals set as follows

spark.speculation.interval=100ms

Copy the code

How do we determine if a task needs to be rescheduled? There are two conditions that need to be met. At this point we assume that there are 10 tasks in total

  1. The number of successful tasks is greater than 0.75 x 10.

    The spark. Speculation. Quantile = 0.75

0.75 is the default value for this parameter

  1. Duration of running tasks > 1.5 * Average duration of successful tasks.

    The spark. Speculation. Multiplier = 1.5

1.5 is the default value for this parameter

If the two conditions are met, the running task needs to be scheduled again. If the task is slow because of data skew, follow the instructions in the article that helps you figure out all aspects of Spark Core tuning

If we open the speculated that execution was triggered by nasty news about data skew, there will be a chance to meet a very embarrassing situation, the task is data skew running time hypothesis to 10 seconds, at this time I left so 1 ~ 2 seconds the processing finished, at this time to satisfy the two requirements above, the task is distributed, then perform again, then tilt, This task can’t be finished in an endless loop.

It can solve full GC or network jitter situations, but data skew, its presence can become a bottleneck.

1.5 Spark Streaming semantics

  1. A record is either processed once or not processed At all, and data may be lost

  2. A record may be processed At least once or more times

  3. Exactly once A record is processed only once, which is the most demanding

finally