sequence
This article focuses on The Parallel Execution of Flink
The instance
Operator Level
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = [...] DataStream<Tuple2<String, Integer>> wordCounts = text .flatMap(new LineSplitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1).setParallelism(5); wordCounts.print(); env.execute("Word Count Example");
Copy the code
- Parallelism can be set by calling setParallelism(), operators, Data Sources, and Data sinks
Execution Environment Level
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); DataStream<String> text = [...] DataStream<Tuple2<String, Integer>> wordCounts = [...] wordCounts.print(); env.execute("Word Count Example");
Copy the code
- In ExecutionEnvironment, setParallelism can be used to set default parallelism for Operators, Data Sources and Data sinks. If operators, Data Sources, and Data Sinks themselves have set Parallelism then parallelism of ExecutionEnvironment Settings is overridden
Client Level
./bin/flink run -p 10 .. /examples/*WordCount-java*.jarCopy the code
or
try {
PackagedProgram program = new PackagedProgram(file, args);
InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
Configuration config = new Configuration();
Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());
// set the parallelism to 10 here
client.run(program, 10, true);
} catch (ProgramInvocationException e) {
e.printStackTrace();
}
Copy the code
- With the CLI Client, you can specify -p for command line calls or parallelism in arguments to client.run for Java/Scala calls
System Level
# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1
Copy the code
- You can specify system-level default Parallelism for all execution environments through the parallelism. Default configuration item in Flink-conf.yaml
ExecutionEnvironment
Flink – Java – 1.7.1 – sources jar! /org/apache/flink/api/java/ExecutionEnvironment.java
@Public
public abstract class ExecutionEnvironment {
//......
private final ExecutionConfig config = new ExecutionConfig();
/**
* Sets the parallelism for operations executed through this environment.
* Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with
* x parallel instances.
*
* <p>This method overrides the default parallelism forthis environment. * The {@link LocalEnvironment} uses by default a value equal to the number of hardware * contexts (CPU cores / threads). When executing the program via thecommand line client
* from a JAR file, the default parallelism is the one configured for that setup.
*
* @param parallelism The parallelism
*/
public void setParallelism(int parallelism) {
config.setParallelism(parallelism);
}
@Internal
public Plan createProgramPlan(String jobName, boolean clearSinks) {
if (this.sinks.isEmpty()) {
if (wasExecuted) {
throw new RuntimeException("No new data sinks have been defined since the " +
"last execution. The last execution refers to the latest call to " +
"'execute()', 'count()', 'collect()', or 'print()'.");
} else {
throw new RuntimeException("No data sinks have been created yet. " +
"A program needs at least one sink that consumes data. " +
"Examples are writing the data set or printing it."); }}if (jobName == null) {
jobName = getDefaultName();
}
OperatorTranslation translator = new OperatorTranslation();
Plan plan = translator.translateToPlan(this.sinks, jobName);
if (getParallelism() > 0) {
plan.setDefaultParallelism(getParallelism());
}
plan.setExecutionConfig(getConfig());
// Check plan for GenericTypeInfo's and register the types at the serializers. if (! config.isAutoTypeRegistrationDisabled()) { plan.accept(new Visitor
>() { private final Set
> registeredTypes = new HashSet<>(); private final Set
> visitedOperators = new HashSet<>(); @Override public boolean preVisit(org.apache.flink.api.common.operators.Operator
visitable) { if (! visitedOperators.add(visitable)) { return false; } OperatorInformation
opInfo = visitable.getOperatorInfo(); Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, registeredTypes); return true; } @Override public void postVisit(org.apache.flink.api.common.operators.Operator
visitable) {} }); } try { registerCachedFilesWithPlan(plan); } catch (Exception e) { throw new RuntimeException("Error while registering cached files: " + e.getMessage(), e); } // clear all the sinks such that the next execution does not redo everything if (clearSinks) { this.sinks.clear(); wasExecuted = true; } // All types are registered now. Print information. int registeredTypes = config.getRegisteredKryoTypes().size() + config.getRegisteredPojoTypes().size() + config.getRegisteredTypesWithKryoSerializerClasses().size() + config.getRegisteredTypesWithKryoSerializers().size(); int defaultKryoSerializers = config.getDefaultKryoSerializers().size() + config.getDefaultKryoSerializerClasses().size(); LOG.info("The job has {} registered types and {} default Kryo serializers", registeredTypes, defaultKryoSerializers); if (config.isForceKryoEnabled() && config.isForceAvroEnabled()) { LOG.warn("In the ExecutionConfig, both Avro and Kryo are enforced. Using Kryo serializer"); } if (config.isForceKryoEnabled()) { LOG.info("Using KryoSerializer for serializing POJOs"); } if (config.isForceAvroEnabled()) { LOG.info("Using AvroSerializer for serializing POJOs"); } if (LOG.isDebugEnabled()) { LOG.debug("Registered Kryo types: {}", config.getRegisteredKryoTypes().toString()); LOG.debug("Registered Kryo with Serializers types: {}", config.getRegisteredTypesWithKryoSerializers().entrySet().toString()); LOG.debug("Registered Kryo with Serializer Classes types: {}", config.getRegisteredTypesWithKryoSerializerClasses().entrySet().toString()); LOG.debug("Registered Kryo default Serializers: {}", config.getDefaultKryoSerializers().entrySet().toString()); LOG.debug("Registered Kryo default Serializers Classes {}", config.getDefaultKryoSerializerClasses().entrySet().toString()); LOG.debug("Registered POJO types: {}", config.getRegisteredPojoTypes().toString()); // print information about static code analysis LOG.debug("Static code analysis mode: {}", config.getCodeAnalysisMode()); } return plan; } / /... }
>
>
>Copy the code
- ExecutionEnvironment provides the setParallelism method to specify parallelism for ExecutionConfig. Finally, the createProgramPlan method reads ExecutionConfig Parallelism and sets defaultParallelism for the Plan
LocalEnvironment
Flink – Java – 1.7.1 – sources jar! /org/apache/flink/api/java/LocalEnvironment.java
@Public
public class LocalEnvironment extends ExecutionEnvironment {
//......
public JobExecutionResult execute(String jobName) throws Exception {
if (executor == null) {
startNewSession();
}
Plan p = createProgramPlan(jobName);
// Session management is disabled, revert this commit to enable
//p.setJobId(jobID);
//p.setSessionTimeout(sessionTimeout);
JobExecutionResult result = executor.executePlan(p);
this.lastJobExecutionResult = result;
returnresult; } / /... }Copy the code
- Execute of LocalEnvironment calls executePlan of LocalExecutor
LocalExecutor
Flink – clients_2. 11-1.7.1 – sources. The jar! /org/apache/flink/client/LocalExecutor.java
public class LocalExecutor extends PlanExecutor {
//......
@Override
public JobExecutionResult executePlan(Plan plan) throws Exception {
if (plan == null) {
throw new IllegalArgumentException("The plan may not be null.");
}
synchronized (this.lock) {
// check if we start a session dedicated for this execution
final boolean shutDownAtEnd;
if (jobExecutorService == null) {
shutDownAtEnd = true;
// configure the number of local slots equal to the parallelism of the local plan
if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
int maxParallelism = plan.getMaximumParallelism();
if (maxParallelism > 0) {
this.taskManagerNumSlots = maxParallelism;
}
}
// start the cluster for us
start();
}
else {
// we use the existing session
shutDownAtEnd = false;
}
try {
// TODO: Set job's default parallelism to max number of slots final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots); final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers); Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration); OptimizedPlan op = pc.compile(plan); JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration); JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId()); return jobExecutorService.executeJobBlocking(jobGraph); } finally { if (shutDownAtEnd) { stop(); }}}} //...... }Copy the code
- LocalExecutor’s executePlan method also sets defaultParallelism for the plan based on slotsPerTaskManager and numTaskManagers
RemoteEnvironment
Flink – Java – 1.7.1 – sources jar! /org/apache/flink/api/java/RemoteEnvironment.java
@Public
public class RemoteEnvironment extends ExecutionEnvironment {
//......
public JobExecutionResult execute(String jobName) throws Exception {
PlanExecutor executor = getExecutor();
Plan p = createProgramPlan(jobName);
// Session management is disabled, revert this commit to enable
//p.setJobId(jobID);
//p.setSessionTimeout(sessionTimeout);
JobExecutionResult result = executor.executePlan(p);
this.lastJobExecutionResult = result;
returnresult; } / /... }Copy the code
- Execute of RemoteEnvironment calls the executePlan of RemoteExecutor
RemoteExecutor
Flink – clients_2. 11-1.7.1 – sources. The jar! /org/apache/flink/client/RemoteExecutor.java
public class RemoteExecutor extends PlanExecutor { private final Object lock = new Object(); private final List<URL> jarFiles; private final List<URL> globalClasspaths; private final Configuration clientConfiguration; private ClusterClient<? > client; / /... @Override public JobExecutionResult executePlan(Plan plan) throws Exception {if (plan == null) {
throw new IllegalArgumentException("The plan may not be null.");
}
JobWithJars p = new JobWithJars(plan, this.jarFiles, this.globalClasspaths);
return executePlanWithJars(p);
}
public JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception {
if (program == null) {
throw new IllegalArgumentException("The job may not be null.");
}
synchronized (this.lock) {
// check if we start a session dedicated for this execution
final boolean shutDownAtEnd;
if (client == null) {
shutDownAtEnd = true;
// start the executor for us
start();
}
else {
// we use the existing session
shutDownAtEnd = false;
}
try {
return client.run(program, defaultParallelism).getJobExecutionResult();
}
finally {
if(shutDownAtEnd) { stop(); }}}} //...... }Copy the code
- RemoteExecutor’s executePlan calls the executePlanWithJars method, which in turn calls ClusterClient’s Run and specifies defaultParallelism as an argument
ClusterClient
Flink – clients_2. 11-1.7.1 – sources. The jar! /org/apache/flink/client/program/ClusterClient.java
public abstract class ClusterClient<T> {
//......
public JobSubmissionResult run(JobWithJars program, int parallelism) throws ProgramInvocationException {
return run(program, parallelism, SavepointRestoreSettings.none());
}
public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings)
throws CompilerException, ProgramInvocationException {
ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
if (classLoader == null) {
throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
}
OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
}
private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism)
throws CompilerException, ProgramInvocationException {
return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
}
public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
Logger log = LoggerFactory.getLogger(ClusterClient.class);
if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
log.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
p.setDefaultParallelism(parallelism);
}
log.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());
returncompiler.compile(p); } / /... }Copy the code
- Parallelism in ClusterClient’s Run method applies to the Plan when parallelism > 0 and when p.getDefaultParallelism() <= 0
DataStreamSource
Flink – streaming – java_2. 11-1.7.1 – sources. The jar! /org/apache/flink/streaming/api/datastream/DataStreamSource.java
@Public public class DataStreamSource<T> extends SingleOutputStreamOperator<T> { boolean isParallel; public DataStreamSource(StreamExecutionEnvironment environment, TypeInformation<T> outTypeInfo, StreamSource<T, ? > operator, boolean isParallel, StringsourceName) {
super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));
this.isParallel = isParallel;
if(! isParallel) {setParallelism(1);
}
}
public DataStreamSource(SingleOutputStreamOperator<T> operator) {
super(operator.environment, operator.getTransformation());
this.isParallel = true;
}
@Override
public DataStreamSource<T> setParallelism(int parallelism) {
if(parallelism ! = 1 &&! isParallel) { throw new IllegalArgumentException("Source: " + transformation.getId() + " is not a parallel source");
} else {
super.setParallelism(parallelism);
returnthis; }}}Copy the code
- DataStreamSource inherited SingleOutputStreamOperator, it provides setParallelism method, is a superclass SingleOutputStreamOperator setParallelism final call
SingleOutputStreamOperator
Flink – streaming – java_2. 11-1.7.1 – sources. The jar! /org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@Public
public class SingleOutputStreamOperator<T> extends DataStream<T> {
//......
/**
* Sets the parallelism for this operator.
*
* @param parallelism
* The parallelism for this operator.
* @return The operator with set parallelism.
*/
public SingleOutputStreamOperator<T> setParallelism(int parallelism) {
Preconditions.checkArgument(canBeParallel() || parallelism == 1,
"The parallelism of non parallel operator must be 1.");
transformation.setParallelism(parallelism);
returnthis; } / /... }Copy the code
- SingleOutputStreamOperator setParallelism finally is to StreamTransformation role
DataStreamSink
Flink – streaming – java_2. 11-1.7.1 – sources. The jar! /org/apache/flink/streaming/api/datastream/DataStreamSink.java
@Public public class DataStreamSink<T> { private final SinkTransformation<T> transformation; / /... /** * Sets the parallelismfor this sink. The degree must be higher than zero.
*
* @param parallelism The parallelism for this sink.
* @return The sink with set parallelism.
*/
public DataStreamSink<T> setParallelism(int parallelism) {
transformation.setParallelism(parallelism);
returnthis; } / /... }Copy the code
- DataStreamSink provides the setParallelism method, and finally the SinkTransformation method
summary
- Flink allows you to set several levels of parallelism, including Operator level, Execution Environment level, Client level, and System level
- In flink-conf.yaml, specify system-level default Parallelism for all execution environments through the parallelism. In ExecutionEnvironment, setParallelism can be used to set default parallelism for Operators, Data Sources and Data sinks. If operators, Data Sources, and Data Sinks themselves have set Parallelism then parallelism of ExecutionEnvironment Settings is overridden
- The setParallelism method provided by ExecutionEnvironment is used to specify parallelism for ExecutionConfig (
If you use the CLI client, you can specify -p for command-line calls or parallelism in arguments to client.run for Java/Scala calls. Parallelism for LocalEnvironment and RemoteEnvironment Settings is finally set in the Plan
); DataStreamSource inherited SingleOutputStreamOperator, it provides setParallelism method, finally calling the superclass SingleOutputStreamOperator setParallelism; SingleOutputStreamOperator setParallelism finally effect to StreamTransformation; DataStreamSink provides the setParallelism method, and finally the SinkTransformation method
doc
- Parallel Execution