sequence

This article focuses on The Parallel Execution of Flink

The instance

Operator Level

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = [...]  DataStream<Tuple2<String, Integer>> wordCounts = text .flatMap(new LineSplitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1).setParallelism(5); wordCounts.print(); env.execute("Word Count Example");
Copy the code
  • Parallelism can be set by calling setParallelism(), operators, Data Sources, and Data sinks

Execution Environment Level

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); DataStream<String> text = [...]  DataStream<Tuple2<String, Integer>> wordCounts = [...]  wordCounts.print(); env.execute("Word Count Example");
Copy the code
  • In ExecutionEnvironment, setParallelism can be used to set default parallelism for Operators, Data Sources and Data sinks. If operators, Data Sources, and Data Sinks themselves have set Parallelism then parallelism of ExecutionEnvironment Settings is overridden

Client Level

./bin/flink run -p 10 .. /examples/*WordCount-java*.jarCopy the code

or

try {
    PackagedProgram program = new PackagedProgram(file, args);
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
    Configuration config = new Configuration();

    Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

    // set the parallelism to 10 here
    client.run(program, 10, true);

} catch (ProgramInvocationException e) {
    e.printStackTrace();
}
Copy the code
  • With the CLI Client, you can specify -p for command line calls or parallelism in arguments to client.run for Java/Scala calls

System Level

# The parallelism used for programs that did not specify and other parallelism.

parallelism.default: 1
Copy the code
  • You can specify system-level default Parallelism for all execution environments through the parallelism. Default configuration item in Flink-conf.yaml

ExecutionEnvironment

Flink – Java – 1.7.1 – sources jar! /org/apache/flink/api/java/ExecutionEnvironment.java

@Public
public abstract class ExecutionEnvironment {
	//......

	private final ExecutionConfig config = new ExecutionConfig();

	/**
	 * Sets the parallelism for operations executed through this environment.
	 * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with
	 * x parallel instances.
	 *
	 * <p>This method overrides the default parallelism forthis environment. * The {@link LocalEnvironment} uses by default a value equal to the number of hardware * contexts (CPU  cores / threads). When executing the program via thecommand line client
	 * from a JAR file, the default parallelism is the one configured for that setup.
	 *
	 * @param parallelism The parallelism
	 */
	public void setParallelism(int parallelism) {
		config.setParallelism(parallelism);
	}

	@Internal
	public Plan createProgramPlan(String jobName, boolean clearSinks) {
		if (this.sinks.isEmpty()) {
			if (wasExecuted) {
				throw new RuntimeException("No new data sinks have been defined since the " +
						"last execution. The last execution refers to the latest call to " +
						"'execute()', 'count()', 'collect()', or 'print()'.");
			} else {
				throw new RuntimeException("No data sinks have been created yet. " +
						"A program needs at least one sink that consumes data. " +
						"Examples are writing the data set or printing it."); }}if (jobName == null) {
			jobName = getDefaultName();
		}

		OperatorTranslation translator = new OperatorTranslation();
		Plan plan = translator.translateToPlan(this.sinks, jobName);

		if (getParallelism() > 0) {
			plan.setDefaultParallelism(getParallelism());
		}
		plan.setExecutionConfig(getConfig());

		// Check plan for GenericTypeInfo's and register the types at the serializers. if (! config.isAutoTypeRegistrationDisabled()) { plan.accept(new Visitor
      
       >() { private final Set
       
        > registeredTypes = new HashSet<>(); private final Set
        
         > visitedOperators = new HashSet<>(); @Override public boolean preVisit(org.apache.flink.api.common.operators.Operator
          visitable) { if (! visitedOperators.add(visitable)) { return false; } OperatorInformation
          opInfo = visitable.getOperatorInfo(); Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, registeredTypes); return true; } @Override public void postVisit(org.apache.flink.api.common.operators.Operator
          visitable) {} }); } try { registerCachedFilesWithPlan(plan); } catch (Exception e) { throw new RuntimeException("Error while registering cached files: " + e.getMessage(), e); } // clear all the sinks such that the next execution does not redo everything if (clearSinks) { this.sinks.clear(); wasExecuted = true; } // All types are registered now. Print information. int registeredTypes = config.getRegisteredKryoTypes().size() + config.getRegisteredPojoTypes().size() + config.getRegisteredTypesWithKryoSerializerClasses().size() + config.getRegisteredTypesWithKryoSerializers().size(); int defaultKryoSerializers = config.getDefaultKryoSerializers().size() + config.getDefaultKryoSerializerClasses().size(); LOG.info("The job has {} registered types and {} default Kryo serializers", registeredTypes, defaultKryoSerializers); if (config.isForceKryoEnabled() && config.isForceAvroEnabled()) { LOG.warn("In the ExecutionConfig, both Avro and Kryo are enforced. Using Kryo serializer"); } if (config.isForceKryoEnabled()) { LOG.info("Using KryoSerializer for serializing POJOs"); } if (config.isForceAvroEnabled()) { LOG.info("Using AvroSerializer for serializing POJOs"); } if (LOG.isDebugEnabled()) { LOG.debug("Registered Kryo types: {}", config.getRegisteredKryoTypes().toString()); LOG.debug("Registered Kryo with Serializers types: {}", config.getRegisteredTypesWithKryoSerializers().entrySet().toString()); LOG.debug("Registered Kryo with Serializer Classes types: {}", config.getRegisteredTypesWithKryoSerializerClasses().entrySet().toString()); LOG.debug("Registered Kryo default Serializers: {}", config.getDefaultKryoSerializers().entrySet().toString()); LOG.debug("Registered Kryo default Serializers Classes {}", config.getDefaultKryoSerializerClasses().entrySet().toString()); LOG.debug("Registered POJO types: {}", config.getRegisteredPojoTypes().toString()); // print information about static code analysis LOG.debug("Static code analysis mode: {}", config.getCodeAnalysisMode()); } return plan; } / /... }
        >
       >
      >Copy the code
  • ExecutionEnvironment provides the setParallelism method to specify parallelism for ExecutionConfig. Finally, the createProgramPlan method reads ExecutionConfig Parallelism and sets defaultParallelism for the Plan

LocalEnvironment

Flink – Java – 1.7.1 – sources jar! /org/apache/flink/api/java/LocalEnvironment.java

@Public
public class LocalEnvironment extends ExecutionEnvironment {

	//......

	public JobExecutionResult execute(String jobName) throws Exception {
		if (executor == null) {
			startNewSession();
		}

		Plan p = createProgramPlan(jobName);

		// Session management is disabled, revert this commit to enable
		//p.setJobId(jobID);
		//p.setSessionTimeout(sessionTimeout);

		JobExecutionResult result = executor.executePlan(p);

		this.lastJobExecutionResult = result;
		returnresult; } / /... }Copy the code
  • Execute of LocalEnvironment calls executePlan of LocalExecutor

LocalExecutor

Flink – clients_2. 11-1.7.1 – sources. The jar! /org/apache/flink/client/LocalExecutor.java

public class LocalExecutor extends PlanExecutor {
	
	//......

	@Override
	public JobExecutionResult executePlan(Plan plan) throws Exception {
		if (plan == null) {
			throw new IllegalArgumentException("The plan may not be null.");
		}

		synchronized (this.lock) {

			// check if we start a session dedicated for this execution
			final boolean shutDownAtEnd;

			if (jobExecutorService == null) {
				shutDownAtEnd = true;

				// configure the number of local slots equal to the parallelism of the local plan
				if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
					int maxParallelism = plan.getMaximumParallelism();
					if (maxParallelism > 0) {
						this.taskManagerNumSlots = maxParallelism;
					}
				}

				// start the cluster for us
				start();
			}
			else {
				// we use the existing session
				shutDownAtEnd = false;
			}

			try {
				// TODO: Set job's default parallelism to max number of slots final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots); final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers); Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration); OptimizedPlan op = pc.compile(plan); JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration); JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId()); return jobExecutorService.executeJobBlocking(jobGraph); } finally { if (shutDownAtEnd) { stop(); }}}} //...... }Copy the code
  • LocalExecutor’s executePlan method also sets defaultParallelism for the plan based on slotsPerTaskManager and numTaskManagers

RemoteEnvironment

Flink – Java – 1.7.1 – sources jar! /org/apache/flink/api/java/RemoteEnvironment.java

@Public
public class RemoteEnvironment extends ExecutionEnvironment {

	//......

	public JobExecutionResult execute(String jobName) throws Exception {
		PlanExecutor executor = getExecutor();

		Plan p = createProgramPlan(jobName);

		// Session management is disabled, revert this commit to enable
		//p.setJobId(jobID);
		//p.setSessionTimeout(sessionTimeout);

		JobExecutionResult result = executor.executePlan(p);

		this.lastJobExecutionResult = result;
		returnresult; } / /... }Copy the code
  • Execute of RemoteEnvironment calls the executePlan of RemoteExecutor

RemoteExecutor

Flink – clients_2. 11-1.7.1 – sources. The jar! /org/apache/flink/client/RemoteExecutor.java

public class RemoteExecutor extends PlanExecutor { private final Object lock = new Object(); private final List<URL> jarFiles; private final List<URL> globalClasspaths; private final Configuration clientConfiguration; private ClusterClient<? > client; / /... @Override public JobExecutionResult executePlan(Plan plan) throws Exception {if (plan == null) {
			throw new IllegalArgumentException("The plan may not be null.");
		}

		JobWithJars p = new JobWithJars(plan, this.jarFiles, this.globalClasspaths);
		return executePlanWithJars(p);
	}

	public JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception {
		if (program == null) {
			throw new IllegalArgumentException("The job may not be null.");
		}

		synchronized (this.lock) {
			// check if we start a session dedicated for this execution
			final boolean shutDownAtEnd;

			if (client == null) {
				shutDownAtEnd = true;
				// start the executor for us
				start();
			}
			else {
				// we use the existing session
				shutDownAtEnd = false;
			}

			try {
				return client.run(program, defaultParallelism).getJobExecutionResult();
			}
			finally {
				if(shutDownAtEnd) { stop(); }}}} //...... }Copy the code
  • RemoteExecutor’s executePlan calls the executePlanWithJars method, which in turn calls ClusterClient’s Run and specifies defaultParallelism as an argument

ClusterClient

Flink – clients_2. 11-1.7.1 – sources. The jar! /org/apache/flink/client/program/ClusterClient.java

public abstract class ClusterClient<T> {
	//......

	public JobSubmissionResult run(JobWithJars program, int parallelism) throws ProgramInvocationException {
		return run(program, parallelism, SavepointRestoreSettings.none());
	}

	public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings)
			throws CompilerException, ProgramInvocationException {
		ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
		if (classLoader == null) {
			throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
		}

		OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
		return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
	}

	private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism)
			throws CompilerException, ProgramInvocationException {
		return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
	}

	public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
		Logger log = LoggerFactory.getLogger(ClusterClient.class);

		if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
			log.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
			p.setDefaultParallelism(parallelism);
		}
		log.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());

		returncompiler.compile(p); } / /... }Copy the code
  • Parallelism in ClusterClient’s Run method applies to the Plan when parallelism > 0 and when p.getDefaultParallelism() <= 0

DataStreamSource

Flink – streaming – java_2. 11-1.7.1 – sources. The jar! /org/apache/flink/streaming/api/datastream/DataStreamSource.java

@Public public class DataStreamSource<T> extends SingleOutputStreamOperator<T> { boolean isParallel; public DataStreamSource(StreamExecutionEnvironment environment, TypeInformation<T> outTypeInfo, StreamSource<T, ? > operator, boolean isParallel, StringsourceName) {
		super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));

		this.isParallel = isParallel;
		if(! isParallel) {setParallelism(1);
		}
	}

	public DataStreamSource(SingleOutputStreamOperator<T> operator) {
		super(operator.environment, operator.getTransformation());
		this.isParallel = true;
	}

	@Override
	public DataStreamSource<T> setParallelism(int parallelism) {
		if(parallelism ! = 1 &&! isParallel) { throw new IllegalArgumentException("Source: " + transformation.getId() + " is not a parallel source");
		} else {
			super.setParallelism(parallelism);
			returnthis; }}}Copy the code
  • DataStreamSource inherited SingleOutputStreamOperator, it provides setParallelism method, is a superclass SingleOutputStreamOperator setParallelism final call

SingleOutputStreamOperator

Flink – streaming – java_2. 11-1.7.1 – sources. The jar! /org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java

@Public
public class SingleOutputStreamOperator<T> extends DataStream<T> {
	//......

	/**
	 * Sets the parallelism for this operator.
	 *
	 * @param parallelism
	 *            The parallelism for this operator.
	 * @return The operator with set parallelism.
	 */
	public SingleOutputStreamOperator<T> setParallelism(int parallelism) {
		Preconditions.checkArgument(canBeParallel() || parallelism == 1,
				"The parallelism of non parallel operator must be 1.");

		transformation.setParallelism(parallelism);

		returnthis; } / /... }Copy the code
  • SingleOutputStreamOperator setParallelism finally is to StreamTransformation role

DataStreamSink

Flink – streaming – java_2. 11-1.7.1 – sources. The jar! /org/apache/flink/streaming/api/datastream/DataStreamSink.java

@Public public class DataStreamSink<T> { private final SinkTransformation<T> transformation; / /... /** * Sets the parallelismfor this sink. The degree must be higher than zero.
	 *
	 * @param parallelism The parallelism for this sink.
	 * @return The sink with set parallelism.
	 */
	public DataStreamSink<T> setParallelism(int parallelism) {
		transformation.setParallelism(parallelism);
		returnthis; } / /... }Copy the code
  • DataStreamSink provides the setParallelism method, and finally the SinkTransformation method

summary

  • Flink allows you to set several levels of parallelism, including Operator level, Execution Environment level, Client level, and System level
  • In flink-conf.yaml, specify system-level default Parallelism for all execution environments through the parallelism. In ExecutionEnvironment, setParallelism can be used to set default parallelism for Operators, Data Sources and Data sinks. If operators, Data Sources, and Data Sinks themselves have set Parallelism then parallelism of ExecutionEnvironment Settings is overridden
  • The setParallelism method provided by ExecutionEnvironment is used to specify parallelism for ExecutionConfig (If you use the CLI client, you can specify -p for command-line calls or parallelism in arguments to client.run for Java/Scala calls. Parallelism for LocalEnvironment and RemoteEnvironment Settings is finally set in the Plan); DataStreamSource inherited SingleOutputStreamOperator, it provides setParallelism method, finally calling the superclass SingleOutputStreamOperator setParallelism; SingleOutputStreamOperator setParallelism finally effect to StreamTransformation; DataStreamSink provides the setParallelism method, and finally the SinkTransformation method

doc

  • Parallel Execution