preface
In the process of using Spark Streaming, we often need to manipulate the database to count or change some values to persist the results generated by computation.
In a recent real-time consumer processing task, when spark Streaming was used for real-time data processing, I needed to update the calculated data to hbase and mysql. Therefore, this article summarized the contents of Spark operation hbase and mysql, and recorded some pits I stepped on.
Spark Streaming Persistent design mode
DStreams output operation
- Print: Prints the first 10 batch elements in each Dstream on a driver node. It is often used for development and debugging
-
SaveAsTextFiles (prefix, [suffix]) : Saves the current Dstream as a file. The file name of each interval Batch is named based on prefix and suffix: “Prefix — TIME_IN_MS [. Suffix]”.
-
SaveAsObjectFiles (prefix, [suffix]) : The current Dstream content is saved as the serialization file of Java serializable objects. The file naming rules of each interval Batch are based on prefix and suffix: “Prefix – TIME_IN_MS [. Suffix]”.
-
SaveAsHadoopFiles (prefix, [suffix]) : Dstream is saved as hadoop files. The file naming rules of each interval Batch are based on prefix and suffix: “Prefix – TIME_IN_MS [. Suffix]”.
-
ForeachRDD (func) : The most general output operation, and you can apply fun to every RDD generated from the data stream. Fun typically saves the data in each RDD to an external system, such as to a file, or to a database over a network connection. It is worth noting that FUN executes in the driver process of the running application and usually includes AN RDD action to force the data flow RDD to be computed.
Use foreachRDD design patterns
Dstream.foreachrdd provides great flexibility for development, but avoids many common pitfalls when used. Our usual process for saving data to an external system is: Establish a remote connection -> transfer data to a remote system through the connection -> close the connection. For this process, we think directly of the following program code:
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
}
Copy the code
In the previous article “Spark Triage — Preliminary Test”, we sorted out the Spark worker and driver. We know that in cluster mode, the connection in the above code needs to be sent from the driver to the worker in the form of serialized objects. However, connection cannot be passed between machines, that is, connection cannot be serialized, which may cause Cserialization Errors (Connection Object Not Serializable). To avoid such errors, we set up conenction in worker with the following code:
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
Copy the code
Does that seem to solve the problem? But if you think about it, we have connections established and closed on every record in every RDD, which leads to unnecessarily high loads and reduced throughput of the entire system.
Therefore, a better way is to use rdd.foreachPartition, that is, to establish a unique connection foreachPartition of the RDD (note: the RDD within each partition is running on the same worker), the code is as follows:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
Copy the code
In this way, we reduce the load of frequently establishing connections. We usually use connection pools when connecting to databases. The concept of connection pools is introduced, and the code is optimized as follows:
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } }Copy the code
By holding a static connection pool object, we can reuse connection to further optimize the overhead of connection establishment, thus reducing the load. It is also worth noting that, similar to database connection pooling, connection pooling should be lazy on demand and withdraw timed connections in a timely manner.
Also of note:
-
If more than one foreachRDD is used in Spark Streaming, they are executed programmatically downwards between them
-
Dstream’s execution strategy for output operations is lazy, so if we don’t add any RDD actions to the foreachRDD, the system will simply receive the data and discard it.
The Spark visit Hbase
Above we explained the basic design pattern for exporting Spark Streaming’s Dstream to an external system. Here we explain how to export Dstream to an Hbase cluster.
Hbase common connection class
Zookeeper is used to obtain information when Scala connects to Hbase. Therefore, zooKeeper information is required during the configuration, as follows:
import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Connection import org.apache.hadoop.hbase.HConstants import org.apache.hadoop.hbase.client.ConnectionFactory object HbaseUtil extends The Serializable {private val conf = HBaseConfiguration. The create () private val para = conf. HbaseConfig / / conf configuration class, Conf. Set (HConstants.ZOOKEEPER_CLIENT_PORT, para.get("port").getOrElse("2181")) conf.set(HConstants.ZOOKEEPER_QUORUM, para.get("quorum").getOrElse("127-0-0-1")) // hosts private val connection = ConnectionFactory.createConnection(conf) def getHbaseConn: Connection = connection }Copy the code
According to online information, we do not use connection pools because of the particularity of Hbase connections
Hbase Output Operations
The put operation is used as an example to illustrate how to apply the above design mode to Hbase output operations:
dstream.foreachRDD(rdd => { if (! Rdd. isEmpty) {rdd.foreachPartition(partitionRecords => {val Connection = hbaseutil. getHbaseConn // Obtain Hbase connection partitionRecords.foreach(data => { val tableName = TableName.valueOf("tableName") val t = connection.getTable(tableName) try { val put = new Put(Bytes.toBytes(_rowKey_)) // row key // column, qualifier, value put.addColumn(_column_.getBytes, _qualifier_.getBytes, _value_.getBytes) Try(t.put(put)).getorelse (t.lose ()) // do some log (display on worker)} catch {case e: Exception => // log error e.printStackTrace()} finally {t.colose ()}})}) // do some log(display on driver)}})Copy the code
For details about Hbase operations, see Operating Hbase on Spark (1.0.0 New API).
Filling holes record
Note the problem of configuring hconstants.zookeeper_quorum during Hbase connection:
- Because IP addresses cannot be directly used to access Hbase connections, you often need to configure hosts, as shown in the preceding code section
127-0-0-1 (any)
, which we need to configure in hosts
127-0-0-1 127.0.0.1
Copy the code
- In a single-node scenario, you only need to configure the hosts of the Hbase where ZooKeeper resides. However, a weird bug occurs when you switch to the Hbase cluster
Description: Saving Dstream to Hbase on foreachRDD is stuck without any error messages (yes! It’s stuck. It’s not working.)
Fault analysis: The Hbase cluster has multiple machines, but the hosts of only one Hbase machine is configured. As a result, the Spark cluster constantly searches for the hosts when accessing Hbase, but fails to find the hosts
Solution: Configure all hbase node IP addresses for hosts on each worker. The problem is resolved
The Spark to access Mysql
Similar to accessing Hbase, we also need a serializable class to establish Mysql connections, and here we use Mysql’s C3P0 connection pool
MySQL generic connection classes
import java.sql.Connection import java.util.Properties import com.mchange.v2.c3p0.ComboPooledDataSource class MysqlPool extends Serializable { private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true) private val conf = Conf.mysqlConfig try { CPDS. SetJdbcUrl (conf. Get (" url "). The getOrElse (" JDBC: mysql: / / 127.0.0.1:3306 / test_bee? useUnicode=true&characterEncoding=UTF-8")); cpds.setDriverClass("com.mysql.jdbc.Driver"); cpds.setUser(conf.get("username").getOrElse("root")); cpds.setPassword(conf.get("password").getOrElse("")) cpds.setMaxPoolSize(200) cpds.setMinPoolSize(20) cpds.setAcquireIncrement(5) cpds.setMaxStatements(180) } catch { case e: Exception => e.printStackTrace() } def getConnection: Connection = { try { return cpds.getConnection(); } catch { case ex: Exception => ex.printStackTrace() null } } } object MysqlManager { var mysqlManager: MysqlPool = _ def getMysqlManager: MysqlPool = { synchronized { if (mysqlManager == null) { mysqlManager = new MysqlPool } } mysqlManager } }Copy the code
We use c3P0 to set up the Mysql connection pool and then pull the connection from the pool each time we access it for data transfer.
Mysql output operation
Also using the previous foreachRDD design pattern, output Dstream to mysql code as follows:
dstream.foreachRDD(rdd => { if (! Rdd.isempty) {rdd.foreachPartition(partitionRecords => {// Get a connection from the connection pool val conn = MysqlManager.getMysqlManager.getConnection val statement = conn.createStatement try { conn.setAutoCommit(false) partitionRecords.foreach(record => { val sql = "insert into table..." AddBatch (SQL)}) statement.executeBatch conn.com MIT} catch {case e: Exception => // do some log } finally { statement.close() conn.close() } }) } })Copy the code
It is worth noting:
-
Conn. SetAutoCommit (false); conn. SetAutoCommit (false);
-
(T^T) (T^T) (T^T) (T^T) (T^T) (T^T) (T^T)
The deployment of
The following maven configurations are provided for the JAR packages used by Spark to connect Mysql and Hbase:
<dependency><! Hbase --> <groupId>org.apache. Hbase </groupId> <artifactId>hbase-client</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> < version > 1.0.0 < / version > < / dependency > < the dependency > < groupId > org. Apache. Hbase < / groupId > The < artifactId > hbase - server < / artifactId > < version > 1.0.0 < / version > < / dependency > < the dependency > <! -- Mysql --> <groupId> Mysql </groupId> <artifactId>mysql-connector- Java </artifactId> <version>5.1.31</version> </dependency> <dependency> <groupId> c3P0 </artifactId> <version>0.9.1.2</version> </dependency>Copy the code
References:
- Spark Streaming Programming Guide
- HBase is introduced
- Operating HBase on Spark (1.0.0 new API)
- Spark Development Quick start
- Kafka -> Spark -> Streaming ->mysql (Scala) real-time data processing examples
- Spark Streaming uses the C3P0 connection pool to operate the mysql database