Welcome to my GitHub
Here classification and summary of xinchen all original (including supporting source code) : github.com/zq2599/blog…
This paper gives an overview of
- This article is the tenth hive Learning Note. The UDF is applicable to one-in-one-out scenarios. For example, the specified field of each record is capitalized.
- For example, avG and SUM are both Defiend Aggregate Function (UDAF) with hive. The customized functions for this scenario are the User Defiend Aggregate Function (UDAF). UDAF development is a bit more complicated than going in and out, so this article will take a look at UDAF development.
- The UDAF developed in this paper is named UDF_fieldLength, which is used to count the total length of a specified field in each group when group by.
The preparatory work
- In some of the older tutorials and documentation, the key to UDAF development is to inherit UDaf.java;
- The UDAF class has been annotated as Deprecated when you open hive-Exec 1.2.2.
- UDAF class after being abandoned, recommend there are two alternatives: implement GenericUDAFResolver2 interface, or inheritance AbstractGenericUDAFResolver classes;
- Now a new question arises: which of these two alternatives should we use for UDAF?
- Open AbstractGenericUDAFResolver glance at the source code, as shown below, if there is a an Epiphany, the class itself is GenericUDAFResolver2 interface implementation class:
public abstract class AbstractGenericUDAFResolver
implements GenericUDAFResolver2
{
@SuppressWarnings("deprecation")
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
throws SemanticException {
if (info.isAllColumns()) {
throw new SemanticException(
"The specified syntax for UDAF invocation is invalid.");
}
return getEvaluator(info.getParameters());
}
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] info)
throws SemanticException {
throw new SemanticException(
"This UDAF does not support the deprecated getEvaluator() method."); }}Copy the code
- Has seen since the source code, there would be no good of tangled, inherit the parent class or implement interfaces can be, choose it at your own will, I choose here is inherited AbstractGenericUDAFResolver class;
About the four stages of UDAF
- Before coding, understand the four stages of UDAF, defined in the Mode enumeration of GenericUDAFEvaluator:
- COMPLETE: If MapReduce only has Map and no Reduce, it enters this phase;
- PARTIAL1: map phase of normal MapReduce.
- PARTIAL2: Combiner phase of normal MapReduce.
- FINAL: indicates the Reduce phase of normal MapReduce.
Methods to be called at each stage
- When developing UDAF, you inherit the abstract class GenericUDAFEvaluator, which has multiple abstract methods, one or more of which are called at different stages;
- The following figure makes it clear which methods are called at each stage:
- The following figure illustrates the three phases of sequential execution and the methods involved:
- The origin of the above two pictures are kent7306 article “the Hive UDAF development, rounding, address: blog.csdn.net/kent7306/ar…
- The relationship between the abstract method and each phase is very clear in the above two diagrams. Now let’s start coding.
Download the source code
- If you don’t want to code, you can download all the source code at GitHub, with the following address and link information:
The name of the | link | note |
---|---|---|
Project home page | Github.com/zq2599/blog… | The project’s home page on GitHub |
Git repository address (HTTPS) | Github.com/zq2599/blog… | The project source warehouse address, HTTPS protocol |
Git repository address (SSH) | [email protected]:zq2599/blog_demos.git | The project source warehouse address, SSH protocol |
- This git project has multiple folders. The application of this chapter is in the hiveudf folder, as shown in the red box below:
A brief description of UDAF development steps
Developing UDAF consists of the following steps:
- New class FieldLengthAggregationBuffer, used to hold the intermediate results, the class to inherit AbstractAggregationBuffer;
- Create a new class, FieldLengthUDAFEvaluator, that implements the methods to be called in the four phases and inherits GenericUDAFEvaluator;
- New class FieldLength for registration in the hive UDAF, there will be instantiated FieldLengthUDAFEvaluator, the class to inherit AbstractGenericUDAFResolver;
- Compile build, get jar;
- Add jar to Hive.
- Register functions in Hive;
Then follow the above steps to start operation;
The development of
- Open new hiveudf handled in engineering, the newly built FieldLengthAggregationBuffer. Java, the effect of this class is to cache the middle calculation results, every time the results of calculation in this inside, be passed on to the next stage, its value is used to store a member variable incremental data:
package com.bolingcavalry.hiveudf.udaf;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
public class FieldLengthAggregationBuffer extends GenericUDAFEvaluator.AbstractAggregationBuffer {
private Integer value = 0;
public Integer getValue(a) {
return value;
}
public void setValue(Integer value) {
this.value = value;
}
public void add(int addValue) {
synchronized(value) { value += addValue; }}/** * merge value buffer size, which is used to hold string length, so set to 4byte *@return* /
@Override
public int estimate(a) {
returnJavaDataModel.PRIMITIVES1; }}Copy the code
- New FieldLengthUDAFEvaluator. Java, it is the entire UDAF logic implementation, key code have been added, please combined with the previous pictures to understand, the core idea is to iterate will be processed the current group of fields, the merger of the dispersed data, Then terminate determines the calculation result of the current grouping:
package com.bolingcavalry.hiveudf.udaf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
/ * * *@DescriptionHere is the actual processing class of UDAF *@author: willzhao E-mail: [email protected]
* @date: 2020/11/4 as * /
public class FieldLengthUDAFEvaluator extends GenericUDAFEvaluator {
PrimitiveObjectInspector inputOI;
ObjectInspector outputOI;
PrimitiveObjectInspector integerOI;
/** * The inspector inspects the input and output of each phase, so that other methods can be used directly@param m
* @param parameters
* @return
* @throws HiveException
*/
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
super.init(m, parameters);
// COMPLETE or PARTIAL1, enter the original data of the database
if(Mode.PARTIAL1.equals(m) || Mode.COMPLETE.equals(m)) {
inputOI = (PrimitiveObjectInspector) parameters[0];
} else {
// PARTIAL2 and FINAL phases, both based on the value returned by init in the previous phase
integerOI = (PrimitiveObjectInspector) parameters[0];
}
outputOI = ObjectInspectorFactory.getReflectionObjectInspector(
Integer.class,
ObjectInspectorFactory.ObjectInspectorOptions.JAVA
);
// For the next stage, that is, to tell the next stage, its output data type
return outputOI;
}
public AggregationBuffer getNewAggregationBuffer(a) throws HiveException {
return new FieldLengthAggregationBuffer();
}
/** * reset to clear the total *@param agg
* @throws HiveException
*/
public void reset(AggregationBuffer agg) throws HiveException {
((FieldLengthAggregationBuffer)agg).setValue(0);
}
/** * Methods that are continuously called to execute, and the final data is stored in agG *@param agg
* @param parameters
* @throws HiveException
*/
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
if(null==parameters || parameters.length<1) {
return;
}
Object javaObj = inputOI.getPrimitiveJavaObject(parameters[0]);
((FieldLengthAggregationBuffer)agg).add(String.valueOf(javaObj).length());
}
/** * group by returns the result of the current group *@param agg
* @return
* @throws HiveException
*/
public Object terminate(AggregationBuffer agg) throws HiveException {
return ((FieldLengthAggregationBuffer)agg).getValue();
}
/** * The method executed at the end of the current phase returns partial aggregation results (map, combiner) *@param agg
* @return
* @throws HiveException
*/
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
return terminate(agg);
}
/** * Merge data to add the total length to the cache object (combiner or reduce) *@param agg
* @param partial
* @throws HiveException
*/
public void merge(AggregationBuffer agg, Object partial) throws HiveException { ((FieldLengthAggregationBuffer) agg).add((Integer)integerOI.getPrimitiveJavaObject(partial)); }}Copy the code
- This class is used to register UDAF to Hive and instantiate FieldLengthUDAFEvaluator for Use with Hive:
package com.bolingcavalry.hiveudf.udaf;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
public class FieldLength extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
return new FieldLengthUDAFEvaluator();
}
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException {
return newFieldLengthUDAFEvaluator(); }}Copy the code
At this point, the coding is complete, and deployment and experimentation follow;
Deployment and Experience
This deployment registers temporary functions. If you want to register permanent functions, refer to the previous section.
- Execute MVN clean package -u in the pom. XML directory to compile and build.
- Get the file hiveudf-1.0-snapshot.jar in the target directory;
- Upload the hive server to /home/hadoop/udf.
- Enter the Hive session and run the following command to add the JAR:
add jar /home/hadoop/udf/hiveudf-1.0-SNAPSHOT.jar;
Copy the code
- Run the following command to register:
create temporary function udf_fieldlength as 'com.bolingcavalry.hiveudf.udaf.FieldLength';
Copy the code
- Select * from address; select * from address; select * from address;
hive> select * from address;
OK
1 guangdong guangzhou
2 guangdong shenzhen
3 shanxi xian
4 shanxi hanzhong
6 jiangshu nanjing
Copy the code
- Execute the following SQL
select province, count(city), udf_fieldlength(city) from address group by province;
Copy the code
The results are as follows: Guangdong’s Guangzhou and Shenzhen’s nanjing are 17, Jiangsu’s Nanjing are 7, Shanxi’s Xian and Hanzhong are 12, which meet the expectation:
Total MapReduce CPU Time Spent: 2 seconds 730 msec
OK
guangdong 2 17
jiangshu 1 7
shanxi 2 12
Time taken: 28.484 seconds, Fetched: 3 row(s)
Copy the code
At this point, the study and practice of UDAF are completed, we have mastered the development of multi-in-and-out function, because it involves multiple stages and the logic of external call, it makes the development of UDAF slightly more difficult, the following article is the development of multi-in-and-out, which will be easier.
You are not alone, Xinchen original accompany all the way
- Java series
- Spring series
- The Docker series
- Kubernetes series
- Database + middleware series
- The conversation series
Welcome to pay attention to the public number: programmer Xin Chen
Wechat search “programmer Xin Chen”, I am Xin Chen, looking forward to enjoying the Java world with you…
Github.com/zq2599/blog…