Using Flink, Spark, Kafka, MySQL, and Hive to import data into ClickHouse, this article introduces how to use Flink, Spark, Kafka, MySQL, and Hive to import data into ClickHouse.

  • Import data using Flink
  • Import data using Spark
  • Import data from Kafka
  • Import data from MySQL
  • Import data from Hive

Import data using Flink

This article describes importing data into ClickHouse using Flink-JDBC. Maven dependencies are:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
    <version>1.10.1</version>
</dependency>
Copy the code

The sample

This example uses the Kafka connector to import Kafka data into ClickHouse in real time through Flink

public class FlinkSinkClickHouse {
    public static void main(String[] args) throws Exception {
        String url = "JDBC: clickhouse: / / 192.168.10.203:8123 / default";
        String user = "default";
        String passwd = "hOn0d9HT";
        String driver = "ru.yandex.clickhouse.ClickHouseDriver";
        int batchsize = 500; // Set the batch size to a smaller size so that you can see the data being written immediately

        // Create execution environment
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        String kafkaSource11 = "" +
                "CREATE TABLE user_behavior ( " +
                "' user_id 'BIGINT, -- user id\n" +
                "' item_id 'BIGINT, -- item id\n" +
                "' cat_id 'BIGINT, -- category id\n" +
                "' action 'STRING, -- user action \n" +
                "' type 'INT, -- user's province \n" +
                "' ts' BIGINT, -- timestamp of user action \n" +
                "' proctime 'AS proctime (), -- computes a processing time column \n" +
                "' eventTime 'AS TO_TIMESTAMP(FROM_UNIXTIME(ts,' YYYY-MM-DD HH: MM :ss')), -- eventTime \n" +
                "WATERMARK FOR eventTime AS eventtime-interval '5' SECOND -- define WATERMARK \n on eventTime" +
                ") WITH ( 'connector' = 'kafka', -- 使用 kafka connector\n" +
                "'topic' = 'user_behavior', -- kafka theme \n" +
                "'scan.startup.mode' = 'later-offset ', -- offset, read from the earliest offset \n" +
                "'properties.group.id' = 'group1', -- consumer group \n" +
                "' properties. The bootstrap. The servers' = 'KMS - 5, 092 KMS -" 092, 092 KMS - for these', - kafka broker address \ n" +
                "'format' = 'json', -- data source format is json\n" +
                " 'json.fail-on-missing-field' = 'true',\n" +
                " 'json.ignore-parse-errors' = 'false'" +
                ")";

        // Kafka Source
        tEnv.executeSql(kafkaSource11);
        String query = "SELECT user_id,item_id,cat_id,action,province,ts FROM user_behavior";
        Table table = tEnv.sqlQuery(query);

        String insertIntoCkSql = "INSERT INTO behavior_mergetree(user_id,item_id,cat_id,action,province,ts)\n" +
                "VALUES(? ,? ,? ,? ,? ,?) ";

        // Write data to ClickHouse Sink
        JDBCAppendTableSink sink = JDBCAppendTableSink
                .builder()
                .setDrivername(driver)
                .setDBUrl(url)
                .setUsername(user)
                .setPassword(passwd)
                .setQuery(insertIntoCkSql)
                .setBatchSize(batchsize)
                .setParameterTypes(Types.LONG, Types.LONG,Types.LONG, Types.STRING,Types.INT,Types.LONG)
                .build();

        String[] arr = {"user_id"."item_id"."cat_id"."action"."province"."ts"};
        TypeInformation[] type = {Types.LONG, Types.LONG,Types.LONG, Types.STRING,Types.INT,Types.LONG};

        tEnv.registerTableSink(
                "sink",
                arr,
                type,
                sink
        );

        tEnv.insertInto(table, "sink");

        tEnv.execute("Flink Table API to ClickHouse Example"); }}Copy the code

Note:

  • Because ClickHouse has a high latency per insert, we need to set itBatchSizeTo batch insert data to improve performance.
  • In the JDBCAppendTableSink implementation, if the number of the last batch of data is insufficientBatchSize, the remaining data is not inserted.

Import data using Spark

This article focuses on how to write data to Clickhouse using the Spark program.

<dependency>
      <groupId>ru.yandex.clickhouse</groupId>
      <artifactId>clickhouse-jdbc</artifactId>
      <version>0.2.4</version>
</dependency>
<! - if an error: under Caused by: Java. Lang. ClassNotFoundException: com.google.com mon. Escape. Escapers, add the following dependence - >
<dependency>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>
          <version>28.0 the jre</version>
</dependency>
Copy the code

The sample

object Spark2ClickHouseExample {

  val properties = new Properties()
  properties.put("driver"."ru.yandex.clickhouse.ClickHouseDriver")
  properties.put("user"."default")
  properties.put("password"."hOn0d9HT")
  properties.put("batchsize"."1000")
  properties.put("socket_timeout"."300000")
  properties.put("numPartitions"."8")
  properties.put("rewriteBatchedStatements"."true")

  case class Person(name: String, age: Long)

  private def runDatasetCreationExample(spark: SparkSession) :Dataset[Person] = {
    import spark.implicits._
    // DataFrames to DataSet
    val path = "file:///e:/people.json"
    val peopleDS = spark.read.json(path)
    peopleDS.createOrReplaceTempView("people")
    val ds = spark.sql("SELECT name,age FROM people").as[Person]
    ds.show()
    ds
  }

  def main(args: Array[String]) {


    val url = "jdbc:clickhouse://kms-1:8123/default"
    val table = "people"

    val spark = SparkSession
      .builder()
      .appName("Spark Example")
      .master("local") // Set to run locally
      .getOrCreate()
    val ds = runDatasetCreationExample(spark)

    ds.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE.100000).jdbc(url, table, properties)
    spark.stop()
  }
}
Copy the code

Import data from Kafka

Mainly using ClickHouse’s table engine.

use

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,... ',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_schema = ' ',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_thread_per_consumer = 0]
Copy the code
  • kafka_broker_list: Comma-separated brokers address (localhost:9092).
  • kafka_topic_list: list of Kafka topics, separated by commas.
  • kafka_group_name: Consumer group.
  • kafka_format– Message format. For exampleJSONEachRowJSON, CSV, etc

Use the sample

Create a user_Behavior theme in kafka and write data to the theme as follows:

{"user_id":63401."item_id":6244."cat_id":143."action":"pv"."province":3."ts":1573445919}
{"user_id":9164."item_id":2817."cat_id":611."action":"fav"."province":28."ts":1573420486}
{"user_id":63401."item_id":6244."cat_id":143."action":"pv"."province":3."ts":1573445919}
Copy the code

Create a table in ClickHouse and select Kafka() as the table engine as follows:

 CREATE TABLE kafka_user_behavior (
    user_id UInt64 COMMENT 'user id',
    item_id UInt64 COMMENT 'commodity id',
    cat_id UInt16  COMMENT 'category id',
    action String  COMMENT 'behavior',
    province UInt8 COMMENT 'province id',
    ts UInt64      COMMENT 'Timestamp'
  ) ENGINE = Kafka()
    SETTINGS
    kafka_broker_list = 'cdh04:9092',
    kafka_topic_list = 'user_behavior',
    kafka_group_name = 'group1',
    kafka_format = 'JSONEachRow'
;
- the query
cdh04 :) select * from kafka_user_behavior ;

-- Check the data again and find that the data is empty
cdh04 :) select count(*) from kafka_user_behavior;

SELECT count(*)
FROMKafka_user_behavior ┌ ─count(a) ─ ┐ │0│ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘Copy the code

Import Kafka data into ClickHouse through materialized views

Once the query is complete, ClickHouse will delete the data in the table. The Kafka table engine is just a data pipeline, and we can access the data in Kafka through materialized views.

  • Start by creating a table for the Kafka table engine to read data from Kafka
  • Then create a table from a normal table engine, such as MergeTree, for the end user
  • Finally, a materialized view is created to synchronize the Kafka engine tables to the tables used by the end user in real time
Create a Kafka engine table
 CREATE TABLE kafka_user_behavior_src (
    user_id UInt64 COMMENT 'user id',
    item_id UInt64 COMMENT 'commodity id',
    cat_id UInt16  COMMENT 'category id',
    action String  COMMENT 'behavior',
    province UInt8 COMMENT 'province id',
    ts UInt64      COMMENT 'Timestamp'
  ) ENGINE = Kafka()
    SETTINGS
    kafka_broker_list = 'cdh04:9092',
    kafka_topic_list = 'user_behavior',
    kafka_group_name = 'group1',
    kafka_format = 'JSONEachRow'
;

Create a table for use by end users
 CREATE TABLE kafka_user_behavior (
    user_id UInt64 COMMENT 'user id',
    item_id UInt64 COMMENT 'commodity id',
    cat_id UInt16  COMMENT 'category id',
    action String  COMMENT 'behavior',
    province UInt8 COMMENT 'province id',
    ts UInt64      COMMENT 'Timestamp'
  ) ENGINE = MergeTree()
    ORDER BY user_id
;
Create materialized views and synchronize data
CREATE MATERIALIZED VIEW user_behavior_consumer TO kafka_user_behavior
    AS SELECT * FROM kafka_user_behavior_src ;
-- Query, multiple queries, already queried data will still be output
cdh04 :) select * from kafka_user_behavior;
Copy the code

Note:

The Kafka consumption table cannot be used directly as a result table. The Kafka consumption table is only used to consume Kafka data, and does not really store all data.

Import data from MySQL

Like importing data in Kafka, ClickHouse also supports the MySQL table engine, which maps a table from MySQL to ClickHouse.

Data type mappings

The mapping between data types and ClickHouse types in MySQL is shown in the following table.

MySQL ClickHouse
UNSIGNED TINYINT UInt8
TINYINT Int8
UNSIGNED SMALLINT UInt16
SMALLINT Int16
UNSIGNED INT, UNSIGNED MEDIUMINT UInt32
INT, MEDIUMINT Int32
UNSIGNED BIGINT UInt64
BIGINT Int64
FLOAT Float32
DOUBLE Float64
DATE Date
DATETIME, TIMESTAMP DateTime
BINARY FixedString

use

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
    ...
) ENGINE = MySQL('host:port'.'database'.'table'.'user'.'password'[, replace_query, 'on_duplicate_clause']);

Copy the code

Use the sample

MySQL > select test from clickhouse database
CREATE TABLE mysql_users(
    id Int32,
    name String
) ENGINE = MySQL(
 '192.168.10.203:3306'.'clickhouse'.'users'.'root'.'123qwe');
Select * from database;
cdh04 :) SELECT * FROM mysql_users;

SELECT *
FROMMysql_users ┌ ─ ─ id ┬ ─ name ─ ─ ┐ │1Tom │ │ │2 │ jack  │
│  3│ lihua │ └ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ┘Insert data into the MySQL table
MySQL > select * from 'MySQL'; MySQL > select * from 'MySQL'
INSERT INTO users VALUES(4.'robin');
SQL > alter database
cdh04 :) select * from mysql_users;                

SELECT *
FROMMysql_users ┌ ─ ─ id ┬ ─ name ─ ─ ┐ │1Tom │ │ │2 │ jack  │
│  3 │ lihua │
│  4Robin │ │ └ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ┘Copy the code

Note: UPDATE and DELETE operations are not supported for MySQL table engines.

-- Execute update
ALTER TABLE mysql_users UPDATE name = 'hanmeimei' WHERE id = 1;
-- Execute delete
ALTER TABLE mysql_users DELETE WHERE id = 1;
- an error
DB::Exception: Mutations are not supported by storage MySQL.
Copy the code

Import data from Hive

This article uses Waterdrop for data import. Waterdrop is an easy-to-use, high-performance, real-time data processing product built on Spark that can handle large amounts of data. Waterdrop has a very rich plugin that reads data from Kafka, HDFS, Kudu, performs various data processing, and writes the results to ClickHouse, Elasticsearch, or Kafka.

We only need to write a Waterdrop Pipeline configuration file to complete the data import. The configuration file consists of four parts: Spark, Input, Filter, and Output.

Installing Waterdrop is as simple as downloading a ZIP file and unpacking it. To use Waterdrop, Spark is required.

  • In the config/ folder of the Waterdrop installation directory, create a configuration file: hive_table_batch.conf with the following contents. There are four parts: Spark, Input, Filter, and Output.

    • The Spark part is related to Spark configuration. This section describes how to configure resources required by Spark.

    • The Input part defines the data source. Pre_sql reads the SQL data from Hive, and table_name is the data to be read and registered as the name of a temporary table in Spark. The value can be any field.

    • The Filter section configures a series of transformations, such as filtering fields

    • The Output section is the connection configuration for the ClickHouse that writes processed structured data to the ClickHouse.

      Note that metaStore of Hive must be in service state.

spark {
  spark.app.name = "Waterdrop_Hive2ClickHouse"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"/ / this configuration is required to fill out the spark. SQL. CatalogImplementation ="hive"
}
input {
    hive {
        pre_sql = "select * from default.users"
        table_name = "hive_users"
    }
}
filter {}
output {
    clickhouse {
        host = "kms-1:8123"
        database = "default"
        table = "users"
        fields = ["id"."name"]
        username = "default"
        password = "hOn0d9HT"}}Copy the code
  • Perform a task
[kms@kms- 1 waterdrop1.51.]$ bin/start-waterdrop.sh  --config config/hive_table_batch.conf --master yarn --deploy-mode cluster
Copy the code

This will start a Spark job to perform the extraction of the data, and when the execution is complete, view the ClickHouse data.

conclusion

This article focuses on how to import data into ClickHouse using Flink, Spark, Kafka, MySQL, and Hive, and provides detailed examples for each of these methods to help you.

The public account “Big Data Technology and Data Warehouse”, reply to “information” to receive the big data data package