
This article focuses on Flink's Table Formats

The instance

CSV Format

  new Csv()
    .field("field1", Types.STRING)    // required: ordered format fields
    .field("field2", Types.TIMESTAMP)
    .fieldDelimiter(",")              // optional: string delimiter "," by default
    .lineDelimiter("\n")              // optional: string delimiter "\n" by default
    .quoteCharacter('"')              // optional: single character for string values, empty by default
    .commentPrefix('#')               // optional: string to indicate comments, empty by default
    .ignoreFirstLine()                // optional: ignore the first line, by default it is not skipped
    .ignoreParseErrors()              // optional: skip records with parse error instead of failing by default
  • Flink supports CSV format built-in without additional dependencies

JSON Format

  new Json()
    .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default

    // required: define the schema either by using type information whichparses numbers to corresponding types .schema(Type.ROW(...) ) // or by using a JSON schemawhich parses to DECIMAL and TIMESTAMP
      "{" +
      " type: 'object'," +
      " properties: {" +
      " lon: {" +
      " type: 'number'" +
      "}," +
      " rideTime: {" +
      " type: 'string'," +
      " format: 'date-time'" +
      "}" +
      "}" +

    // or use the table's schema .deriveSchema() )
  • Json formats can be defined using Schema or jsonSchema or deriveSchema, with additional Flink-JSON dependencies

Apache Avro Format

  new Avro()

    // required: define the schema either by using an Avro specific record class

    // or by using an Avro schema
      "{" +
      " \"type\": \"record\"," +
      " \"name\": \"test\"," +
      " \"fields\" : [" +
      " {\"name\": \"a\", \"type\": \"long\"}," +
      " {\"name\": \"b\", \"type\": \"string\"}" +
      "]" +
      "}"))Copy the code
  • Avro schema can be defined using recordClass or Avro Schema, and flink-Avro dependencies need to be added


Flink – table_2. 11-1.7.1 – sources. The jar! /org/apache/flink/table/descriptors/ConnectTableDescriptor.scala

abstract class ConnectTableDescriptor[D <: ConnectTableDescriptor[D]](
    private val tableEnv: TableEnvironment,
    private val connectorDescriptor: ConnectorDescriptor)
  extends TableDescriptor
  with SchematicDescriptor[D]
  with RegistrableDescriptor { this: D =>

  private var formatDescriptor: Option[FormatDescriptor] = None
  private var schemaDescriptor: Option[Schema] = None


  override def withFormat(format: FormatDescriptor): D = {
    formatDescriptor = Some(format)

  • StreamTableEnvironment’s connect method creates a StreamTableDescriptor; StreamTableDescriptor inherits ConnectTableDescriptor; ConnectTableDescriptor provides a withFormat method that returns the FormatDescriptor


Flink – table – common – 1.7.1 – sources. The jar! /org/apache/flink/table/descriptors/

public abstract class FormatDescriptor extends DescriptorBase implements Descriptor {

	private String type;

	private int version;

	 * Constructs a {@link FormatDescriptor}.
	 * @param type string that identifies this format
	 * @param version property version for backwards compatibility
	public FormatDescriptor(String type, int version) {
		this.type = type;
		this.version = version;

	public final Map<String, String> toProperties() {
		final DescriptorProperties properties = new DescriptorProperties();
		properties.putString(FormatDescriptorValidator.FORMAT_TYPE, type);
		properties.putInt(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, version);
		return properties.asMap();

	 * Converts this descriptor into a set of format properties. Usually prefixed with
	 * {@link FormatDescriptorValidator#FORMAT}.
	protected abstract Map<String, String> toFormatProperties();
  • FormatDescriptor is an abstract class; Csv, Json, and Avro are subclasses of it


Flink – table_2. 11-1.7.1 – sources. The jar! /org/apache/flink/table/descriptors/Csv.scala

class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {

  private var fieldDelim: Option[String] = None
  private var lineDelim: Option[String] = None
  private val schema: mutable.LinkedHashMap[String, String] =
    mutable.LinkedHashMap[String, String]()
  private var quoteCharacter: Option[Character] = None
  private var commentPrefix: Option[String] = None
  private var isIgnoreFirstLine: Option[Boolean] = None
  private var lenient: Option[Boolean] = None

  def fieldDelimiter(delim: String): Csv = {
    this.fieldDelim = Some(delim)

  def lineDelimiter(delim: String): Csv = {
    this.lineDelim = Some(delim)

  def schema(schema: TableSchema): Csv = {
    this.schema.clear() { case (n, t) =>
      field(n, t)

  def field(fieldName: String, fieldType: TypeInformation[_]): Csv = {
    field(fieldName, TypeStringUtils.writeTypeInfo(fieldType))

  def field(fieldName: String, fieldType: String): Csv = {
    if (schema.contains(fieldName)) {
      throw new ValidationException(s"Duplicate field name $fieldName.")
    schema += (fieldName -> fieldType)

  def quoteCharacter(quote: Character): Csv = {
    this.quoteCharacter = Option(quote)

  def commentPrefix(prefix: String): Csv = {
    this.commentPrefix = Option(prefix)

  def ignoreFirstLine(): Csv = {
    this.isIgnoreFirstLine = Some(true)

  def ignoreParseErrors(): Csv = {
    this.lenient = Some(true)

  override protected def toFormatProperties: util.Map[String, String] = {
    val properties = new DescriptorProperties()

    fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _))
    lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _))

    val subKeys = util.Arrays.asList(

    val subValues = => util.Arrays.asList(e._1, e._2)).toList.asJava

    quoteCharacter.foreach(properties.putCharacter(FORMAT_QUOTE_CHARACTER, _))
    commentPrefix.foreach(properties.putString(FORMAT_COMMENT_PREFIX, _))
    isIgnoreFirstLine.foreach(properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, _))
    lenient.foreach(properties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, _))

  • Csv provides methods such as Field, fieldDelimiter, lineDelimiter, quoteCharacter, commentPrefix, ignoreFirstLine, and ignoreParseErrors


Flink – json – 1.7.1 – sources jar! /org/apache/flink/table/descriptors/

public class Json extends FormatDescriptor {

	private Boolean failOnMissingField;
	private Boolean deriveSchema;
	private String jsonSchema;
	private String schema;

	public Json() {
		super(FORMAT_TYPE_VALUE, 1);

	public Json failOnMissingField(boolean failOnMissingField) {
		this.failOnMissingField = failOnMissingField;
		return this;

	public Json jsonSchema(String jsonSchema) {
		this.jsonSchema = jsonSchema;
		this.schema = null;
		this.deriveSchema = null;
		return this;

	public Json schema(TypeInformation<Row> schemaType) {
		this.schema = TypeStringUtils.writeTypeInfo(schemaType);
		this.jsonSchema = null;
		this.deriveSchema = null;
		return this;

	public Json deriveSchema() {
		this.deriveSchema = true;
		this.schema = null;
		this.jsonSchema = null;
		return this;

	protected Map<String, String> toFormatProperties() {
		final DescriptorProperties properties = new DescriptorProperties();

		if(deriveSchema ! = null) { properties.putBoolean(FORMAT_DERIVE_SCHEMA, deriveSchema); }if(jsonSchema ! = null) { properties.putString(FORMAT_JSON_SCHEMA, jsonSchema); }if(schema ! = null) { properties.putString(FORMAT_SCHEMA, schema); }if(failOnMissingField ! = null) { properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, failOnMissingField); }returnproperties.asMap(); }}Copy the code
  • Json provides schema, jsonSchema, and deriveSchema to define THE Json format


Flink – avro – 1.7.1 – sources jar! /org/apache/flink/table/descriptors/

public class Avro extends FormatDescriptor {

	private Class<? extends SpecificRecord> recordClass;
	private String avroSchema;

	public Avro() {
		super(AvroValidator.FORMAT_TYPE_VALUE, 1);

	public Avro recordClass(Class<? extends SpecificRecord> recordClass) {
		this.recordClass = recordClass;
		return this;

	public Avro avroSchema(String avroSchema) {
		this.avroSchema = avroSchema;
		return this;

	protected Map<String, String> toFormatProperties() {
		final DescriptorProperties properties = new DescriptorProperties();

		if(null ! = recordClass) { properties.putClass(AvroValidator.FORMAT_RECORD_CLASS, recordClass); }if(null ! = avroSchema) { properties.putString(AvroValidator.FORMAT_AVRO_SCHEMA, avroSchema); }returnproperties.asMap(); }}Copy the code
  • Avro provides two ways to define Avro format: recordClass and Avro Schema


  • Table Formats