The data set
Suppose we have two streaming datasets in different Kafka topics (source, target), and we need to know the data quality of the target dataset based on the source dataset.
For simplicity, assume that the data for both topics are JSON strings, as shown below
{"id": 1."name": "Apple"."color": "red"."time": "The 2018-09-12 _06:00:00"}
{"id": 2."name": "Banana"."color": "yellow"."time": "The 2018-09-12 _06:01:00"}...Copy the code
Environment to prepare
Prepare the environment for the Apache Griffin measurement module, including the following components:
- The JDK (+ 1.8)
- Hadoop (server +)
- The Spark (2.2.1 +)
- Kafka (0.8.x)
- Zookeeper (+ 3.5)
For details on how to configure these components, see Griffin /griffin-doc/deploy. This article assumes that the above environments have been configured. For information on version matching, see github.com/apache/grif…
Build Apache Griffin measurement module
1. Download the Apache Griffin source package here. 2. Decompress the source package.
unzip griffin-0.4. 0-source-release.zip
cd griffin-0.4. 0-source-release
Copy the code
3. Build the Apache Griffin JAR
mvn clean install
Copy the code
And move the built Apache Griffin JAR package to the project path
mv measure/target/measure-0.4. 0.jar <work path>/griffin-measure.jar
Copy the code
Data preparation
To get started quickly, we use the Kafka shell to create two Kafka themes (source, target) and generate data for them in JSON string format.
# create topics
# Note: it just works for kafka 0.8
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic source
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic target
Copy the code
The data format looks something like this:
{"id": 1."name": "Apple"."color": "red"."time": "The 2018-09-12 _06:00:00"}
{"id": 2."name": "Banana"."color": "yellow"."time": "The 2018-09-12 _06:01:00"}
Copy the code
For topic sources and targets, there may be some different data from each other. You can download the demo data and run the./streaming-data.sh script to generate JSON string data files and generate them into a Kafka theme.
Define data quality metrics
Apache Griffin Environment Configuration Environment configuration file: env.json
{
"spark": {
"log.level": "WARN"."checkpoint.dir": "hdfs:///griffin/checkpoint"."batch.interval": "20s"."process.interval": "1m"."init.clear": true."config": {
"spark.default.parallelism": 4."spark.task.maxFailures": 5."spark.streaming.kafkaMaxRatePerPartition": 1000."spark.streaming.concurrentJobs": 4."spark.yarn.maxAppAttempts": 5."spark.yarn.am.attemptFailuresValidityInterval": "1h"."spark.yarn.max.executor.failures": 120."spark.yarn.executor.failuresValidityInterval": "1h"."spark.hadoop.fs.hdfs.impl.disable.cache": true}},"sinks": [{"type": "console"
},
{
"type": "hdfs"."config": {
"path": "hdfs:///griffin/persist"}}, {"type": "elasticsearch"."config": {
"method": "post"."api": "http://es:9200/griffin/accuracy"}}]."griffin.checkpoint": [{"type": "zk"."config": {
"hosts": "zk:2181"."namespace": "griffin/infocache"."lock.path": "lock"."mode": "persist"."init.clear": true."close.clear": false}}}]Copy the code
Define Griffin Data Quality (DQ) DQ profile: dq.json
{
"name": "streaming_accu"."process.type": "streaming"."data.sources": [{"name": "src"."baseline": true."connectors": [{"type": "kafka"."version": "0.8"."config": {
"kafka.config": {
"bootstrap.servers": "kafka:9092"."group.id": "griffin"."auto.offset.reset": "largest"."auto.commit.enable": "false"
},
"topics": "source"."key.type": "java.lang.String"."value.type": "java.lang.String"
},
"pre.proc": [{"dsl.type": "df-opr"."rule": "from_json"}}]]."checkpoint": {
"type": "json"."file.path": "hdfs:///griffin/streaming/dump/source"."info.path": "source"."ready.time.interval": "10s"."ready.time.delay": "0"."time.range": ["-5m"."0"]."updatable": true}}, {"name": "tgt"."connectors": [{"type": "kafka"."version": "0.8"."config": {
"kafka.config": {
"bootstrap.servers": "kafka:9092"."group.id": "griffin"."auto.offset.reset": "largest"."auto.commit.enable": "false"
},
"topics": "target"."key.type": "java.lang.String"."value.type": "java.lang.String"
},
"pre.proc": [{"dsl.type": "df-opr"."rule": "from_json"}}]]."checkpoint": {
"type": "json"."file.path": "hdfs:///griffin/streaming/dump/target"."info.path": "target"."ready.time.interval": "10s"."ready.time.delay": "0"."time.range": ["-1m"."0"]}}],"evaluate.rule": {
"rules": [{"dsl.type": "griffin-dsl"."dq.type": "accuracy"."out.dataframe.name": "accu"."rule": "src.id = tgt.id AND src.name = tgt.name AND src.color = tgt.color AND src.time = tgt.time"."details": {
"source": "src"."target": "tgt"."miss": "miss_count"."total": "total_count"."matched": "matched_count"
},
"out":[
{
"type":"metric"."name": "accu"
},
{
"type":"record"."name": "missRecords"}]}]},"sinks": ["CONSOLE"."HDFS"]}Copy the code
Quality of measured data
Submit the measurement job to Spark with the configuration file path as the parameter.
spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default\ -driver-memory 1g --executor-memory 1g --num-executors 3 \
<path>/griffin-measure.jar \
<path>/env.json <path>/dq.json
Copy the code
Report data quality indicators
The calculation log is available from the console, and results metrics are printed per minute as the job runs. Relevant results will be stored in HDFS: HDFS: / / / griffin/persist / /, and named after the timestamp in computing tasks listed in different directory.
Optimize data quality reporting
Data quality measures can be further improved based on the results, as well as actual business needs
For details about measurement parameters, see Griffin /griffin-doc/measure