Starting from Spoon’s main, kettleEnvironment.init () initializes the kettle environment and loads the Step
ExecutorService executor = Executors.newCachedThreadPool(); Future<KettleException> pluginRegistryFuture = executor.submit( new Callable<KettleException>() { @Override public KettleException call() throws Exception { registerUIPluginObjectTypes(); KettleClientEnvironment.getInstance().setClient( KettleClientEnvironment.ClientType.SPOON ); try { KettleEnvironment.init(); } catch ( KettleException e ) { return e; } return null; }});Copy the code
Multithreading technology related
The thread pool
Executors. NewCachedThreadPool () to use this way to create a thread pool, the number of threads are not fixed. A task queue is a SynchronousQueue in which a task is placed until it is consumed by the thread pool.
Threads concurrent
Enter the kettleEnvironment.init method
public static void init( boolean simpleJndi ) throws KettleException {
init( Arrays.asList(
RowDistributionPluginType.getInstance(),
StepPluginType.getInstance(),
StepDialogFragmentType.getInstance(),
PartitionerPluginType.getInstance(),
JobEntryPluginType.getInstance(),
JobEntryDialogFragmentType.getInstance(),
LogTablePluginType.getInstance(),
RepositoryPluginType.getInstance(),
LifecyclePluginType.getInstance(),
KettleLifecyclePluginType.getInstance(),
ImportRulePluginType.getInstance(),
CartePluginType.getInstance(),
CompressionPluginType.getInstance(),
AuthenticationProviderPluginType.getInstance(),
AuthenticationConsumerPluginType.getInstance(),
EnginePluginType.getInstance()
), simpleJndi );
}
Copy the code
Initialize a set of classes that define plug-in types
RowDistributionPluginType.getInstance()
public static RowDistributionPluginType getInstance() {
if ( pluginType == null ) {
pluginType = new RowDistributionPluginType();
}
return pluginType;
}
Copy the code
In the singleton mode, double-checked locking and delayed initialization are used to cause problems with instruction reordering. One solution is to use class initialization solutions. The JVM performs Class initialization during the Class initialization phase (after the Class is loaded and before it is used by a thread). When performing class initialization, the JVM acquires a lock that can synchronize multiple threads initializing the same class. Become Initialzation On Demand Holder idiom
initialized.compareAndSet( null, ready = SettableFuture.create() )
Enter the init method and use cas to determine that only one thread is calling the call method and that the other competing threads block until the initialization of the initializer thread completes. Initialized is a variable of the AtomicReference type, which can be used in other cases. Votaile cannot have memory visibility for reading and writing multiple variables, using locking mode, or AtomicReference type variables can also be implemented.
Enter the following code
pluginClasses.forEach( PluginRegistry::addPluginType );
PluginRegistry.init();
Copy the code
The above code will take KettleEnvironment. The init method RowDistributionPluginType… PluginTypes pluginregistry.init () method call
. for ( final PluginTypeInterface pluginType : pluginTypes ) { log.snap( Metrics.METRIC_PLUGIN_REGISTRY_PLUGIN_TYPE_REGISTRATION_START, pluginType.getName() ); registry.registerType( pluginType ); log.snap( Metrics.METRIC_PLUGIN_REGISTRY_PLUGIN_TYPE_REGISTRATION_STOP, pluginType.getName() ); } log.snap( Metrics.METRIC_PLUGIN_REGISTRY_PLUGIN_REGISTRATION_STOP ); .Copy the code
When pluginType is StepPluginType, registry. RegisterType (pluginType) is the method we care about. The final call
public void searchPlugins() throws KettlePluginException {
registerNatives();
registerPluginJars();
registerXmlPlugins();
}
Copy the code
There are two ways to extend Step
Built-in Step loading
RegisterNatives source code is as follows
protected void registerNatives() throws KettlePluginException { // Scan the native steps... // StepPluginType.getXmlPluginFile() xmlFile ="kettle-steps.xml" String xmlFile = getXmlPluginFile(); String alternative = null; if ( ! Utils.isEmpty( getAlternativePluginFile() ) ) { alternative = getPropertyExternal( getAlternativePluginFile(), null ); if ( ! Utils.isEmpty( alternative ) ) { xmlFile = alternative; } } // Load the plugins for this file... // InputStream inputStream = null; try { inputStream = getResAsStreamExternal( xmlFile ); if ( inputStream == null ) { inputStream = getResAsStreamExternal( "/" + xmlFile ); } if ( ! Utils.isEmpty( getAlternativePluginFile() ) ) { // Retry to load a regular file... if ( inputStream == null && ! Utils.isEmpty( alternative ) ) { try { inputStream = getFileInputStreamExternal( xmlFile ); } catch ( Exception e ) { throw new KettlePluginException( "Unable to load native plugins '" + xmlFile + "'", e ); } } } if ( inputStream == null ) { if ( isReturn() ) { return; } else { throw new KettlePluginException( "Unable to find native plugins definition file: " + xmlFile ); } } // This method registers plugins from the InputStream with the XML Resource registerPlugins( inputStream ); } catch ( KettleXMLException e ) { throw new KettlePluginException( "Unable to read the kettle XML config file: " + xmlFile, e ); } finally { IOUtils.closeQuietly( inputStream ); }}Copy the code
registerPlugins
protected void registerPlugins( InputStream inputStream ) throws KettlePluginException, KettleXMLException {// Parse XML Document Document = xmlHandler. loadXMLFile(inputStream, null, true, false); Node repsNode = XMLHandler.getSubNode( document, getMainTag() ); List<Node> repsNodes = XMLHandler.getNodes( repsNode, getSubTag() ); for ( Node repNode : RepsNodes) {// Parsed data, Instantiation PluginInterface, registered to PluginRegistry registerPluginFromXmlResource (repNode, getPath (), enclosing getClass (), true, null); }}Copy the code
Step loading in plug-in mode
registerPluginJars();
protected void registerPluginJars() throws KettlePluginException { List<JarFileAnnotationPlugin> jarFilePlugins = findAnnotatedClassFiles( pluginType.getName() ); for ( JarFileAnnotationPlugin jarFilePlugin : jarFilePlugins ) { URLClassLoader urlClassLoader = createUrlClassLoader( jarFilePlugin.getJarFile(), getClass().getClassLoader() ); try { Class<? > clazz = urlClassLoader.loadClass( jarFilePlugin.getClassName() ); if ( clazz == null ) { throw new KettlePluginException( "Unable to load class: " + jarFilePlugin.getClassName() ); } List<String> libraries = Arrays.stream( urlClassLoader.getURLs() ) .map( URL::getFile ) .collect( Collectors.toList() ); Annotation annotation = clazz.getAnnotation( pluginType ); // Handle an annotated plugin handlePluginAnnotation( clazz, annotation, libraries, false, jarFilePlugin.getPluginFolder() ); } catch ( Exception e ) { // Ignore for now, don't know if it's even possible. LogChannel.GENERAL.logError( "Unexpected error registering jar plugin file: " + jarFilePlugin.getJarFile(), e ); } finally { if ( urlClassLoader ! = null && urlClassLoader instanceof KettleURLClassLoader ) { ( (KettleURLClassLoader) urlClassLoader ).closeClassLoader(); }}}}Copy the code
registerPluginJars
protected void registerPluginJars() throws KettlePluginException { List<JarFileAnnotationPlugin> jarFilePlugins = findAnnotatedClassFiles( pluginType.getName() ); for ( JarFileAnnotationPlugin jarFilePlugin : jarFilePlugins ) { URLClassLoader urlClassLoader = createUrlClassLoader( jarFilePlugin.getJarFile(), getClass().getClassLoader() ); try { Class<? > clazz = urlClassLoader.loadClass( jarFilePlugin.getClassName() ); if ( clazz == null ) { throw new KettlePluginException( "Unable to load class: " + jarFilePlugin.getClassName() ); } List<String> libraries = Arrays.stream( urlClassLoader.getURLs() ) .map( URL::getFile ) .collect( Collectors.toList() ); Annotation annotation = clazz.getAnnotation( pluginType ); // Parse jar classes with @step annotations, HandlePluginAnnotation (Clazz, Annotation, libraries, false, jarFilePlugin.getPluginFolder() ); } catch ( Exception e ) { // Ignore for now, don't know if it's even possible. LogChannel.GENERAL.logError( "Unexpected error registering jar plugin file: " + jarFilePlugin.getJarFile(), e ); } finally { if ( urlClassLoader ! = null && urlClassLoader instanceof KettleURLClassLoader ) { ( (KettleURLClassLoader) urlClassLoader ).closeClassLoader(); }}}}Copy the code
registerXmlPlugins();
Default is not implemented