Flink and ClickHouse are the leaders in real-time computing and (near real-time) OLAP, respectively. They are very popular open source frameworks in recent years, and many big companies are using them together to build real-time platforms for various purposes. The advantages of the two will not be repeated, this article to briefly introduce the author’s team in the click stream real-time number warehouse a little practical experience.

Click flow and its dimensional modeling

The so-called Click stream refers to the track data left at the back end when users visit websites, apps and other Web front-end, which is also the basis of traffic analysis and user behavior analysis. Clickstream data is generally stored in the form of access log and buried log, which is characterized by large quantity and rich dimension. Take our ordinary e-commerce platform of medium size as an example, it generates about 200GB of original logs and billions of logs every day, with 100+ buried events involving 50+ dimensions.

According to Kimball’s dimensional modeling theory, click flow bins follow a typical star model, as shown in the following diagram.

Click on the flow number bin layer design

The hierarchical design of click-stream real-time data warehouse can still draw lessons from the traditional data warehouse, which is flat as the best policy to reduce the delay of data transmission. Schematic diagram below.

  • DIM layer: Dimension layer, MySQL mirror library, stores all dimension data.
  • ODS layer: Paste source layer, raw data from Flume directly into Kafka corresponding topic.
  • DWD layer: Detail layer. Through Flink, necessary ETL and real-time dimension join operations are carried out on Kafka data to form standardized detailed data and write back to Kafka for use by downstream and other businesses. Flink is then used to write detailed data into ClickHouse tables for query and analysis, and Hive tables for backup and data quality assurance (logarithm, complement, etc.).
  • DWS layer: service layer. Some indicators are summarized to Redis in real time through Flink for large-screen services. More metrics are periodically aggregated through mechanisms such as ClickHouse materialized views to form reports and page heat maps. In particular, some of the detailed data is also open in this layer, making it easy for senior BI staff to do flexible ad-hoc queries like funnel, retention, and user path, which is what makes ClickHouse so much more powerful than other OLAP engines.

Key points and Precautions

Flink Real-time dimensional association

The asynchronous I/O mechanism of Flink framework makes it very convenient for users to access external storage in streaming jobs. In our case, there are three points to note:

  • Use an asynchronous MySQL Client such as vert. x MySQL Client.
  • Add memory Cache (such as Guava Cache, Caffeine, etc.) into AsyncFunction, and set a reasonable Cache expulsion mechanism to avoid frequent requests to MySQL library.
  • Real-time dimension association is only applicable to slowly changing dimensions, such as geographic location information, commodity and classification information. Fast changing dimensions (such as user information) are not suitable for tapping into wide tables. We used the MySQL table engine to map fast changing dimension tables directly into ClickHouse, which supports heterogeneous queries and smaller dimension table join scenarios. In the future, consider using the MaterializedMySQL engine (currently unreleased) to mirror part of the dimension table to ClickHouse via binlog.

Flink – ClickHouse Sink design

ClickHouse can be written directly via JDBC (Flink-connector-JDBC), but it is not flexible enough. Fortunately clickhouse – JDBC project provides adapter clickhouse cluster BalancedClickhouseDataSource components, we designed based on its Flink – clickhouse Sink, there are three main points:

  • Write to local tables instead of distributed tables, cliche.
  • Write frequency is controlled according to the two conditions of data batch size and batch interval to achieve a balance between part Merge pressure and real-time data. Currently we use a batch size of 10,000 with 15-second intervals, triggering writes as long as one of them is met.
  • BalancedClickhouseDataSource by random routing ensures that each ClickHouse instance of load balancing, but only by periodic ping to live, and to block the current instance, can be accessed without failover – that is, once tried to write have failed node, You lose data. To this end, we designed a retry mechanism. The number of retries and the interval can be configured. If the data cannot be written successfully after the retry opportunities are exhausted, the batch of data will be transferred to the configured path, and the alarm will be sent to request timely check and backfill.

Currently, we only implement DataStream API-style Flink-Clickhouse Sink. With the trend of SQL-based Flink jobs, we plan to implement SQL-style ClickHouse Sink in the future, and we will give back to the community when it is well-honed. In addition, in addition to random routing, we also plan to add more flexible routing methods such as polling and Sharding key hash.

Also, ClickHouse doesn’t support transactions, so don’t bother with actions like 2PC Sink that guarantee exactly once semantics. If the Flink to ClickHouse link fails and the job restarts, the job will be consumed directly from the latest point (Kafka’s latest offset) and the lost data will be backfilled by Hive.

ClickHouse data rebalancing

When the ClickHouse cluster expands, data reshard is a hassle because there is no tool like HDFS Balancer out of the box. A more crude approach is to change the Shard weight in the ClickHouse configuration file to write more data to the newly added shard until all nodes are nearly balanced and then adjust back. But this creates significant hot spot problems and is only valid for direct writing to distributed tables, not desirable.

Therefore, we took a somewhat roundabout approach: rename the original table, create a new table with the same schema as the original table on all nodes, write live data to the new table, use clickhouse-Copier to migrate historical data to the new table, and then delete the original table. Of course, during migration, the rebalanced tables are unserviceable and still not as elegant. If the leaders have a better plan, welcome to communicate.

The End

I’ve covered configuration, tuning, latency monitoring, and permission management for components like Flink and ClickHouse in a previous blog post.

www.jianshu.com/p/bedead165…