Brief introduction: Building Lakehouse architecture based on RocketMQ and Hudi zero code, and RocketMQ Connector & RocketMQ Stream to assist ETL data analysis, To provide you with the rapid construction of Lakehouse technical solutions and low operation and maintenance costs to achieve real-time computing solutions.

This article directory

  • Background knowledge
  • Architecture evolution in the era of big data
  • RocketMQ Connector&Stream
  • Apache Hudi
  • Building Lakehouse practice

The title of this article contains three key words: Lakehouse, RocketMQ and Hudi. We will start with the overall Lakehouse architecture and then step by step analyze the reasons for the architecture, the characteristics of the architecture components, and the practical aspects of building the Lakehouse architecture.

Background knowledge

1. Lakehouse Architecture

Lakehouse was originally proposed by Databrick and required the following architectural features:

(1) Transaction support

Many data pipes within an enterprise typically read and write data concurrently. Support for ACID transactions ensures consistency issues when multiple parties concurrently read and write data;

(2) Schema enforcement and governance

Lakehouse should have a way to support schema execution and evolution, support DW Schema paradigms such as star or snowflake models, be able to reason about data integrity, and have robust governance and auditing mechanisms;

(3) Openness

The storage format used is open and standardized (such as Parquet) and provides apis for a variety of tools and engines, including machine learning and Python/R libraries, so that they can directly and efficiently access the data;

(4) BI support

Lakehouse can use BI tools directly on source data. This improves data freshness, reduces latency, and reduces the cost of operating two copies of data in the data pool and the data warehouse;

(5) Storage and computing separation

In practice, this means using separate clusters for storage and computation, so these systems can scale to support greater user concurrency and data volumes. Some modern data warehouses also have this property;

(6) Support a variety of data types from unstructured data to structured data

Lakehouse can be used to store, optimize, analyze, and access many data applications including image, video, audio, text, and semi-structured data.

(7) Support various workloads

These include data science, machine learning, and SQL and analytics. Multiple tools may be required to support these workloads, but they all rely on the same data repository underneath;

(8) End-to-end flow

Real-time reporting is a standard application in many enterprises. Support for convection eliminates the need to build a separate system dedicated to serving real-time data applications.

From the characteristics described above for the Lakehouse architecture, we can see that some combination of open source products can be used to build a solution for a single function. But there doesn’t seem to be a one-size-fits-all solution for full functionality. Next, let’s first understand what the mainstream data processing architecture is in the era of big data.

Architecture evolution in the era of big data

1. Open source products in the era of big data

There are many open source products in the era of big data, such as RocketMQ and Kafka in the field of messaging. Flink, Spark, Storm in computing; Storage domain HDFS, Hbase, Redis, ElasticSearch, Hudi, DeltaLake, and so on.

Why are there so many open source products? First, in the era of big data, the volume of data is increasing, and the needs of each business are different, so there are various types of products for architects to choose from to support various scenarios. However, many types of products also bring some troubles to architects, such as difficult selection, high trial and error cost, high learning cost, complex architecture, etc.

2. Current mainstream multi-layer architecture

The processing scenarios of big data include data analysis, BI, scientific computing, machine learning, and indicator monitoring. For different scenarios, the service provider will choose different computing engines and storage engines based on service characteristics. For example, the transaction indicator can be binlog + CDC+ RocketMQ + Flink + Hbase + ELK for BI and Metric visualization.

(1) Advantages of multi-layer architecture: support a wide range of business scenarios;

(2) Disadvantages of multi-layer architecture:

  • Long processing link and high latency;
  • More data copies, double the cost;
  • High cost of learning;

The main reason for the disadvantages of the multi-layer architecture is that the storage links and computing links are too long.

  • Do we really need so many solutions to support a wide range of business scenarios? Can the Lakehouse architecture unify the solution?
  • Can storage tiers in a multi-tier architecture be merged? Can Hudi products support multiple storage needs?
  • Can the computing layers of multi-tier architectures be merged? Can RocketMQ Stream fuse the messaging and computing layers?

The current mainstream multi-tier architecture

3. Lakehouse architecture is generated

The Lakehouse architecture is an updated version of the multi-tier architecture that continues to reduce the storage layer complexity down to one. The computing layer is further compressed to merge the message layer with the computing layer, with RocketMQ Stream acting as the computing layer. We get the new architecture as shown in the figure below. In the new architecture, the message gateway is implemented by RocketMQ Connector, the message computing layer is implemented by RocketMQ Stream, and the flow of the intermediate state of message computation is completed inside RocketMQ. The calculated results are dropped through the Rocketmq-Hud-connector into the Hudi library, which supports multiple indexes and provides a unified API for output to different products.

Lakehouse architecture

Let’s take a look at the architecture’s characteristics.

(1) Advantages of Lakehouse architecture:

  • Shorter link, more suitable for real-time scenarios, high data freshness;
  • The cost is controllable, reducing the storage cost;
  • Low cost of learning, programmer friendly;
  • The operation and maintenance complexity is greatly reduced.

(2) Shortcomings of Lakehouse architecture

Message products and data lake products have high requirements on stability and ease of use. Meanwhile, message products need to support computing scenarios, and data lake products need to provide powerful indexing functions.

(3) Choice

In the Lakehouse architecture we chose RocketMQ, the messaging product, and Hudi, the data lake product.

At the same time, RocketMQ Stream can be used to integrate the computing layer into a RocketMQ cluster, which reduces the computing layer to a single layer, suitable for most small and medium-sized big data processing scenarios.

Let’s take a step-by-step look at RocketMQ and Hudi.

RocketMQ Connector & Stream

RocketMQ history chart

RocketMQ entered Apache incubation in 2017, RocketMQ 4.0 was released in 2018 to complete cloud biogenesis, and RocketMQ 5.0 was released in 2021 to fully integrate messages, events, and streams.

1. Preferred business message domain

RocketMQ as a “messaging product that lets you sleep” has become the top choice in the business messaging space due to the following features:

(1) The financial grade is high and reliable

Experienced alibaba double 11 flood peak test;

(2) Minimalist architecture

As shown in the following figure, RocketMQ architecture consists of two main parts: source data Cluster NameServer Cluster and computational storage Cluster Broker Cluster.

RocketMQ architecture diagrams

NameServer nodes are stateless and can be easily expanded horizontally. Broker nodes work in active/standby mode to ensure high data reliability and flexible configuration.

Setup: RocketMQ cluster setup requires simple code:

The Jar:

 nohup sh bin/mqnamesrv &
 nohup sh bin/mqbroker -n localhost:9876 &
Copy the code

On K8S:

kubectl apply -f example/rocketmq_cluster.yaml
Copy the code

(3) Extremely low operation and maintenance costs

RocketMQ has a low operating cost and provides a good CLI tool, MQAdmin, which provides a variety of command support, including cluster health check, cluster traffic management and other aspects. For example, the mqadmin clusterList command can obtain the status of all nodes in the current cluster (production and consumption traffic, delay, queue length, disk water level, etc.). The mqadmin updateBrokerConfig command sets the readable and writable status of broker nodes or topics in real time, enabling the dynamic removal of temporarily unavailable nodes for production consumption traffic migration.

(4) Rich message types

RocketMQ supports normal messages, transactional messages, delayed messages, timed messages, sequential messages, and more. It can easily support big data and service scenarios.

(5) High throughput and low delay

In the active/standby synchronous replication mode, each Broker node can maximize disk utilization and reduce p99 latency to milliseconds.

RocketMQ 5.0 Overview

RocketMQ 5.0 is a cloud-born, cloud-grown cloud native messaging, events, and streaming hyper-converged platform with the following features:

(1) Lightweight SDK

  • Fully support the cloud native communication standard gRPC protocol;
  • Stateless Pop consumption mode, multi-language friendly, easy to integrate;

(2) Minimalist architecture

  • No external dependence, reduce the burden of operation and maintenance;
  • Loose coupling between nodes, any service node can migrate at any time;

(3) separable storage computing separation

  • Broker upgraded to a true stateless service node with no binding.
  • Broker and Store nodes are deployed separately and scaled independently;
  • Multi-protocol standard support, no vendor lock-in;
  • Can be divided and combined, adapt to a variety of business scenarios, reduce the burden of operation and maintenance;

As shown in the figure below, a computing Broker consists of abstract models and corresponding protocol adaptation, as well as consumption and governance capabilities. Storage cluster (Store) is mainly divided into message storage CommitLog (multi-type message storage, multi-mode storage) and Index storage Index (multiple Index). If the cloud storage capability can be fully utilized, Configure CommitLog and Index on the cloud file system to naturally separate storage from computing.

(4) Multi-mode storage support

  • Meet the demands of high availability in different basic scenarios;
  • Take advantage of on-cloud infrastructure to reduce costs;

(5) Cloud native infrastructure:

  • Observability cloud bioassay, OpenTelemetry standardization;
  • Kubernetes one-click deployment and expansion delivery.

RocketMQ 5.02021 Big Events and future plans

3, RocketMQConnector

A. Traditional data streams

(1) Disadvantages of traditional data flow

  • Producer-consumer code needs to be implemented by itself and costs a lot.
  • Data synchronization tasks are not centrally managed.
  • Repeated development, uneven code quality;

(2) Solution: RocketMQ Connector

  • Cooperation and construction, reuse data synchronization task code;
  • Unified management scheduling, improve resource utilization;

B. RocketMQ Connector data synchronization process

The RocketMQ Connector data stream differs from traditional data streams in that source and Sink are managed in a unified manner, and it is open source and has an active community.

4. RocketMQ Connector architecture

As shown in the figure above, RocketMQ Connector architecture mainly consists of Runtime and Worker, as well as ecological Source&Sink.

(1) Standard: OpenMessaging

(2) Ecology: support ActiveMQ, Cassandra, ES, JDBC, JMS, MongoDB, Kafka, RabbitMQ, Mysql, Flume, Hbase, Redis and most of the products in the field of big data;

(3) Component: Manager manages scheduling in a unified manner. If there are multiple tasks, all tasks can be uniformly load balanced and evenly distributed to different workers. At the same time, workers can be horizontally expanded.

5, RocketMQ Stream

RocketMQ Stream is a product that compresses computing layers into a single layer. It supports some common operators such as Window, JOIN, dimension table, compatible with Flink SQL, UDF/UDAF/UDTF.

Apache Hudi

Hudi is a streaming data lake platform that enables rapid updates to large volumes of data. Built-in tabular format, storage layer supporting transactions, a series of table services, data services (out of the box ingestion tools) and comprehensive operation and maintenance monitoring tools. Hudi can uninstall storage to OSS, AWS S3 storage on Ali Cloud.

Hudi features include:

  • Transactional write, MVCC/OCC concurrency control;
  • Native support for record level updates and deletions;
  • Query oriented optimization: automatic management of small files, for incremental pull optimization design, automatic compression, clustering to optimize file layout;

Apache Hudi is a complete data lake platform. Its features are:

  • Each module is tightly integrated and self-managed;
  • Write using Spark, Flink, and Java.
  • Query using Spark, Flink, Hive, Presto, Trino, Impala, AWS Athena/Redshift, etc.
  • An out-of-the-box tool/service for data manipulation.

Apache Hudi is optimized for three scenarios:

1. Streaming stack

(1) Incremental processing;

(2) Fast and efficient;

(3) Line-oriented;

(4) Scanning is not optimized;

2. Batch stack

(1) batch processing;

(2) inefficient;

(3) Scan and column storage format;

3. Incremental processing stack

(1) Incremental processing;

(2) Fast and efficient;

(3) Scan and column storage format.

Building Lakehouse practice

This part only introduces the main process and practical operation configuration items. For practical operation details of this machine, please refer to the appendix.

1. Preparation

RocketMQ version: 4.9.0

Rocketmq - connect - hudi version 0.0.1 - the SNAPSHOTCopy the code

Hudi version: 0.8.0

2. Build rocketMQ-Hudi-connector

(1) to download:

Git clone github.com/apache/rock…

(2) the configuration:

/data/lakehouse/rocketmq-externals/rocketmq-connect/rocketmq-connect-runtime/target/distribution/conf/connect.conf Connector - in the path of the pluginCopy the code

(3) the compiler:

cd rocketmq-externals/rocketmq-connect-hudi
mvn clean install -DskipTest -U
Copy the code

Rocketmq-connect-hudi -0.0.1- snapshot-jar-with-dependencies. Jar is the rocketmq-Hudi -connector we need to use

3, run,

(1) Start or use the existing RocketMQ cluster and initialize the metadata Topic:

Connector-cluster-topic (cluster information) Connector-config-topic (configuration information)

Connector-offset-topic (sink consumption progress) Connector-position-topic (source data processing progress and to ensure the order of messages, each topic can only build a queue)

(2) Start RocketMQ Connector runtime

CD /data/lakehouse/rocketmq-externals/ Rocketmq-connect/Rocketmq-connect-Runtime sh./run_worker.sh ## Worker can start multiple hostsCopy the code

(3) Configure and start the RocketMq-Hudi-Connector task

Request the RocketMQ Connector Runtime to create the task

curl http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-hudi-sink-connector-name} ? config='{"connector-class":"org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector","topicNames":"topicc","tablePa th":"file:///tmp/hudi_connector_test","tableName":"hudi_connector_test_table","insertShuffleParallelism":"2","upsertShuf fleParallelism":"2","deleteParallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.Roc KetMQConverter ", "the source - rocketmq" : "127.0.0.1:9876", "SRC - cluster DefaultCluster" : ""," refresh interval - ":" 10000 ", "schemaPath ":"/data/lakehouse/config/user. Avsc "\} 'start-up success will print log as follows:  2021-09-06 16:23:14 INFO pool-2-thread-1 - Open HoodieJavaWriteClient successfullyCopy the code

(4) At this time, the data produced to Source Topic will be automatically written into the table corresponding to 1Hudi, which can be queried through Hudi API.

4. Configuration resolution

(1) RocketMQ Connector needs to configure RocketMQ cluster information and connector plug-in location, including: Connect Work node ID Identifies the workerID, connect service command receiving port httpPort, RocketMQ cluster namesrvAddr, connect local configuration storage directory storePathRootDir, and Connector plug-in directory plugi NPaths.

RocketMQ Connector configuration table

(2) the Hudi task needs to configure the tablePath tablePath and tableName tableName, as well as the Schema file used by Hudi.

Hudi task configuration table

Components involved: RocketMQ, RocketMQ-Connector-Runtime, RocketMQ-connect-Hudi, HUDI, HDFS, AVro, Spark-Shell0, start HDFS

Download hadoop package

www.apache.org/dyn/closer….

CD/Users/osgoo/Documents/hadoop - 2.10.1 vi core - site. XML < configuration > < property > < name > fs. DefaultFS < / name > <! Hadoop1 --> <value> HDFS ://localhost:9000</value> </property> <! -- Override the default configuration in core-default. XML --> </configuration> vi HDFS -site. XML <configuration> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration>./bin/ HDFS namenode -format./sbin/start-dfs.sh JPS View namenode and datanode Lsof -i:9000./bin/ HDFS DFS -mkdir -p /Users/osgoo/Downloads 1. Start the RocketMQ cluster. https://rocketmq.apache.org/docs/quick-start/ sh mqadmin updatetopic -t connector-cluster-topic -n localhost:9876 -c DefaultCluster sh mqadmin updatetopic -t connector-config-topic -n localhost:9876 -c DefaultCluster sh mqadmin updatetopic -t connector-offset-topic -n localhost:9876 -c DefaultCluster sh mqadmin updatetopic -t Connector-position-topic -n localhost:9876 -c DefaultCluster testhudi1 sh mqadmin updatetopic -t testhudi1 -n localhost:9876 -c DefaultCluster Jar CD rocketmq-connect-hudi MVN clean install-dskiptest rocketmq-connect-hudi MVN clean install-dskiptest rocketmq-connect-hudi MVN clean install-dskiptest -u 4. Start rocketmq-Connector Runtime and configure connect.conf -------------- workerId=DEFAULT_WORKER_1 storePathRootDir=/Users/osgoo/Downloads/storeRoot ## Http port for user to access REST API httpPort=8082 # Rocketmq namesrvAddr namesrvAddr=localhost:9876 # Source or sink connector jar file dir,The default value is Rocketmq - connect - sample pluginPaths = / Users/osgoo/Downloads/connector - plugins -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- copy Rocketmq hudi - connector. The jar to pluginPaths = / Users/osgoo/Downloads/connector - plugins sh run_worker. Sh 5 config curl into the lake, configuration http://localhost:8082/connectors/rocketmq-connect-hudi?config='\{"connector-class":"org.apache.rocketmq.connect.hudi.con nector.HudiSinkConnector","topicNames":"testhudi1","tablePath":"hdfs://localhost:9000/Users/osgoo/Documents/base-path7", "tableName":"t7","insertShuffleParallelism":"2","upsertShuffleParallelism":"2","deleteParallelism":"2","source-record-co Nverter "is:" org. Apache. Rocketmq. Connect the runtime. The converter. The RocketMQConverter ", "the source - rocketmq" : "127.0.0.1:9876", "the source - cl Uster DefaultCluster ":" ", "refresh interval -" : "10000", "schemaPath" : "/ Users/osgoo/Downloads/user. Avsc" \} '6, send a message to testhudi1 7, # # using spark read CD/Users/osgoo/Downloads/spark - 3.1.2 - bin - hadoop3.2 / bin. / spark - shell \ - packages Org. Apache. Hudi: hudi - spark3 - bundle_2. 12:0. 9.0 org. Apache. Spark: spark - avro_2. 12:3. 0.1 \ conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._  import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ val tableName = "t7" val basePath = "hdfs://localhost:9000/Users/osgoo/Documents/base-path7" val tripsSnapshotDF = spark. read. format("hudi"). load(basePath + "/*") tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select *  from hudi_trips_snapshot").show()Copy the code

The original link

This article is the original content of Aliyun and shall not be reproduced without permission.