background

This project mainly solves the online query of check and Opinion historical data tables (historical data refers to the complete intermediate process and result data in the process of business occurrence). The original implementation provides storage query services based on Oracle. With the increasing amount of data, performance problems are encountered in the process of writing and reading data. In addition, historical data is only used for service query reference and does not affect the actual flow. This project will be placed in the downstream data processing Hadoop distributed platform to achieve this requirement. Here are some specific demand indicators:

  1. Data volume: The current cumulative data volume of check table is 5000W + rows, 11GB; The accumulated data amount of opinion table is 300 million +, about 100GB. Daily increments of about 500,000 + rows per table, insert only, no update.
  2. The primary key of the check table is ID (Oracle global ID), and the query key is check_id. One check_id corresponds to multiple records. Therefore, the list of corresponding records must be returned. The primary key of the opinion table is also id, and the query keys are bussiness_no and buss_type. The size of a single query is less than 50 lists, the query frequency is about 100 lists per day, and the query response time is 2s.

Technology selection

In terms of data volume and query requirements, HBase is preferred for distributed platforms that store large amounts of data and provide real-time query capabilities. After a preliminary survey and evaluation based on requirements, HBase is selected as a major storage component. Requirements are divided into writing and reading HBase.

To read HBase, the scheme is determined. Rowkeys are designed based on requirements, and data is read based on the rich apis (such as GET and Scan) provided by HBase to meet performance requirements.

The methods for writing HBase are as follows:

  1. Java Invokes the native API of HBase. Table. Add (List(Put)).
  2. MapReduce jobs, using TableOutputFormat as output.
  3. Bulk Load: Generates a persistent HFile based on the HBase internal data format, copies the HFile to a proper location, and notifying RegionServer. That is, the massive data is stored in the HFile. You can select MapReduce or Spark to generate Hfile.

In this document, Spark + Bulk Load is used to write data to HBase. This method has the following advantages compared with the other two methods:

  1. BulkLoad does not write WAL or flush or split.
  2. If we make a lot of calls to the PUT interface to insert data, we can cause a lot of GC operations. In addition to affecting performance, BulkLoad may even affect the stability of HBase nodes.
  3. There are no large number of interface calls that drain performance.
  4. You can take advantage of Spark’s powerful computing capability.

The illustration is as follows:

design

Environmental information

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Hadoop 2.5-2.7
HBase 0.98.6
Spark 2.0.0-2.1.1
Sqoop 1.4.6__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

Table design

This section focuses on HBase table design, in which RowKey is the most important part. To illustrate, let’s first look at the data format. For example, check and opinion.

Check table (there are 18 fields in the original table, 5 fields are selected in this paper for convenient description)

As shown in the preceding figure, the primary key is ID and consists of 32 letters and numbers randomly. The service query field check_id contains an indefinite length field (no more than 32 characters) and consists of letters and numbers. The same check_id may correspond to multiple records, and other fields are related service fields. As is known to all, HBase provides queries based on rowKeys, which must be unique. RowKey’s design is primarily concerned with how the data will be accessed. Initially, we have two design approaches.

  1. Split two tables. One table id is used as RowKey and is listed as each column of the check table. The other table is the index table, and the RowKey is check_id. Each column corresponds to an ID. Select * from check_id; select * from check_id; select * from check_id; All are HBase GET operations.
  2. Think of this requirement as a range query rather than a single query. Prefix the check_id with the RowKey, followed by the ID. Set startRow and stopRow for Scan and find the corresponding record list.

The advantages of the first method are simple table structure and easy RowKey design. The disadvantages of the first method are: 1) When data is written, a row of original data needs to be written to two tables, and the index table needs to scan for the existence of the RowKey before writing. If the RowKey exists, add a column; otherwise, create a new row. Even with a List, you need to read the table at least twice. The second design method, the RowKey design, is more complex, but writes and reads are one-time. Overall consideration, we adopt the second design method.

RowKey design

Hot issue

Rows in HBase are sorted by RowKey lexicographically. Hot issues usually occur when a large number of clients directly access one or a few nodes in the cluster. By default, a table is created with only one region and is divided into more regions as the number of regions increases. These regions can be distributed on multiple RegionServers to evenly distribute loads. Due to the large amount of data in stock, it is necessary to evenly distribute HBase load to each RegionServer at the beginning, that is, to perform pre-split. Common methods to prevent hot spots include adding salt, hashing, and flipping incremental parts (such as time stamps).

RowKey design

Step1: determine the number of pre-partitioning and create an HBase Table

Different business scenarios and data characteristics determine the number in different ways. I personally think that factors such as data volume size and cluster size should be considered comprehensively. Such as the check table size is about 11 g, test cluster size of 10 machines, hbase. Hregion. Max. Filesize = 3 g (when the size of the region more than this number, will be broken down into two), Therefore, during initialization, the size of a region should be 1 to 2 gb (it is not split at first), and the data of a region should be divided into 11 GB /2 GB =6. However, to make full use of cluster resources, the check table in this document is divided into 10 partitions. If the data volume is 100 GB and keeps growing, the number of regions should be increased to 100 GB /2 GB =50. The Hbase check table is constructed as follows:

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__create 'tinawang:check',
{ NAME => 'f', COMPRESSION => 'SNAPPY',DATA_BLOCK_ENCODING => 'FAST_DIFF',BLOOMFILTER=>'ROW'},
{SPLITS => [ '1','2','3', '4','5','6','7','8','9']}__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

Where Column Family = ‘f’, the shorter the better.

COMPRESSION => SNAPPY. HBase supports LZO, GZIP, and SNAPPY COMPRESSION. GZIP is high in compression but CPU intensive. The latter two are about the same, with Snappy slightly outperforming GZIP in CPU consumption. In I/O and CPU balancing mode, select Snappy.

DATA_BLOCK_ENCODING => ‘FAST_DIFF’. In this case, RowKey is similar. Run the following command to check whether the key length is longer than value.

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__./hbase org.apache.hadoop.hbase.io.hfile.HFile -m -f /apps/hbase/data/data/tinawang/check/a661f0f95598662a53b3d8b1ae469fdf/f/a5fefc880f87492d908672e1634f2eed_SeqId_2___Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

Step2: RowKey composition

Salt

Data is evenly distributed to each Region. Combined with pre-split, we calculate hashcode value of check_id of check table, and modulus(numRegions) as prefix, pay attention to complement data.

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST) __stringutils.leftpad (integer.tostring (math.abs (check_id. HashCode () % numRegion)),1, '0') __Wed Dec 20 2017 10:35:48  GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

Note: If the amount of data is more than hundreds of gigabytes, numRegions will naturally be 2 digits, and salt will also be 2 digits.

Hash Hash

Because check_id itself is an indefinite character string, to hash data and facilitate RowKey query and comparison, we use SHA1 hash for check_id and make it 32 bits fixed length.

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__MD5Hash.getMD5AsHex(Bytes.toBytes(check_id))__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48  GMT+0800 (CST)__Copy the code

uniqueness

The above salt+hash is used as the prefix of the RowKey, and the primary key ID of the check table is used to ensure the uniqueness of the RowKey. Check_id =A208849559

To enhance readability, you can also add a custom among separator, such as’ + ‘, ‘|’, etc.

The above design ensures that the salt+hash prefix value is fixed for each query and falls in the same region. Check columns in HBase are stored in the same way as check columns in Oracle data source.

WEB Query design

The RowKey design is closely related to the query. The query mode determines the RowKey design. If the RowKey design is based on the previous RowKey design, you can set the startRow, stopRow of Scan to complete the Scan. For example, if check_id=A208849559 is queried and the salt+hash calculation is performed on the RowKey based on RowKey design principles, the prefix is obtained.

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__startRow = 7+7c9498b4a83974da56b252122b9752bf
stopRow = 7+7c9498b4a83974da56b252122b9752bg__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

The code implements the key flow

Spark write to HBase

Step0: prepare work

Since it is the business data from the upstream system, the stock data is pumped to HDFS by SQOOP. Incremental data is obtained daily as a file from the FTP site. Because the business data fields contain some newlines, and because SQoop1.4.6 currently supports only single bytes, this article chooses’ 0x01 ‘as the column separator and’ 0x10 ‘as the row separator.

Step1: Spark read hdfs text file

The default line separator of sparkContext.textFile () is “\n”, where 0x10 is used, which needs to be configured in Configuration. To apply the configuration, we call the newAPIHadoopFile method to read HDFS files and return JavaPairRDD, LongWritable and Text are Hadoop’s Long and String types, respectively. (All Hadoop datatypes are similar to Java datatypes, except that they are specially optimized for network serialization.) The data file we need is in the value of the pairRDD, which is referred to as Text. To facilitate subsequent processing, you can convert JavaPairRDD to JavaRDD< String >.

Step2: Transfer and sort RDD

(1) Convert avaRDD< String> to JavaPairRDD<tuple2,String>, where the parameters are RowKey, col, value. The basic principle of HBase is to sort data based on rowkeys. In addition, when data is written into multiple pre-divided regions in bulk load mode, data on Spark partitions must be in order. Column family (CF) and COL name must be ordered. In this case, because there is only one column cluster, RowKey and Col name are organized into Tuple2 format keys. Notice that one row of records (n fields) in the database is split into N rows.

② Second sort RowKey and COL based on JavaPairRDD<tuple2,String> If you do not sort, the following exception is reported:

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__java.io.IOException: Added a key notlexically larger than previous key__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

③ Organize the data into JavaPairRDDhfileRDD as required by HFile.

Step3: Create hfile and bulk load to HBase

SaveAsNewAPIHadoopFile (saveAsNewAPIHadoopFile)

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__hfileRdd.saveAsNewAPIHadoopFile(hfilePath,ImmutableBytesWritable.class,
KeyValue.class,HFileOutputFormat2.class,config);
__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

② hfilebulk load to HBase

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__final Job job = Job.getInstance();  job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); HFileOutputFormat2.configureIncrementalLoad(job,htable); LoadIncrementalHFiles bulkLoader = newLoadIncrementalHFiles(config); bulkLoader.doBulkLoad(newPath(hfilePath),htable); __Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

Note: if kerberos is enabled in the cluster, step4 needs to be placed in the ugi.doas () method and implemented after verification as follows

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(keyUser,keytabPath);
UserGroupInformation.setLoginUser(ugi);
__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

Access port 60010 of the HBase cluster webui to view region distribution.

Read from HBase

This paper develops web access to HBase data based on spring Boot framework.

Use connection pool

Creating a connection is a heavy operation. In HBase projects, connection pools are used to share ZK connections, meta information cache, and connections between the Region server and the master.

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__HConnection connection = HConnectionManager.createConnection(config);
HTableInterface table = connection.getTable("table1");
try {
	// Use the table as needed, for a single operation and a single thread
} finally {
	table.close();
}
__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

You can also override the default thread pool by doing the following.

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__HConnection createConnection(org.apache.hadoop.conf.Configuration conf,ExecutorService pool);
__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

process query

Step1: Determine the RowKey prefix based on the query criteria

According to 3.3 RowKey Design, HBase write and read rules comply with this design rule. Here we take the same approach, converting the query criteria passed in by the Web caller to the corresponding RowKey prefix. Query, for example, the check list passed check_id = A208849559, generating prefix 7 + 7 c9498b4a83974da56b252122b9752bf.

Step2: determine the scan scope

A208849559 corresponding query result data namely in RowKey prefix is 7 + 7 c9498b4a83974da56b252122b9752bf corresponding RowKey and value.

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__scan.setStartRow(Bytes.toBytes(rowkey_pre)); //scan, 7+7c9498b4a83974da56b252122b9752bf byte[] stopRow = Bytes.toBytes(rowkey_pre); stopRow[stopRow.length-1]++; scan.setStopRow(stopRow); // 7+7c9498b4a83974da56b252122b9752bg __Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

Step3: query results form return objects

Iterate through the ResultScanner object, encapsulate the data corresponding to each row into a Table entity, and return it as a List.

test

1000 check_id were randomly extracted from the original data and used for simulation test. The number of continuous 3 requests was 2000 (200 threads concurrent, 10 cycles), the average response time was 51ms, and the error rate was 0.

As shown in the figure above, after N accumulative tests, the number of Requests on each region was relatively close, which was consistent with the initial load balancing design.

Record on pit

1. Kerberos authentication

If security authentication is enabled for the cluster, Kerberos authentication is required for Spark submission jobs and HBase access.

In this document, the YARN Cluster mode is used. As normal jobs are submitted, the following error may be reported.

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__ERROR StartApp: job failure,
java.lang.NullPointerException
	at com.tinawang.spark.hbase.utils.HbaseKerberos.<init>(HbaseKerberos.java:18)
	at com.tinawang.spark.hbase.job.SparkWriteHbaseJob.run(SparkWriteHbaseJob.java:60)
__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

Locate hbasekerberos.java :18 with the following code:

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__this.keytabPath = (Thread.currentThread().getContextClassLoader().getResource(prop.getProperty("hbase.keytab"))).getPath(); __Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

This is because the executor needs to re-authenticate when connecting to HBase. The Tina. keytab uploaded through –keytab is not obtained by the HBase authentication block. Therefore, the authenticated keytab file needs to be uploaded through –files. Indicated as follows

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__--keytab /path/tina.keytab \
--principal [email protected] \
--files "/path/tina.keytab.hbase"
__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

Tina. keytab. Hbase is obtained by copying and naming Tina. keytab. Spark does not allow the same file to be uploaded repeatedly.

2. Serialization

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner?clean(ClosureCleaner.scala:288)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
	at org.apache.spark.rdd.RDD?anonfun$map$1.apply(RDD.scala:370)
	at org.apache.spark.rdd.RDD?anonfun$map$1.apply(RDD.scala:369)
	...
org.apache.spark.deploy.yarn.ApplicationMaster?anon$2.run(ApplicationMaster.scala:637)
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
Serialization stack:
	- object not serializable (class: org.apache.spark.api.java.JavaSparkContext, value: org.apache.spark.api.java.JavaSparkContext@24a16d8c)
	- field (class: com.tinawang.spark.hbase.processor.SparkReadFileRDD, name: sc, type: class org.apache.spark.api.java.JavaSparkContext)
...
__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

Solution 1:

If sc is referenced in a method as a member variable of the class, add the transient keyword so that it is not serialized.

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__private transient JavaSparkContext  sc; __Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

Solution 2:

Sc is passed as a method parameter, while making the class implements Serializable that involves RDD operations. Take the second approach in your code. See the code.

3. Batch request testing

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Exception in thread "http-nio-8091-Acceptor-0" java.lang.NoClassDefFoundError: org/apache/tomcat/util/ExceptionUtils__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

or

__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Exception in thread "http-nio-8091-exec-34" java.lang.NoClassDefFoundError: ch/qos/logback/classic/spi/ThrowableProxy__Wed Dec 20 2017 10:35:48 GMT+0800 (CST)____Wed Dec 20 2017 10:35:48 GMT+0800 (CST)__Copy the code

Check the following issue and the troubleshooting process once. It may be that the open file exceeds the limit.

Github.com/spring-proj…

Mp.weixin.qq.com/s/34GVlaYDO…

Run the ulimit-a command to check that the number of files opened by each user is 1024 by default.

In system file/etc/security/limits. Modify the limit in the conf, add the following content in file, can solve the problem.

  • soft nofile 65536
  • hard nofile 65536

The authors introduce

Ting Wang is a big data development engineer at China Minsheng Bank, specializing in Spark large-scale data processing and Hbase system design.

reference

Hbase.apache.org/book.html#p… www.opencore.com/blog/2016/1… Hbasefly.com/2016/03/23/… Github.com/spring-proj… Mp.weixin.qq.com/s/34GVlaYDO…