preface

In the process of using Spark Streaming, we often need to manipulate the database to count or change some values to persist the results generated by computation.

In a recent real-time consumer processing task, when spark Streaming was used for real-time data processing, I needed to update the calculated data to hbase and mysql. Therefore, this article summarized the contents of Spark operation hbase and mysql, and recorded some pits I stepped on.

Spark Streaming Persistent design mode

DStreams output operation

  • Print: Prints the first 10 batch elements in each Dstream on a driver node. It is often used for development and debugging
  • SaveAsTextFiles (prefix, [suffix]) : Saves the current Dstream as a file. The file name of each interval Batch is named based on prefix and suffix: “Prefix — TIME_IN_MS [. Suffix]”.

  • SaveAsObjectFiles (prefix, [suffix]) : The current Dstream content is saved as the serialization file of Java serializable objects. The file naming rules of each interval Batch are based on prefix and suffix: “Prefix – TIME_IN_MS [. Suffix]”.

  • SaveAsHadoopFiles (prefix, [suffix]) : Dstream is saved as hadoop files. The file naming rules of each interval Batch are based on prefix and suffix: “Prefix – TIME_IN_MS [. Suffix]”.

  • ForeachRDD (func) : The most general output operation, and you can apply fun to every RDD generated from the data stream. Fun typically saves the data in each RDD to an external system, such as to a file, or to a database over a network connection. It is worth noting that FUN executes in the driver process of the running application and usually includes AN RDD action to force the data flow RDD to be computed.

Use foreachRDD design patterns

Dstream.foreachrdd provides great flexibility for development, but avoids many common pitfalls when used. Our usual process for saving data to an external system is: Establish a remote connection -> transfer data to a remote system through the connection -> close the connection. For this process, we think directly of the following program code:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}
Copy the code

In the previous article “Spark Triage — Preliminary Test”, we sorted out the Spark worker and driver. We know that in cluster mode, the connection in the above code needs to be sent from the driver to the worker in the form of serialized objects. However, connection cannot be passed between machines, that is, connection cannot be serialized, which may cause Cserialization Errors (Connection Object Not Serializable). To avoid such errors, we set up conenction in worker with the following code:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}
Copy the code

Does that seem to solve the problem? But if you think about it, we have connections established and closed on every record in every RDD, which leads to unnecessarily high loads and reduced throughput of the entire system.

Therefore, a better way is to use rdd.foreachPartition, that is, to establish a unique connection foreachPartition of the RDD (note: the RDD within each partition is running on the same worker), the code is as follows:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}
Copy the code

In this way, we reduce the load of frequently establishing connections. We usually use connection pools when connecting to databases. The concept of connection pools is introduced, and the code is optimized as follows:

dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record  => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } }Copy the code

By holding a static connection pool object, we can reuse connection to further optimize the overhead of connection establishment, thus reducing the load. It is also worth noting that, similar to database connection pooling, connection pooling should be lazy on demand and withdraw timed connections in a timely manner.

Also of note:

  • If more than one foreachRDD is used in Spark Streaming, they are executed programmatically downwards between them

  • Dstream’s execution strategy for output operations is lazy, so if we don’t add any RDD actions to the foreachRDD, the system will simply receive the data and discard it.

The Spark visit Hbase

Above we explained the basic design pattern for exporting Spark Streaming’s Dstream to an external system. Here we explain how to export Dstream to an Hbase cluster.

Hbase common connection class

Zookeeper is used to obtain information when Scala connects to Hbase. Therefore, zooKeeper information is required during the configuration, as follows:

import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Connection import org.apache.hadoop.hbase.HConstants import org.apache.hadoop.hbase.client.ConnectionFactory object HbaseUtil extends The Serializable {private val conf = HBaseConfiguration. The create () private val para = conf. HbaseConfig / / conf configuration class, Conf. Set (HConstants.ZOOKEEPER_CLIENT_PORT, para.get("port").getOrElse("2181")) conf.set(HConstants.ZOOKEEPER_QUORUM, para.get("quorum").getOrElse("127-0-0-1")) // hosts private val connection = ConnectionFactory.createConnection(conf) def getHbaseConn: Connection = connection }Copy the code

According to online information, we do not use connection pools because of the particularity of Hbase connections

Hbase Output Operations

The put operation is used as an example to illustrate how to apply the above design mode to Hbase output operations:

dstream.foreachRDD(rdd => { if (! Rdd. isEmpty) {rdd.foreachPartition(partitionRecords => {val Connection = hbaseutil. getHbaseConn // Obtain Hbase connection partitionRecords.foreach(data => { val tableName = TableName.valueOf("tableName") val t = connection.getTable(tableName)  try { val put = new Put(Bytes.toBytes(_rowKey_)) // row key // column, qualifier, value put.addColumn(_column_.getBytes, _qualifier_.getBytes, _value_.getBytes) Try(t.put(put)).getorelse (t.lose ()) // do some log (display on worker)} catch {case e: Exception => // log error e.printStackTrace()} finally {t.colose ()}})}) // do some log(display on driver)}})Copy the code

For details about Hbase operations, see Operating Hbase on Spark (1.0.0 New API).

Filling holes record

Note the problem of configuring hconstants.zookeeper_quorum during Hbase connection:

  • Because IP addresses cannot be directly used to access Hbase connections, you often need to configure hosts, as shown in the preceding code section127-0-0-1 (any), which we need to configure in hosts
127-0-0-1 127.0.0.1
Copy the code
  • In a single-node scenario, you only need to configure the hosts of the Hbase where ZooKeeper resides. However, a weird bug occurs when you switch to the Hbase cluster

Description: Saving Dstream to Hbase on foreachRDD is stuck without any error messages (yes! It’s stuck. It’s not working.)

Fault analysis: The Hbase cluster has multiple machines, but the hosts of only one Hbase machine is configured. As a result, the Spark cluster constantly searches for the hosts when accessing Hbase, but fails to find the hosts

Solution: Configure all hbase node IP addresses for hosts on each worker. The problem is resolved

The Spark to access Mysql

Similar to accessing Hbase, we also need a serializable class to establish Mysql connections, and here we use Mysql’s C3P0 connection pool

MySQL generic connection classes

import java.sql.Connection import java.util.Properties import com.mchange.v2.c3p0.ComboPooledDataSource class MysqlPool extends Serializable { private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true) private val conf = Conf.mysqlConfig try { CPDS. SetJdbcUrl (conf. Get (" url "). The getOrElse (" JDBC: mysql: / / 127.0.0.1:3306 / test_bee? useUnicode=true&characterEncoding=UTF-8")); cpds.setDriverClass("com.mysql.jdbc.Driver"); cpds.setUser(conf.get("username").getOrElse("root")); cpds.setPassword(conf.get("password").getOrElse("")) cpds.setMaxPoolSize(200) cpds.setMinPoolSize(20) cpds.setAcquireIncrement(5) cpds.setMaxStatements(180) } catch { case e: Exception => e.printStackTrace() } def getConnection: Connection = { try { return cpds.getConnection(); } catch { case ex: Exception => ex.printStackTrace() null } } } object MysqlManager { var mysqlManager: MysqlPool = _ def getMysqlManager: MysqlPool = { synchronized { if (mysqlManager == null) { mysqlManager = new MysqlPool } } mysqlManager } }Copy the code

We use c3P0 to set up the Mysql connection pool and then pull the connection from the pool each time we access it for data transfer.

Mysql output operation

Also using the previous foreachRDD design pattern, output Dstream to mysql code as follows:

dstream.foreachRDD(rdd => { if (! Rdd.isempty) {rdd.foreachPartition(partitionRecords => {// Get a connection from the connection pool val conn = MysqlManager.getMysqlManager.getConnection val statement = conn.createStatement try { conn.setAutoCommit(false) partitionRecords.foreach(record => { val sql = "insert into table..." AddBatch (SQL)}) statement.executeBatch conn.com MIT} catch {case e: Exception => // do some log } finally { statement.close() conn.close() } }) } })Copy the code

It is worth noting:

  • Conn. SetAutoCommit (false); conn. SetAutoCommit (false);

  • (T^T) (T^T) (T^T) (T^T) (T^T) (T^T) (T^T)

The deployment of

The following maven configurations are provided for the JAR packages used by Spark to connect Mysql and Hbase:

<dependency><! Hbase --> <groupId>org.apache. Hbase </groupId> <artifactId>hbase-client</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> < version > 1.0.0 < / version > < / dependency > < the dependency > < groupId > org. Apache. Hbase < / groupId > The < artifactId > hbase - server < / artifactId > < version > 1.0.0 < / version > < / dependency > < the dependency > <! -- Mysql --> <groupId> Mysql </groupId> <artifactId>mysql-connector- Java </artifactId> <version>5.1.31</version> </dependency> <dependency> <groupId> c3P0 </artifactId> <version>0.9.1.2</version> </dependency>Copy the code

References:

  1. Spark Streaming Programming Guide
  2. HBase is introduced
  3. Operating HBase on Spark (1.0.0 new API)
  4. Spark Development Quick start
  5. Kafka -> Spark -> Streaming ->mysql (Scala) real-time data processing examples
  6. Spark Streaming uses the C3P0 connection pool to operate the mysql database