A Major Compaction occurs when the LevelDB database submerges the ordered key-value pairs of the high-level file and multiple ordered key-value pairs of the low-level file. The input of the disk multipath merge sort algorithm is an ordered key-value pair from multiple disk files, which is sorted in memory and then output to one or more new disk files.

Multipath merge sort is also a common algorithm in the field of big data, which is often used for sorting massive data. When particularly large amounts of data, the data cannot be memory for a single machine, it needs to be split multiple sets respectively by different machine memory ordering process (map), and then to multiplex merge algorithm will be ordered by data from a number of different machines (reduce) process, it is current multiplex merge sort, why say streaming sorting, Because the data source comes from a network socket.

The advantage of merge sort is that its memory consumption is very low. Its memory consumption is proportional to the number of input files, and has nothing to do with the total amount of data. The total amount of data only linearly proportional to the time of sorting.

Now let’s personally implement the disk merge algorithm, why disk, because its input comes from disk files.

Algorithm ideas

We need to maintain an ordered array in memory. The current smallest element of each input file is placed as an element in the array. Arrays remain sorted by element size.

The logic of the loop always starts with the smallest element, fetching the next element in its file and comparing it with the current element in the array. According to the comparison results for different processing, here we use binary search algorithm for fast comparison. Notice that the elements in each input file are in order.

  1. If the fetched element is equal to the smallest element in the current array, then the element can be printed directly. Then we go on to the next cycle. It is impossible to fetch an element smaller than the smallest element in the current array, because the input file itself is also ordered.

  1. Otherwise, you need to insert the element into the specified position in the current array, keeping the array in order. The current smallest element in the array is then printed and removed. Let’s go through the next cycle.

Binary search

Need special attention is Java built-in binary search algorithm in the use of more sophisticated.

public class Collections {...public static <T> int binarySearch(List<T> list, T key) {...if (found) {
      return index;
    } else {
      return -(insertIndex+1); }}... }Copy the code

If the key can be found in the list, return the corresponding location. If it doesn’t, it returns a negative number, not just a simple negative number of -1, which specifies the insertion position, which means that if you insert the key there, the array will remain in order.

For example, if binarySearch returns index=-1, then insertIndex is -(index+1), which is 0, at the beginning of the array. If index=-size-1 is returned, then insertIndex is size, the end of the array. Other negative numbers are inserted into the middle of the array.

Input file class

A MergeSource object is created for each input file, which provides hasNext() and next() methods for determining and getting the next element. Note that the input file is ordered, and the next element is the smallest element in the current input file. The hasNext() method reads the next line and caches it in the cachedLine variable, and calls the next() method to convert the cachedLine variable to an integer and return it.

class MergeSource implements Closeable {
	private BufferedReader reader;
	private String cachedLine;
	private String filename;

	public MergeSource(String filename) {
		this.filename = filename;
		try {
            FileReader fr = new FileReader(filename);
			this.reader = new BufferedReader(fr);
		} catch (FileNotFoundException e) {
		}
	}

	public boolean hasNext(a) {
		String line;
		try {
			line = this.reader.readLine();
			if (line == null || line.isEmpty()) {
				return false;
			}
			this.cachedLine = line.trim();
			return true;
		} catch (IOException e) {
		}
		return false;
	}

	public int next(a) {
		if (this.cachedLine == null) {
			if(! hasNext()) {throw new IllegalStateException("no content"); }}int num = Integer.parseInt(this.cachedLine);
		this.cachedLine = null;
		return num;
	}

	@Override
	public void close(a) throws IOException {
		this.reader.close(); }}Copy the code

Memory ordered array element class

Get the array ready before sorting, place the smallest element of each input file into the array, and sort.

class Bin implements Comparable<Bin> {
	int num;
	MergeSource source;

	Bin(MergeSource source, int num) {
		this.source = source;
		this.num = num;
	}

	@Override
	public int compareTo(Bin o) {
		return this.num - o.num; }}List<Bin> prepare(a) {
  	List<Bin> bins = new ArrayList<>();
	for (MergeSource source : sources) {
		Bin newBin = new Bin(source, source.next());
		bins.add(newBin);
	}
    Collections.sort(bins);
    return bins;
}
Copy the code

Output file class

Flush () before closing the output file to avoid losing the cached contents of the PrintWriter.

class MergeOut implements Closeable {
	private PrintWriter writer;

	public MergeOut(String filename) {
		try {
            FileOutputStream out = new FileOutputStream(filename);
			this.writer = new PrintWriter(out);
		} catch (FileNotFoundException e) {
		}
	}

	public void write(Bin bin) {
		writer.println(bin.num);
	}

	@Override
	public void close(a) throws IOException { writer.flush(); writer.close(); }}Copy the code

Prepare the contents of the input file

Let’s generate a series of input files, each containing a bunch of random integers. A total of N files are generated, each with an integer number between minEntries and minEntries. Returns a list of file names for all input files.

List<String> generateFiles(int n, int minEntries, int maxEntries) {
	List<String> files = new ArrayList<>();
	for (int i = 0; i < n; i++) {
		String filename = "input-" + i + ".txt";
		PrintWriter writer;
		try {
			writer = new PrintWriter(new FileOutputStream(filename));
            ThreadLocalRandom rand = ThreadLocalRandom.current();
			int entries = rand.nextInt(minEntries, maxEntries);
			List<Integer> nums = new ArrayList<>();
			for (int k = 0; k < entries; k++) {
				int num = rand.nextInt(10000000);
				nums.add(num);
			}
			Collections.sort(nums);
			for (int num : nums) {
				writer.println(num);
			}
			writer.flush();
			writer.close();
		} catch (FileNotFoundException e) {
		}
		files.add(filename);
	}
	return files;
}
Copy the code

Sorting algorithm

Everything is ready except the east wind. With all the above classes in place, the sorting algorithm is simple and requires very little code. It is easy to understand the following algorithm by comparing the above algorithm ideas.

public void sort(a) {
	List<Bin> bins = prepare();
	while (true) {
        // Take the smallest element in the array
		MergeSource current = bins.get(0).source;
		if (current.hasNext()) {
            // Retrieves the next element from the input file
			Bin newBin = new Bin(current, current.next());
            // Binary lookup, that is, comparing with an existing element in an array
			int index = Collections.binarySearch(bins, newBin);
			if (index == 0) {
                // The algorithm is set to 1
				this.out.write(newBin);
			} else {
                // Algorithm idea 2
				if (index < 0) {
					index = -(index+1); } bins.add(index, newBin); Bin minBin = bins.remove(0);
				this.out.write(minBin); }}else {
            // The end of the file is encountered
			Bin minBin = bins.remove(0);
			this.out.write(minBin);
			if (bins.isEmpty()) {
				break; }}}}Copy the code

All the code

The following code can be copied and pasted directly into the IDE.

package leetcode;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

public class DiskMergeSort implements Closeable {

	public static List<String> generateFiles(int n, int minEntries, int maxEntries) {
		List<String> files = new ArrayList<>();
		for (int i = 0; i < n; i++) {
			String filename = "input-" + i + ".txt";
			PrintWriter writer;
			try {
				writer = new PrintWriter(new FileOutputStream(filename));
				int entries = ThreadLocalRandom.current().nextInt(minEntries, maxEntries);
				List<Integer> nums = new ArrayList<>();
				for (int k = 0; k < entries; k++) {
					int num = ThreadLocalRandom.current().nextInt(10000000);
					nums.add(num);
				}
				Collections.sort(nums);
				for (int num : nums) {
					writer.println(num);
				}
				writer.close();
			} catch (FileNotFoundException e) {
			}
			files.add(filename);
		}
		return files;
	}

	private List<MergeSource> sources;
	private MergeOut out;

	public DiskMergeSort(List<String> files, String outFilename) {
		this.sources = new ArrayList<>();
		for (String filename : files) {
			this.sources.add(new MergeSource(filename));
		}
		this.out = new MergeOut(outFilename);
	}

	static class MergeOut implements Closeable {
		private PrintWriter writer;

		public MergeOut(String filename) {
			try {
				this.writer = new PrintWriter(new FileOutputStream(filename));
			} catch (FileNotFoundException e) {
			}
		}

		public void write(Bin bin) {
			writer.println(bin.num);
		}

		@Override
		public void close(a) throws IOException { writer.flush(); writer.close(); }}static class MergeSource implements Closeable {
		private BufferedReader reader;
		private String cachedLine;

		public MergeSource(String filename) {
			try {
				FileReader fr = new FileReader(filename);
				this.reader = new BufferedReader(fr);
			} catch (FileNotFoundException e) {
			}
		}

		public boolean hasNext(a) {
			String line;
			try {
				line = this.reader.readLine();
				if (line == null || line.isEmpty()) {
					return false;
				}
				this.cachedLine = line.trim();
				return true;
			} catch (IOException e) {
			}
			return false;
		}

		public int next(a) {
			if (this.cachedLine == null) {
				if(! hasNext()) {throw new IllegalStateException("no content"); }}int num = Integer.parseInt(this.cachedLine);
			this.cachedLine = null;
			return num;
		}

		@Override
		public void close(a) throws IOException {
			this.reader.close(); }}static class Bin implements Comparable<Bin> {
		int num;
		MergeSource source;

		Bin(MergeSource source, int num) {
			this.source = source;
			this.num = num;
		}

		@Override
		public int compareTo(Bin o) {
			return this.num - o.num; }}public List<Bin> prepare(a) {
		List<Bin> bins = new ArrayList<>();
		for (MergeSource source : sources) {
			Bin newBin = new Bin(source, source.next());
			bins.add(newBin);
		}
		Collections.sort(bins);
		return bins;
	}

	public void sort(a) {
		List<Bin> bins = prepare();
		while (true) {
			MergeSource current = bins.get(0).source;
			if (current.hasNext()) {
				Bin newBin = new Bin(current, current.next());
				int index = Collections.binarySearch(bins, newBin);
				if (index == 0 || index == -1) {
					this.out.write(newBin);
					if (index == -1) {
						throw new IllegalStateException("impossible"); }}else {
					if (index < 0) {
						index = -index - 1;
					}
					bins.add(index, newBin);
					Bin minBin = bins.remove(0);
					this.out.write(minBin); }}else {
				Bin minBin = bins.remove(0);
				this.out.write(minBin);
				if (bins.isEmpty()) {
					break; }}}}@Override
	public void close(a) throws IOException {
		for (MergeSource source : sources) {
			source.close();
		}
		this.out.close();
	}

	public static void main(String[] args) throws IOException {
		List<String> inputs = DiskMergeSort.generateFiles(100.10000.20000);
		// It takes time to run the algorithm multiple times
		for (int i = 0; i < 20; i++) {
			DiskMergeSort sorter = new DiskMergeSort(inputs, "output.txt");
			long start = System.currentTimeMillis();
			sorter.sort();
			long duration = System.currentTimeMillis() - start;
			System.out.printf("%dms\n", duration); sorter.close(); }}}Copy the code

This algorithm also has a minor flaw, that is, if the number of input files is very large, the array in memory will be very large, and the operation of inserting and deleting the array will be very time-consuming. In this case, you can consider using TreeSet instead of array. Readers can try it by themselves.