preface

Spark Streaming is all about using operators and coding. I didn’t think Spark Streaming would have much to mention, so I skipped it. Then…

That I still turn head to pull 😂

A, the Spark Streaming

1.1 Operation Process

  1. First of all, whether we submit Spark Core, Spark SQL or Spark Streaming, we will collectively refer to it as an “Application”.
  2. Also, Spark SQL and Spark Streaming rely on Spark Core at the bottom level, so to use them, you must initialize a Spark Core entry. StreamingContext must depend on SparkContext, and SparkContext must be where we initialize a Driver service, so it’s structured like this
  1. After all of these are initialized, our worker will have assigned executors as well
  2. The Driver service then sends a Receiver object to the Executor, which has only one Receiver by default or can be set to multiple by code
  3. When a Receiver is started, it is essentially a Task (the Task starts receiving data, wraps it into blocks, and writes the blocks to the Executor’s memory).
  4. The Receiver tells the Driver about these blocks


Although the picture is simple, it is good to convey the information accurately.

  1. The Driver organizes these blocks into an RDD at a certain interval. A Block is a partition (1 partition -> 1 task).

1.2 BlockInterval and BatchInterval

How much data does it take to form a Block? The answer is a Block every 200 milliseconds

So how long does it take to merge these blocks into an RDD? The answer is the one in your code

// 2 is 2 seconds

val ssc = new StreamingContext(conf,Seconds(2));

Copy the code

The Driver treats the data in the 2s as an RDD

These are our BlockInterval and BatchInterval, the two important time parameters, one by default of 200 milliseconds, the other controlled by our users

We can also check these configuration parameters on the official website

CTRL + F, search blockInterval or 200

That’s all we need to know for now, so let’s write down the code

1.3 Getting started with WordCount program

As has been mentioned repeatedly in previous shares, if it is a real-time program, we care about its data input, data processing, and data output

1.3.1 POM file

<properties>

    <maven.compiler.source>1.8</maven.compiler.source>

    <maven.compiler.target>1.8</maven.compiler.target>

    <scala.version>2.11.8</scala.version>

    <spark.version>2.2.1</spark.version>

    <hadoop.version>2.7.5</hadoop.version>

    <encoding>UTF-8</encoding>

</properties>



<dependencies>

    <dependency>

        <groupId>org.scala-lang</groupId>

        <artifactId>scala-library</artifactId>

        <version>${scala.version}</version>

    </dependency>



    <dependency>

        <groupId>org.apache.spark</groupId>

        <artifactId>The spark - streaming_2. 11</artifactId>

        <version>${spark.version}</version>

    </dependency>



    <dependency>

        <groupId>org.apache.spark</groupId>

        <artifactId>Spark - streaming - kafka - 0-10 _2. 11</artifactId>

        <version>${spark.version}</version>

    </dependency>





</dependencies>







<build>

    <pluginManagement>

        <plugins>

            <plugin>

                <groupId>net.alchim31.maven</groupId>

                <artifactId>scala-maven-plugin</artifactId>

                <version>3.2.2</version>

            </plugin>

            <plugin>

                <groupId>org.apache.maven.plugins</groupId>

                <artifactId>maven-compiler-plugin</artifactId>

                <version>3.5.1 track of</version>

            </plugin>

        </plugins>

    </pluginManagement>

    <plugins>

        <plugin>

            <groupId>net.alchim31.maven</groupId>

            <artifactId>scala-maven-plugin</artifactId>

            <executions>

                <execution>

                    <id>scala-compile-first</id>

                    <phase>process-resources</phase>

                    <goals>

                        <goal>add-source</goal>

                        <goal>compile</goal>

                    </goals>

                </execution>

                <execution>

                    <id>scala-test-compile</id>

                    <phase>process-test-resources</phase>

                    <goals>

                        <goal>testCompile</goal>

                    </goals>

                </execution>

            </executions>

        </plugin>



        <plugin>

            <groupId>org.apache.maven.plugins</groupId>

            <artifactId>maven-compiler-plugin</artifactId>

            <executions>

                <execution>

                    <phase>compile</phase>

                    <goals>

                        <goal>compile</goal>

                    </goals>

                </execution>

            </executions>

        </plugin>



        <plugin>

            <groupId>org.apache.maven.plugins</groupId>

            <artifactId>maven-shade-plugin</artifactId>

            <version>2.4.3</version>

            <executions>

                <execution>

                    <phase>package</phase>

                    <goals>

                        <goal>shade</goal>

                    </goals>

                    <configuration>

                        <filters>

                            <filter>

                                <artifact>* : *</artifact>

                                <excludes>

                                    <exclude>META-INF/*.SF</exclude>

                                    <exclude>META-INF/*.DSA</exclude>

                                    <exclude>META-INF/*.RSA</exclude>

                                </excludes>

                            </filter>

                        </filters>

                    </configuration>

                </execution>

            </executions>

        </plugin>

    </plugins>

</build>

Copy the code

1.3.2 Data input

We will start by creating the Spark Streaming entry

// 1. Data input

val conf = new SparkConf()

conf.setMaster("local[2]")

conf.setAppName("wordCount")

Copy the code

So let’s just explain it a little bit, the first line is the program entry that we just mentioned, creating SparkCore, that’s fine,setAppName is just setting a name, love is whatever it is, there’s nothing to say

1.3.3 Added the setMaster(local[2]) extension

In the second line, we setMaster(local[2]). Note here that we consume Kafka data in Receiver and Direct.

There are two JARS available on the official website. One is the integration based on version 0.8, which provides both recevier and Direct integration, and the other is the integration based on version 0.10, which only provides direct.

The Receiver integration starts a recevier thread in Spark’s executor to pull and fetch data that the Receiver doesn’t handle, so the Receiver carries stuff. It is consumed based on Kafka’s high level API, and the offset is automatically saved in ZooKeeper without active maintenance.

At this point, the thread of pulling data and the thread of processing data do not communicate with each other. When the thread of processing data dies, the thread of pulling data is not aware of it, and it is still pulling data. The data will all accumulate in executor memory, and there will be a problem

The Direct method no longer starts a separate thread to pull data, and the data retrieved is no longer stored in executor. The retrieved data is processed directly by the same set of people.

Of course, there are problems: consuming using Kafka’s Low API requires manually maintaining the value of offset. The old version will still be stored in ZooKeeper, the new version will be stored in Kafka’s default topic by default, of course you can have MySQL and Hbase for special needs… those are also possible.

How about setMaster(local[1])? If it is a direct method, it is ok, but in this case, it is a single-thread pull and processing, but if it is a Receiver method, it is screwed. Local [1] means that only one thread is started, and then you will find that your program does not receive an error, but the data is dead or dead. Local [*]; local[*]

after

val ssc = new StreamingContext(conf, Seconds(2))

Copy the code

Get the program entry of SparkStreaming and set 2s to form an RDD

1.3.3 Data acquisition and processing

This is simply fetching data from the socket

// 2. Data processing

val dataDStream = ssc.socketTextStream("localhost".8888)

val result = dataDStream.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)

Copy the code

1.3.4 Data output

// 3. Data output

result.print(a)

Copy the code

If we go to work in the enterprise, we will also find that basically we only need to write the part of the code to process the data, the front part of the fetch configuration and downstream output is not much attention

At this point we can run the code to listen on port 8888 (not 8888, but whatever) using the netcat mentioned in flink 2

But this is not a cumulative statistic, and the cumulative requirement requires some advanced operators

finally

After that, Flink, ES and Spark appeared randomly, now I plan to re-learn all the components, interested friends can follow the public account: say your wish