In real production, we often have the need to base on the raw data flow and then associate a large number of external tables to supplement some attributes. For example, in the order data, we hope to get the name of the province where the order recipient resides. Generally, the order will record the ID of a province, so we need to query the external dimension table according to the ID to supplement the province name attribute.
Difference between flow table and dimension table:
Flow table: a table mapped from real-time stream data. In join query, every data will be actively queried in the dimension table to see if there is any matching data.
Dimension table: Dimension information table. Generally, dimension table is passively queried in external storage (Redis, Mysql). Currently, Flink SQL dimension table JOIN only supports association of dimension table snapshots at the current time
Join the flow table and dimension table
In Flink streaming computing, some of our dimension attributes are generally stored in MySQL/HBase/Redis. These dimension table data are updated regularly and need to be associated according to business. According to the timeliness requirements of our business on dimension table data association, there are several solutions as follows:
- Query the dimension table association in real time
- Preloads dimension table associations
- Hot Storage Association
- Radio dimension table
Dimension table is a concept in data warehouse. Dimension attribute in dimension table is the Angle to observe data and supplement the information of fact table. In the real-time data store, there is also the concept of dimension table and fact table. The fact table is usually kafka’s real-time streaming data, and dimension table is usually stored in external devices (such as MySQL and HBase). For each stream data, an external dimension table data source can be associated to provide data associated queries for real-time calculations. The dimension table may change constantly. When the dimension table joins, specify the time when the record is associated with the snapshot of the dimension table.
- 1. Difference between flow table and dimension table
- 2. Data flow analysis of flow table and dimension table JOIN
- 3. Data flow analysis of dual-stream Join
- Code examples and scenarios
In the process of Flink flow processing, it is often necessary to interact with external systems, such as dimension completion, which uses the dimension table to complete the fields in the fact table. By default, in MapFunction, a single parallel can only interact synchronously: send requests to external storage, IO blocks, wait for the request to return, and then proceed to the next request. This kind of synchronous interaction often consumes a lot of time in network waiting. In order to improve the processing efficiency, the parallelism of MapFunction can be increased, but increasing the parallelism means more resources, which is not a very good solution. Flink introduced Async I/O in 1.2. In asynchronous mode, IO operations are asynchronized. A single parallel request can be sent continuously, and the first one returned will be processed first, thus there is no need for blocking waiting between continuous requests, which greatly improves the efficiency of stream processing.
Note:
To use Async I/O, an external storage client that supports asynchronous requests is required.
Use Async I/O, inherit RichAsyncFunction(interface AsyncFunction<IN, OUT> abstract class), rewrite or implement open(establish connection), close(close connection), asyncInvoke(asynchronous call)3 methods. As follows, custom implementations ElasticsearchAsyncFunction class to obtain dimension data from ES.
Async I/O, preferably combined with cache, can reduce The Times of requesting external storage and improve efficiency.
Async I/O provides the Timeout parameter to control the maximum waiting time for a request. By default, when an asynchronous I/O request times out, an exception is raised and the job restarts or stops. If you want to handle timeouts, you can override the AsyncFunction#timeout method.
Async I/O provides the Capacity parameter to control the number of concurrent requests. Once the Capacity is used up, the backvoltage mechanism is triggered to suppress upstream data intake.
Async I/O output supports both out-of-order and sequential modes.
Out-of-order, use AsyncDataStream. UnorderedWait (…). API, each parallel output order and input order may be inconsistent.
Order, with AsyncDataStream. OrderedWait (…). API, each parallel output order is the same as the input order. To ensure order, you need to sort the output Buffer, which is less efficient.
Use Async I/O to Join flow table and dimension table
Demand background
Complete dimension fields in the flow table in real time. Here, complete the user’s age in the flow table.
The data source
- Flow table: logs of user behavior. A user has clicked on or browsed an item at some point in time. Self-made test data, data samples such as the following:
{“userID”: “user_1”, “eventTime”: “2016-06-06 07:03:42”, “eventType”: “browse”, “productID”: 2} 2. Dimension table: Basic user information. The self-created test data is stored on ES. The data example is as follows: GET DIM_user/DIM_user /user
{ “_index”: “dim_user”, “_type”: “dim_user”, “_id”: “user_1”, “_version”: 1, “found”: true, “_source”: { “age”: 22 } }
Implementation logic
package com.bigdata.flink; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.*; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import java.util.Properties; import java.util.concurrent.TimeUnit; /** * Author: zuoziwen * Summary: Join */ public class FlinkAsyncIO {public static void main(String[] args) throws Exception{ ParameterTool ParameterTool = parametertool. fromArgs(args); String kafkaBootstrapServers = parameterTool.get("kafka.bootstrap.servers"); String kafkaGroupID = parameterTool.get("kafka.group.id"); String kafkaAutoOffsetReset= parameterTool.get("kafka.auto.offset.reset"); String kafkaTopic = parameterTool.get("kafka.topic"); int kafkaParallelism =parameterTool.getInt("kafka.parallelism"); String esHost= parameterTool.get("es.host"); Integer esPort= parameterTool.getInt("es.port"); String esUser = parameterTool.get("es.user"); String esPassword = parameterTool.get("es.password"); String esIndex = parameterTool.get("es.index"); String esType = parameterTool.get("es.type"); /**Flink DataStream running environment */ Configuration config = new Configuration(); config.setInteger(RestOptions.PORT,8081); config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config); /* / Properties kafkaProperties = new Properties(); kafkaProperties.put("bootstrap.servers",kafkaBootstrapServers); kafkaProperties.put("group.id",kafkaGroupID); kafkaProperties.put("auto.offset.reset",kafkaAutoOffsetReset); FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(kafkaTopic, new SimpleStringSchema(), kafkaProperties); kafkaConsumer.setCommitOffsetsOnCheckpoints(true); SingleOutputStreamOperator<String> source = env.addSource(kafkaConsumer).name("KafkaSource").setParallelism(kafkaParallelism); / / data conversion SingleOutputStreamOperator < Tuple4 < String, String, String, Integer > > sourceMap = source. The map ((MapFunction < String, Tuple4<String, String, String, Integer>>) value -> { Tuple4<String, String, String, Integer> output = new Tuple4<>(); try { JSONObject obj = JSON.parseObject(value); output.f0 = obj.getString("userID"); output.f1 = obj.getString("eventTime"); output.f2 = obj.getString("eventType"); output.f3 = obj.getInteger("productID"); } catch (Exception e) { e.printStackTrace(); } return output; }).returns(new TypeHint<Tuple4<String, String, String, Integer>>(){}).name("Map: ExtractTransform"); / / filter out abnormal data SingleOutputStreamOperator < Tuple4 < String, String, String, Integer>> sourceFilter = sourceMap.filter((FilterFunction<Tuple4<String, String, String, Integer>>) value -> value.f3 ! = null).name("Filter: FilterExceptionData"); //Timeout: Timeout period By default, if an asynchronous I/O request times out, an exception will be raised and the job will be restarted or stopped. If you want to handle timeouts, you can override the AsyncFunction#timeout method. //Capacity: Number of concurrent requests / * * Async IO implementation flow table and dimension table Join * / SingleOutputStreamOperator < Tuple5 < String, String, String, Integer, Integer>> result = AsyncDataStream.orderedWait(sourceFilter, new ElasticsearchAsyncFunction(esHost,esPort,esUser,esPassword,esIndex,esType), 500, TimeUnit.MILLISECONDS, 10).name("Join: JoinWithDim"); */ result.print().name("PrintToConsole"); env.execute(); }}Copy the code
ElasticsearchAsyncFunction
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.util.Collections; import java.util.concurrent.TimeUnit; /** * Author: zuoziwen * Summary: * custom ElasticsearchAsyncFunction, Implement query dimension data from ES * / public class ElasticsearchAsyncFunction extends RichAsyncFunction < Tuple4 < String, String, String, Integer>, Tuple5<String, String, String, Integer,Integer>> { private String host; private Integer port; private String user; private String password; private String index; private String type; public ElasticsearchAsyncFunction(String host, Integer port, String user, String password, String index, String type) { this.host = host; this.port = port; this.user = user; this.password = password; this.index = index; this.type = type; } private RestHighLevelClient restHighLevelClient; private Cache<String,Integer> cache; @override public void open(Configuration parameters){//ES Client CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); restHighLevelClient = new RestHighLevelClient( RestClient .builder(new HttpHost(host, port)) .setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider))); // Cache = cacheBuilder.newBuilder ().maximumsize (2).expireAfterAccess(5, timeUnit.minutes).build(); } / close the connection * * * * @ throws the Exception * / @ Override public void the close () throws the Exception {restHighLevelClient. Close (); } public void asyncInvoke(Tuple4<String, String, String, String); Integer> input, ResultFuture<Tuple5<String, String, String, Integer, ResultFuture) {// resultFuture = cache.getifPresent (input. F0); if(cachedValue ! Println (" Get dimension data from cache: key="+input.f0+",value="+cachedValue); resultFuture.complete(Collections.singleton(new Tuple5<>(input.f0,input.f1,input.f2,input.f3,cachedValue))); }else {searchFromES(input,resultFuture); }} /** * When there is no data in the cache, * @param Input * @param resultFuture */ private void searchFromES(Tuple4<String, String, String, Integer> input, ResultFuture<Tuple5<String, String, String, Integer, Tuple5<String, String, String, Integer, Integer> output = new Tuple5<>(); output.f0=input.f0; output.f1=input.f1; output.f2=input.f2; output.f3=input.f3; // 2. Query Key String dimKey = input.f0; // create Ids Query SearchRequest SearchRequest = new SearchRequest(); searchRequest.indices(index); searchRequest.types(type); searchRequest.source(SearchSourceBuilder.searchSource().query(QueryBuilders.idsQuery().addIds(dimKey))); / / 4, with asynchronous client query data restHighLevelClient. SearchAsync (searchRequest, New ActionListener<SearchResponse>() {Override public void onResponse(SearchResponse SearchResponse) { SearchHit[] searchHits = searchResponse.getHits().getHits(); if(searchHits.length >0 ){ JSONObject obj = JSON.parseObject(searchHits[0].getSourceAsString()); Integer dimValue=obj.getInteger("age"); output.f4=dimValue; cache.put(dimKey,dimValue); System.out.println(" Put dimension data into cache: key="+dimKey+",value="+dimValue); } resultFuture.complete(Collections.singleton(output)); } @override public void onFailure(Exception e) {output.f4=null; resultFuture.complete(Collections.singleton(output)); }}); @override public void timeout(Tuple4<String, String, String, Integer> input, ResultFuture<Tuple5<String, String, String, Integer, Integer>> resultFuture) { searchFromES(input,resultFuture); }}Copy the code
Flink SQL implements join of flow table and dimension table
Flow tables are data in message queues like Kafka. Dimension tables are static data like in MySQL.
To join two tables using Flink SQL, a static dimension table acts like a dictionary table. The stream table comes in with a stream of data, and then queries the dictionary in memory to update the stream table and insert it into the MySQL database.
Rely on
<groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.version}</artifactId> <version>${flink.version}</version> <! -- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.version}</artifactId> <version>${flink.version}</version> <! -- <scope>provided</scope>--> </dependency>Copy the code
implementation
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.*; @author zuoziwen/public class JoinDemo {private static String dimTable = "CREATE TABLE dimTable (\n" + " id int,\n" + " user_name STRING,\n" + " age INT,\n" + " gender STRING,\n" + " PRIMARY KEY (id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector'='jdbc',\n" + " 'username'='root',\n" + " 'password'='root',\n" + " 'url'='jdbc:mysql://localhost:3306/aspirin',\n" + " 'table-name'='user_data_for_join'\n" + ")"; private static String kafkaTable = "CREATE TABLE KafkaTable (\n" + " `user` STRING,\n" + " `site` STRING,\n" + " `time` STRING\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'test-old',\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'format' = 'json'\n" + ")"; private static String wideTable = "CREATE TABLE wideTable (\n" + " id int,\n" + " site STRING,\n" + " user_name STRING,\n" + " age INT,\n" + " ts STRING,\n" + " PRIMARY KEY (id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector'='jdbc',\n" + " 'username'='root',\n" + " 'password'='root',\n" + " 'url'='jdbc:mysql://localhost:3306/aspirin',\n" + " 'table-name'='wide_table'\n" + ")"; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env); tableEnvironment.executeSql(dimTable); tableEnvironment.executeSql(kafkaTable); tableEnvironment.executeSql(wideTable); Table mysqlTable = tableEnvironment.from("dimTable").select("id, user_name, age, gender"); Table kafkaTable = tableEnvironment.from("KafkaTable").select($("user"), $("site"), $("time")); String joinSql = "insert into wideTable " + " select " + " dimTable.id as `id`, " + " t.site as site, " + " dimTable.user_name as user_name, " + " dimTable.age as age, " + " t.`time` as ts " + "from KafkaTable as t " + "left join dimTable on dimTable.user_name = t.`user`"; tableEnvironment.executeSql(joinSql); env.execute(); }}Copy the code
Refer to Kangaroo Cloud’s open source flinkStreamSQL:
Github.com/DTStack/fli… Based on open source Flink, real time SQL is extended; Kafka dimension table: mysql, SQlServer, Oracle,hbase, Mongo, Redis, Cassandra Result table: Mysql, SQlServer,oracle,hbase, elasticsearch5.x, mongo, redis, Cassandra
The core is
支那
AsyncReqRow asyncDbReq = loadAsyncReq(sideType, sqlRootDir, rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
//TODO How much should be set for the degree of parallelism? Timeout? capacity settings?
if (ORDERED.equals(sideTableInfo.getCacheMode())){
return AsyncDataStream.orderedWait(inputStream, asyncDbReq, sideTableInfo.getAsyncTimeout(), TimeUnit.MILLISECONDS, sideTableInfo.getAsyncCapacity())
.setParallelism(sideTableInfo.getParallelism());
}else {
return AsyncDataStream.unorderedWait(inputStream, asyncDbReq, sideTableInfo.getAsyncTimeout(), TimeUnit.MILLISECONDS, sideTableInfo.getAsyncCapacity())
.setParallelism(sideTableInfo.getParallelism());
}
Copy the code
InputStream is our stream table loadAsyncReq is returning a RichAsyncFunction, Public Abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> query data from a dimension table
The implementation consists of two steps:
Flink API to achieve the function of dimension table
The API corresponding to Flink is the abstract class RichAsyncFunction, which implements the open (initialization), asyncInvoke (data asynchronous call), close (stop some operation) methods. The main thing is to implement the methods inside asyncInvoke.
A join between a stream and a dimension table encounters two problems:
The first is performance. If the flow rate is fast, each piece of data needs to be joined to the dimension table. However, data in the dimension table exists in a third-party storage system. If the third-party storage system is accessed in real time, the join performance will be poor and network I/OS must be used every time. In addition, the third-party storage system will be under great pressure and may fail.
Therefore, the solution is to cache the data in the dimension table, which can be fully cached. This is mainly the case of small data in the dimension table, and LRU cache, which is the case of large data in the dimension table.
The SQL syntax for parsing streams and dimension table joins is translated into the underlying FlinkAPI
Since FlinkSQL already does most of the SQL scenarios, it is impossible to parse all the SYNTAX of SQL and convert it to the underlying FlinkAPI.
So what we do is to parse the SQL syntax to find whether there is a dimension table in the join table. If there is a dimension table, we will separate the statement of the join dimension table and use Flink TableAPI and StreamAPi to generate new DataStream. Join the DataStream with other tables so that you can use SQL to implement the join syntax of flow and dimension tables.
The tool for SQL parsing is Apache Calcite, and Flink also uses this framework for SQL parsing. So all syntax is parsable.