The article directories
-
- A brief introduction.
- Example 2.
-
- 2.1 Converting a Table to DataStream
- 2.2 Converting a Table to a DataSet
- 2.3 DataStrearm Converts Table objects
- 2.4 DataSet Converts the Table object
A brief introduction.
DataStream/DataSet Table conversion:
- DataStream/DataSet conversion Table
- Table transformation DataStream/DataSet
Example 2.
2.1 Converting a Table to DataStream
There are two modes for converting tables to DataStream
- Append Mode appends a table to a stream
- Retract Mode converts tables to streams
grammar
// get TableEnvironment.
// registration of a DataSet is equivalent
// ge val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table=...// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String.Int)] dsTuple =
tableEnv.toAppendStream[(String.Int)](table)
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream[(Boolean, X)].
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean.Row)] = tableEnv.toRetractStream[Row](table)
Copy the code
The sample
object TableToDataStream {
def main(args: Array[String) :Unit = {
// construct data and convert it to table
val data = List(
Peoject(1L, 1."Hello"),
Peoject(2L, 2."Hello"),
Peoject(3L, 3."Hello"),
Peoject(4L, 4."Hello"),
Peoject(5L, 5."Hello"),
Peoject(6L, 6."Hello"),
Peoject(7L, 7."Hello World"),
Peoject(8L, 8."Hello World"),
Peoject(8L, 8."Hello World"),
Peoject(20L, 20."Hello World"))
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
import org.apache.flink.api.scala._
val stream = bsEnv.fromCollection(data)
val table = tEnv.fromDataStream(stream)
//TODO converts table to DataStream---- appends a table to the stream Append Mode
val appendStream: DataStream[Peoject] = tEnv.toAppendStream[Peoject](table)
Retract Mode true for add message, false for undo message
val retractStream: DataStream[(Boolean.Peoject)] = tEnv.toRetractStream[Peoject](table)
retractStream.print()
bsEnv.execute()
}
case class Peoject(user: Long, index: Int, content: String)
}
Copy the code
The results of
3> (true.Peoject(6.6.Hello))
2> (true.Peoject(5.5.Hello))
5> (true.Peoject(8.8.Hello World))
6> (true.Peoject(1.1.Hello))
8> (true.Peoject(3.3.Hello))
7> (true.Peoject(2.2.Hello))
7> (true.Peoject(20.20.Hello World))
4> (true.Peoject(7.7.Hello World))
1> (true.Peoject(4.4.Hello))
6> (true.Peoject(8.8.Hello World))
Copy the code
2.2 Converting a Table to a DataSet
grammar
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table=...// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String.Int)] = tableEnv.toDataSet[(String.Int)](table)
Copy the code
The sample
object TableToDataSet {
def main(args: Array[String) :Unit = {
// construct data and convert it to table
val data = List(
Peoject(1L, 1."Hello"),
Peoject(2L, 2."Hello"),
Peoject(3L, 3."Hello"),
Peoject(4L, 4."Hello"),
Peoject(5L, 5."Hello"),
Peoject(6L, 6."Hello"),
Peoject(7L, 7."Hello World"),
Peoject(8L, 8."Hello World"),
Peoject(8L, 8."Hello World"),
Peoject(20L, 20."Hello World"))
// Initialize the environment and load the table data
val fbEnv = ExecutionEnvironment.getExecutionEnvironment
val fbTableEnv = BatchTableEnvironment.create(fbEnv)
import org.apache.flink.api.scala._
val collection: DataSet[Peoject] = fbEnv.fromCollection(data)
val table: Table = fbTableEnv.fromDataSet(collection)
//TODO converts the table to the dataSet
val toDataSet: DataSet[Peoject] = fbTableEnv.toDataSet[Peoject](table)
toDataSet.print()
}
case class Peoject(user: Long, index: Int, content: String)
}
Copy the code
The results of
Peoject(1.1.Hello)
Peoject(2.2.Hello)
Peoject(3.3.Hello)
Peoject(4.4.Hello)
Peoject(5.5.Hello)
Peoject(6.6.Hello)
Peoject(7.7.Hello World)
Peoject(8.8.Hello World)
Peoject(8.8.Hello World)
Peoject(20.20.Hello World)
Copy the code
2.3 DataStrearm Converts Table objects
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long.String)] =...// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)
// convert the DataStream into a Table with fields 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong.'myString)
Copy the code
2.4 DataSet Converts the Table object
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = ... // see "Create a TableEnvironment" section
val stream: DataSet[(Long.String)] =...// convert the DataSet into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataSet(stream)
// convert the DataSet into a Table with fields 'myLong, 'myString
val table2: Table = tableEnv.fromDataSet(stream, 'myLong.'myString)
Copy the code
The public,
Wechat id: Bigdata_limeng