Introduction to the Java SPI mechanism

The Java SPI mechanism, also known as the Java Service Provider Interface, is a dynamic loading mechanism provided by Java based on the combination of Interface programming, policy mode and configuration file. Callers can enable, extend, or replace the framework’s existing implementation strategy as needed. In Java, based on the SPI idea, provides a concrete implementation, ServiceLoader, using this class can easily achieve service-oriented registration and discovery, complete the decoupling of service provision and use.

Common examples of the Java SPI mechanism are:

  • Database Driver interface implementation class loading: JDBC can load different types of database drivers, such as OracleDriver, SQLServerDriver, and Driver (MySql), according to actual use.
  • Slf4j log facade interface implementation class loading: SLF4J log facade is not a logging framework, need to use the Java SPI mechanism to load the qualified logging framework interface implementation class to complete the binding of the logging framework, such as Log4j, Logback, etc..

For detailed usage, refer to the SPI mechanism in Java that must be understood by Advanced Development.

Application of Java SPI mechanism in Flink

Java SPI mechanism is used in Flink SQL program to dynamically load various Factory implementation classes. For TableFactory interface, for example, Flink program will be found in the process used to rely on meta-inf/services/org. Apache. Flink. Table. Factories. TableFactory, The implementation of the TableFactory interface is instantiated by reflection, and then the qualified TableFactory implementation classes are screened out by the TableFactoryService#filter() method. For example, the Flink SQL program reads data from Kafka (version 0.11). The Flink SQL program first obtains all the available implementation classes for TableFactory, Through TableFactoryService# filter () to get qualified TableFactory implementation class Kafka011TableSourceSinkFactory instance. This article mainly explains the Java SPI mechanism in Flink SQL program application, for TableFactory implementation of the filtering work readers can read their own code.

Note: The source version of Flink covered in this article is version 1.9.

tEnv
    .connect(
        new Kafka()
            .version("0.11")
            .topic(topic)
            .startFromLatest()
            .properties(props))
    .withSchema(schema)
    .withFormat(format)
    .registerTableSource("record");
Copy the code

The above program is used to establish a connection with Kafka and specify the structure and format of the data to be read. Finally, registerTableSource is used to complete the registration of table source. If we look at the code, we can see that the TableFactoryService#find() method is called internally and TableSourceFactory instances are found, Call createTableSource() to create a Kafka011TableSource instance.

# TableFactoryUtil.java
private static <T> TableSource<T> findAndCreateTableSource(Map<String, String> properties) {
    try {
	return TableFactoryService
	    .find(TableSourceFactory.class, properties)
	    .createTableSource(properties);
    } catch (Throwable t) {
	throw new TableException("findAndCreateTableSource failed.", t); }}Copy the code
# TableFactoryService
public static <T extends TableFactory> T find(Class<T> factoryClass, Map<String, String> propertyMap) {
    return findSingleInternal(factoryClass, propertyMap, Optional.empty());
}
Copy the code
# TableFactoryService.javaprivate static <T extends TableFactory> T findSingleInternal( Class<T> factoryClass,Map<String, String> properties,Optional<ClassLoader> classLoader) { List<TableFactory> tableFactories = discoverFactories(classLoader); List<T> filtered = filter(tableFactories, factoryClass, properties); . }Copy the code

In the TableFactoryService#findSingleInternal() method, we can see that there are two main methods used, The discoverFactories() method is used to query the implementation classes of the TableFactory interface provided in the current Flink SQL program, and the filter() method is used to filter the implementation classes of the TableFactory interface that meet the criteria. Obviously, the use of the Java SPI mechanism is inside the discoverFactories() method.

#TableFactoryService.javaprivate static List<TableFactory> discoverFactories(Optional<ClassLoader> classLoader) { try { List<TableFactory> result  = new LinkedList<>();if (classLoader.isPresent()) {
            ServiceLoader
                .load(TableFactory.class, classLoader.get())
                .iterator()
                .forEachRemaining(result::add);
        } else {
            defaultLoader.iterator().forEachRemaining(result::add);
        }
	return result;
    } catch (ServiceConfigurationError e) {
        LOG.error("Could not load service provider for table factories.", e);
        throw new TableException("Could not load service provider for table factories.", e); }}Copy the code

In the discoverFactories() method, the else block is executed because the classLoader passed in is optional.empty (), so classLoader.isPresent() is false.

private static final ServiceLoader<TableFactory> defaultLoader = ServiceLoader.load(TableFactory.class);
Copy the code

You can see that defaultLoader is a static class variable, and because of this, the Flink SQL 1.9 code may have a Bug. Of course, this Bug will be explained at the end of this article.

public static <S> ServiceLoader<S> load(Class<S> service) {
    ClassLoader cl = Thread.currentThread().getContextClassLoader();
    return ServiceLoader.load(service, cl);
}

public static <S> ServiceLoader<S> load(Class<S> service,ClassLoader loader) {
    return new ServiceLoader<>(service, loader);
}

# service => TableFactory, loader => AppClassLoader, acc => null
private ServiceLoader(Class<S> svc, ClassLoader cl) { 
    service = Objects.requireNonNull(svc, "Service interface cannot be null"); loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl; acc = (System.getSecurityManager() ! = null) ? AccessController.getContext() : null; reload(); }Copy the code

In the ServiceLoader constructor, we can see that the service, loader, and ACC variables are assigned.

// Cached providers, in instantiation order
private LinkedHashMap<String,S> providers = new LinkedHashMap<>();

// The current lazy-lookup iterator
private LazyIterator lookupIterator;
public void reload() {
    providers.clear();
    lookupIterator = new LazyIterator(service, loader);
}
Copy the code

In the reload() method, we first empty the providers variable and create an instance of LazyIterator. The providers variable store reads an instance of the TableFactory implementation class in the Services folder. LazyIterator, as the name implies, fully deferred provider lookup (fully lazy provider lookup).

private class LazyIterator implements Iterator<S> {
    Class<S> service;
    ClassLoader loader;
    Enumeration<URL> configs = null; This is used to save all the dependency names in the project
    Iterator<String> pending = null; The full pathname of the TableFactory implementation class used to hold the Services folder in each dependency
    String nextName = null; Use to save the full path name of the current TableFactory implementation class
    
    # service -> TableFactory, loader -> AppClassLoader
    private LazyIterator(Class<S> service, ClassLoader loader) {
        this.service = service;
        this.loader = loader;
    }

    private boolean hasNextService() {... } private SnextService() {... } public booleanhasNext() {... } public Snext() {... } public voidremove() {... }}Copy the code

After looking at the defaultLoader variable, let’s move on.

defaultLoader.iterator().forEachRemaining(result::add);
Copy the code
# ServiceLoader.java
public Iterator<S> iterator() {
    return new Iterator<S>() {
        Iterator<Map.Entry<String,S>> knownProviders 
            = providers.entrySet().iterator();

        public boolean hasNext() {
            if (knownProviders.hasNext())
                return true;
            return lookupIterator.hasNext();
        }

        public S next() {
            if (knownProviders.hasNext())
                return knownProviders.next().getValue();
            return lookupIterator.next();
        }

        public void remove() { throw new UnsupportedOperationException(); }}; }Copy the code

Defaultloader.iterator () creates an inner class of the Iterator interface and instances of knownProviders, providing hasNext(), next(), remove() and other methods. After looking at the iterator() method, we move on to forEachRemaining().

#Iterator.java
default void forEachRemaining(Consumer<? super E> action) {
    Objects.requireNonNull(action);
    while (hasNext())
        action.accept(next());
}
Copy the code

It’s worth noting that the hasNext() and next() methods there actually call the hasNext() and next() methods of the inner classes of the Iterator interface. First, let’s look at the implementation of the hasNext() method.

public boolean hasNext() {
    Providers start with clear(), since the application is looking for the TableFactory implementation class for the first time.
    KnownProviders = provider.entryset ().iterator();
    Knownproviders.hasnext () is always false in the current TableFactory implementation class.
    # enter lookupiterator.hasnext ().
    if (knownProviders.hasNext())
        return true;
    return lookupIterator.hasNext();
}

# lookupIterator
public boolean hasNext() {
    Acc is always null when Flink SQL queries the TableFactory interface implementation class
    if (acc == null) {
        returnhasNextService(); }... }# lookupIterator
private boolean hasNextService() {
    NextName specifies the full pathname of the next TableFactory implementation class to be queried
    if(nextName ! = null) {return true;
    }
    Enumeration
      
        configs = null
      
    if (configs == null) {
        try {
            # PREFIX = META-INF/services/
            # service.getName() is the full pathname for TableFactory
            This is where the Java SPI mechanism reads the implementation class of the interface.
            String fullName = PREFIX + service.getName();
            Use classLoader to load resource information according to the path.
            # and all will be loaded into the project contains the meta-inf/services/org. Apache. Flink. Table. Factories. TableFactory dependent jar address,
            # classLoader and other information is stored in the variable configs (Enumeration
      
        configs).
      
            When the system instantiates the TableFactory implementation class in one JAR, it reads the services file in the next jar using the configs.next() method.
            The # configs data structure is shown below.
            if (loader == null)
                configs = ClassLoader.getSystemResources(fullName);
            else
                configs = loader.getResources(fullName);
            } catch (IOException x) {
                fail(service, "Error locating configuration files", x); }}The pending variable is used to store the full pathname of the TableFactory implementation class read from a dependent JAR.
    # This is an Iterator data interface that calls the pending.next() method each time it is needed
    And assign the full path name of the resulting TableFactory implementation class to nextName. (Iterator
      
        Pending = null)
      
    # pending = null;
    #! Pending.hasnext () = true means that when the services folder in a dependent JAR is read,
    # Hope to continue reading information from subsequent dependent jars.
    while((pending == null) || ! pending.hasNext()) {Configs.hasmoreelements () will return false when all dependencies have been traversed.
        # This means that the set of TableFactory implementation classes are finished.
        if(! configs.hasMoreElements()) {return false;
        }
        The # parse() method is used to read the full pathname of the TableFactory interface implementation class in the Services folder of a dependent JAR and save it into a pending variable.
        # in this method, it is worth that, if will have preserved the TableFactory interface implementation class for the full path name. A, even if the current is still dependent jar contains the full path name. A,
        The full pathname A is not added to the pending variable. This ensures that the implementation instance of the TableFactory saved in providers is unique,
        Even if multiple dependent services folders contain the full path name of the same implementation class
        pending = parse(service, configs.nextElement());
    }
    Read the full pathname of the TableFactory interface implementation class saved in Pending and save it in the nextName variable.
    nextName = pending.next();
    return true;
}
Copy the code

  • According to the resource path name (meta-inf/services/org. Apache. Flink. Table. Factories. TableFactory) using this information and assign a value to the configs variable load resources.
  • Get a dependency JAR from the configs variable, and read the full pathnames of the TableFactory interface from this dependency JAR, and then save these full pathnames into pending variables.
  • Take the full pathname of a TableFactory interface from the pending variable and store it in the nextName variable for use in the next() method.

Let’s look at the next() method.

public S next() {
    if (knownProviders.hasNext())
        return knownProviders.next().getValue();
    return lookupIterator.next();
}

# lookupIterator
public S next() {
    Acc is always null when Flink SQL queries the TableFactory interface implementation class
    if (acc == null) {
        returnnextService(); }... }# lookupIterator
private S nextService() {
    The # hasNextService() method is the same as above, and nextName! = null, it returns true.
    if(! hasNextService()) throw new NoSuchElementException(); String cn = nextName;NextName is assigned to null for the next TableFactory interface implementation of class full path name assignment.nextName = null; Class<? > c = null; .Use class.forname () to reflect the full path name of the TableFactory interface implementation Class.
    Instantiate the class based on the full pathname
    c = Class.forName(cn, false, loader); . S p = service.cast(c.newInstance());Save the instantiated TableFactory interface implementation class into the providers variable.
    providers.put(cn, p);
    returnp; . }Copy the code

At this point, a query of the TableFactory interface implementation class is complete. The next step is to implement the query of multiple TableFactory interfaces in a dependent JAR, Then there are all included in the current project meta-inf/services/org. Apache. Flink. Table. Factories. TableFactory dependent jar in the query. The general work is the same, I will not repeat it here.

Problems with the Java SPI mechanism in Flink SQL 1.9

In the following scenario, there are two MQ (message queue) products named A and B. Flink has the corresponding connector flink-connector-A and flink-connector-b. And contain both inside the meta-inf/services/org. Apache. Flink. Table. Factories. TableFactory file. If flink SQL program 1 consumes flink-connector-a in the same cluster and flink SQL program 2 consumes flink-connector-2 in the same cluster, the following exception will be reported:

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: 
           Could not find a suitable table factory for
           'org.apache.flink.table.factories.TableSourceFactory' in the classpath.
...
50395:The following factories have been considered:
...
51110:org.apache.flink.streaming.connectors.kafka.A
...
Copy the code

It is clear that the implementation class information of the TableFactory interface queried by Flink SQL program 2 is actually the information queried by Flink SQL program 1. This problem is caused by the defaultLoader variable.

# TableFactoryService
private static final ServiceLoader<TableFactory> defaultLoader = ServiceLoader.load(TableFactory.class);
Copy the code

The TableFactoryService class has internal variables and methods that are static. Flink SQL program 1 searched for the TableFactory interface implementation class, and the variable providers referenced by defaultLoader saved the instance class of the TableFactory interface. When Flink SQL program 2 is ready to find the TableFactory interface implementation class, the defaultLoader in Flink SQL program 1 is used directly (because it is a static variable, it will be stored in the JVM unless the cluster is stopped).

defaultLoader.iterator().forEachRemaining(result::add);

default void forEachRemaining(Consumer<? super E> action) {
    Objects.requireNonNull(action);
    while (hasNext())
        action.accept(next());
}

public Iterator<S> iterator() {
    return new Iterator<S>() {
        Flink SQL program 1 reads the TableFactory interface implementation instance
        Iterator<Map.Entry<String,S>> knownProviders
            = providers.entrySet().iterator();

        public boolean hasNext() {
            # At this point, kownproviders.hasnext () is true
            if (knownProviders.hasNext())
                return true;
            return lookupIterator.hasNext();
        }

        public S next() {
            Knownproviders.hasnext () is true
            if (knownProviders.hasNext())
                Get an instance of the TableFactory interface implementation class from knownProviders
                return knownProviders.next().getValue();
            return lookupIterator.next();
        }

        public void remove() { throw new UnsupportedOperationException(); }}; }Copy the code

At this point, we clearly know why Flink SQL program 2 will read Flink SQL program 1 query TableFactory interface implementation class data. Thankfully, this bug has been fixed in Flink 1.10.

private static List<TableFactory> discoverFactories(Optional<ClassLoader> classLoader) { try { List<TableFactory> result  = new LinkedList<>(); ClassLoader cl = classLoader.orElse(Thread.currentThread().getContextClassLoader()); ServiceLoader .load(TableFactory.class, cl) .iterator() .forEachRemaining(result::add);return result;
	} catch (ServiceConfigurationError e) {
	    LOG.error("Could not load service provider for table factories.", e);
	    throw new TableException("Could not load service provider for table factories.", e); }}Copy the code

Wen finally