Flink supports reading and writing data to files, sockets, collections, etc. At the same time, Flink also has many connectors, such as Kafka, Hadoop, Redis, etc. These built-in Connectors are to maintain accurate primary semantics, while SparkStreaming connection Kafka, Redis and other users need to maintain accurate primary semantics. Next, let’s take a look at how Flink customizes Source. This article does not cover implementation such as checkpoint.

Custom Source

Flink provides the following three interfaces to get what you want:

  • SourceFunction: For the top-level interface of all StreamSources, a Source that directly inherits that interface cannot set the parallelism to greater than 1
/** * The run method is called when the Source is running and the cancel */ is called when the Source is finished
class MyNonParallelSource extends SourceFunction[Access]{

  private var isRunning = true
  override def run(ctx: SourceFunction.SourceContext[Access) :Unit = {
    val domains = List("ruozedata.com"."dongqiudi.com"."zhibo8.com")
    val random = new Random(a)while (isRunning) {
      val time = System.currentTimeMillis() + ""
      val domain = domains(random.nextInt(domains.length))
      val flow = random.nextInt(10000)
      1.to(10).map(x => ctx.collect(Access(time, domain, flow)))
    }
  }

  override def cancel() :Unit = {
    isRunning = false}}// The parallelism cannot be set to more than 1
val ds = env.addSource(new MyNonParallelSource)

Copy the code
  • ParallelSourceFunction: Instances that inherit this interface can set parallelism to greater than 1
class MyParallelSource extends ParallelSourceFunction[Access] {
  private var isRunning = true

  override def run(ctx: SourceFunction.SourceContext[Access) :Unit = {
    val domains = List("ruozedata.com"."dongqiudi.com"."zhibo8.com")
    val random = new Random(a)while (isRunning) {
      val time = System.currentTimeMillis() + ""
      val domain = domains(random.nextInt(domains.length))
      val flow = random.nextInt(10000)
      1.to(10).map(x => ctx.collect(Access(time, domain, flow)))
    }
  }

  override def cancel() :Unit = {
    isRunning = false}}// Custom generated data, parallelism can be set to greater than 1
val ds2 = env.addSource(new MyParallelSource)

Copy the code
  • RichParallelSourceFunction: inherit the interface is not only set the parallelism is greater than 1 function and can achieve life cycle such as: the open and close
/** * MySQL as Source for FLink */
class MySQLSource extends RichParallelSourceFunction[City]{

  private var conn:Connection = _
  private var state:PreparedStatement = _

  override def open(parameters: Configuration) :Unit = {
    val url = "jdbc:mysql://localhost:3306/g7"
    val user = "ruoze"
    val password = "ruozedata"
    conn = MySQLUtil.getConnection(url,user,password)
  }

  override def close() :Unit = {
    MySQLUtil.close(conn,state)
  }

  override def run(ctx: SourceFunction.SourceContext[City) :Unit = {
    val sql = "select * from city_info"
    state = conn.prepareStatement(sql)
    val rs = state.executeQuery()
    while(rs.next()){
      val id = rs.getInt(1)
      val name = rs.getString(2)
      val area = rs.getString(3)
      ctx.collect(City(id,name,area))
    }
  }

  override def cancel() :Unit= {}}// Select data from MySQL with parallelism greater than 1
val ds3 = env.addSource(new MySQLSource)
Copy the code

Kafka Source Connector

Flink supports all versions of Kafka. The corresponding table for versions and JARS is as follows:

Maven Dependency Consumer and

Producer Class name
Kafka Version
Flink connector – kafka – 0.8 _2. 11 FlinkKafkaConsumer08

FlinkKafkaProducer08
X 0.8.
Flink connector – kafka – 0.9 _2. 11 FlinkKafkaConsumer09

FlinkKafkaProducer09
X 0.9.
Flink connector – kafka – 0.10 _2. 11 FlinkKafkaConsumer010

FlinkKafkaProducer010
X 0.10.
Flink connector – kafka – 0.11 _2. 11 FlinkKafkaConsumer011

FlinkKafkaProducer011
X 0.11.
Flink – connector – kafka_2. 11 FlinkKafkaConsumer

FlinkKafkaProducer
> = 1.0.0

You import different Kafka Connector dependencies depending on your Kafka version. For example, my Kafka version is 4.1.0. Pom.xml adds the following dependencies:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>Flink - connector - kafka_2. 11</artifactId>
  <version>1.9.0</version>
</dependency>
Copy the code

Kafka Source 4.1.0 Kafka Source 4.1.0

val properties = new Properties()
properties.setProperty("bootstrap.servers"."localhost:9092")
// only required for Kafka 0.8
// Only Kafka 0.8 will use ZK
// properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id"."test")
stream = env
    .addSource(new FlinkKafkaConsumer[String] ("topic".new SimpleStringSchema(), properties))
    .print()
Copy the code

Flink Kafka also provides the following way to configure which Kafka partitions position to start with:

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val myConsumer = new FlinkKafkaConsumer08[String] (...). myConsumer.setStartFromEarliest()// start from the earliest record possible
myConsumer.setStartFromLatest()        // start from the latest record
myConsumer.setStartFromTimestamp(...)  // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets()  // the default behaviour

val stream = env.addSource(myConsumer)
...
Copy the code
  • SetStartFromGroupOffsets (default) : Reads data from the returned partitions offset submitted by the consumer in brokers(or Zookeeper 0.8), or from the earliest location if offset is not found
  • SetStartFromEarliest ()/setStartFromLatest() : Reads data from the earliest/latest position, in which the offset submitted to Kafka and any data intended to read from it are ignored
  • SetStartFromTimestamp (long) : Data is read from the specified timestamp. For every partitions, records greater than or equal to the specified timestamp will be read. If no records are greater than or equal to the specified timestamp, the data will be read from the latest location. Offsets submitted to Kafka and data intended to be read from them are ignored

Of course, you can also specify each partition offset to read data directly. In the following example, the data is retrieved from three partitions of topic myTopic at different offsets. If the specified offset is not found in the partition, Falls back to the default setStartFromGroupOffsets.

val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
specificStartOffsets.put(new KafkaTopicPartition("myTopic".0), 23L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic".1), 31L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic".2), 43L)

myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
Copy the code

Note ⚠ ️ : The above configuration does not affect the position of the read data when the job fails automatically or manually from SavePoint, because the read position starts from the offset stored in savePoint and checkpoint. We’ll talk about checkpoint and SavePoint in a future article.

Custom Sink

Flink supports writing data in the following ways:

  • WriteAsText: Writes data in text format
  • WriteAsCsv: writes data in CSV format
  • Print ()/printToErr() : Prints the toString value of each element on the standard output/standard error stream. You can provide the prefix MSG, which precedes the output, to help distinguish between different print requests. If the parallelism is greater than 1, the output will also be preceded by the identifier of the task that produced the output
  • WriteUsingOutputFormat () : Custom file output base class
  • WriteToSocket: Writes serialized data to the socket
  • AddSink: Call the custom Sink method. Flink has some built-in connectors that have already written output to other systems (such as Kafka)

How do we customize a Sink? The answer is to look at the official implementation of the writeAsText method, it is easy to see the following source code, we see the familiar addSink method, where OutputFormatSinkFunction is we need to follow the implementation of the example.

@PublicEvolving
	public DataStreamSink<T> writeUsingOutputFormat(OutputFormat<T> format) {
		return addSink(new OutputFormatSinkFunction<>(format));
	}
Copy the code

Go ahead and open the source code, knowing that OutputFormatSinkFunction takes the specified OutputFormat to output. RichSinkFunction appears in the definition of the class, which is the key for us to customize Sink.

@PublicEvolving
@Deprecated
public class OutputFormatSinkFunction<IN> extends RichSinkFunction<IN> implements InputTypeConfigurable {
Copy the code

As with Source, SinkFunction is a base class of RichSinkFunction, but it is not implemented directly. Instead, it is a real RichSinkFunction, because it has lifecycle functions that can share the same connection in multithreaded environments, such as the following custom output to MySQL:

class MySQLRichSink extends RichSinkFunction[Access]{

  private var conn:Connection = _
  private var state:PreparedStatement = _
  private val sql = "INSERT INTO access_log (time, domain,flow) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE flow = ?"
  override def open(parameters: Configuration) :Unit = {
    val url = "jdbc:mysql://localhost:3306/g7"
    val user = "ruoze"
    val password = "ruozedata"
    conn = MySQLUtil.getConnection(url,user,password)
  }

  override def close() :Unit = {
    MySQLUtil.close(conn,state)
  }

  override def invoke(value: Access, context: SinkFunction.Context[_]) :Unit = {
    state = conn.prepareStatement(sql)
    state.setString(1,value.time)
    state.setString(2,value.domain)
    state.setInt(3,value.flow)
    state.setInt(4,value.flow)
    state.executeUpdate()
  }
Copy the code

Also note ⚠️ : none of the above output is checkpoint. It is not accurate at least once, and semantics are maintained at least once. The data may be duplicated after the job restarts.

Redis Connector

Flink builds a built-in Connector that writes to Redis. Before using it, add the following dependencies to pom.xml:

<dependency>
   <groupId>org.apache.bahir</groupId>
   <artifactId>flink-connector-redis_${scala.binary.version}</artifactId>
   <version>1.0</version>
</dependency>
Copy the code

It can communicate with different types of Redis environments using three different methods: single Redis server, Redis cluster, and Redis Sentinel. The following examples are for a single Redis server:

// The Redis version is the latest
class RedisSinkMapper extends RedisMapper[(String.Int)]{

  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET."traffic")}override def getKeyFromData(data: (String.Int)) :String = data._1

  override def getValueFromData(data: (String.Int)) :String = data._2 + ""
}

val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").build()
env.readTextFile("input/access.log")
  .map(x => {
    val splits = x.split(",")
    val domain = splits(1)
    val flow = splits(2).toInt
    (domain, flow)
  }).keyBy(0).sum(1)
  .addSink(new RedisSink[(String.Int)](conf, new RedisSinkMapper))
Copy the code

The sample code and data mentioned above have been uploaded to Github. If necessary, please clone directly to the local computer and run directly, 🔗 link is as follows: github.com/liverrrr/fl… If there are mistakes or mistakes above, 👏 welcome you to point out the comments below, we exchange and learn together 📖