sequence

This paper mainly studies FLink’s CsvReader

The instance

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<RecordDto> csvInput = env.readCsvFile(csvFilePath)
                .pojoType(RecordDto.class, "playerName"."country"."year"."game"."gold"."silver"."bronze"."total");

        csvInput.map(new MapFunction<RecordDto, RecordDto>() {
            @Override
            public RecordDto map(RecordDto value) throws Exception {
                LOGGER.info("execute map:{}",value);
                TimeUnit.SECONDS.sleep(5);
                return value;
            }
        }).print();
Copy the code

ExecutionEnvironment.readCsvFile

Flink – Java – 1.6.2 – sources jar! /org/apache/flink/api/java/ExecutionEnvironment.java

	/**
	 * Creates a CSV reader to read a comma separated value (CSV) file. The reader has options to
	 * define parameters and field types and will eventually produce the DataSet that corresponds to
	 * the read and parsed CSV input.
	 *
	 * @param filePath The path of the CSV file.
	 * @return A CsvReader that can be used to configure the CSV input.
	 */
	public CsvReader readCsvFile(String filePath) {
		return new CsvReader(filePath, this);
	}
Copy the code
  • CsvReader is created based on filePath

CsvReader

Flink – Java – 1.6.2 – sources jar! /org/apache/flink/api/java/io/CsvReader.java

	public CsvReader(String filePath, ExecutionEnvironment executionContext) {
		this(new Path(Preconditions.checkNotNull(filePath, "The file path may not be null.")), executionContext);
	}

	public CsvReader(Path filePath, ExecutionEnvironment executionContext) {
		Preconditions.checkNotNull(filePath, "The file path may not be null.");
		Preconditions.checkNotNull(executionContext, "The execution context may not be null.");

		this.path = filePath;
		this.executionContext = executionContext;
	}

	/**
	 * Configures the reader to read the CSV data and parse it to the given type. The all fields of the type
	 * must be public or able to set value. The type information for the fields is obtained from the type class.
	 *
	 * @param pojoType The class of the target POJO.
	 * @param pojoFields The fields of the POJO which are mapped to CSV fields.
	 * @return The DataSet representing the parsed CSV data.
	 */
	public <T> DataSource<T> pojoType(Class<T> pojoType, String... pojoFields) {
		Preconditions.checkNotNull(pojoType, "The POJO type class must not be null.");
		Preconditions.checkNotNull(pojoFields, "POJO fields must be specified (not null) if output type is a POJO.");

		final TypeInformation<T> ti = TypeExtractor.createTypeInfo(pojoType);
		if(! (ti instanceof PojoTypeInfo)) { throw new IllegalArgumentException("The specified class is not a POJO. The type class must meet the POJO requirements. Found: " + ti);
		}
		final PojoTypeInfo<T> pti = (PojoTypeInfo<T>) ti;

		CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, pti, pojoFields, this.includedMask);

		configureInputFormat(inputFormat);

		return new DataSource<T>(executionContext, inputFormat, pti, Utils.getCallLocationName());
	}
Copy the code
  • CsvReader provides a pojoType method that maps CSV data to Java types and converts it to flink’s DataSource. When creating the DataSource, PojoCsvInputFormat and PojoTypeInfo were provided

Task

Flink – runtime_2. 11-1.6.2 – sources. The jar! /org/apache/flink/runtime/taskmanager/Task.java

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary forexample to consume input data, * produce its results (intermediate result partitions) and communicate * with the JobManager. * * <p>The Flink operators  (implemented as subclasses of * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks. * The task connects those to the network stack and actor messages, and tracks the state * of the execution and handles exceptions. * * <p>Tasks have no knowledge about how they relate to other tasks, or whether they * are the first attempt to execute the task, or a repeated attempt. All of that * is only known to the JobManager. All the task knows are its own runnable code, * the task's configuration, and the IDs of the intermediate results to consume and * produce (if any). * * 

Each Task is run by one dedicated thread. */ public class Task implements Runnable, TaskActions, CheckpointListener { //...... /** * The core work method that bootstraps the task and executes its code. */ @Override public void run() { //...... // now load and instantiate the task'

s invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if(! transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running notifyObservers(ExecutionState.RUNNING, null); taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); / /... }}Copy the code
  • Task’s run method calls invokable.invoke(), where Invokable is DataSourceTask

DataSourceTask.invoke

Flink – runtime_2. 11-1.6.2 – sources. The jar! /org/apache/flink/runtime/operators/DataSourceTask.java

	@Override
	public void invoke() throws Exception {
		// --------------------------------------------------------------------
		// Initialize
		// --------------------------------------------------------------------
		initInputFormat();

		LOG.debug(getLogString("Start registering input and output"));

		try {
			initOutputs(getUserCodeClassLoader());
		} catch (Exception ex) {
			throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " +
					ex.getMessage(), ex);
		}

		LOG.debug(getLogString("Finished registering input and output"));

		// --------------------------------------------------------------------
		// Invoke
		// --------------------------------------------------------------------
		LOG.debug(getLogString("Starting data source operator"));

		RuntimeContext ctx = createRuntimeContext();

		final Counter numRecordsOut;
		{
			Counter tmpNumRecordsOut;
			try {
				OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
				ioMetricGroup.reuseInputMetricsForTask();
				if (this.config.getNumberOfChainedStubs() == 0) {
					ioMetricGroup.reuseOutputMetricsForTask();
				}
				tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter();
			} catch (Exception e) {
				LOG.warn("An exception occurred during the metrics setup.", e);
				tmpNumRecordsOut = new SimpleCounter();
			}
			numRecordsOut = tmpNumRecordsOut;
		}
		
		Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");

		if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
			((RichInputFormat) this.format).setRuntimeContext(ctx);
			LOG.debug(getLogString("Rich Source detected. Initializing runtime context."));
			((RichInputFormat) this.format).openInputFormat();
			LOG.debug(getLogString("Rich Source detected. Opening the InputFormat."));
		}

		ExecutionConfig executionConfig = getExecutionConfig();

		boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();

		LOG.debug("DataSourceTask object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
		
		final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer();
		
		try {
			// start all chained tasks
			BatchTask.openChainedTasks(this.chainedTasks, this);
			
			// get input splits to read
			final Iterator<InputSplit> splitIterator = getInputSplits();
			
			// for each assigned input split
			while(! this.taskCanceled && splitIterator.hasNext()) { // get start and end final InputSplit split = splitIterator.next(); LOG.debug(getLogString("Opening input split " + split.toString()));
				
				final InputFormat<OT, InputSplit> format = this.format;
			
				// open input format
				format.open(split);
	
				LOG.debug(getLogString("Starting to read input from split " + split.toString()));
				
				try {
					final Collector<OT> output = new CountingCollector<>(this.output, numRecordsOut);

					if (objectReuseEnabled) {
						OT reuse = serializer.createInstance();

						// as long as there is data to read
						while(! this.taskCanceled && ! format.reachedEnd()) { OT returned;if((returned = format.nextRecord(reuse)) ! = null) { output.collect(returned); }}}else {
						// as long as there is data to read
						while(! this.taskCanceled && ! format.reachedEnd()) { OT returned;if((returned = format.nextRecord(serializer.createInstance())) ! = null) { output.collect(returned); }}}if(LOG.isDebugEnabled() && ! this.taskCanceled) { LOG.debug(getLogString("Closing input split " + split.toString()));
					}
				} finally {
					// close. We close here such that a regular close throwing an exception marks a task as failed.
					format.close();
				}
				completedSplitsCounter.inc();
			} // end for all input splits

			// close the collector. if it is a chaining task collector, it will close its chained tasks
			this.output.close();

			// close all chained tasks letting them report failure
			BatchTask.closeChainedTasks(this.chainedTasks, this);

		}
		catch (Exception ex) {
			// close the input, but do not report any exceptions, since we already have another root cause
			try {
				this.format.close();
			} catch (Throwable ignored) {}

			BatchTask.cancelChainedTasks(this.chainedTasks);

			ex = ExceptionInChainedStubException.exceptionUnwrap(ex);

			if (ex instanceof CancelTaskException) {
				// forward canceling exception
				throw ex;
			}
			else if(! this.taskCanceled) { // drop exception,if the task was canceled
				BatchTask.logAndThrowException(ex, this);
			}
		} finally {
			BatchTask.clearWriters(eventualOutputs);
			// --------------------------------------------------------------------
			// Closing
			// --------------------------------------------------------------------
			if(this.format ! = null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) { ((RichInputFormat) this.format).closeInputFormat(); LOG.debug(getLogString("Rich Source detected. Closing the InputFormat.")); }}if(! this.taskCanceled) { LOG.debug(getLogString("Finished data source operator"));
		}
		else {
			LOG.debug(getLogString("Data source operator cancelled")); }}Copy the code
  • DataSourceTask invoke method here as long as it’s not taskCanceled and format. The reachedEnd (), will be called the format. The nextRecord (serializer. CreateInstance ()) to pull the data, Then execute output.collect(returned)
  • The format is CsvInputFormat(PojoCsvInputFormat), but nextRecord and the reachedEnd method, which is the call’s parent, DelimitedInputFormat
  • PojoCsvInputFormat inherits the abstract class CsvInputFormat, which in turn inherits the abstract class GenericCsvInputFormat, GenericCsvInputFormat inherits the abstract DelimitedInputFormat class

DelimitedInputFormat

Flink – core – 1.6.2 – sources jar! /org/apache/flink/api/common/io/DelimitedInputFormat.java

	/**
	 * The default read buffer size = 1MB.
	 */
	private static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024;

	private transient byte[] readBuffer;

	private int bufferSize = -1;

	private void initBuffers() {
		this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize;

		if (this.bufferSize <= this.delimiter.length) {
			throw new IllegalArgumentException("Buffer size must be greater than length of delimiter.");
		}

		if(this.readBuffer == null || this.readBuffer.length ! = this.bufferSize) { this.readBuffer = new byte[this.bufferSize]; }if (this.wrapBuffer == null || this.wrapBuffer.length < 256) {
			this.wrapBuffer = new byte[256];
		}

		this.readPos = 0;
		this.limit = 0;
		this.overLimit = false;
		this.end = false;
	}

	/**
	 * Checks whether the current split is at its end.
	 * 
	 * @return True, if the split is at its end, false otherwise.
	 */
	@Override
	public boolean reachedEnd() {
		return this.end;
	}
	
	@Override
	public OT nextRecord(OT record) throws IOException {
		if (readLine()) {
			return readRecord(record, this.currBuffer, this.currOffset, this.currLen);
		} else {
			this.end = true;
			return null;
		}
	}

	/**
	 * Fills the read buffer with bytes read from the file starting from an offset.
	 */
	private boolean fillBuffer(int offset) throws IOException {
		int maxReadLength = this.readBuffer.length - offset;
		// special case for reading the whole split.
		if (this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
			int read = this.stream.read(this.readBuffer, offset, maxReadLength);
			if (read == -1) {
				this.stream.close();
				this.stream = null;
				return false;
			} else {
				this.readPos = offset;
				this.limit = read;
				return true; / /}}else. int toRead;if (this.splitLength > 0) {
			// if we have more data, read that
			toRead = this.splitLength > maxReadLength ? maxReadLength : (int) this.splitLength;
		}
		else {
			// if we have exhausted our split, we need to complete the current record, or read one
			// more across the next split.
			// the reason is that the next split will skip over the beginning until it finds the first
			// delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the
			// previous split.
			toRead = maxReadLength;
			this.overLimit = true;
		}

		int read = this.stream.read(this.readBuffer, offset, toRead);

		if (read == -1) {
			this.stream.close();
			this.stream = null;
			return false;
		} else {
			this.splitLength -= read;
			this.readPos = offset; // position from where to start reading
			this.limit = read + offset; // number of valid bytes in the read buffer
			return true; }}Copy the code
  • DelimitedInputFormat first calls readLine() to read data into currBuffer and, if there is data, calls the readRecord method implemented by subclass CsvInputFormat. Here we pass currBuffer, currOffset, and currLen
  • The fillBuffer method is called in the DelimitedInputFormat readLine() method, which is based on splitLength(The length of the FileInputSplit in DelimitedInputFormat. GetStatistics method) and maxReadLength, then read data from the file into readBuffer from offset to toRead, then set currBuffer, currOffset, currLen
  • ReadBuffer is set to bufferSize when init, bufferSize is initialized to -1, is set to 4*1024 in getStatistics, and DEFAULT_READ_BUFFER_SIZE is 1024*1024

CsvInputFormat.readRecord

Flink – Java – 1.6.2 – sources jar! /org/apache/flink/api/java/io/CsvInputFormat.java

	@Override
	public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
		/*
		 * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
		 */
		// Found window's end line, so find carriage return before the newline if (this.lineDelimiterIsLinebreak && numBytes > 0 && bytes[offset + numBytes - 1] = = '\r') { //reduce the number of bytes so that the Carriage return is not taken as data numBytes--; } if (commentPrefix ! = null && commentPrefix.length <= numBytes) { //check record for comments boolean isComment = true; for (int i = 0; i < commentPrefix.length; i++) { if (commentPrefix[i] ! = bytes[offset + i]) { isComment = false; break; } } if (isComment) { this.commentCount++; return null; } } if (parseRecord(parsedValues, bytes, offset, numBytes)) { return fillRecord(reuse, parsedValues); } else { this.invalidLineCount++; return null; }}Copy the code
  • CsvInputFormat’s readRecord method reads the raw data, which is then parsed through the parseRecord method to populate parsedValues(Object[]), and then calls the subclass’s fillRecord method (Here is the PojoCsvInputFormat) populates parsedValues into the reuse objectWhen the object is DataSourceTask format. The call nextRecord incoming serializer. CreateInstance ())

PojoCsvInputFormat.fillRecord

Flink – Java – 1.6.2 – sources jar! /org/apache/flink/api/java/io/PojoCsvInputFormat.java

/**
 * Input format that reads csv into POJOs.
 * @param <OUT> resulting POJO type
 */
@Internal
public class PojoCsvInputFormat<OUT> extends CsvInputFormat<OUT> {

	//......

	@Override
	public void open(FileInputSplit split) throws IOException {
		super.open(split);

		pojoFields = new Field[pojoFieldNames.length];

		Map<String, Field> allFields = new HashMap<String, Field>();

		findAllFields(pojoTypeClass, allFields);

		for (int i = 0; i < pojoFieldNames.length; i++) {
			pojoFields[i] = allFields.get(pojoFieldNames[i]);

			if(pojoFields[i] ! = null) { pojoFields[i].setAccessible(true);
			} else {
				throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName());
			}
		}
	}

	@Override
	public OUT fillRecord(OUT reuse, Object[] parsedValues) {
		for (int i = 0; i < parsedValues.length; i++) {
			try {
				pojoFields[i].set(reuse, parsedValues[i]);
			} catch (IllegalAccessException e) {
				throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\" ", e); }}returnreuse; } / /... }Copy the code
  • PojoCsvInputFormat’s open method is called at executor’s executePlan time, using reflection to get the required Field ahead of time
  • The fillRecord method is just using reflection to set parsedValues to the POJO
  • An IllegalAccessException is thrown if the reflection setting is unsuccessful

CountingCollector.collect

Flink – runtime_2. 11-1.6.2 – sources. The jar! /org/apache/flink/runtime/operators/util/metrics/CountingCollector.java

public class CountingCollector<OUT> implements Collector<OUT> {
	private final Collector<OUT> collector;
	private final Counter numRecordsOut;

	public CountingCollector(Collector<OUT> collector, Counter numRecordsOut) {
		this.collector = collector;
		this.numRecordsOut = numRecordsOut;
	}

	@Override
	public void collect(OUT record) {
		this.numRecordsOut.inc();
		this.collector.collect(record);
	}

	@Override
	public void close() { this.collector.close(); }}Copy the code
  • The collector is org. Apache. Flink. Runtime. Operators. The chaining. ChainedMapDriver

ChainedMapDriver

Flink – runtime_2. 11-1.6.2 – sources. The jar! /org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java

@Override public void collect(IT record) { try { this.numRecordsIn.inc(); this.outputCollector.collect(this.mapper.map(record)); } catch (Exception ex) { throw new ExceptionInChainedStubException(this.taskName, ex); }}Copy the code
  • The map method of Mapper is called, the map logic is executed, and outputCollector.collect is called to send the results
  • CountingCollector outputCollector, here it is in the packing of the collector is org. Apache. The flink. Runtime. Operators. Shipping. OutputCollector

OutputCollector

Flink – runtime_2. 11-1.6.2 – sources. The jar! /org/apache/flink/runtime/operators/shipping/OutputCollector.java

	/**
	 * Collects a record and emits it to all writers.
	 */
	@Override
	public void collect(T record)  {
		if(record ! = null) { this.delegate.setInstance(record); try {for (RecordWriter<SerializationDelegate<T>> writer : writers) {
					writer.emit(this.delegate);
				}
			}
			catch (IOException e) {
				throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e);
			}
			catch (InterruptedException e) {
				throw new RuntimeException("Emitting the record was interrupted: "+ e.getMessage(), e); }}else {
			throw new NullPointerException("The system does not support records that are null."
								+ "Null values are only supported as fields inside other objects."); }}Copy the code
  • The RecordWriter emit method is called here to emit data

RecordWriter

Flink – runtime_2. 11-1.6.2 – sources. The jar! /org/apache/flink/runtime/io/network/api/writer/RecordWriter.java

	public void emit(T record) throws IOException, InterruptedException {
		for(int targetChannel : channelSelector.selectChannels(record, numChannels)) { sendToTarget(record, targetChannel); }}Copy the code
  • Here by channelSelector. SelectChannels returned to send targetChannel, here channelSelector OutputEmitter

OutputEmitter

Flink – runtime_2. 11-1.6.2 – sources. The jar! /org/apache/flink/runtime/operators/shipping/OutputEmitter.java

	@Override
	public final int[] selectChannels(SerializationDelegate<T> record, int numberOfChannels) {
		switch (strategy) {
		case FORWARD:
			return forward();
		case PARTITION_RANDOM:
		case PARTITION_FORCED_REBALANCE:
			return robin(numberOfChannels);
		case PARTITION_HASH:
			return hashPartitionDefault(record.getInstance(), numberOfChannels);
		case BROADCAST:
			return broadcast(numberOfChannels);
		case PARTITION_CUSTOM:
			return customPartition(record.getInstance(), numberOfChannels);
		case PARTITION_RANGE:
			return rangePartition(record.getInstance(), numberOfChannels);
		default:
			throw new UnsupportedOperationException("Unsupported distribution strategy: " + strategy.name());
		}
	}

	private int[] forward() {
		return this.channels;
	}
Copy the code
  • Strategy is FORWARD

summary

  • The inputFormat created by CsvReader is PojoCsvInputFormat. Its main method is fillRecord, which fills data with reflection and reads data from the readLine method of DelimitedInputFormat. It calls the fillBuffer method, which is based on splitLength(The length of the FileInputSplit in DelimitedInputFormat. GetStatistics method) and maxReadLength to determine the toRead, and then read data from the file into the readBuffer from offset to toRead
  • DataSourceTask loops through the format.nextRecord method within the invoke method, then calls output.collect one by one (Packaging the org. Apache. Flink. Runtime. Operators. Shipping. OutputCollector CountingCollector) until taskCanceled or format.reachedEnd()
  • Output. collect method, where output is CountingCollector and its delegate collector is ChainedMapDriver; ChainedMapDriver maps the read data and passes the result of the map to CountingCollector, which acts as an OutputCollector. OutputCollector uses The RecordWriter to transmit the data

doc

  • CsvReader