sequence
This paper mainly studies the iterate operation of Flink DataStream
The instance
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*dosomething*/); DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception {returnvalue > 0; }}); iteration.closeWith(feedback); DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception {returnvalue <= 0; }});Copy the code
- This example shows some basic uses of IterativeStream, using iterate to create IterativeStream and using IterativeStream’s closeWith method to close the feedbackStream
DataStream.iterate
Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/DataStream.java
@Public
public class DataStream<T> {
//......
@PublicEvolving
public IterativeStream<T> iterate() {
return new IterativeStream<>(this, 0);
}
@PublicEvolving
public IterativeStream<T> iterate(long maxWaitTimeMillis) {
returnnew IterativeStream<>(this, maxWaitTimeMillis); } / /... }Copy the code
- DataStream provides two iterate methods that create and return IterativeStream, the iterate method with no arguments has a maxWaitTimeMillis of 0
IterativeStream
Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/datastream/IterativeStream.java
@PublicEvolving public class IterativeStream<T> extends SingleOutputStreamOperator<T> { // We store these so that we can create a co-iterationif we need to
private DataStream<T> originalInput;
private long maxWaitTime;
protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) {
super(dataStream.getExecutionEnvironment(),
new FeedbackTransformation<>(dataStream.getTransformation(), maxWaitTime));
this.originalInput = dataStream;
this.maxWaitTime = maxWaitTime;
setBufferTimeout(dataStream.environment.getBufferTimeout());
}
@SuppressWarnings({ "unchecked"."rawtypes"}) public DataStream<T> closeWith(DataStream<T> feedbackStream) { Collection<StreamTransformation<? >> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();if(! predecessors.contains(this.transformation)) { throw new UnsupportedOperationException("Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
}
((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());
return feedbackStream;
}
public <F> ConnectedIterativeStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
return withFeedbackType(TypeInformation.of(feedbackTypeClass));
}
public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeHint<F> feedbackTypeHint) {
return withFeedbackType(TypeInformation.of(feedbackTypeHint));
}
public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeInformation<F> feedbackType) {
return new ConnectedIterativeStreams<>(originalInput, feedbackType, maxWaitTime);
}
@Public
public static class ConnectedIterativeStreams<I, F> extends ConnectedStreams<I, F> {
private CoFeedbackTransformation<F> coFeedbackTransformation;
public ConnectedIterativeStreams(DataStream<I> input,
TypeInformation<F> feedbackType,
long waitTime) {
super(input.getExecutionEnvironment(),
input,
new DataStream<>(input.getExecutionEnvironment(),
new CoFeedbackTransformation<>(input.getParallelism(),
feedbackType,
waitTime))); this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecondInput().getTransformation(); } public DataStream<F> closeWith(DataStream<F> feedbackStream) { Collection<StreamTransformation<? >> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();if(! predecessors.contains(this.coFeedbackTransformation)) { throw new UnsupportedOperationException("Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
}
coFeedbackTransformation.addFeedbackEdge(feedbackStream.getTransformation());
return feedbackStream;
}
private UnsupportedOperationException groupingException =
new UnsupportedOperationException("Cannot change the input partitioning of an" +
"iteration head directly. Apply the partitioning on the input and" +
"feedback streams instead.");
@Override
public ConnectedStreams<I, F> keyBy(int[] keyPositions1, int[] keyPositions2) {
throw groupingException;
}
@Override
public ConnectedStreams<I, F> keyBy(String field1, String field2) {
throw groupingException;
}
@Override
public ConnectedStreams<I, F> keyBy(String[] fields1, String[] fields2) {
throw groupingException;
}
@Override
public ConnectedStreams<I, F> keyBy(KeySelector<I, ?> keySelector1, KeySelector<F, ?> keySelector2) {
throw groupingException;
}
@Override
public <KEY> ConnectedStreams<I, F> keyBy(KeySelector<I, KEY> keySelector1, KeySelector<F, KEY> keySelector2, TypeInformation<KEY> keyType) {
throw groupingException;
}
}
}
Copy the code
- IterativeStream inherited SingleOutputStreamOperator, its constructor receives two parameters, one is originalInput, one is maxWaitTime; Based on dataStream. GetTransformation () and create FeedbackTransformation maxWaitTime; Constructor and at the same time according to dataStream. Environment. GetBufferTimeout bufferTimeout () to set the transformation parameters
- IterativeStream provides two main methods. One is the closeWith method, which is used for close iteration. This method is used to define the part of iteration that will be fed into the iteration header.
The elements of the filter are re-entered into the IterativeStream header to continue the operation
); Founded ConnectedIterativeStreams withFeedbackType method - ConnectedIterativeStreams inherited ConnectedStreams, it allows to be the type of the type of iteration and originalInput feedback is not the same, it also defines the closeWith method, But it covers ConnectedStreams keyBy method, throw UnsupportedOperationException anomalies
FeedbackTransformation
Flink – streaming – java_2. 11-1.7.0 – sources. The jar! /org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
@Internal public class FeedbackTransformation<T> extends StreamTransformation<T> { private final StreamTransformation<T> input; private final List<StreamTransformation<T>> feedbackEdges; private final LongwaitTime;
public FeedbackTransformation(StreamTransformation<T> input, Long waitTime) {
super("Feedback", input.getOutputType(), input.getParallelism());
this.input = input;
this.waitTime = waitTime;
this.feedbackEdges = Lists.newArrayList();
}
public StreamTransformation<T> getInput() {
return input;
}
public void addFeedbackEdge(StreamTransformation<T> transform) {
if(transform.getParallelism() ! = this.getParallelism()) { throw new UnsupportedOperationException("Parallelism of the feedback stream must match the parallelism of the original" +
" stream. Parallelism of original stream: " + this.getParallelism() +
"; parallelism of feedback stream: " + transform.getParallelism() +
". Parallelism can be modified using DataStream#setParallelism() method");
}
feedbackEdges.add(transform);
}
public List<StreamTransformation<T>> getFeedbackEdges() {
return feedbackEdges;
}
public Long getWaitTime() {
return waitTime;
}
@Override
public final void setChainingStrategy(ChainingStrategy strategy) {
throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation."); } @Override public Collection<StreamTransformation<? >>getTransitivePredecessors() { List<StreamTransformation<? >> result = Lists.newArrayList(); result.add(this); result.addAll(input.getTransitivePredecessors());returnresult; }}Copy the code
- FeedbackTransformation inherits StreamTransformation with properties such as feedbackEdges, waitTime, and so on
- The addFeedbackEdge method is used to add a feedback edge, and IterativeStream’s closeWith method calls addFeedbackEdge to add a StreamTransformation
- WaitTime specifies the length of time that the feedback operator will wait for feedback elements. Once waitTime expires, the operation will close and no new feedback elements will be accepted
summary
- DataStream provides two iterate methods that create and return IterativeStream, the iterate method with no arguments has a maxWaitTimeMillis of 0
- IterativeStream’s constructor takes two arguments, originalInput and maxWaitTime. Based on dataStream. GetTransformation () and create FeedbackTransformation maxWaitTime; Constructor and at the same time according to dataStream. Environment. GetBufferTimeout bufferTimeout () to set the transformation parameters; FeedbackTransformation inherits StreamTransformation with properties like feedbackEdges, waitTime, and so on, WaitTime specifies the length of time that the feedback operator will wait for feedback elements. Once waitTime expires, the operation will close and no new feedback elements will be accepted
- IterativeStream inherited SingleOutputStreamOperator, it mainly provides two methods, one is closeWith method, is used to close the iteration. It is mainly used to define the section iteration that is to be fed into the Iteration header; Created ConnectedIterativeStreams withFeedbackType method, ConnectedIterativeStreams inherited ConnectedStreams, It allows iteration to be fed to a different type than originalInput. It also defines the closeWith method, but it overwrites the keyBy method of ConnectedStreams. Throw an UnsupportedOperationException abnormal
doc
- DataStream Transformations