Problem description

For a large website, the number of visitors is in the billions. We can do some simple calculations here about what a billion is. What data do we typically record for a single user visit?

  • 1. User ID
  • 2. User access time
  • 3. Duration of the user’s stay
  • 4. Operations performed by users
  • 5. The rest of the user’s data (such as IP, etc.)

Let’s just look at the user ID, for example, 10011802330414, so our ID is almost a long type, because when we store a lot of data, we use text storage. So for 500 million user ids, it’s all on disk, about five gigabytes, and for that size, it’s not big data. But for one case, it’s more than enough.

We will generate a dataset of 500 million ids, which we mentioned above is 5 gigabytes in size (without compression), so I won’t upload such a dataset on GitHub, but we will provide a way to generate a dataset of 500 million ids.

Of course, to solve this problem, you can still run the project in local mode, but you need to have enough disk space and memory space, about 8GB disk space (because in addition to the data itself, spark also generates some temporary data), and 5G memory (reduceByKey). To really demonstrate spark’s features, our example will run on a Spark cluster.

I will cover how to build a cluster in a later chapter. But there are plenty of cluster building tutorials on the web, some of which are excellent and detailed. Of course, we don’t cover clustering in this section, but we can still start our case.

Problem analysis

So now that we have 500 million pieces of data (which are actually not stored as text, but generated at run time), it doesn’t seem too hard to figure out who’s accessed the most from 500 million pieces of data. But we really want to use this example to understand spark’s real strengths.

500 million ID data can be firstly cached into RDD by map, then reduceByKey can be carried out for RDD, and finally find out the ID with the most occurrence. The idea is simple, so there’s not a lot of code

implementation

Scala implementation

The first is the ID generation method:

RandomId.class

import scala.Serializable;

public class RandomId implements Serializable {

    private static final long twist(long u, long v) {
        return (((u & 0x80000000L) | (v & 0x7fffffffL)) >> 1) ^ ((v & 1) == 1 ? 0x9908b0dfL : 0);
    }
    private long[] state= new long[624];
    private int left = 1;
    public RandomId() {
        for (int j = 1; j < 624; j++) {
            state[j] = (1812433253L * (state[j - 1] ^ (state[j - 1] >> 30)) + j);
            state[j] &= 0xfffffffffL;
        }
    }
    public void next_state() {
        int p = 0;
        left = 624;
        for (int j = 228; --j > 0; p++)
            state[p] = state[p+397] ^ twist(state[p], state[p + 1]);

        for(int j=397; --j>0; p++) state[p] = state[p-227] ^ twist(state[p], state[p + 1]); state[p] = state[p-227] ^ twist(state[p], state[0]); } public longnext() {
        if (--left == 0) next_state();
        returnstate[624-left]; }}Copy the code

And then use it to generate 500 million pieces of data

import org.apache.spark.{SparkConf, SparkContext}

object ActiveVisitor {


  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")

    val sc = new SparkContext(conf)

    val list = 1 until 100000

    val id =new RandomId()

    var max = 0

    var maxId = 0L

    val lastNum = sc.parallelize(list).flatMap(num => {
      var list2 = List(id.next())
      for (i <- 1 to 50000){
        list2 = id.next() :: list2
      }
      println(num +"%")
      list2
    }).map((_,1)).reduceByKey(_+_).foreach(x => {
      if (x._2 > max){
        max = x._2
        maxId = x._1
        println(x)
      }
    })
  }

}
Copy the code

Processing 500 million pieces of data

import org.apache.spark.{SparkConf, SparkContext}

object ActiveVisitor {


  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor"Val sc =new SparkContext(conf) val list = 1 until 10000 val id =new RandomId() var Max = Var maxId = 0L val lastNum = sc.parallelize(list) var maxId = 0L val lastNum = sc.parallelize(list Var list2 = List(id.next())for(I < -1 to 50000){list2 = id.next() :: list2} // Here records the percentage of generated IDS println(num/1000.0 +"%") // Return the generated list // each loop contains 50 thousand IDS // / over 500 million data // appear for each data 1. Map ((_,1)) // process the marked data // get the number of occurrences of each ID, i.e. (ID,1) ReduceByKey (_+_) // Run through the processed data. Foreach (x => {// Store the maximum value in Maxif(x._2 > Max){Max = x._2 maxId = x._1 // If x is larger than the value previously recorded, output the id and the number // Output the last time, // Println (x)}})}}Copy the code

Run results

Submit the logs to Spark for execution and observe the logs

1% 5000% 2% 5001% 3% 5002% 4% 5003% 5% 5004% 6% 5005% 7% 5006% 8% 5007% 9% 5008% 10% 5009% 11% 5010% 12% 5011% 5012% 13% 5013% 14% 15% 5014%... . .Copy the code

Let’s look at another section of the log

5634%
5635%
5636%
5637%
5638%
5639%
5640%
5641%
5642%
5643%
5644%
5645%
2019-03-05 11:52:14 INFO  ExternalSorter:54 - Thread 63 spilling in- Memory map of 1007.3 MB to disk (2times so far)
647%
648%
649%
650%
651%
652%
653%
654%
655%
656%
Copy the code

Note here that ‘Spilling in-memory map of 1007.3 MB to disk’ overwrites 1007.3 MB of data from the map to disk. The reason is that spark overwrites a large amount of data to the disk during processing. When the data is used again, the data is read from the disk. For programs that operate in real time, multiple and massive reads and writes to disks are never allowed. However, overwrite to disk is a very common operation when dealing with big data.

In fact, in the complete log, we can see that a significant portion of the log was generated during disk overwrite, about 49 times (the total in my operation)

As shown in figure:

In total, there were 49 log overwrites, and each log was about 1 GIGAByte, which also supports our saying that 500 million data pieces occupy 5 gigabytes of space. In fact, I stored those 500 million pieces of data on a disk, which took up about five gigabytes of space.

The results of

Finally, we can see the results in the log.

The whole process took about 47 minutes, which can be much shorter in a large cluster, as we are currently using only four nodes.

This is not surprising because clusters run in parallel, asynchronously, and duplicate results are quite normal. Of course, we can also use concurrency mechanisms to deal with this phenomenon. And this is something that we will continue to refine in subsequent cases.

From the results, we found that in 500 million data pieces, the ID that appeared most only appeared 8 times, indicating that in a large number of data, many ids may only appear once or twice. That’s why I ended up using the Foreach method to find the maximum instead of using the following method

import org.apache.spark.{SparkConf, SparkContext}

object ActiveVisitor {


  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor"Val sc =new SparkContext(conf) val list = 1 until 10000 val id =new RandomId() var Max = Var maxId = 0L val lastNum = sc.parallelize(list) var maxId = 0L val lastNum = sc.parallelize(list Var list2 = List(id.next())for(I < -1 to 50000){list2 = id.next() :: list2} // Here records the percentage of generated IDS println(num/1000.0 +"%") // Return the generated list // each loop contains 50 thousand IDS // / over 500 million data // appear for each data 1. Map ((_,1)) // process the marked data // get the number of occurrences of each ID, i.e. (ID,1) Count).reduceByKey(_+_) // Sort data // reverse order.sortByKey (falsePrintln (lastnum.first ())}}Copy the code

In this method, we sort the reduceByKey results and output the first sorting result, that is, the ID with the largest number of times. This seems to be more in line with our requirements. But in fact, it takes more resources to get the same result. So, as we said, a lot of these ID starts actually only show up once or twice, and you still sort them when you sort them. Keep in mind that since many ids appear only once, the size of the sorted dataset is likely to be in the hundreds of millions of entries.

Based on what we know about sorting algorithms, sorting such a large data set is bound to cost a lot of resources. Therefore, we can tolerate some redundant output without affecting our ability to get the correct result.

At this point, out of 500 million data points, we’re looking for the most frequent data points. If you are interested, you can try this method to solve the 5 billion data items, the most data items. But to do that, you need 50 gigabytes of space. Even with the above program, it’s instant messaging, but 5 billion pieces of data can still take a lot of time.