Let’s take a quick example to see how to customize the data source

  1. Define a sample class SensorReading to encapsulate temperature data
// 'id' : sensor ID; 'timestamp' : timestamp; 'Temperature' : temperature value
case class SensorReading(id: String,
                         timestamp: Long,
                         temperature: Double)
Copy the code
  1. Inherited RichParallelSourceFunction class, implementation method, and write a realization method of the infinite stream data
import java.util.Calendar

import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction.SourceFunction}

import scala.util.Random
// The generic is' SensorReading ', indicating that the event type in the generated stream is' SensorReading '
class SensorSource extends RichParallelSourceFunction[SensorReading] {
  // Indicates whether the data source is running properly
  var running: Boolean = true

  // The context parameter is used to emit data
  override def run(ctx: SourceContext[SensorReading) :Unit = {
    val rand = new Random

    var curFTemp = (1 to 10).map(
      // Use gaussian noise to generate random temperature values
      i => ("sensor_" + i, (rand.nextGaussian() * 20)))// Generate unlimited data flow
    while (running) {
      curFTemp = curFTemp.map(
        t => (t._1, t._2 + (rand.nextGaussian() * 0.5)))// Generate a timestamp in ms
      val curTime = Calendar.getInstance.getTimeInMillis

      // Use the COLLECT method of CTX parameters to emit sensor data
      curFTemp.foreach(t => ctx.collect(SensorReading(t._1, curTime, t._2)))

      // Send sensor data every 100ms
      Thread.sleep(1000)}}// define the need to close the data source when canceling the flink task
  override def cancel() :Unit = running = false
}
Copy the code
  1. Define a test class and test it
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._

object MapExample {
  def main(args: Array[String) :Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // Add data source
    val stream : DataStream[SensorReading] = env.addSource(new SensorSource)

    val mapped1 : DataStream[String] = stream.map(r => r.id)

    val mapped2 : DataStream[String] = stream.map(new MyMapFunction)

    val mapped3 : DataStream[String] = stream
      .map(new MapFunction[SensorReading.String] {
        override def map(value: SensorReading) :String = value.id
      })

    mapped1.print()
    mapped2.print()
    mapped3.print()

    env.execute()
  }

  // Type generics: SensorReading; Output generics: String;
  class MyMapFunction extends MapFunction[SensorReading.String] {
    override def map(value: SensorReading) :String = value.id
  }
}
Copy the code