sequence

This paper mainly studies flink’s JDBCOutputFormat

JDBCOutputFormat

Flink – jdbc_2. 11-1.7.0 – sources. The jar! /org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java

/**
 * OutputFormat to write Rows into a JDBC database.
 * The OutputFormat has to be configured using the supplied OutputFormatBuilder.
 *
 * @see Row
 * @see DriverManager
 */
public class JDBCOutputFormat extends RichOutputFormat<Row> {
	private static final long serialVersionUID = 1L;
	static final int DEFAULT_BATCH_INTERVAL = 5000;

	private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);

	private String username;
	private String password;
	private String drivername;
	private String dbURL;
	private String query;
	private int batchInterval = DEFAULT_BATCH_INTERVAL;

	private Connection dbConn;
	private PreparedStatement upload;

	private int batchCount = 0;

	private int[] typesArray;

	public JDBCOutputFormat() {
	}

	@Override
	public void configure(Configuration parameters) {
	}

	/**
	 * Connects to the target database and initializes the prepared statement.
	 *
	 * @param taskNumber The number of the parallel instance.
	 * @throws IOException Thrown, if the output could not be opened due to an
	 * I/O problem.
	 */
	@Override
	public void open(int taskNumber, int numTasks) throws IOException {
		try {
			establishConnection();
			upload = dbConn.prepareStatement(query);
		} catch (SQLException sqe) {
			throw new IllegalArgumentException("open() failed.", sqe);
		} catch (ClassNotFoundException cnfe) {
			throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
		}
	}

	private void establishConnection() throws SQLException, ClassNotFoundException {
		Class.forName(drivername);
		if (username == null) {
			dbConn = DriverManager.getConnection(dbURL);
		} else {
			dbConn = DriverManager.getConnection(dbURL, username, password);
		}
	}

	/**
	 * Adds a record to the prepared statement.
	 *
	 * <p>When this method is called, the output format is guaranteed to be opened.
	 *
	 * <p>WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
	 * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null)) * * @param row The records to add to the output. * @see PreparedStatement * @throws IOException Thrown, if the records could not be added due to an I/O problem. */ @Override public void writeRecord(Row row) throws IOException { if (typesArray ! = null && typesArray.length > 0 && typesArray.length ! = row.getArity()) { LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array..."); } try { if (typesArray == null) { // no types provided for (int index = 0; index < row.getArity(); index++) { LOG.warn("Unknown column type for column {}. Best effort approach to set its value: {}.", index + 1, row.getField(index));
					upload.setObject(index + 1, row.getField(index));
				}
			} else {
				// types provided
				for (int index = 0; index < row.getArity(); index++) {

					if (row.getField(index) == null) {
						upload.setNull(index + 1, typesArray[index]);
					} else {
						// casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
						switch (typesArray[index]) {
							case java.sql.Types.NULL:
								upload.setNull(index + 1, typesArray[index]);
								break;
							case java.sql.Types.BOOLEAN:
							case java.sql.Types.BIT:
								upload.setBoolean(index + 1, (boolean) row.getField(index));
								break;
							case java.sql.Types.CHAR:
							case java.sql.Types.NCHAR:
							case java.sql.Types.VARCHAR:
							case java.sql.Types.LONGVARCHAR:
							case java.sql.Types.LONGNVARCHAR:
								upload.setString(index + 1, (String) row.getField(index));
								break;
							case java.sql.Types.TINYINT:
								upload.setByte(index + 1, (byte) row.getField(index));
								break;
							case java.sql.Types.SMALLINT:
								upload.setShort(index + 1, (short) row.getField(index));
								break;
							case java.sql.Types.INTEGER:
								upload.setInt(index + 1, (int) row.getField(index));
								break;
							case java.sql.Types.BIGINT:
								upload.setLong(index + 1, (long) row.getField(index));
								break;
							case java.sql.Types.REAL:
								upload.setFloat(index + 1, (float) row.getField(index));
								break;
							case java.sql.Types.FLOAT:
							case java.sql.Types.DOUBLE:
								upload.setDouble(index + 1, (double) row.getField(index));
								break;
							case java.sql.Types.DECIMAL:
							case java.sql.Types.NUMERIC:
								upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.getField(index));
								break;
							case java.sql.Types.DATE:
								upload.setDate(index + 1, (java.sql.Date) row.getField(index));
								break;
							case java.sql.Types.TIME:
								upload.setTime(index + 1, (java.sql.Time) row.getField(index));
								break;
							case java.sql.Types.TIMESTAMP:
								upload.setTimestamp(index + 1, (java.sql.Timestamp) row.getField(index));
								break;
							case java.sql.Types.BINARY:
							case java.sql.Types.VARBINARY:
							case java.sql.Types.LONGVARBINARY:
								upload.setBytes(index + 1, (byte[]) row.getField(index));
								break;
							default:
								upload.setObject(index + 1, row.getField(index));
								LOG.warn("Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.", typesArray[index], index + 1, row.getField(index)); // case java.sql.Types.SQLXML // case java.sql.Types.ARRAY: // case java.sql.Types.JAVA_OBJECT: // case java.sql.Types.BLOB: // case java.sql.Types.CLOB: // case java.sql.Types.NCLOB: // case java.sql.Types.DATALINK: // case java.sql.Types.DISTINCT: // case java.sql.Types.OTHER: // case java.sql.Types.REF: // case java.sql.Types.ROWID: // case java.sql.Types.STRUC } } } } upload.addBatch(); batchCount++; } catch (SQLException e) { throw new RuntimeException("Preparation of JDBC statement failed.", e); } if (batchCount >= batchInterval) { // execute batch flush(); } } void flush() { try { upload.executeBatch(); batchCount = 0; } catch (SQLException e) { throw new RuntimeException("Execution of JDBC statement failed.", e); } } int[] getTypesArray() { return typesArray; } /** * Executes prepared statement and closes all resources of this instance. * * @throws IOException Thrown, if the input could not be closed properly. */ @Override public void close() throws IOException { if (upload ! = null) { flush(); // close the connection try { upload.close(); } catch (SQLException e) { LOG.info("JDBC statement could not be closed: " + e.getMessage()); } finally { upload = null; } } if (dbConn ! = null) { try { dbConn.close(); } catch (SQLException se) { LOG.info("JDBC connection could not be closed: " + se.getMessage()); } finally { dbConn = null; } } } public static JDBCOutputFormatBuilder buildJDBCOutputFormat() { return new JDBCOutputFormatBuilder(); } / /... }Copy the code
  • JDBCOutputFormat inherited RichOutputFormat, generic here is org. Apache. Flink. Types. The Row
  • In open, establishConnection is called to load the driver, dbConn is initialized, and dbconn.prepareStatement (query) is called to get the upload(PreparedStatement)
  • The writeRecord method determines whether typesArray is provided, if not, it uses setObject to set the value, and if not, it converts the value based on the corresponding type. There are many Types in java.sql.Types that are supported
  • WriteRecord takes a PreparedStatement addBatch operation, when batchCount greater than or equal to batchInterval (The default is 5000), can perform flush operation, that is, call a PreparedStatement. ExecuteBatch method, and then reset batchCount; In case the data does not reach the batchInterval and fails to commit, the PreparedStatement and Connection are flushed again on close and then closed
  • JDBCOutputFormat provides a JDBCOutputFormatBuilder that can be used to easily build a JDBCOutputFormat

Row

Flink – core – 1.7.0 – sources jar! /org/apache/flink/types/Row.java

/**
 * A Row can have arbitrary number of fields and contain a set of fields, which may all be
 * different types. The fields in Row can be null. Due to Row is not strongly typed, Flink's * type extraction mechanism can't extract correct field types. So that users should manually
 * tell Flink the type information via creating a {@link RowTypeInfo}.
 *
 * <p>
 * The fields in the Row can be accessed by position (zero-based) {@link #getField(int)}. And can
 * set fields by {@link #setField(int, Object)}.
 * <p>
 * Row is in principle serializable. However, it may contain non-serializable fields,
 * in which case serialization will fail.
 *
 */
@PublicEvolving
public class Row implements Serializable{

	private static final long serialVersionUID = 1L;

	/** The array to store actual values. */
	private final Object[] fields;

	/**
	 * Create a new Row instance.
	 * @param arity The number of fields in the Row
	 */
	public Row(int arity) {
		this.fields = new Object[arity];
	}

	/**
	 * Get the number of fields in the Row.
	 * @return The number of fields in the Row.
	 */
	public int getArity() {
		return fields.length;
	}

	/**
	 * Gets the field at the specified position.
	 * @param pos The position of the field, 0-based.
	 * @return The field at the specified position.
	 * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
	 */
	public Object getField(int pos) {
		return fields[pos];
	}

	/**
	 * Sets the field at the specified position.
	 *
	 * @param pos The position of the field, 0-based.
	 * @param value The value to be assigned to the field at the specified position.
	 * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
	 */
	public void setField(int pos, Object value) {
		fields[pos] = value;
	}

	@Override
	public String toString() {
		StringBuilder sb = new StringBuilder();
		for (int i = 0; i < fields.length; i++) {
			if (i > 0) {
				sb.append(",");
			}
			sb.append(StringUtils.arrayAwareToString(fields[i]));
		}
		return sb.toString();
	}

	@Override
	public boolean equals(Object o) {
		if (this == o) {
			return true;
		}
		if(o == null || getClass() ! = o.getClass()) {return false;
		}

		Row row = (Row) o;

		return Arrays.deepEquals(fields, row.fields);
	}

	@Override
	public int hashCode() {
		return Arrays.deepHashCode(fields);
	}

	/**
	 * Creates a new Row and assigns the given values to the Row's fields. * This is more convenient than using the constructor. * * 

For example: * *

 * Row.of("hello", true, 1L); } * 

* instead of *

 * Row row = new Row(3); * row.setField(0, "hello"); * row.setField(1, true); * row.setField(2, 1L); * 

* */ public static Row of(Object... values) { Row row = new Row(values.length); for (int i = 0; i < values.length; i++) { row.setField(i, values[i]); } return row; } /** * Creates a new Row which copied from another row. * This method does not perform a deep copy. * * @param row The row being copied. * @return The cloned new Row */ public static Row copy(Row row) { final Row newRow = new Row(row.fields.length); System.arraycopy(row.fields, 0, newRow.fields, 0, row.fields.length); return newRow; } /** * Creates a new Row with projected fields from another row. * This method does not perform a deep copy. * * @param fields fields to be projected * @return the new projected Row */ public static Row project(Row row, int[] fields) { final Row newRow = new Row(fields.length); for (int i = 0; i < fields.length; i++) { newRow.fields[i] = row.fields[fields[i]]; } return newRow; }}Copy the code

  • Row is the writeRecord type of JDBCOutputFormat. It uses Object data to access field values and also provides static methods such as of, copy, and project

JDBCOutputFormatBuilder

Flink – jdbc_2. 11-1.7.0 – sources. The jar! /org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java

	/**
	 * Builder for a {@link JDBCOutputFormat}.
	 */
	public static class JDBCOutputFormatBuilder {
		private final JDBCOutputFormat format;

		protected JDBCOutputFormatBuilder() {
			this.format = new JDBCOutputFormat();
		}

		public JDBCOutputFormatBuilder setUsername(String username) {
			format.username = username;
			return this;
		}

		public JDBCOutputFormatBuilder setPassword(String password) {
			format.password = password;
			return this;
		}

		public JDBCOutputFormatBuilder setDrivername(String drivername) {
			format.drivername = drivername;
			return this;
		}

		public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
			format.dbURL = dbURL;
			return this;
		}

		public JDBCOutputFormatBuilder setQuery(String query) {
			format.query = query;
			return this;
		}

		public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {
			format.batchInterval = batchInterval;
			return this;
		}

		public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {
			format.typesArray = typesArray;
			return this;
		}

		/**
		 * Finalizes the configuration and checks validity.
		 *
		 * @return Configured JDBCOutputFormat
		 */
		public JDBCOutputFormat finish() {
			if (format.username == null) {
				LOG.info("Username was not supplied.");
			}
			if (format.password == null) {
				LOG.info("Password was not supplied.");
			}
			if (format.dbURL == null) {
				throw new IllegalArgumentException("No database URL supplied.");
			}
			if (format.query == null) {
				throw new IllegalArgumentException("No query supplied.");
			}
			if (format.drivername == null) {
				throw new IllegalArgumentException("No driver supplied.");
			}

			returnformat; }}Copy the code
  • JDBCOutputFormatBuilder provides builder methods for username, Password, dbURL, Query, Drivername, batchInterval, and typesArray attributes

JDBCAppendTableSink

Flink – jdbc_2. 11-1.7.0 – sources. The jar! /org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java

/**
 * An at-least-once Table sink for JDBC.
 *
 * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink (if
 * checkpointing is enabled). However, one common use case is to run idempotent queries
 * (e.g., <code>REPLACE</code> or <code>INSERT OVERWRITE</code>) to upsert into the database and
 * achieve exactly-once semantic.</p>
 */
public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {

	private final JDBCOutputFormat outputFormat;

	private String[] fieldNames;
	private TypeInformation[] fieldTypes;

	JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
		this.outputFormat = outputFormat;
	}

	public static JDBCAppendTableSinkBuilder builder() {
		return new JDBCAppendTableSinkBuilder();
	}

	@Override
	public void emitDataStream(DataStream<Row> dataStream) {
		dataStream
				.addSink(new JDBCSinkFunction(outputFormat))
				.name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
	}

	@Override
	public void emitDataSet(DataSet<Row> dataSet) {
		dataSet.output(outputFormat);
	}

	@Override
	public TypeInformation<Row> getOutputType() {
		return new RowTypeInfo(fieldTypes, fieldNames);
	}

	@Override
	public String[] getFieldNames() {
		returnfieldNames; } @Override public TypeInformation<? > []getFieldTypes() {
		returnfieldTypes; } @Override public TableSink<Row> configure(String[] fieldNames, TypeInformation<? >[] fieldTypes) { int[] types = outputFormat.getTypesArray(); String sinkSchema = String.join(",", IntStream.of(types).mapToObj(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
		String tableSchema =
			String.join(",", Stream.of(fieldTypes).map(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
		String msg = String.format("Schema of output table is incompatible with JDBCAppendTableSink schema. " +
			"Table schema: [%s], sink schema: [%s]", tableSchema, sinkSchema);

		Preconditions.checkArgument(fieldTypes.length == types.length, msg);
		for (int i = 0; i < types.length; ++i) {
			Preconditions.checkArgument(
				JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i],
				msg);
		}

		JDBCAppendTableSink copy;
		try {
			copy = new JDBCAppendTableSink(InstantiationUtil.clone(outputFormat));
		} catch (IOException | ClassNotFoundException e) {
			throw new RuntimeException(e);
		}

		copy.fieldNames = fieldNames;
		copy.fieldTypes = fieldTypes;
		return copy;
	}

	@VisibleForTesting
	JDBCOutputFormat getOutputFormat() {
		returnoutputFormat; }}Copy the code
  • JDBCAppendTableSink uses JDBCOutputFormat, which implements the AppendStreamTableSink and BatchTableSink interfaces
  • Its emitDataStream method sets the JDBCSinkFunction sink(JDBCSinkFunction); The emitDataSet method sets output to the dataSet
  • TableSink(BatchTableSink Indicates that TableSink is implementedGetOutputType, getFieldNames, getFieldTypes, configure; JDBCAppendTableSink is created based on the JDBCOutputFormat

JDBCSinkFunction

Flink – jdbc_2. 11-1.7.0 – sources. The jar! /org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java

class JDBCSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction { final JDBCOutputFormat outputFormat; JDBCSinkFunction(JDBCOutputFormat outputFormat) { this.outputFormat = outputFormat; } @Override public void invoke(Row value) throws Exception { outputFormat.writeRecord(value); } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { outputFormat.flush(); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); RuntimeContext ctx = getRuntimeContext(); outputFormat.setRuntimeContext(ctx); outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks()); } @Override public void close() throws Exception { outputFormat.close(); super.close(); }}Copy the code
  • JDBCSinkFunction inherits RichSinkFunction and also implements CheckpointedFunction interface. Invoke method using JDBCOutputFormat writeRecord method, and snapshotState is invoked the JDBCOutputFormat. Flush to submit records in a timely manner

summary

  • JDBCOutputFormat inherits RichOutputFormat, so establishConnection is called when open to load the driver, initialize dbConn, Then call dbconn.prepareStatement (query) to get the upload(PreparedStatement); WriteRecord takes a PreparedStatement addBatch operation, when batchCount greater than or equal to batchInterval (The default is 5000), can perform flush operation, that is, call a PreparedStatement. ExecuteBatch method, and then reset batchCount; In case the data does not reach the batchInterval and fails to commit, the PreparedStatement and Connection are flushed again on close and then closed
  • Row is the writeRecord type of JDBCOutputFormat, which uses Object data to access field values
  • JDBCOutputFormatBuilder provides builder methods for username, Password, dbURL, Query, Drivername, batchInterval, and typesArray attributes
  • JDBCAppendTableSink JDBCOutputFormat is used in JDBCAppendTableSink. Its emitDataStream method sets the sink of JDBCSinkFunction (JDBCSinkFunction); The emitDataSet method sets output to the dataSet
  • JDBCSinkFunction inherits RichSinkFunction and also implements CheckpointedFunction interface. Invoke method using JDBCOutputFormat writeRecord method, and snapshotState is invoked the JDBCOutputFormat. Flush to submit records in a timely manner

doc

  • JDBCOutputFormat