sequence

This article focuses on MemoryStateBackend of Flink

StateBackend

Flink – runtime_2. 11-1.7.0 – sources. The jar! /org/apache/flink/runtime/state/StateBackend.java

@PublicEvolving public interface StateBackend extends java.io.Serializable { // ------------------------------------------------------------------------ // Checkpoint storage - the durable persistence  of checkpoint data // ------------------------------------------------------------------------ /** * Resolves the given  pointer to a checkpoint/savepoint into a checkpoint location. The location * supports reading the checkpoint metadata, or disposing the checkpoint storage location. * * <p>If the state backend cannot understand the format of the pointer (for example because it
	 * was created by a different state backend) this method should throw an {@code IOException}.
	 *
	 * @param externalPointer The external checkpoint pointer to resolve.
	 * @return The checkpoint location handle.
	 *
	 * @throws IOException Thrown, if the state backend does not understand the pointer, or if
	 *                     the pointer could not be resolved due to an I/O error.
	 */
	CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException;

	/**
	 * Creates a storage for checkpoints forthe given job. The checkpoint storage is * used to write checkpoint data and metadata. * * @param jobId The job to store  checkpoint data for. * @return A checkpoint storage for the given job.
	 *
	 * @throws IOException Thrown if the checkpoint storage cannot be initialized.
	 */
	CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException;

	// ------------------------------------------------------------------------
	//  Structure Backends 
	// ------------------------------------------------------------------------

	/**
	 * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b>
	 * and checkpointing it. Uses default TTL time provider.
	 *
	 * <p><i>Keyed State</i> is state where each value is bound to a key.
	 *
	 * @param <K> The type of the keys by which the state is organized.
	 *
	 * @return The Keyed State Backend for the given job, operator, and key group range.
	 *
	 * @throws Exception This method may forward all exceptions that occur while instantiating the backend.
	 */
	default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
			Environment env,
			JobID jobID,
			String operatorIdentifier,
			TypeSerializer<K> keySerializer,
			int numberOfKeyGroups,
			KeyGroupRange keyGroupRange,
			TaskKvStateRegistry kvStateRegistry) throws Exception {
		return createKeyedStateBackend(
			env,
			jobID,
			operatorIdentifier,
			keySerializer,
			numberOfKeyGroups,
			keyGroupRange,
			kvStateRegistry,
			TtlTimeProvider.DEFAULT
		);
	}

	/**
	 * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b>
	 * and checkpointing it.
	 *
	 * <p><i>Keyed State</i> is state where each value is bound to a key.
	 *
	 * @param <K> The type of the keys by which the state is organized.
	 *
	 * @return The Keyed State Backend for the given job, operator, and key group range.
	 *
	 * @throws Exception This method may forward all exceptions that occur while instantiating the backend.
	 */
	default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
		Environment env,
		JobID jobID,
		String operatorIdentifier,
		TypeSerializer<K> keySerializer,
		int numberOfKeyGroups,
		KeyGroupRange keyGroupRange,
		TaskKvStateRegistry kvStateRegistry,
		TtlTimeProvider ttlTimeProvider
	) throws Exception {
		return createKeyedStateBackend(
			env,
			jobID,
			operatorIdentifier,
			keySerializer,
			numberOfKeyGroups,
			keyGroupRange,
			kvStateRegistry,
			ttlTimeProvider,
			new UnregisteredMetricsGroup());
	}

	/**
	 * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b>
	 * and checkpointing it.
	 *
	 * <p><i>Keyed State</i> is state where each value is bound to a key.
	 *
	 * @param <K> The type of the keys by which the state is organized.
	 *
	 * @return The Keyed State Backend for the given job, operator, and key group range.
	 *
	 * @throws Exception This method may forward all exceptions that occur while instantiating the backend.
	 */
	<K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
		Environment env,
		JobID jobID,
		String operatorIdentifier,
		TypeSerializer<K> keySerializer,
		int numberOfKeyGroups,
		KeyGroupRange keyGroupRange,
		TaskKvStateRegistry kvStateRegistry,
		TtlTimeProvider ttlTimeProvider,
		MetricGroup metricGroup) throws Exception;
	
	/**
	 * Creates a new {@link OperatorStateBackend} that can be used for storing operator state.
	 *
	 * <p>Operator state is state that is associated with parallel operator (or function) instances,
	 * rather than with keys.
	 *
	 * @param env The runtime environment of the executing task.
	 * @param operatorIdentifier The identifier of the operator whose state should be stored.
	 *
	 * @return The OperatorStateBackend for operator identified by the job and operator identifier.
	 *
	 * @throws Exception This method may forward all exceptions that occur while instantiating the backend.
	 */
	OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception;
}
Copy the code
  • The StateBackend interface defines how the states of stateful streaming applications are stored and checkpointed
  • Flink supports MemoryStateBackend, FsStateBackend, and RocksDBStateBackend. If no backend is configured, MemoryStateBackend is used by default. In the flink – the conf. In yaml global default configuration can be done, but each job can be controlled by specific StreamExecutionEnvironment. SetStateBackend to override global configuration
  • MemoryStateBackend can specify a size in the constructor. By default, it is 5MB and can be larger but not larger than akka frame size. FsStateBackend stores TaskManager state in memory, but checkpoint state can be stored in filesystem (Such as cluster.); RocksDBStateBackend stores working state in RocksDB, and checkpoint state in Filesystem
  • StateBackend interface defines createCheckpointStorage, createKeyedStateBackend, createOperatorStateBackend method; Serializable interface is inherited; The implementation of StateBackend interfaces is required to be thread-safe
  • StateBackend has a direct implementation of AbstractStateBackend. AbstractFileStateBackend and RocksDBStateBackend inherit AbstractStateBackend. MemoryStateBackend and FsStateBackend inherit AbstractFileStateBackend

AbstractStateBackend

Flink – runtime_2. 11-1.7.0 – sources. The jar! /org/apache/flink/runtime/state/AbstractStateBackend.java

/** * An abstract base implementation of the {@link StateBackend} interface. * * <p>This class has currently no contents  and only kept to notbreak the prior class hierarchy forusers. */ @PublicEvolving public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable { private static final long serialVersionUID = 4620415814639230247L; // ------------------------------------------------------------------------ // State Backend - State-Holding Backends //  ------------------------------------------------------------------------ @Override public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup) throws IOException; @Override public abstract OperatorStateBackend createOperatorStateBackend( Environment env, String operatorIdentifier) throws Exception; }Copy the code
  • AbstractStateBackend declares that StateBackend and Serializable interfaces are implemented. Nothing else is added here

AbstractFileStateBackend

Flink – runtime_2. 11-1.7.0 – sources. The jar! /org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java

@PublicEvolving
public abstract class AbstractFileStateBackend extends AbstractStateBackend {

	private static final long serialVersionUID = 1L;

	// ------------------------------------------------------------------------
	//  State Backend Properties
	// ------------------------------------------------------------------------

	/** The path where checkpoints will be stored, or null, if none has been configured. */
	@Nullable
	private final Path baseCheckpointPath;

	/** The path where savepoints will be stored, or null, ifnone has been configured. */ @Nullable private final Path baseSavepointPath; / /... // ------------------------------------------------------------------------ // Initialization and metadata storage // ------------------------------------------------------------------------ @Override public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException {return AbstractFsCheckpointStorage.resolveCheckpointPointer(pointer);
	}

	// ------------------------------------------------------------------------
	//  Utilities
	// ------------------------------------------------------------------------

	/**
	 * Checks the validity of the path's scheme and path. * * @param path The path to check. * @return The URI as a Path. * * @throws IllegalArgumentException  Thrown, if the URI misses scheme or path. */ private static Path validatePath(Path path) { final URI uri = path.toUri(); final String scheme = uri.getScheme(); final String pathPart = uri.getPath(); // some validity checks if (scheme == null) { throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + "Please specify the file system scheme explicitly in the URI."); } if (pathPart == null) { throw new IllegalArgumentException("The path to store the checkpoint data in is null. " + "Please specify a directory path for the checkpoint data."); } if (pathPart.length() == 0 || pathPart.equals("/")) { throw new IllegalArgumentException("Cannot use the root directory for checkpoints."); } return path; } @Nullable private static Path parameterOrConfigured(@Nullable Path path, Configuration config, ConfigOption
      
        option) { if (path ! = null) { return path; } else { String configValue = config.getString(option); try { return configValue == null ? null : new Path(configValue); } catch (IllegalArgumentException e) { throw new IllegalConfigurationException("Cannot parse value for " + option.key() + " : " + configValue + " . Not a valid path."); }}}}
      Copy the code
  • AbstractFileStateBackend is an extension of AbstractStateBackend. It has two properties baseCheckpointPath and baseSavepointPath. Null is allowed. The path format is HDFS :// or file://. The resolveCheckpoint method is used to resolve the location of checkpoint or savePoint. Here use AbstractFsCheckpointStorage. ResolveCheckpointPointer (pointer) to complete

MemoryStateBackend

Flink – runtime_2. 11-1.7.0 – sources. The jar! /org/apache/flink/runtime/state/memory/MemoryStateBackend.java

@PublicEvolving
public class MemoryStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend {

	private static final long serialVersionUID = 4109305377809414635L;

	/** The default maximal size that the snapshotted memory state may have (5 MiBytes). */
	public static final int DEFAULT_MAX_STATE_SIZE = 5 * 1024 * 1024;

	/** The maximal size that the snapshotted memory state may have. */
	private final int maxStateSize;

	/** Switch to chose between synchronous and asynchronous snapshots.
	 * A value of 'UNDEFINED' means not yet configured, in which case the default will be used. */
	private final TernaryBoolean asynchronousSnapshots;

	// ------------------------------------------------------------------------

	/**
	 * Creates a new memory state backend that accepts states whose serialized forms are
	 * up to the default state size (5 MB).
	 *
	 * <p>Checkpoint and default savepoint locations are used as specified in the
	 * runtime configuration.
	 */
	public MemoryStateBackend() { this(null, null, DEFAULT_MAX_STATE_SIZE, TernaryBoolean.UNDEFINED); } /** * Creates a new memory state backend that accepts states whose serialized forms are * up to the default state size  (5 MB). The state backend uses asynchronous snapshots * or synchronous snapshots as configured. * * <p>Checkpoint and default savepoint locations are used as specifiedin the
	 * runtime configuration.
	 *
	 * @param asynchronousSnapshots Switch to enable asynchronous snapshots.
	 */
	public MemoryStateBackend(boolean asynchronousSnapshots) {
		this(null, null, DEFAULT_MAX_STATE_SIZE, TernaryBoolean.fromBoolean(asynchronousSnapshots));
	}

	/**
	 * Creates a new memory state backend that accepts states whose serialized forms are
	 * up to the given number of bytes.
	 *
	 * <p>Checkpoint and default savepoint locations are used as specified in the
	 * runtime configuration.
	 *
	 * <p><b>WARNING:</b> Increasing the size of this value beyond the default value
	 * ({@value #DEFAULT_MAX_STATE_SIZE}) should be done with care.
	 * The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there
	 * and the JobManager needs to be able to hold all aggregated state in its memory.
	 *
	 * @param maxStateSize The maximal size of the serialized state
	 */
	public MemoryStateBackend(int maxStateSize) {
		this(null, null, maxStateSize, TernaryBoolean.UNDEFINED);
	}

	/**
	 * Creates a new memory state backend that accepts states whose serialized forms are
	 * up to the given number of bytes and that uses asynchronous snashots as configured.
	 *
	 * <p>Checkpoint and default savepoint locations are used as specified in the
	 * runtime configuration.
	 *
	 * <p><b>WARNING:</b> Increasing the size of this value beyond the default value
	 * ({@value #DEFAULT_MAX_STATE_SIZE}) should be done with care.
	 * The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there
	 * and the JobManager needs to be able to hold all aggregated state in its memory.
	 *
	 * @param maxStateSize The maximal size of the serialized state
	 * @param asynchronousSnapshots Switch to enableasynchronous snapshots. */ public MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots) { this(null, null, maxStateSize, TernaryBoolean.fromBoolean(asynchronousSnapshots)); } /** * Creates a new MemoryStateBackend, setting optionally the path to persist checkpoint metadata * to, and to persist savepoints to. * * @param checkpointPath The path to write checkpoint metadata to. If null, the value from * the runtime configuration will be used. * @param savepointPath The path to write savepoints to. If null, the value from * the runtime configuration will be used. */ public MemoryStateBackend(@Nullable String checkpointPath, @Nullable String savepointPath) { this(checkpointPath, savepointPath, DEFAULT_MAX_STATE_SIZE, TernaryBoolean.UNDEFINED);  } /** * Creates a new MemoryStateBackend, setting optionally the paths to persist checkpoint metadata * and savepoints to, as well as configuring state thresholds and asynchronous operations. * * <p><b>WARNING:</b> Increasing the size of this value beyond the default value * ({@value#DEFAULT_MAX_STATE_SIZE}) should be done with care.
	 * The checkpointed state needs to be send to the JobManager via limited size RPC messages, and there
	 * and the JobManager needs to be able to hold all aggregated state inits memory. * * @param checkpointPath The path to write checkpoint metadata to. If null, the value from * the runtime configuration will be used. * @param savepointPath The path to write savepoints to. If null, the value from * the runtime configuration will be used. * @param maxStateSize The maximal size of the serialized state.  * @param asynchronousSnapshots Flag to switch between synchronous and asynchronous * snapshot mode. If null, the value configuredin the
	 *                              runtime configuration will be used.
	 */
	public MemoryStateBackend(
			@Nullable String checkpointPath,
			@Nullable String savepointPath,
			int maxStateSize,
			TernaryBoolean asynchronousSnapshots) {

		super(checkpointPath == null ? null : new Path(checkpointPath),
				savepointPath == null ? null : new Path(savepointPath));

		checkArgument(maxStateSize > 0, "maxStateSize must be > 0");
		this.maxStateSize = maxStateSize;

		this.asynchronousSnapshots = asynchronousSnapshots;
	}

	/**
	 * Private constructor that creates a re-configured copy of the state backend.
	 *
	 * @param original The state backend to re-configure
	 * @param configuration The configuration
	 */
	private MemoryStateBackend(MemoryStateBackend original, Configuration configuration) {
		super(original.getCheckpointPath(), original.getSavepointPath(), configuration);

		this.maxStateSize = original.maxStateSize;

		// if asynchronous snapshots were configured, use that setting,
		// else check the configuration
		this.asynchronousSnapshots = original.asynchronousSnapshots.resolveUndefined(
				configuration.getBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS));
	}

	// ------------------------------------------------------------------------
	//  Properties
	// ------------------------------------------------------------------------

	/**
	 * Gets the maximum size that an individual state can have, as configured in the
	 * constructor (by default {@value #DEFAULT_MAX_STATE_SIZE}).
	 *
	 * @return The maximum size that an individual state can have
	 */
	public int getMaxStateSize() {
		return maxStateSize;
	}

	/**
	 * Gets whether the key/value data structures are asynchronously snapshotted.
	 *
	 * <p>If not explicitly configured, this is the default value of
	 * {@link CheckpointingOptions#ASYNC_SNAPSHOTS}.
	 */
	public boolean isUsingAsynchronousSnapshots() {
		returnasynchronousSnapshots.getOrDefault(CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue()); } // ------------------------------------------------------------------------ // Reconfiguration // ------------------------------------------------------------------------ /** * Creates a copy of this state backend that  uses the values definedin the configuration
	 * for fields where that were not specified in this state backend.
	 *
	 * @param config the configuration
	 * @return The re-configured variant of the state backend
	 */
	@Override
	public MemoryStateBackend configure(Configuration config) {
		return new MemoryStateBackend(this, config);
	}

	// ------------------------------------------------------------------------
	//  checkpoint state persistence
	// ------------------------------------------------------------------------

	@Override
	public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
		return new MemoryBackendCheckpointStorage(jobId, getCheckpointPath(), getSavepointPath(), maxStateSize);
	}

	// ------------------------------------------------------------------------
	//  state holding structures
	// ------------------------------------------------------------------------

	@Override
	public OperatorStateBackend createOperatorStateBackend(
			Environment env,
			String operatorIdentifier) throws Exception {

		return new DefaultOperatorStateBackend(
				env.getUserClassLoader(),
				env.getExecutionConfig(),
				isUsingAsynchronousSnapshots());
	}

	@Override
	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
			Environment env,
			JobID jobID,
			String operatorIdentifier,
			TypeSerializer<K> keySerializer,
			int numberOfKeyGroups,
			KeyGroupRange keyGroupRange,
			TaskKvStateRegistry kvStateRegistry,
			TtlTimeProvider ttlTimeProvider,
			MetricGroup metricGroup) {

		TaskStateManager taskStateManager = env.getTaskStateManager();
		HeapPriorityQueueSetFactory priorityQueueSetFactory =
			new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
		return new HeapKeyedStateBackend<>(
				kvStateRegistry,
				keySerializer,
				env.getUserClassLoader(),
				numberOfKeyGroups,
				keyGroupRange,
				isUsingAsynchronousSnapshots(),
				env.getExecutionConfig(),
				taskStateManager.createLocalRecoveryConfig(),
				priorityQueueSetFactory,
				ttlTimeProvider);
	}

	// ------------------------------------------------------------------------
	//  utilities
	// ------------------------------------------------------------------------

	@Override
	public String toString() {
		return "MemoryStateBackend (data in heap memory / checkpoints to JobManager) " +
				"(checkpoints: '" + getCheckpointPath() +
				"', savepoints: '" + getSavepointPath() +
				"', asynchronous: " + asynchronousSnapshots +
				", maxStateSize: " + maxStateSize + ")"; }}Copy the code
  • MemoryStateBackend inherits AbstractFileStateBackend and implements the ConfigurableStateBackend interface (The configure method); It stores TaskManager’s working state and JobManager’s checkpoint state in the JVM heap (However, for high availability, you can also set checkpoint state to filesystem); MemoryStateBackend is used only for experimental purposes, such as local startup or requiring very little state. For production purposes, use FsStateBackend(Store TaskManager's working state in memory, but store JobManager's checkpoint state to the file system to support larger state storage)
  • MemoryStateBackend has a maxStateSize property (The default DEFAULT_MAX_STATE_SIZE is 5MBThe size of each state cannot exceed maxStateSize, and all states of a task cannot exceed RPC system limits (The default value is 10MB, which can be changed but is not recommended), the sum of the state of all retained cpus cannot exceed the JVM heap size of JobManager. In addition, if checkpointPath and savepointPath are not specified when MemoryStateBackend is created, the global default value is read from flink-conf.yaml. MemoryStateBackend also has an “asynchronousSnapshots” property of the TernaryBoolean type (TRUE, FALSE, UNDEFINED), where UNDEFINED means there is no configuration and the default value will be used
  • MemoryStateBackend createCheckpointStorage create MemoryBackendCheckpointStorage; CreateOperatorStateBackend method to create OperatorStateBackend; The createKeyedStateBackend method creates HeapKeyedStateBackend

summary

  • The StateBackend interface defines how the states of stateful streaming applications are stored and checkpointed. MemoryStateBackend, FsStateBackend, and RocksDBStateBackend are supported. If this backend is not configured, MemoryStateBackend is used by default. In the flink – the conf. In yaml global default configuration can be done, but each job can be controlled by specific StreamExecutionEnvironment. SetStateBackend to override global configuration
  • StateBackend interface defines createCheckpointStorage, createKeyedStateBackend, createOperatorStateBackend method; Serializable interface is inherited; The implementation of StateBackend interface is required to be thread-safe. StateBackend has a direct implementation of AbstractStateBackend. AbstractFileStateBackend and RocksDBStateBackend inherit AbstractStateBackend. MemoryStateBackend and FsStateBackend inherit AbstractFileStateBackend
  • MemoryStateBackend inherits AbstractFileStateBackend and implements the ConfigurableStateBackend interface (The configure method); It stores TaskManager’s working state and JobManager’s checkpoint state in the JVM heap. MemoryStateBackend createCheckpointStorage create MemoryBackendCheckpointStorage; CreateOperatorStateBackend method to create OperatorStateBackend; The createKeyedStateBackend method creates HeapKeyedStateBackend

doc

  • State Backends