It takes about 10 minutes to read this article.

Background knowledge

  • Connector: Apache Pulsar Connector, including Source and Sink components.
  • Functions: Lightweight computing component of Apache Pulsar.

The Instance architecture

When using pulsar-admin source, pulsar-admin sink or pulsar-admin function to operate source, sink or function, an Instance will be started on the Worker. The Instance architecture is shown below.

  • Worker Service: Run Instance here.
  • Source: Input data from external systems to Pulsar. From the command line, you can use pulsar-admin to manipulate Source.
  • Sink: Outputs Pulsar data to the external system. On the cli, you can use pulsar-admin to operate Sink.
  • Function: Performs lightweight calculations. From the command line, you can use the pulsar-admin operation. The Function.
  • Components of IdentityFunction, PulsarSource, and PulsarSink: Instance.
  • Consumer: Pulsar consumers.
  • Producer: indicates the Producer of Pulsar.
  • Queue: Data structure for data transfer.
  • Red arrow: data flowing into Pulsar from external system and data flowing out from Pulsar to external system.
  • Yellow arrows: data flowing into and out of a Pulsar Topic.

As mentioned earlier, when starting a Source, Sink, or Function with pulsar-admin, you actually start an Instance that runs on the Worker (the dark blue square labeled Worker Service in the figure).

The figure above has two Worker services:

  • Worker Service1 has two instances, Source and Function.
  • Worker Service2 has an Instance, Sink.

In addition, the PulsarSource, IdentityFunction, and PulsarSink components in the figure are all part of Instance. When the external system inputs data to Pulsar, it starts the source Instance using pulsar-admin source, which initializes three components as follows:

// Sink component, setupOutput(contextImpl); //sourceComponent responsible for data input setupInput(contextImpl); //functionComponent, responsible for simple calculations and filtering, etcreturn new JavaInstance(contextImpl, object);
Copy the code

The exit of the data is initialized first, and if there is a problem with the data destination, it does not proceed.

Within each component there are the following logical decisions:

if(sinkspec.getClassName ().isempty ()) {// If className is not specified, the PulsarSink is initialized using the default component mentioned above}else{// If className can be found, use it to initialize} // Ifsource classname is not set, we default pulsar source
if (sourceSpec.getclassname ().isempty ()) {// If className is not specified, the PulsarSource is initialized using the system default component.}else{// Initialize with className if it can be found} // create thefunctions
if(userClassObject instanceof Function) {// Initialize this with default IdentityFunction. Function = (Function) userClassObject; }else{/ / using the user-defined Function enclosing javaUtilFunction = (Java. Util. The Function. The Function) userClassObject; }Copy the code

After the Source command is started and Instance finds that className is provided, it replaces the default Source component of the system with className to receive data from the external system. When data is received, it is placed in a queue.

Either the user – defined source or the system default PulsarSource executes the following code upon receiving the data:

consume(record); public void consume(Record<T> record) { try { queue.put(record); } catch (InterruptedException e) { throw new RuntimeException(e); }}Copy the code

Pulsar puts the data into a queue for Function to process. The Source is specified here, so the system default IdentityFunction is used. The differences between PulsarSource and user-defined source are as follows:

  • User-defined Source: For integration with external systems (red arrow in figure). For example, databases and logs.
  • The system default PulsarSource: Consumer is started to consume data from Pulsar topic (yellow arrow pointing to Consumer in figure).

When data flows into Function, the following logic is executed:

// process the message result = javaInstance.handleMessage(currentRecord, currentRecord.getValue()); public JavaExecutionResult handleMessage(Record<? > record, Object input) { ...if (function! = null) {// User-definedfunctionOutput = function.process(input, context); }elseOutput = javautilfunction.apply (input); }... }Copy the code

After Function processing is completed, the data will enter the next component, namely Sink, which will also perform similar judgment. If it is a user-defined Sink, the user’s class will be called for initialization. If there is no user-defined Sink, the default PulsarSink is used for initialization to complete data output. PulsarSink is used here.

PulsarSink and user-defined Sink have the following differences:

  • User-defined Sink: outputs data to the external system (second red arrow in the figure).
  • The default PulsarSink: initializes the Producer of Pulsar and outputs data to Pulsar Topic (yellow arrow pointing to the outside in the figure).

The above flow occurs when executing pulsar-admin source on the command line, replacing IdentityFunction with the user’s Function object when executing Function. But Source and Sink will be initialized using the system default PulsarSource and PulsarSink (the blue square labeled Function). The same is true for Pulsar-admin sink, which will not be described here.

conclusion

This article shares the relationship between Source, Sink, and Function and the process of data flow. Instance actually contains three components: Source, Sink, and Function. These three components are different, so it is inaccurate to say that Source and Sink are a special kind of Function. At the same time, the encapsulation of the PUB/SUB model for each Instance is described. Source and Sink, as separate components in Instance, are of great significance in constructing the ecology of external systems.