1. Read data from the collection

  1. fromCollection

Get data from the collection

// Define sample class, sensor ID, timestamp, temperature
case class SensorReading(id: String, timestamp: Long, temperature: Double)

object Sensor {
  def main(args: Array[String) :Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream1 = env
      .fromCollection(List(
        SensorReading("sensor_1".1547718199.35.8),
        SensorReading("sensor_6".1547718201.15.4),
        SensorReading("sensor_7".1547718202.6.7),
        SensorReading("sensor_10".1547718205.38.1)
      ))

    stream1.print("stream1:").setParallelism(1)

    env.execute()
  }
}
Copy the code
  1. fromEleements
/ / not commonly used
env.fromElements(1.2."hhh")
Copy the code
  1. fromParalleCollection
/ / not commonly used
// From the specified sequence
env.fromParallelCollection(new NumberSequenceIterator(1L, 1000L))
Copy the code

2. Read data from the file

 env.readTextFile("YOUR_FILE_PATH")
Copy the code

3. Use Kafka queue data as the data source

  1. Importing dependency packages
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>Flink connector - kafka - 0.11 _2. 11</artifactId>
    <version>1.10.0</version>
</dependency>
Copy the code
  1. The instance
val properties = new Properties()
properties.setProperty("bootstrap.servers"."mayi101:9092")
properties.setProperty("group.id"."consumer-group")
properties.setProperty("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset"."latest")

val stream3 = env.addSource(new FlinkKafkaConsumer011[String] ("sensor".new SimpleStringSchema(), properties))
Copy the code
  1. test
kafka-console-producer.sh \
> --broker-list mayi101:9092 --topic flink
Copy the code

4. Customize source

In addition to the above source data sources, we can also customize the source. All you need to do is pass in a SourceFunction

See the custom Source article

 env.addSource( new MySensorSource())Copy the code