preface
Our company needs to analyze and monitor the data of car factories nationwide (payment, abnormal lifting, entry and exit, vehicle type, license plate, etc.). Our partner on the JAVA side is responsible for deploying the Driver end to synchronously upload the bottom table data of each node to Kafka in real time for the data group to use.
In order to ensure that data is not lost, in addition to real-time access data, a version of yesterday’s full data will be accessed at dawn every day. Data redundancy is handled by Clickhouse’s ReplacingMergeTree() engine
Lead to
- Ali Cloud OSS(Offline data access from OSS)
- Ossutil (The money is not available, the OSS purchased by our company does not support mounting to Linux in the form of directory, so use this tool to synchronize)
- Jq (Working with JSON data in Linux)
- Clickhouse(Data landing)
The implementation of
Install OSSFS
Each version download address and installation tutorial: help.aliyun.com/document_de…
1. Download and install
[root@hadoop-prod-datanode1 ~]Wget # http://gosspublic.alicdn.com/ossutil/1.7.7/ossutil64
[root@hadoop-prod-datanode1 ~]# chmod 755 ossutil64
Copy the code
2. Use interactive configuration to generate configuration files
[root@hadoop-prod-datanode1 ~]# ./ossutil64 config
Copy the code
Set the Endpoint, AccessKey ID, AccessKey Secret, and STSToken parameters as prompted
The parameters are described as follows:
-
Endpoint: Specifies the endpoint of the Bucket region. For details about regional endpoints, see Accessing Domain Names and Data Centers
- You can also add
http://
orhttps://
Specifies the protocol used by OSsutil to access OSS. The default protocol is HTTP. For example, if HTTPS is used to access the Bucket in Shenzhen, set this parameter tohttps://oss-cn-shenzhen.aliyuncs.com
.
- You can also add
-
AccessKeyID, accessKeySecret: Specifies the AccessKey of the account. * For how to obtain AccessKey when using Ali Cloud account or RAM user, see Obtaining AccessKey.
- For details about how to obtain the AccessKey when the STS temporary authorization account is used to access OSS, see Using THE STS Temporary Access Credentials.
-
StsToken: This parameter needs to be set when the STS temporary authorization account is used to access OSS. Otherwise, this parameter is empty.
- See temporary access credentials for how stsToken is generated.
Install JQ tool to parse JSON
Json data in OSS is formatted as JSON package JSON, which Clickhouse does not support writing, so you need to strip the JSON
1. Download JQ
[root@hadoop-prod-datanode1 ~]# yum install jq
Copy the code
2. Get a JSON to test the functionality
[root@hadoop-prod-datanode1 ~]# jq -c .[] /data/park/park_data/xxx.json > /data/park/park_parse_data/xxx.json
Copy the code
Write scripts to batch write Clickhouse
Written format can be found in the Ck website clickhouse.com/docs/en/int…
1. Write functional processing scripts
#! /bin/bash
# * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
# Author: CoolCool の XinXing
# Version: 1.0
#Date: 2021-07-08
#FileName: park_extr_insert_clickhouse.sh
#Description: Warehouse full data
# * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
Get the current time
date=$(date -d "1 day ago" +"%Y-%m-%d")
echo "-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --${date}-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --"
Get the original data path
path="/data/park/park_data/$date"
Fuzzy matching makes file names in OSS correspond to tables in Clickhouse
args=The $1
parkTableJson="$args*.json"
The pattern matches which table to process (import)
case $args in
'VechicleExit')
tabName='out_vehicle_record'
;;
'VechicleEnter')
tabName='in_vehicle_record'
;;
'TempStopPayment')
tabName='pay_record'
;;
'MonthlyRent')
tabName='token_record'
;;
esac
Put all matched JSON file directories into an array for traversal
array1=$(ls $path/$parkTableJson)
Create a local data folder and store it as a directory
mkdir -p /data/park/parse_park_data/$date
echo "-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -${tabName}-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --"
# loop processing
for value in ${array1[*]}
do
# replace the original path with json after parsing the local data path
parsePath=${value/park_data/parse_park_data}
jq -c .[] $value > $parsePath
# check whether file is empty, Ck insert empty data error
if [ -s $parsePath ]; then
Print the number of file records for traceback in the log
echo `wc -l $parsePath`
Insert data into ClickhouseClickhouse-client-h 10.196.17.27 --port=9999 --database="park" -u default --password 123456 --query="insert into park.${tabName} FORMAT JSONEachRow" < $parsePath
else
echo "File contents are empty and will not be processed:$parsePath"
fi
done
Copy the code
2. Pull oss delta data and invoke the above script
#! /bin/bash
# * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
# Author: CoolCool の XinXing
# Version: 1.0
#Date: 2021-07-08
#FileName: park_extr_all.sh
#Description: Warehouse full data
# * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
# Pull yesterday's full data from OSS incremental
ossutil64 cp -r oss://park-client/file/ /data/park/park_data/ --update
sh /script/spark_job/bin/park/park_extr_insert_clickhouse.sh "VechicleExit"
sh /script/spark_job/bin/park/park_extr_insert_clickhouse.sh "VechicleEnter"
sh /script/spark_job/bin/park/park_extr_insert_clickhouse.sh "TempStopPayment"
sh /script/spark_job/bin/park/park_extr_insert_clickhouse.sh "MonthlyRent"
Copy the code
After the speech
I don’t think it’s necessary to make it clear in my blog. After all, Baidu and Google have everything. The key point is to share ideas and a little experience. You can extend your own implementation by referring to the ideas of existing students, so that you are not limited to the tools in this article. All roads lead to Rome