Flink has three time semantics: Processing time, Event Time, and Ingestion Time. For detailed explanation of these time semantics, please refer to Flink’s Time and Watermarks. This article mainly explains how to define time semantics in Flink Table API & SQL based on time operators. You can learn from this article:

  • An introduction to time properties
  • The processing time
  • Event time

Introduction to time attributes

Flink TableAPI&SQL time-based operations (such as Windows) that require time semantics can provide a logical time attribute based on the specified timestamp.

The time attribute is part of schAMA. The time attribute is defined when a table is created using DDL, DataStream is converted to a table, or TableSource is used. Once the time attribute is defined, it can be treated as a reference to a field for use in time-based operations.

A time attribute, like a timestamp, can be accessed and computed. If a time attribute is computed, the time attribute is atomized into a regular timestamp. A regular timestamp is not compatible with Flink’s time and water line and cannot be used for time-based operations.

The time attributes required by Flink TableAPI & SQL can be specified in Datastream as follows:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); / / the default

// You can select:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Copy the code

The processing time

Local machine time is one of the simplest time semantics, but it can not guarantee the consistency of results. It does not need to extract time stamps and generate water lines. There are three ways to define the processing time attribute, as follows

DDL statements define processing times when they create tables

The processing time attribute can be defined as a computed column in a DDL statement, using the PROCTIME() function as follows:

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() Declare an extra field as a processing time attribute
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); -- 10 minutes of scrolling
Copy the code

The processing time is defined when DataStream is converted to a Table

When converting DataStream to a table, you can specify the time attribute in the schema definition using the. Proctime attribute and place it at the end of other Schema fields as follows:

DataStream<Tuple2<String, String>> stream = ... ;// Declare an additional logical field as the processing time attribute
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.proctime");

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));
Copy the code

Using TableSource

Define TableSource and implement DefinedProctimeAttribute as follows:

// Define a table source with a processing time attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {

	@Override
	public TypeInformation<Row> getReturnType(a) {
		String[] names = new String[] {"user_name" , "data"};
		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
		return Types.ROW(names, types);
	}

	@Override
	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
		/ / create the streamDataStream<Row> stream = ... ;return stream;
	}

	@Override
	public String getProctimeAttribute(a) {
        // This field is appended to the schema as the third field
		return "user_action_time"; }}// register table source
tEnv.registerTableSource("user_actions".new UserActionSource());

WindowedTable windowedTable = tEnv
	.from("user_actions")
	.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));
Copy the code

Event time

Based on the specific timestamp of the record, the consistency of the results is guaranteed even if the data is out of order or late. There are three ways to define the processing time attribute, as follows

A DDL statement creates a table at an event time

The event time attribute can be defined by the WATERMARK statement as follows:

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  Declare user_action_time as the event time attribute and allow 5S delay
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
Copy the code

Event time is defined when DataStream is converted to a Table

The. Rowtime attribute is used to specify the event time attribute when defining the Schema. The timestamp and watermark must be specified in DataStream. For example, if the event time attribute is event_time in the dataset, you can specify event_time. rowTime in the event time field of the Table.

Flink currently supports two ways to define the EventTime field, as follows:

/ / way 1:
// Extract timestamp and assign watermarksDataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...) ;// Declare an additional logical field as the event time attribute
// Use user_action_time. rowTime at the end of the table schema to define the event time attribute
// The system gets the event time attribute in the TableEnvironment
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.rowtime");

/ / way 2:

// Extract timestamp from the first field and assign watermarksDataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...) ;// The first field is already used to extract the timestamp, and the corresponding field can be directly used as the event time attribute
Table table = tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");

/ / use:

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));
Copy the code

Using TableSource

Also can be realized when creating TableSource DefinedRowtimeAttributes interface to define the EventTime fields, in the interface to implement getRowtimeAttributeDescriptors method, Create time attribute information based on EventTime.

// Define the table source with the RowTime attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {

	@Override
	public TypeInformation<Row> getReturnType(a) {
		String[] names = new String[] {"user_name"."data"."user_action_time"};
		TypeInformation[] types =
		    new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
		return Types.ROW(names, types);
	}

	@Override
	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {

        // Create a stream that allocates water levels based on the user_action_time attributeDataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...) ;return stream;
	}

	@Override
	public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors(a) {
        // mark the user_action_time field as the event time attribute
        // Create a user_action_time descriptor that identifies the time property field
		RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
			"user_action_time".new ExistingField("user_action_time"),
			new AscendingTimestamps());
		List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
		returnlistRowtimeAttrDescr; }}/ / register table
tEnv.registerTableSource("user_actions".new UserActionSource());

WindowedTable windowedTable = tEnv
	.from("user_actions")
	.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));
Copy the code

summary

This article focuses on the use of time semantics in the Flink Table API and SQL. You can use two time semantics: processing time and event time. The usage of each time semantics is explained in detail.

The public account “Big Data Technology and Data Warehouse” replies to “information” to receive the big data data package