Abstract: In the sorting and Reducer phases, the reduce side connection process generates huge network I/O traffic. In this phase, the values of the same key are clustered together.

This article is shared with DonglianLin from MapReduce example: reducing side connections in Hadoop MapReduce by huawei cloud community.

In this blog post, I will use MapReduce examples to explain how to perform reduction side connections in Hadoop MapReduce. Here, I assume that you are familiar with the MapReduce framework and know how to write a basic MapReduce program. The topics discussed in this blog are:

  • What is joining?

  • Connections in MapReduce

  • What is Reduce side connection?

  • MapReduce example for reducing side connections

  • conclusion

What is a join?

The JOIN operation is used to join two or more database tables based on foreign keys. Typically, companies maintain separate tables in their databases for customer and transaction records. Moreover, many times these companies need to use data from these separate tables to generate analysis reports. Therefore, they perform join operations on these individual tables using common columns (foreign keys), such as customer ids, to generate composite tables. They then analyze the composite table to obtain the required analysis report.

Connections in MapReduce

Just like SQL Join, we can join different data sets in MapReduce. There are two types of connection operations in MapReduce:

  • Map Side Join: As the name implies, the Join operation takes place in the Map phase itself. Therefore, in a Map Side Join, the Mapper performs the join and the input to each map must be partitioned and sorted by key.

  • Reduction sub – join: As the name implies, on the reduction side join, deceleration is responsible for performing the connection operation. Because the sort and reshuffling phases send values with the same keys to the same Reducer, it is relatively simpler and easier to implement than map Side Join, so by default, the data is organized for us.

Now, let’s take a closer look at reduce Side Join.

What is reduced side connection?

As mentioned above, reduce side join is the process of join operation in the Reducer stage. Basically, reduce Side Join occurs in the following way:

  • Mapper reads input data to be combined based on common columns or join keys.

  • The mapper processes the input and adds labels to the input to distinguish between inputs that belong to different sources or data sets or databases.

  • The mapper outputs an intermediate key-value pair where the key is just the join key.

  • After the sorting and reshuffling phases, a list of keys and values is generated for the reducer.

  • The Reducer now joins the values that exist in the list with the keys to give the final aggregate output.

MapReduce example of reducing edge connections

Suppose I have two separate sports field data sets:

  • Cust_details: This contains the customer details.

  • Transaction_details: Contains the customer’s transaction history.

Using these two datasets, I wanted to know the life cycle value of each customer. In doing so, I will need the following:

  • The name of the person and how often the person visited.

  • The total amount of money he/she has spent on equipment.

The figure above just shows you the schemas for the two datasets on which we will perform reduce Side Join operations. Click the button below to download the entire project that contains the source code and input files for this MapReduce example:

Keep the following in mind when adding the MapReduce sample project above to Eclipse on the Reduce side:

  • The input files are located in the project’s input_files directory. Load these into your HDFS.

  • Don’t forget the path to build Hadoop Reference Jars based on your system or VM (in the Reduce Side Join project lib directory).

Now, let’s learn what happens inside the Map and Reduce phases of this MapReduce example about reduce Side Join:

1. Map stage:

I’ll set up a separate mapper for each of the two datasets, one for cust_DETAILS input and the other for transaction_DETAILS input.

Cust_details mapper:

public static class CustsMapper extends Mapper <Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String record = value.toString(); String[] parts = record.split(","); context.write(new Text(parts[0]), new Text("cust " + parts[1])); }}Copy the code

I’ll read the input one tuple at a time.

I then token each word in the tuple and take the castor ID personal ø N with the name.

Rm E C ħ US ID will be my key value pair keys and my mapper will eventually be generated.

I’ll also add a label “C-us” to indicate that the input tuple is of type CUST_DETAILS.

Therefore, my CUST_DETAILS mapper will generate the following intermediate key-value pairs:

Key-value pair: [customer ID, customer name]

For example: [4000001, C Us Christina], [4000002, Castelpeige] and so on

Mapper for Transaction_details:

public static class TxnsMapper extends Mapper <Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String record = value.toString(); String[] parts = record.split(","); context.write(new Text(parts[2]), new Text("tnxn " + parts[3])); }}Copy the code

Just like cust_Details’ mapper, I’ll follow similar steps here. However, there are some differences:

  • I’m going to get the dollar value instead of the name.

  • In this case, we will use “TNXN” as the tag.

Therefore, the customer ID will be my key of the key-value pair that the mapper eventually generates.

Finally, the output of the Transaction_Details mapper will take the following format:

Key-value pair: [Customer ID, TNXN amount]

Examples: [4000001, TNXN 40.33], [4000002, TNXN 198.44].

2. Sorting and shuffling stages

The sort and reorganization phase generates an arraylist of values for each key. In other words, it puts together all the values corresponding to each unique key in an intermediate key-value pair. The output for the sorting and reorganization phase will be in the following format:

Key-value list:

  • {cust ID1 — [(cust name1), (tnxNamount1), (TNXN amount2), (TNXN amount3),…..] }

  • {customer ID2 – [(Customer name 2), (tnxnamount1), (TNXN amount2), (TNXN amount3),…..] }

Example:

  • {4000001 — [(Cust Kristina), (TNXN 40.33), (TNXN47.05)… };

  • {4000002 — [(Cust Paige), (TNXN 198.44), (TNXN 5.58)…] };

The framework now calls the reduce() method (reduce(Text key,Iterable values, Context Context) for each unique connection key (CUST ID) and corresponding list of values. The Reducer will then perform join operations on the values that exist in the corresponding list of values to finally calculate the desired output. Therefore, the number of Reducer tasks executed will be equal to the number of unique customer ids.

Now let’s see how the Reducer performs the connection operation in this MapReduce example.

3. Reducer stage

If you recall, the main goal of performing this side connection reduction operation was to find out how many times a particular customer visited the complex and the total amount of money that customer spent on different sports. Therefore, my final output should be in the following format:

Key-value pair: [Customer name] (Key) – [Total amount, access frequency] (Value)

_ Reducer code: _

public static class ReduceJoinReducer extends Reducer <Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String name = ""; Double total = 0.0; int count = 0; for (Text t : values) { String parts[] = t.toString().split(" "); if (parts[0].equals("tnxn")) { count++; total += Float.parseFloat(parts[1]); } else if (parts[0].equals("cust")) { name = parts[1]; } } String str = String.format("%d %f", count, total); context.write(new Text(name), new Text(str)); }}Copy the code

Therefore, the following steps will be taken in each reducer to achieve the desired output:

In each reducer, I have a list of keys and values, where the key is just the customer ID. The list of values will have input from two data sets, the amount from Transaction_DETAILS and the name from CUST_DETAILS.

I will now iterate over the values that exist in the reducer list of values.

I then split the list of values and check whether the value is of type Transaction_DETAILS or CUST_DETAILS.

If the type is transaction_details, I do the following steps:

  • I add the counter value to calculate the frequency of this person’s visits.

  • I will accumulate the update value to calculate the total amount spent by the person.

On the other hand, if the value is of type CUST_DETAILS, I store it in a string variable. Later, I will specify the name as the key in my output key-value pair.

Finally, I’ll write output key/value pairs in the output folder of my HDFS.

Therefore, my reducer will produce the following final output:

Christina, 651.05 8

Page, 706.97 6

… .

Also, the whole process we did above is called _Reduce Side Join_ in MapReduce.

The source code:

The source code for the MapReduce example for reducing side connections above is as follows:

import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ReduceJoin { public static class CustsMapper extends Mapper <Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String record = value.toString(); String[] parts = record.split(","); context.write(new Text(parts[0]), new Text("cust " + parts[1])); } } public static class TxnsMapper extends Mapper <Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String record = value.toString(); String[] parts = record.split(","); context.write(new Text(parts[2]), new Text("tnxn " + parts[3])); } } public static class ReduceJoinReducer extends Reducer <Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String name = ""; Double total = 0.0; int count = 0; for (Text t : values) { String parts[] = t.toString().split(" "); if (parts[0].equals("tnxn")) { count++; total += Float.parseFloat(parts[1]); } else if (parts[0].equals("cust")) { name = parts[1]; } } String str = String.format("%d %f", count, total); context.write(new Text(name), new Text(str)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "Reduce-side join"); job.setJarByClass(ReduceJoin.class); job.setReducerClass(ReduceJoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); MultipleInputs.addInputPath(job, new Path(args[0]),TextInputFormat.class, CustsMapper.class); MultipleInputs.addInputPath(job, new Path(args[1]),TextInputFormat.class, TxnsMapper.class); Path outputPath = new Path(args[2]); FileOutputFormat.setOutputPath(job, outputPath); outputPath.getFileSystem(conf).delete(outputPath); System.exit(job.waitForCompletion(true) ? 0:1); }}Copy the code

Run this program

Finally, the command to run the MapReduce sample program above on reduce Side Join is as follows:

hadoop jarreducejoin.jar ReduceJoin /sample/input/cust_details/sample/input/transaction_details /sample/output

conclusion

In the sorting and Reducer phases, the reduce connection process generates a large amount of network I/O traffic. In this phase, values of the same key are clustered together. Therefore, if you have a large number of different data sets with millions of values, you are likely to experience an OutOfMemory exception, that is, your RAM is full and therefore overflows. In my opinion, the advantages of using Reduce Side Join are as follows:

  • This is easy to do because we use the built-in sorting and reshuffling algorithm in the MapReduce framework, which combines the values of the same keys and sends them to the same reducer.

  • In Reduce Side Join, your input does not need to follow any strict format, so you can perform join operations on unstructured data as well.

In general, people prefer Apache Hive, which is part of the Hadoop ecosystem to perform connection operations. Therefore, if you come from an SQL background, you don’t need to worry about writing MapReduce Java code to perform join operations. You can use Hive as an alternative.

Click to follow, the first time to learn about Huawei cloud fresh technology ~