The base class AbstractRegistry class was explained in the previous article, and other classes in this package will continue.
FailbackRegistry
The class inherited AbstractRegistry AbstractRegistry methods of registration in the subscription, is actually the change of some memory cache, and the true registered in the implementation logic of subscription FailbackRegistry implementation, And FailbackRegistry provides a mechanism for retry failures. Initialize the
// Scheduled executor service
// Scheduled task executor
private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1.new NamedThreadFactory("DubboRegistryFailedRetryTimer".true));
// Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
// The failed retry timer periodically checks whether any request fails. If any request fails, retry indefinitely.
private finalScheduledFuture<? > retryFuture;// Failed to register a collection of urls
private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
// Unregister a collection of failed urls
private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
// Subscribe to a collection of failed listeners
private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// Unsubscribe the collection of listeners that failed
private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// Set of failed URL notifications
private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
/** * The time in milliseconds the retryExecutor will wait */
// Retry frequency
private final int retryPeriod;
Copy the code
The constructor
public FailbackRegistry(URL url) {
super(url);
// Read the retry frequency from the URL, if null, 5000ms by default
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
// Create the failed retry timer
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run(a) {
// Check and connect to the registry
try {
/ / try again
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
Copy the code
The constructor creates a fail-retry timer. The retry frequency is set from the URL. If not set, the default is 5000ms. In this class to register, unregister, subscribe, unsubscribe rewrite operations, the code logic is relatively simple.
@Override
public void register(URL url) {
super.register(url);
// First remove the URL from the failed cache
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// Sending a registration request to the server side
// Send a registration request to the registry
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
// If startup detection is enabled, an exception is thrown
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
// Put the failed URL in the cache and retry periodically.failedRegistered.add(url); }}Copy the code
It removes the URL from the failed registration cache and failed unregistered cache collections during registration, and then performs registration with the registry.
AbstractRegistryFactory
This class implements the RegistryFactory interface and abstracts the createRegistry method, which implements Registry’s container. Initialize the
private static final ReentrantLock LOCK = new ReentrantLock();
// Registry Collection Map<RegistryAddress, Registry>
/ / set the Registry
private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();
Copy the code
Destroy all Registry objects and clean up the cache data
public static Collection<Registry> getRegistries(a) {
return Collections.unmodifiableCollection(REGISTRIES.values());
}
public static void destroyAll(a) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
}
// Lock up the registry shutdown process
/ / gets the lock
LOCK.lock();
try {
for (Registry registry : getRegistries()) {
try {
/ / destroy
registry.destroy();
} catch(Throwable e) { LOGGER.error(e.getMessage(), e); }}// Clear the cache
REGISTRIES.clear();
} finally {
// Release the lock
/ / releases the lockLOCK.unlock(); }}Copy the code
This is a method that implements the Registry interface. The most important thing to note here is createRegistry because AbstractRegistryFfactory itself is an abstract class, and createRegistry is an abstract method. In order for subclasses to pay attention to this method, For example, the redis registry and the ZooKeeper registry must be created in a different way, and some of the same operations are already implemented in AbstractRegistryFactory, so just pay attention and implement this abstract method.
@Override
public Registry getRegistry(URL url) {
/ / modify the url
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
// Calculate the key value
String key = url.toServiceString();
// Lock the registry access process to ensure a single instance of the registry
/ / gets the lock
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if(registry ! =null) {
return registry;
}
// Create a Registry object
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
// Add to cache.
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
/ / releases the lockLOCK.unlock(); }}Copy the code