Guide language | this article describes how to use on the tencent cloud after the cloud of big data components to complete the design and implementation of real-time analysis system, in the process of reading by comparing the use of components such as cloud Ckafka, Flink and MySQL difference to reflect the advantage of the cloud. This paper takes the scene of video live broadcast gift reward as an example to show the convenience of development under full/semi-managed service, so that readers can have a preliminary understanding of the design of video live broadcast system.
I. Solution description
(1) Overview
This solution combines Tencent Cloud CKafka, Oceanus, private network VPC, BI, etc. to conduct real-time visualization analysis of digital operation in the video live broadcasting industry. The analysis indicators include the regional distribution of viewers, statistics of members at all levels, gifts of each module, and the number of online viewers.
(II) Scheme structure and advantages
According to the above live video scene, the following architecture diagram is designed:
Product List:
-
Oceanus flow calculation
-
Private network VPC
-
Message queue CKafka
-
Cloud database MySQL
-
EMR cluster HBase component
-
Business intelligence analysis services
Two, pre-preparation
Purchase and create the appropriate big data components.
(a) Create a VPC private network
Private network is a logically isolated network space that you can customize on Tencent Cloud. You must choose the same network when building MySQL, EMR, ClickHouse cluster and other services so that the network can be connected. Otherwise, you need to use peer connection or VPN to access the network.
(page address: console.cloud.tencent.com/vpc/vpc?rid…
(2) Create the Oceanus cluster
The Stream computing Oceanus service is compatible with native Flink tasks. On the Cluster Management > New Cluster page of the Oceanus console, create a cluster, select a region, availability zone, VPC, log, storage, and initial password.
(3) Create message queue Ckafka
CKafka (Cloud Kafka) is a message queue service based on the open source Apache Kafka message queue engine, which provides high throughput performance and high scalability. Message queue CKafka is perfectly compatible with Apache kafka0.9, 0.10, 1.1, 2.4, 2.8 version of the interface, in performance, scalability, business security, operation and maintenance and other aspects of the super advantages, so that you enjoy low cost, super functions at the same time, avoid tedious operation and maintenance work.
(page address: cloud.tencent.com/product/cka…
-
Create the Ckafka cluster
Note Private network and subnet Select the network and subnet created earlier:
-
Create a topic
Create a topic
-
Simulate sending data to a topic
-
Kafka client
Enter the CVM of the same subnet, start the Kafka client, and simulate sending data.
(cloud.tencent.com/document/pr…
- Send using a script
Script 1: Java refer to the following official website:
(cloud.tencent.com/document/pr…
Script 2: The Python script generates simulated data:
#! /usr/bin/python3 Import jsonImport randomImport timefrom kafka import KafkaProducer TIME_FORMAT = "% % m - Y H: % d % % m: % S" PROVINCES = [" Beijing ", "guangdong province", "shandong province", "jiangsu province", "henan province", "Shanghai", "hebei province", "zhejiang province", "Hong Kong", "shanxi", "hunan province", "chongqing", "fujian province", "tianjin", "Yunnan", "sichuan province", "guangxi", "anhui province", "hainan", "jiangxi province", "hubei province", "Shanxi Province", "liaoning province", "Taiwan", "heilongjiang province", "Inner Mongolia", "Macao", "guizhou", "gansu province", "qinghai", "xinjiang", "Tibet", "jilin", Broker_lists = ['172.28.28.13:9092'] topIC_LIVE_GIFt_total = 'LIVE_GIFt_total' topic_LIVE_streaming_log = 'live_streaming_log' producer = KafkaProducer(bootstrap_servers=broker_lists, value_serializer=lambda m: Json. Dumps (m).encode(' ASCII ')) # Pre_day_count = 0pre_hour_count = 0hour_unit = 3600DAY_unit = 3600 * 24 def generate_datA_LIVE_GIFt_total (): # construct time update_time = time.time() - day_unit * pre_day_count update_time_str = time.strftime(TIME_FORMAT, time.localtime(update_time)) create_time = update_time - hour_unit * pre_hour_count create_time_str = time.strftime(TIME_FORMAT, time.localtime(create_time)) results = [] for _ in range(0, 10): user_id = random.randint(2000, 4000) random_gift_type = random.randint(1, 10) random_gift_total = random.randint(1, 100) msg_kv = {"user_id": user_id, "gift_type": random_gift_type, "gift_total_amount": random_gift_total, "create_time": create_time_str, "update_time": update_time_str} results.append(msg_kv) return results def generate_live_streaming_log(): # construct time update_time = time.time() - day_unit * pre_day_count leave_time_str = time.strftime(TIME_FORMAT, time.localtime(update_time)) create_time = update_time - hour_unit * pre_hour_count create_time_str = time.strftime(TIME_FORMAT, time.localtime(create_time)) results = [] for _ in range(0, 10): user_id = random.randint(2000, 4000) random_province = random.randint(0, len(PROVINCES) - 1) province_name = PROVINCES[random_province] grade = random.randint(1, 5) msg_kv = {"user_id": + STR (user_id % 255), "room_id": 20210813, "arrive_time": create_time_str, "create_time": create_time_str, "leave_time": leave_time_str, "region": 1122, "grade": (user_id % 5 + 1), "province": province_name} results.append(msg_kv) return results def send_data(topic, msgs): count = 0 # produce asynchronously for msg in msgs: import time time.sleep(1) count += 1 producer.send(topic, msg) print(" send %d data... \n %s" % (count, msg)) producer.flush() if __name__ == '__main__': count = 1 while True: time.sleep(60) #for _ in range(count): msg_live_stream_logs = generate_live_streaming_log() send_data(topic_live_streaming_log, msg_live_stream_logs) msg_topic_live_gift_totals = generate_data_live_gift_total() send_data(topic_live_gift_total, msg_topic_live_gift_totals)Copy the code
(4) Create an EMR cluster
EMR is a cloud hosted elastic open-source Hadoop service that supports big data frameworks such as Spark, HBase, Presto, Flink, and Druid. In this example, Flume, Hive, YARN, HUE, and Oozie are required.
(page address console.cloud.tencent.com/emr)
- Install HBase components in the EMR cluster.
- In the production environment, the server configuration can be selected based on site requirements. In this example, a server with low configuration is selected. Select the created VPC network to ensure that all service components are in the same VPC.
-
The HBase Master node is displayed
-
Click login to enter the server
-
Create Hbase table
[root@172~]# hbase shell create 'dim_hbase', 'cf'Copy the code
(5) Create the cloud database MySQL
Cloud database MySQL (TencentDB for MySQL) is a high-performance distributed data storage service developed by Tencent Cloud based on the open source database MySQL. It enables users to set up, operate and expand relational databases more easily in the cloud.
(page address: console.cloud.tencent.com/cdb)
MySQL > select * from ‘network’;
After MySQL service is created, you need to modify the binlog parameter, as shown in the figure to FULL (default is MINIMAL).
After modifying the parameters, log in to MySQL to create the database and database tables required by the example.
- Log in to the MySQL cloud database
- Creating a Database
Open an SQL window or visual page to create a database and table:
CREATE DATABASE livedb; Create a database listCopy the code
Create business intelligence analytics
Business Intelligence (BI), a new generation of agile self-service BI service platform, supports self-service data preparation, exploratory analysis and enterprise-level management and control. In just a few minutes, you can easily complete a series of data visualization operations such as data analysis, business data exploration and report making in the cloud. Convenient drag-and-drop interaction allows you to quickly understand the correlation, trend and logic behind data without relying on IT personnel or worrying about trial and error costs.
(page address: cloud.tencent.com/product/bi)
– Buy business intelligence analytics
- The primary account is required to purchase resources. Purchase resources based on the number of created sub-accounts.
- Application made by sub-users:
- The main account has been approved. Sub-users are granted the right to add data sources, create data sets, and view reports.
– Add MySQL data source
Here, open the external network connection mode. See more connection modes:
(cloud.tencent.com/document/pr…
- Open the purchased MySQL instance, open the extranet:
- Add SaaS BI (119.29.66.144:3306) to the MySQL database security group
Note that MySQL3306 port is added, not extranet mapped port.
- Create a MySQL account and configure permissions
Create an account and set the account password. Note that the host IP address is set to % :
Setting account permissions:
Enter intelligent Business analysis, connect to MySQL database. Add data source ->MySQL, then click Test connection.
Iii. Implementation of the scheme
Next, I will introduce how to use Oceanus to realize real-time visualization data processing and analysis of digital operation of live video broadcasting through a case study.
(I) Solutions
Business goals
Only the following three statistical indicators are listed here:
-
Distribution of users watching live broadcast throughout the station;
-
Gift sum statistics module;
-
Gift statistics source data format.
Live_streaming_log (topic) :
Ckafka is stored in JSON format, and the data displayed is as follows:
'user_id': 3994, 'GIFt_type ': 3,' GIFt_total_AMOUNT ': 28, 'Room_id ': 20210813,' IP ': '123.0.0.105', 'create_time': '2021-08-16 09:46:51' , 'update_time': '2021-08-16 09:46:51' }Copy the code
Module record table: LIVE_module_RoomID (Hbase dimension table) :
-
Oceanus SQL job writing
Distribution of users watching live broadcast on the whole network. (you need to create a table in MySQL in advance)
- Define the source:
CREATE TABLE `live_streaming_log_source ` ( `user_id` BIGINT, `ip` VARCHAR, `room_id` BIGINT, `arrive_time` TIMESTAMP, `leave_time` TIMESTAMP, `create_time` TIMESTAMP, `region_code` INT, `grade` INT, `province` VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'live_streaming_log', 'scan, startup mode' = 'earliest - offset', 'the properties. The bootstrap. The servers' =' 172.28.28.13:9092 ', 'properties.group.id' = 'joylyu-consumer-2', 'format' = 'json', 'json.ignore-parse-errors' = 'false', 'json.fail-on-missing-field' = 'false' );Copy the code
- Define the sink:
CREATE TABLE `live_streaming_log_sink` ( `user_id` BIGINT, `ip` VARCHAR, `room_id` BIGINT, `arrive_time` TIMESTAMP, `leave_time` TIMESTAMP, `create_time` TIMESTAMP, `region_code` INT, `grade` INT, `province` VARCHAR, primary key(`user_id`, `ip`,`room_id`,`arrive_time`) not enforced ) WITH ( 'connector' = 'jdbc', 'url' = 'JDBC: mysql: / / 172.28.28.227:3306 / livedb? rewriteBatchedStatements=true&serverTimezon=Asia/Shanghai', 'table-name' = 'live_streaming_log', 'username' = 'root', 'password' = 'xxxxx', 'sink.buffer-flush.max-rows' = '5000', 'sink.buffer-flush.interval' = '2s', 'sink.max-retries' = '3' );Copy the code
- Business logic:
INSERT INTO `live_streaming_log_sink`
SELECT `*` FROM `live_streaming_log_source`;
Copy the code
– Total gift statistics (need to build a table in MySQL in advance)
- Define the source:
CREATE TABLE ` live_gift_total_source` ( `user_id` VARCHAR, `gift_type` VARCHAR, `gift_total_amount` BIGINT, `ip` VARCHAR, `create_time` VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'live_gift_total', 'scan, startup mode' = 'earliest - offset', 'the properties. The bootstrap. The servers' =' 172.28.28.13:9092 ', 'properties.group.id' = 'joylyu-consumer-1', 'format' = 'json', 'json.ignore-parse-errors' = 'false', 'json.fail-on-missing-field' = 'false' );Copy the code
- Define the sink:
CREATE TABLE `live_gift_total_sink` ( `gift_type` VARCHAR, `gift_total_amount` BIGINT, primary key(`user_id`, ` gift_type `) not enforced) WITH (' connector '=' JDBC ', 'url' = 'JDBC: mysql: / / 172.28.28.227:3306 / livedb? rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', 'table-name' = 'live_gift_total', 'username' = 'root', 'password' = 'xxxxx', 'sink.buffer-flush.max-rows' = '5000', 'sink.buffer-flush.interval' = '2s', 'sink.max-retries' = '3' );Copy the code
- Business logic:
INSERT INTO `live_gift_total_sink`
SELECT `gift_type`,
SUM(`gift_total_amount`) as `gift_total_amount_all`
FROM `live_gift_total_source`
GROUP BY `gift_type`;
Copy the code
– Gift statistics of each module (need to build a table in MySQL in advance)
- Define the source:
CREATE TABLE `live_gift_total_source` ( `user_id` VARCHAR, `gift_type` VARCHAR, `gift_total_amount` BIGINT, `ip` VARCHAR, `create_time` VARCHAR, proc_time AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'live_gift_total', 'scan.startup.mode' = 'earliest-offset', 'properties. The bootstrap. The servers' =' 172.28.28.13:9092 ', 'properties. Group. Id =' joylyu - consumer - 1 ', 'format' = 'json', 'json.ignore-parse-errors' = 'false', 'json.fail-on-missing-field' = 'false' );Copy the code
- Defining an Hbase dimension table:
CREATE TABLE `dim_hbase` ( `rowkey` STRING, `cf` ROW <`module_id` STRING>, PRIMARY KEY (' Rowkey ') NOT 残 疾) WITH ('connector' = 'hbase-1.4', 'table-name' = 'DIM_hbase ', 'zookeeper.quorum' = 'user's own hbase server Zookeeper address');Copy the code
- Define the sink:
CREATE TABLE `module_gift_total_sink` ( `module_id` BIGINT, `module_gift_total_amount` BIGINT, Primary key (` module_id `) not enforced) WITH (' connector '=' JDBC ', 'url' = 'JDBC: mysql: / / 172.28.28.227:3306 / livedb? rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', 'table-name' = 'live_gift_total', 'username' = 'root', 'password' = 'xxxxx', 'sink.buffer-flush.max-rows' = '5000', 'sink.buffer-flush.interval' = '2s', 'sink.max-retries' = '3' );Copy the code
- Business logic:
INSERT INTO `module_gift_total_sink` SELECT `b`.`cf`.`module_id`, SUM(`a`.`gift_total_amount`) AS `module_gift_total_amount` FROM `live_gift_total_source` AS `a` LEFT JOIN `dim_hbase` AS `b` for SYSTEM_TIME as of `a`.`proc_time` ON `a`.`room_id` = `b`.`rowkey` GROUP BY `b`.`cf`.`module_id`;Copy the code
(2) Real-time large screen visualization
-
Adding a Data source
Enter the business intelligence analysis interface, click Add Data Source ->MySQL, connect to the specified MySQL database as above, and click Save.
-
Creating a data set
Click Create Dataset ->SQL dataset (other datasets can be selected based on actual business scenarios), add datasets from the previous data source, and click Save.
-
To make the report
Create a new report. Click Make Report -> New Report (select any template) and drag the component into the middle space to complete the report.
Set the real-time refresh. Click Report Settings in the upper left corner -> Advanced, check Get real-time data, and set the refresh interval to 3S (choose according to the actual service situation), so that the report can be automatically refreshed every 3s according to the MysQL data source interval. Click Save when you’re done. For details, see:
(cloud.tencent.com/document/pr…
-
Check the report
Click View report and select the saved report to display the report dynamically. Note: This report is for demonstration only, please refer to:
(cloud.tencent.com/document/pr…
As shown in the figure below, there are a total of 6 charts on the large screen.
Figure 1: Geographical distribution of users. Represents the regional distribution of customers watching live broadcast nationwide;
Chart 2: Number of members by level. Represents the total number of members of each class;
Figure 3: Total of gift types. Represents the sum of all gift types received;
Chart 4: Total number of gifts in the last 6 hours. Represents the total amount of gifts received in the last 6 hours;
Chart 5: Top 10 gifts. Represents the 10 customers who brush the most gifts;
Chart 6: Number of people online. The number of people entering the studio at each time of the day.
Four,
The data were collected by CKafka component of Tencent Cloud, and processed in real-time such as dimension table association in Oceanus, a stream computing system compatible with Flink open source version. The processed data were stored in MySQL and other databases. Finally, the real-time large screen was drawn by updating MySQL data in real time through BI component of business intelligence analysis. Get the effect of real-time refresh. In order to be simple and easy to understand, this program has been simplified in the database table design, focusing on Tencent cloud products to show the whole program. Limited to personal level, if there is any misunderstanding, welcome criticism and correction.
Author’s brief introduction
Spiderwu is a senior engineer at Tencent CSIG.