One, a brief introduction
If the HBase data volume reaches billions of rows or millions of columns, whether a large amount of data can be returned in the query depends on the network bandwidth. Even if the network condition permits, the computing processing on the client may not meet the requirements. Coprocessors come into being in this situation. It allows you to put the business calculation code into the RegionServer coprocessor and return the processed data back to the client, which can greatly reduce the amount of data that needs to be transferred for a performance gain. In addition, the coprocessor allows users to extend functions that HBase does not provide, such as permission verification, secondary indexes, and integrity constraints.
Type of coprocessor
2.1 Observer coprocessor
Function of 1.
Observer coprocessors are similar to triggers in relational databases that are invoked by the Server when certain events occur. It is usually used to achieve the following functions:
- Permission to check: in the implementation
Get
或Put
Before you operate, you can usepreGet
或prePut
Method check permissions; - Integrity restriction: HBase does not support the foreign key function in relational databases. You can use a trigger to check the associated data when data is being inserted or deleted.
- Secondary indexes: Secondary indexes can be maintained using a coprocessor.
Type 2.
There are currently four types of Observer coprocessors:
- RegionObserver: Allows you to observe events on a Region, such as Get and Put operations.
- RegionServerObserver: Allows you to observe events related to A RegionServer operation, such as starting, stopping, or performing a merge, commit, or rollback.
- MasterObserver: Allows you to observe HBase master-related events, such as table creation, deletion, or schema modification.
- WalObserver: Allows you to observe events related to pre-written logging (WAL).
3. The interface
Each of the four types of Observer coprocessors inherits from the Coprocessor interface, which defines all available hook methods to perform specific operations before and after the corresponding method. Usually, we don’t implement the interface directly, but inherit from its Base implementation class, which simply implements the methods in the interface, so that we don’t have to implement all the methods when we implement our custom coprocessor, just override the necessary methods.
In this example, RegionObservers defines all available hook methods in their interface class. Here are some of the definitions, most of which occur in pairs, including pre and POST:
4. Execute the process
- The client sends a PUT request
- The request is dispatched to the appropriate RegionServer and Region
- CoprocessorHost intercepts the request and then calls prePut() on each RegionObserver of the table
- If not by
prePut()
Intercepted, the request continues to region for processing - The result of the region is again intercepted by CoprocessorHost and called
postPut()
- If do not have
postPut()
The response is intercepted and the final result is returned to the client
If you are familiar with Spring, you can compare this approach to the implementation of its AOP, as well as the official documentation:
If you are familiar with Aspect Oriented Programming (AOP), you can think of a coprocessor as applying advice by intercepting a request and then running some custom code,before passing the request on to its final destination (or even changing the destination).
If you’re familiar with section-oriented programming (AOP), you can think of the coprocessor as using Advice by intercepting the request, then running some custom code, and then passing the request to its final destination (or changing the destination).
2.2 Endpoint coprocessor
The Endpoint coprocessor is similar to stored procedures in a relational database. The client can call the Endpoint coprocessor to process the data on the server side and then return.
Take the aggregation operation as an example. Without a coprocessor, when the user needs to find the maximum data in a table, namely the Max aggregation operation, the user must conduct a full table scan and then traverse the scan results on the client, which inevitably increases the pressure on the client to process data. By using Coprocessor, users can deploy the code to obtain the maximum value on the HBase Server. HBase uses multiple nodes in the underlying cluster to obtain the maximum value concurrently. That is, the code for calculating the maximum value of each Region is executed on the Region Server, and the Max value is returned to the client. Then the client only needs to compare the maximum value of each Region to find the maximum value.
Loading mode of co-processing
To use our own coprocessor, it must be loaded statically (using HBase configuration) or dynamically (using HBase Shell or Java API).
- A statically loaded Coprocessor is called a System Coprocessor (system-level Coprocessor). It applies to all tables on the HBase and the HBase service needs to be restarted.
- A dynamically loaded Coprocessor is called a Table Coprocessor and works on a specific Table without restarting the HBase service.
The loading and unloading methods are described as follows.
Static loading and unloading
4.1 Static Loading
Static loading is divided into the following three steps:
- in
hbase-site.xml
Define the coprocessor to load.
<property>
<name>hbase.coprocessor.region.classes</name>
<value>org.myname.hbase.coprocessor.endpoint.SumEndPoint</value>
</property>
Copy the code
The value of the
tag must be one of the following:
- RegionObservers and Endpoints coprocessors:
hbase.coprocessor.region.classes
- Astute coprocessor:
hbase.coprocessor.wal.classes
- MasterObservers coprocessor:
hbase.coprocessor.master.classes
must be the fully qualified class name of the coprocessor implementation class. If more than one class is specified for loading, the class names must be separated by commas.
-
Place the JAR (containing the code and all dependencies) in the lib directory of the HBase installation directory.
-
Restart HBase.
4.2 Static Uninstallation
-
Delete the configured coprocessor <property> element and its child elements from hbase-site.xml.
-
(Optional) Delete the JAR file of the coprocessor from the classpath or HBase lib directory.
-
Restart HBase.
Dynamic loading and unloading
The dynamic loading coprocessor is used without restarting HBase. But dynamically loaded coprocessors are loaded on a per-table basis and can only be used for the specified table. In addition, tables must be taken offline (disable) to load the coprocessor when using dynamic loading. Dynamic loading usually comes in two ways: Shell and Java API.
The following example is based on two premises:
- The coprocessor.jar contains the coprocessor implementation and all its dependencies.
- JAR package is stored in HDFS: HDFS: //
:/ user /
/coprocessor.jar
5.1 Dynamic loading of HBase Shell
- Disable tables using HBase Shell
hbase > disable 'tableName'
Copy the code
- Use the following command to load the coprocessor
hbase > alter 'tableName', METHOD => 'table_att', 'Coprocessor'=>'hdfs://<namenode>:<port>/
user/<hadoop-user>/coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823|
arg1=1,arg2=2'
Copy the code
Coprocessor contained by a pipe (|) character separation of four parameters, in order to explain the following:
-
JAR package path: The path of the JAR package in the HDFS. Note the following two points about paths:
-
You can use wildcard characters, such as HDFS ://
:/user/
/*.jar to add a specified JAR package.
-
You can specify a directory, for example, HDFS ://
:/user/
/. This will add all JAR packages in the directory, but will not search for JAR packages in subdirectories.
-
Class name: The full class name of the coprocessor.
-
Priority: The priority of the coprocessor, following the natural order of numbers, that is, the smaller the value, the higher the priority. Can be null, in which case a default priority value will be assigned.
-
Optional parameters: Optional parameters of the passed coprocessor.
- Enable the table
hbase > enable 'tableName'
Copy the code
- Verify that the coprocessor is loaded
hbase > describe 'tableName'
Copy the code
The coprocessor appearing in the TABLE_ATTRIBUTES attribute represents a successful load.
5.2 HBase Shell Dynamic Uninstallation
- Disable the table
hbase> disable 'tableName'
Copy the code
- Remove the table coprocessor
hbase> alter 'tableName', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
Copy the code
- Enable the table
hbase> enable 'tableName'
Copy the code
5.3 Java API Dynamic loading
TableName tableName = TableName.valueOf("users");
String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar";
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
hTableDescriptor.setValue("COPROCESSOR$1", path + "|"
+ RegionObserverExample.class.getCanonicalName() + "|"
+ Coprocessor.PRIORITY_USER);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);
Copy the code
In HBase 0.96 and later versions, the addCoprocessor() method of HTableDescriptor provides a more convenient loading method.
TableName tableName = TableName.valueOf("users");
Path path = new Path("hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar");
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
hTableDescriptor.addCoprocessor(RegionObserverExample.class.getCanonicalName(), path,
Coprocessor.PRIORITY_USER, null);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);
Copy the code
5.4 Java API Dynamic Uninstallation
Unload is simply redefining the table without setting up the coprocessor. This removes coprocessors on all tables.
TableName tableName = TableName.valueOf("users");
String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar";
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);
Copy the code
6. Coprocessor cases
In this example, a coprocessor similar to the Append command in Redis is implemented. When the put operation is performed on an existing column, the update operation is performed in HBase by default. In this example, the append operation is performed in HBase.
#Redis append command example
redis> EXISTS mykey
(integer) 0
redis> APPEND mykey "Hello"
(integer) 5
redis> APPEND mykey " World"
(integer) 11
redis> GET mykey
"Hello World"
Copy the code
6.1 Creating a Test Table
#Create a magazine table with two column families: article and image
hbase > create 'magazine','article','picture'
Copy the code
6.2 Coprocessor programming
The complete code can be seen in this repository: hbase-observer-coprocessor
Create a New Maven project and import the following dependencies:
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.0</version>
</dependency>
Copy the code
RegionObserver (article: Content); RegionObserver (article: Content); BaseRegionObserver (article: Content); RegionObserver (article: Content);
public class AppendRegionObserver extends BaseRegionObserver {
private byte[] columnFamily = Bytes.toBytes("article");
private byte[] qualifier = Bytes.toBytes("content");
@Override
public void prePut(ObserverContext
e, Put put, WALEdit edit, Durability durability)
throws IOException {
if (put.has(columnFamily, qualifier)) {
// Iterate over the query result to get the original value of the specified column
Result rs = e.getEnvironment().getRegion().get(new Get(put.getRow()));
String oldValue = "";
for (Cell cell : rs.rawCells())
if (CellUtil.matchingColumn(cell, columnFamily, qualifier)) {
oldValue = Bytes.toString(CellUtil.cloneValue(cell));
}
// Gets the newly inserted value for the specified column
List<Cell> cells = put.get(columnFamily, qualifier);
String newValue = "";
for (Cell cell : cells) {
if(CellUtil.matchingColumn(cell, columnFamily, qualifier)) { newValue = Bytes.toString(CellUtil.cloneValue(cell)); }}/ / Append operationput.addColumn(columnFamily, qualifier, Bytes.toBytes(oldValue + newValue)); }}}Copy the code
6.3 Packaging Items
Run the maven command to package the package. The file name is hbase-observer-coprocessor-1.0-snapshot.jar
# mvn clean package
Copy the code
6.4 Uploading the JAR Package to the HDFS
#Upload the project to the hbase directory on the HDFSHadoop fs -put /usr/app/ hbase-observer-coprocessor-1.0-snapshot.jar /hbase#Check whether the upload is successful
hadoop fs -ls /hbase
Copy the code
6.5 Loading a Coprocessor
- Tables need to be disabled before loading the coprocessor
hbase > disable 'magazine'
Copy the code
- Load coprocessor
hbase > alter 'magazine', METHOD => 'table_att', 'Coprocessor' = > 'HDFS: / / hadoop001:8020 / hbase/hbase - the observer - Coprocessor - 1.0 - the SNAPSHOT. Jar. | com heibaiying. AppendRegionObser ver|1001|'Copy the code
- Enable the table
hbase > enable 'magazine'
Copy the code
- Check whether the coprocessor is loaded successfully
hbase > desc 'magazine'
Copy the code
When the coprocessor appears in the TABLE_ATTRIBUTES attribute, it indicates that the load is successful, as shown below:
6.6 Testing the Loading Result
Insert a set of test data:
hbase > put 'magazine', 'rowkey1','article:content','Hello'
hbase > get 'magazine','rowkey1','article:content'
hbase > put 'magazine', 'rowkey1','article:content','World'
hbase > get 'magazine','rowkey1','article:content'
Copy the code
You can see that the append operation has been performed on the value of the specified column:
Insert a set of control data:
hbase > put 'magazine', 'rowkey1','article:author','zhangsan'
hbase > get 'magazine','rowkey1','article:author'
hbase > put 'magazine', 'rowkey1','article:author','lisi'
hbase > get 'magazine','rowkey1','article:author'
Copy the code
You can see that the update operation is still performed for the normal columns:
6.7 Uninstalling a CoPROCESSOR
- You need to disable tables before uninstalling the coprocessor
hbase > disable 'magazine'
Copy the code
- Unload coprocessor
hbase > alter 'magazine', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
Copy the code
- Enable the table
hbase > enable 'magazine'
Copy the code
- Check whether the coprocessor is uninstalled successfully
hbase > desc 'magazine'
Copy the code
6.8 Testing the Uninstallation Result
Run the following commands to test whether the uninstallation succeeds
hbase > get 'magazine','rowkey1','article:content'
hbase > put 'magazine', 'rowkey1','article:content','Hello'
hbase > get 'magazine','rowkey1','article:content'
Copy the code
The resources
- Apache HBase Coprocessors
- Apache HBase Coprocessor Introduction
- Advanced HBase knowledge
See the GitHub Open Source Project: Getting Started with Big Data for more articles in the big Data series