Welcome to my GitHub
Github.com/zq2599/blog…
Content: all original article classification summary and supporting source code, involving Java, Docker, Kubernetes, DevOPS, etc.;
This paper gives an overview of
- This paper is the second part of “CoProcessFunction combat trilogy”. What we want to combat is the dual-stream connection scenario. When processing the data in stream 1, we should combine the key in stream 2.
- The simplest example is: the value of AAA in stream 1 is added to the value of stream 2 and then output to the downstream, as shown in the figure below. The value in stream 1 is stored in state, taken out and added in stream 2, and the result is output to the downstream:
- The content of this article is to code the function shown above;
Refer to the article
Understanding states: “In-depth understanding of State Operations for ProcessFunction (Flink-1.10)”
Download the source code
If you don’t want to write code, the entire series of source codes can be downloaded at GitHub, with the following address and link information (github.com/zq2599/blog…
The name of the | link | note |
---|---|---|
Project home page | Github.com/zq2599/blog… | The project’s home page on GitHub |
Git repository address (HTTPS) | Github.com/zq2599/blog… | The project source warehouse address, HTTPS protocol |
Git repository address (SSH) | [email protected]:zq2599/blog_demos.git | The project source warehouse address, SSH protocol |
The Git project has multiple folders. This chapter will be used in the FlinkStudy folder, as shown in the red box below:
coding
- String turn Tuple2 Map function, as well as the abstract class AbstractCoProcessFunctionExecutor are and the previous “CoProcessFunction combat trilogy: one of the basic functions the same;
- New AbstractCoProcessFunctionExecutor subclasses of AddTwoSourceValue. Java source code is as follows, later will show a few key points:
package com.bolingcavalry.coprocessfunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/ * * *@author will
* @email [email protected]
* @dateThe 2020-11-11 09:48 *@descriptionFunctions */
public class AddTwoSourceValue extends AbstractCoProcessFunctionExecutor {
private static final Logger logger = LoggerFactory.getLogger(AddTwoSourceValue.class);
@Override
protected CoProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> getCoProcessFunctionInstance() {
return new CoProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
// The state of a key stored in processElement1
private ValueState<Integer> state1;
// The state of a key stored in processElement2
private ValueState<Integer> state2;
@Override
public void open(Configuration parameters) throws Exception {
// Initialization state
state1 = getRuntimeContext().getState(new ValueStateDescriptor<>("myState1", Integer.class));
state2 = getRuntimeContext().getState(new ValueStateDescriptor<>("myState2", Integer.class));
}
@Override
public void processElement1(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
logger.info("Processing element 1: {}", value);
String key = value.f0;
Integer value2 = state2.value();
// value2 is empty, which means processElement2 has not yet processed or the key,
// Save value1
if(null==value2) {
logger.info("Stream 2 has not received [{}], save the value [{}] received by stream 1", key, value.f1);
state1.update(value.f1);
} else {
logger.info("Stream 2 received [{}] with the value [{}], now add the two values and print", key, value2);
// Outputs a new element to the downstream node
out.collect(new Tuple2<>(key, value.f1 + value2));
// Clear the state of stream 2state2.clear(); }}@Override
public void processElement2(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
logger.info("Processing element 2: {}", value);
String key = value.f0;
Integer value1 = state1.value();
// Value1 is empty, which means processElement1 has not yet processed or the key,
// Save value2
if(null==value1) {
logger.info("Stream 1 has not received [{}], save the value [{}] received by stream 2", key, value.f1);
state2.update(value.f1);
} else {
logger.info("Stream 1 received [{}] with the value [{}], now add the two values and print", key, value1);
// Outputs a new element to the downstream node
out.collect(new Tuple2<>(key, value.f1 + value1));
// Clear the state of stream 1state1.clear(); }}}; }public static void main(String[] args) throws Exception {
newAddTwoSourceValue().execute(); }}Copy the code
- One key point: For the AAA key, it is not certain whether the aaa key will come first from the primary or secondary source. If the aaa key comes first from the primary source, you should save the value in processElement1 in state1, so that when aaa appears again in the secondary source, ProcessElement2 takes the value of the number one source from state1, adds it up and outputs it downstream;
- Key point 2: If the output goes downstream, it indicates that the data has been processed, and the saved state should be cleaned up.
- For more details on state access in lower-order functions, see In Depth on State Operations for ProcessFunction (Flink-1.10)
validation
- Open ports 9998 and 9999 of the machine respectively. I have a MacBook here, and execute NC-L 9998 and NC-L 9999
- Start the Flink app, and if you’re a Mac like me, just run the AddTwoSourceValue. Main method (if you’re on Windows, I haven’t tried this, but you can deploy it online as a JAR).
- Flink console: aaa,111, flink console: aaa,111, flink console: aaa,111, flink console: aaa,111
22:35:12135 INFO AddTwoSourceValue - Processing element 1: (AAA,111) 22:35:12136 INFO AddtwoSourcevalue-2 has not received [AAA], save the value [111] received by stream 1Copy the code
- In the console listening on port 9999, type BBB,123, flink log as shown below, indicating BBB is also present for the first time, save the value in state:
22:35:34.473 INFO AddTwoSourceValue - Processing element 2: (BBB,123) 22:35:34473 INFO AddtwoSourcevalue-1 stream has not received [BBB], save the value [123] received by stream 2Copy the code
- In the console listening on port 9999 type aaa,222, and the flink log is as follows. It is clear that the values previously stored in state have been extracted, so in the processElement2 method, the aaa values 111 and 222 in the two data sources are added and printed downstream. Print it out directly:
22:35:38.072 INFO AddTwoSourceValue - Processing element 2: (AAA,222) 22:35:38,072 INFO AddtwoSourcevalue-1 stream received [AAA] with the value of [111], now add the two values and output (AAA,333)Copy the code
- So far, we have completed the practice of state interworking in dual-stream scenario. In the following article, timer and bypass output will be added to consider the data processing in dual-stream scenario more comprehensively.
You are not alone, Xinchen original accompany all the way
- Java series
- Spring series
- The Docker series
- Kubernetes series
- Database + middleware series
- The conversation series
Welcome to pay attention to the public number: programmer Xin Chen
Wechat search “programmer Xin Chen”, I am Xin Chen, looking forward to enjoying the Java world with you…
Github.com/zq2599/blog…