1. Read data from the collection
- fromCollection
Get data from the collection
// Define sample class, sensor ID, timestamp, temperature
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object Sensor {
def main(args: Array[String) :Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1 = env
.fromCollection(List(
SensorReading("sensor_1".1547718199.35.8),
SensorReading("sensor_6".1547718201.15.4),
SensorReading("sensor_7".1547718202.6.7),
SensorReading("sensor_10".1547718205.38.1)
))
stream1.print("stream1:").setParallelism(1)
env.execute()
}
}
Copy the code
- fromEleements
/ / not commonly used
env.fromElements(1.2."hhh")
Copy the code
- fromParalleCollection
/ / not commonly used
// From the specified sequence
env.fromParallelCollection(new NumberSequenceIterator(1L, 1000L))
Copy the code
2. Read data from the file
env.readTextFile("YOUR_FILE_PATH")
Copy the code
3. Use Kafka queue data as the data source
- Importing dependency packages
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>Flink connector - kafka - 0.11 _2. 11</artifactId>
<version>1.10.0</version>
</dependency>
Copy the code
- The instance
val properties = new Properties()
properties.setProperty("bootstrap.servers"."mayi101:9092")
properties.setProperty("group.id"."consumer-group")
properties.setProperty("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset"."latest")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String] ("sensor".new SimpleStringSchema(), properties))
Copy the code
- test
kafka-console-producer.sh \
> --broker-list mayi101:9092 --topic flink
Copy the code
4. Customize source
In addition to the above source data sources, we can also customize the source. All you need to do is pass in a SourceFunction
See the custom Source article
env.addSource( new MySensorSource())Copy the code