sequence
This article focuses on Flink’s Table Formats
The instance
CSV Format
.withFormat(
new Csv()
.field("field1", Types.STRING) // required: ordered format fields
.field("field2", Types.TIMESTAMP)
.fieldDelimiter(",") // optional: string delimiter "," by default
.lineDelimiter("\n") // optional: string delimiter "\n" by default
.quoteCharacter('"') // optional: single character for string values, empty by default
.commentPrefix(The '#') // optional: string to indicate comments, empty by default
.ignoreFirstLine() // optional: ignore the first line, by default it is not skipped
.ignoreParseErrors() // optional: skip records with parse error instead of failing by default
)
Copy the code
- Flink supports CSV format built-in without additional dependencies
JSON Format
.withFormat(
new Json()
.failOnMissingField(true) // optional: flag whether to fail if a field is missing or not, false by default
// required: define the schema either by using type information whichparses numbers to corresponding types .schema(Type.ROW(...) ) // or by using a JSON schemawhich parses to DECIMAL and TIMESTAMP
.jsonSchema(
"{" +
" type: 'object'," +
" properties: {" +
" lon: {" +
" type: 'number'" +
"}," +
" rideTime: {" +
" type: 'string'," +
" format: 'date-time'" +
"}" +
"}" +
"}"
)
// or use the table's schema .deriveSchema() )Copy the code
- Json formats can be defined using Schema or jsonSchema or deriveSchema, with additional Flink-JSON dependencies
Apache Avro Format
.withFormat(
new Avro()
// required: define the schema either by using an Avro specific record class
.recordClass(User.class)
// or by using an Avro schema
.avroSchema(
"{" +
" \"type\": \"record\"," +
" \"name\": \"test\"," +
" \"fields\" : [" +
" {\"name\": \"a\", \"type\": \"long\"}," +
" {\"name\": \"b\", \"type\": \"string\"}" +
"]" +
"}"))Copy the code
- Avro schema can be defined using recordClass or Avro Schema, and flink-Avro dependencies need to be added
ConnectTableDescriptor
Flink – table_2. 11-1.7.1 – sources. The jar! /org/apache/flink/table/descriptors/ConnectTableDescriptor.scala
abstract class ConnectTableDescriptor[D <: ConnectTableDescriptor[D]](
private val tableEnv: TableEnvironment,
private val connectorDescriptor: ConnectorDescriptor)
extends TableDescriptor
with SchematicDescriptor[D]
with RegistrableDescriptor { this: D =>
private var formatDescriptor: Option[FormatDescriptor] = None
private var schemaDescriptor: Option[Schema] = None
//......
override def withFormat(format: FormatDescriptor): D = {
formatDescriptor = Some(format)
this
}
//......
}
Copy the code
- StreamTableEnvironment’s connect method creates a StreamTableDescriptor; StreamTableDescriptor inherits ConnectTableDescriptor; ConnectTableDescriptor provides a withFormat method that returns the FormatDescriptor
FormatDescriptor
Flink – table – common – 1.7.1 – sources. The jar! /org/apache/flink/table/descriptors/FormatDescriptor.java
@PublicEvolving
public abstract class FormatDescriptor extends DescriptorBase implements Descriptor {
private String type;
private int version;
/**
* Constructs a {@link FormatDescriptor}.
*
* @param type string that identifies this format
* @param version property version for backwards compatibility
*/
public FormatDescriptor(String type, int version) {
this.type = type;
this.version = version;
}
@Override
public final Map<String, String> toProperties() {
final DescriptorProperties properties = new DescriptorProperties();
properties.putString(FormatDescriptorValidator.FORMAT_TYPE, type);
properties.putInt(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, version);
properties.putProperties(toFormatProperties());
return properties.asMap();
}
/**
* Converts this descriptor into a set of format properties. Usually prefixed with
* {@link FormatDescriptorValidator#FORMAT}.
*/
protected abstract Map<String, String> toFormatProperties();
}
Copy the code
- FormatDescriptor is an abstract class; Csv, Json, and Avro are subclasses of it
Csv
Flink – table_2. 11-1.7.1 – sources. The jar! /org/apache/flink/table/descriptors/Csv.scala
class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) {
private var fieldDelim: Option[String] = None
private var lineDelim: Option[String] = None
private val schema: mutable.LinkedHashMap[String, String] =
mutable.LinkedHashMap[String, String]()
private var quoteCharacter: Option[Character] = None
private var commentPrefix: Option[String] = None
private var isIgnoreFirstLine: Option[Boolean] = None
private var lenient: Option[Boolean] = None
def fieldDelimiter(delim: String): Csv = {
this.fieldDelim = Some(delim)
this
}
def lineDelimiter(delim: String): Csv = {
this.lineDelim = Some(delim)
this
}
def schema(schema: TableSchema): Csv = {
this.schema.clear()
schema.getFieldNames.zip(schema.getFieldTypes).foreach { case (n, t) =>
field(n, t)
}
this
}
def field(fieldName: String, fieldType: TypeInformation[_]): Csv = {
field(fieldName, TypeStringUtils.writeTypeInfo(fieldType))
this
}
def field(fieldName: String, fieldType: String): Csv = {
if (schema.contains(fieldName)) {
throw new ValidationException(s"Duplicate field name $fieldName.")
}
schema += (fieldName -> fieldType)
this
}
def quoteCharacter(quote: Character): Csv = {
this.quoteCharacter = Option(quote)
this
}
def commentPrefix(prefix: String): Csv = {
this.commentPrefix = Option(prefix)
this
}
def ignoreFirstLine(): Csv = {
this.isIgnoreFirstLine = Some(true)
this
}
def ignoreParseErrors(): Csv = {
this.lenient = Some(true)
this
}
override protected def toFormatProperties: util.Map[String, String] = {
val properties = new DescriptorProperties()
fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _))
lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _))
val subKeys = util.Arrays.asList(
DescriptorProperties.TABLE_SCHEMA_NAME,
DescriptorProperties.TABLE_SCHEMA_TYPE)
val subValues = schema.map(e => util.Arrays.asList(e._1, e._2)).toList.asJava
properties.putIndexedFixedProperties(
FORMAT_FIELDS,
subKeys,
subValues)
quoteCharacter.foreach(properties.putCharacter(FORMAT_QUOTE_CHARACTER, _))
commentPrefix.foreach(properties.putString(FORMAT_COMMENT_PREFIX, _))
isIgnoreFirstLine.foreach(properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, _))
lenient.foreach(properties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, _))
properties.asMap()
}
}
Copy the code
- Csv provides methods such as Field, fieldDelimiter, lineDelimiter, quoteCharacter, commentPrefix, ignoreFirstLine, and ignoreParseErrors
Json
Flink – json – 1.7.1 – sources jar! /org/apache/flink/table/descriptors/Json.java
public class Json extends FormatDescriptor {
private Boolean failOnMissingField;
private Boolean deriveSchema;
private String jsonSchema;
private String schema;
public Json() {
super(FORMAT_TYPE_VALUE, 1);
}
public Json failOnMissingField(boolean failOnMissingField) {
this.failOnMissingField = failOnMissingField;
return this;
}
public Json jsonSchema(String jsonSchema) {
Preconditions.checkNotNull(jsonSchema);
this.jsonSchema = jsonSchema;
this.schema = null;
this.deriveSchema = null;
return this;
}
public Json schema(TypeInformation<Row> schemaType) {
Preconditions.checkNotNull(schemaType);
this.schema = TypeStringUtils.writeTypeInfo(schemaType);
this.jsonSchema = null;
this.deriveSchema = null;
return this;
}
public Json deriveSchema() {
this.deriveSchema = true;
this.schema = null;
this.jsonSchema = null;
return this;
}
@Override
protected Map<String, String> toFormatProperties() {
final DescriptorProperties properties = new DescriptorProperties();
if(deriveSchema ! = null) { properties.putBoolean(FORMAT_DERIVE_SCHEMA, deriveSchema); }if(jsonSchema ! = null) { properties.putString(FORMAT_JSON_SCHEMA, jsonSchema); }if(schema ! = null) { properties.putString(FORMAT_SCHEMA, schema); }if(failOnMissingField ! = null) { properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, failOnMissingField); }returnproperties.asMap(); }}Copy the code
- Json provides schema, jsonSchema, and deriveSchema to define THE Json format
Avro
Flink – avro – 1.7.1 – sources jar! /org/apache/flink/table/descriptors/Avro.java
public class Avro extends FormatDescriptor {
private Class<? extends SpecificRecord> recordClass;
private String avroSchema;
public Avro() {
super(AvroValidator.FORMAT_TYPE_VALUE, 1);
}
public Avro recordClass(Class<? extends SpecificRecord> recordClass) {
Preconditions.checkNotNull(recordClass);
this.recordClass = recordClass;
return this;
}
public Avro avroSchema(String avroSchema) {
Preconditions.checkNotNull(avroSchema);
this.avroSchema = avroSchema;
return this;
}
@Override
protected Map<String, String> toFormatProperties() {
final DescriptorProperties properties = new DescriptorProperties();
if(null ! = recordClass) { properties.putClass(AvroValidator.FORMAT_RECORD_CLASS, recordClass); }if(null ! = avroSchema) { properties.putString(AvroValidator.FORMAT_AVRO_SCHEMA, avroSchema); }returnproperties.asMap(); }}Copy the code
- Avro provides two ways to define Avro format: recordClass and Avro Schema
summary
- StreamTableEnvironment’s connect method creates a StreamTableDescriptor; StreamTableDescriptor inherits ConnectTableDescriptor
- ConnectTableDescriptor provides a withFormat method that returns the FormatDescriptor; FormatDescriptor is an abstract class; Csv, Json, and Avro are subclasses of it
- Csv provides methods such as Field, fieldDelimiter, lineDelimiter, quoteCharacter, commentPrefix, ignoreFirstLine, and ignoreParseErrors. Json provides schema, jsonSchema, and deriveSchema to define Json format. Avro provides two ways to define Avro format: recordClass and Avro Schema
doc
- Table Formats