
This article focuses on the StateDescriptor of flink


Flink – core – 1.7.0 – sources jar! /org/apache/flink/api/common/functions/

 * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
 * of the function will have a context through which it can access static contextual information (such as
 * the current parallelism) and other constructs like accumulators and broadcast variables.
 * <p>A function can, during runtime, obtain the RuntimeContext via a call to
 * {@link AbstractRichFunction#getRuntimeContext()}.
public interface RuntimeContext {

	<T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties);

	<T> ListState<T> getListState(ListStateDescriptor<T> stateProperties);

	<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties);

	<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);

	<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);

	<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties);

Copy the code
  • RuntimeContext provides get methods based on the corresponding StateDescriptor. For example, it provides the getState method. ValueStateDescriptor is used to obtain ValueState. GetListState Gets the ListState descriptor by ListStateDescriptor. GetReducingState getReducingState through ReducingState descriptor. GetAggregatingState AggregatingState obtained through AggregatingStateDescriptor; GetFoldingState getFoldingState by FoldingState descriptor. GetMapState obtain the MapState through the MapState descriptor


Flink – core – 1.7.0 – sources jar! /org/apache/flink/api/common/state/

 * Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned
 * {@link State} in stateful operations.
 * <p>Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}.
 * @param <S> The type of the State objects created from this {@code StateDescriptor}.
 * @param <T> The type of the value of the state object described by this state descriptor.
public abstract class StateDescriptor<S extends State, T> implements Serializable {

	 * An enumeration of the types of supported states. Used to identify the state type
	 * when writing and restoring checkpoints and savepoints.
	// IMPORTANT: Do not change the order of the elements in this enum, ordinal is used in serialization
	public enum Type {
		 * @deprecated Enum for migrating from old checkpoints/savepoint versions.

	private static final long serialVersionUID = 1L;

	// ------------------------------------------------------------------------

	/** Name that uniquely identifies state created from this StateDescriptor. */
	protected final String name;

	/** The serializer for the type. May be eagerly initialized in the constructor,
	 * or lazily once the {@link #initializeSerializerUnlessSet(ExecutionConfig)} method
	 * is called. */
	protected TypeSerializer<T> serializer;

	/** The type information describing the value type. Only used to if the serializer
	 * is created lazily. */
	private TypeInformation<T> typeInfo;

	/** Name for queries against state created from this StateDescriptor. */
	private String queryableStateName;

	/** Name for queries against state created from this StateDescriptor. */
	private StateTtlConfig ttlConfig = StateTtlConfig.DISABLED;

	/** The default value returned by the state when no other value is bound to a key. */
	protected transient T defaultValue;

	// ------------------------------------------------------------------------

	 * Create a new {@code StateDescriptor} with the given name and the given type serializer.
	 * @param name The name of the {@code StateDescriptor}.
	 * @param serializer The type serializer for the values in the state.
	 * @param defaultValue The default value that will be set when requesting state without setting
	 *                     a value before.
	protected StateDescriptor(String name, TypeSerializer<T> serializer, @Nullable T defaultValue) { = checkNotNull(name, "name must not be null");
		this.serializer = checkNotNull(serializer, "serializer must not be null");
		this.defaultValue = defaultValue;

	 * Create a new {@code StateDescriptor} with the given name and the given type information.
	 * @param name The name of the {@code StateDescriptor}.
	 * @param typeInfo The type information for the values in the state.
	 * @param defaultValue The default value that will be set when requesting state without setting
	 *                     a value before.
	protected StateDescriptor(String name, TypeInformation<T> typeInfo, @Nullable T defaultValue) { = checkNotNull(name, "name must not be null");
		this.typeInfo = checkNotNull(typeInfo, "type information must not be null");
		this.defaultValue = defaultValue;

	 * Create a new {@code StateDescriptor} with the given name and the given type information.
	 * <p>If this constructor fails (because it is not possible to describe the type via a class),
	 * consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor.
	 * @param name The name of the {@code StateDescriptor}.
	 * @param type The class of the type of values in the state.
	 * @param defaultValue The default value that will be set when requesting state without setting
	 *                     a value before.
	protected StateDescriptor(String name, Class<T> type, @Nullable T defaultValue) { = checkNotNull(name, "name must not be null");
		checkNotNull(type."type class must not be null");

		try {
			this.typeInfo = TypeExtractor.createTypeInfo(type);
		} catch (Exception e) {
			throw new RuntimeException(
					"Could not create the type information for '" + type.getName() + "'." +
					"The most common reason is failure to infer the generic type information, due to Java's type erasure. " +
					"In that case, please pass a 'TypeHint' instead of a class to describe the type. " +
					"For example, to describe 'Tuple2<String, String>' as a generic type, use " +
					"'new PravegaDeserializationSchema<>(new TypeHint
       >(){}, serializer); '"
      , e);

		this.defaultValue = defaultValue;

	// ------------------------------------------------------------------------

	 * Returns the name of this {@code StateDescriptor}.
	public String getName() {
		return name;

	 * Returns the default value.
	public T getDefaultValue() {
		if(defaultValue ! = null) {if(serializer ! = null) {return serializer.copy(defaultValue);
			} else {
				throw new IllegalStateException("Serializer not yet initialized."); }}else {
			return null;

	 * Returns the {@link TypeSerializer} that can be used to serialize the value in the state.
	 * Note that the serializer may initialized lazily and is only guaranteed to exist after
	 * calling {@link #initializeSerializerUnlessSet(ExecutionConfig)}.
	public TypeSerializer<T> getSerializer() {
		if(serializer ! = null) {return serializer.duplicate();
		} else {
			throw new IllegalStateException("Serializer not yet initialized.");

	 * Sets the name for queries of state created from this descriptor.
	 * <p>If a name is set, the created state will be published for queries
	 * during runtime. The name needs to be unique per job. If there is another
	 * state instance published under the same name, the job will fail during runtime.
	 * @param queryableStateName State name for queries (unique name per job)
	 * @throws IllegalStateException If queryable state name already set
	public void setQueryable(String queryableStateName) {
			ttlConfig.getUpdateType() == StateTtlConfig.UpdateType.Disabled,
			"Queryable state is currently not supported with TTL");
		if (this.queryableStateName == null) {
			this.queryableStateName = Preconditions.checkNotNull(queryableStateName, "Registration name");
		} else {
			throw new IllegalStateException("Queryable state name already set");

	 * Returns the queryable state name.
	 * @return Queryable state name or <code>null</code> if not set.
	public String getQueryableStateName() {
		return queryableStateName;

	 * Returns whether the state created from this descriptor is queryable.
	 * @return <code>true</code> if state is queryable, <code>false</code>
	 * otherwise.
	public boolean isQueryable() {
		returnqueryableStateName ! = null; } /** * Configures optional activation of state time-to-live (TTL). * * <p>State user value will expire, become unavailable and be cleaned upin storage
	 * depending on configured {@link StateTtlConfig}.
	 * @param ttlConfig configuration of state TTL
	public void enableTimeToLive(StateTtlConfig ttlConfig) { Preconditions.checkNotNull(ttlConfig); Preconditions.checkArgument( ttlConfig.getUpdateType() ! = StateTtlConfig.UpdateType.Disabled && queryableStateName == null,"Queryable state is currently not supported with TTL");
		this.ttlConfig = ttlConfig;

	public StateTtlConfig getTtlConfig() {
		return ttlConfig;

	// ------------------------------------------------------------------------

	 * Checks whether the serializer has been initialized. Serializer initialization is lazy,
	 * to allow parametrization of serializers with an {@link ExecutionConfig} via
	 * {@link #initializeSerializerUnlessSet(ExecutionConfig)}.
	 * @return True if the serializers have been initialized, false otherwise.
	public boolean isSerializerInitialized() {
		returnserializer ! = null; } /** * Initializes the serializer, unless it has been initialized before. * * @param executionConfig The execution config to use when creating the serializer. */ public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {if (serializer == null) {
			checkState(typeInfo ! = null,"no serializer and no type info");

			// instantiate the serializer
			serializer = typeInfo.createSerializer(executionConfig);

			// we can drop the type info now, no longer needed
			typeInfo  = null;

	// ------------------------------------------------------------------------
	//  Standard Utils
	// ------------------------------------------------------------------------

	public final int hashCode() {
		return name.hashCode() + 31 * getClass().hashCode();

	public final boolean equals(Object o) {
		if (o == this) {
			return true;
		else if(o ! = null && o.getClass() == this.getClass()) { final StateDescriptor<? ,? > that = (StateDescriptor<? ,? >) o;return;
		else {
			return false;

	public String toString() {
		return getClass().getSimpleName() +
				"{name=" + name +
				", defaultValue=" + defaultValue +
				", serializer=" + serializer +
				(isQueryable() ? ", queryableStateName=" + queryableStateName + "" : "") +
				'} ';

	public abstract Type getType();

	// ------------------------------------------------------------------------
	//  Serialization
	// ------------------------------------------------------------------------

	private void writeObject(final ObjectOutputStream out) throws IOException {
		// write all the non-transient fields

		// write the non-serializable default value field
		if (defaultValue == null) {
			// we don't have a default value out.writeBoolean(false); } else { // we have a default value out.writeBoolean(true); byte[] serializedDefaultValue; try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos)) { TypeSerializer
        duplicateSerializer = serializer.duplicate(); duplicateSerializer.serialize(defaultValue, outView); outView.flush(); serializedDefaultValue = baos.toByteArray(); } catch (Exception e) { throw new IOException("Unable to serialize default value of type " + defaultValue.getClass().getSimpleName() + ".", e); } out.writeInt(serializedDefaultValue.length); out.write(serializedDefaultValue); } } private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { // read the non-transient fields in.defaultReadObject(); // read the default value field boolean hasDefaultValue = in.readBoolean(); if (hasDefaultValue) { int size = in.readInt(); byte[] buffer = new byte[size]; in.readFully(buffer); try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer); DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) { defaultValue = serializer.deserialize(inView); } catch (Exception e) { throw new IOException("Unable to deserialize default value.", e); } } else { defaultValue = null; }}}
      Copy the code
  • The StateDescriptor is ValueStateDescriptor, ListStateDescriptor, ReducingStateDescriptor, FoldingStateDescriptor, or AggregatingState Base class of Descriptor, MapStateDescriptor, which defines an abstract method that returns Type Type.VALUE,LIST,EDUCING,FOLDING,AGGREGATING,MAP) for each subclass to express its own Type
  • The StateDescriptor provides several constructors for passing name, TypeSerializer, or TypeInformation, or Class TypeInformation, and defaultValue
  • The StateDescriptor overrides the equals and hashCode methods; It also implements the Serializable interface and customizes the serialization process through writeObject and readObject


  • RuntimeContext provides a get method for each state descriptor, Such as getState, getListState, getReducingState, getAggregatingState, getFoldingState, getMapState
  • The StateDescriptor is ValueStateDescriptor, ListStateDescriptor, ReducingStateDescriptor, FoldingStateDescriptor, or AggregatingState Base class of Descriptor, MapStateDescriptor, which defines an abstract method that returns Type Type.VALUE,LIST,EDUCING,FOLDING,AGGREGATING,MAP) for each subclass to express its own Type
  • The StateDescriptor overrides the equals and hashCode methods; It also implements the Serializable interface and customizes the serialization process through writeObject and readObject


  • Using Managed Keyed State