Registries — Multicast

Objectives: Explain the principles of the registry implemented for multicast, understand the differences between unicast, broadcast, and multicast, and interpret the source code for Duubo-Registry-multicast

This is the second way dubbo implements the registry, and it is the way dubbo implements the registry in the Demo module. Multicast is actually implemented with MulticastSocket.

Let me add a little bit about multicast, or MulticastSocket. The MulticastSocket class inherits from the DatagramSocket class, which only allows datagrams to be sent to a specified destination address. MulticastSocket can broadcast data to multiple clients. The idea is that MulticastSocket will send a datagram to a particular multicast address, a special set of network addresses that clients can simply join when they want to send or receive broadcast messages. The IP protocol provides a special set of IP addresses for multicast, ranging from 224.0.0.0 to 239.255.255.255. The MulticastSocket class can both send datagrams to multicast addresses and receive broadcast messages from other hosts.

That’s a brief overview of the multicast background, but let’s look at how Dubbo applies MulticastSocket to the registry implementation.

Let’s take a look at the classes under the package:

You can see that the package structure is very similar to the default registry. Let’s take a look at these two classes.

(a) MulticastRegistry

This class inherits FailbackRegistry class, which is aimed at the core functions of the registry registration, subscription, unregistration, unsubscription, query registration list to expand, using the broadcast way to achieve.

1. The attribute
// logging output
// Log output
private static final Logger logger = LoggerFactory.getLogger(MulticastRegistry.class);

// The default multicast port
private static final int DEFAULT_MULTICAST_PORT = 1234;

// Multicast address
private final InetAddress mutilcastAddress;

// Multipoint broadcast
private final MulticastSocket mutilcastSocket;

// Multicast broadcast port
private final int mutilcastPort;

// The received URL
private final ConcurrentMap<URL, Set<URL>> received = new ConcurrentHashMap<URL, Set<URL>>();

// Task scheduler
private final ScheduledExecutorService cleanExecutor = Executors.newScheduledThreadPool(1.new NamedThreadFactory("DubboMulticastRegistryCleanTimer".true));

// Clean up the expired url
private finalScheduledFuture<? > cleanFuture;// The interval between cleanups
private final int cleanPeriod;

// Admin permission
private volatile boolean admin = false;
Copy the code

Looking at the above attributes, you need to pay attention to the following points:

  1. MutilcastSocket, the key to the implementation of muticast registry, is added to the distinction of singlebroadcast, broadcast, and multicast, as described below. Unicast is in which only two entities communicate with each other at a time, and the sender and receiver are uniquely identified. The broadcast destination address is the whole target in the network, while the multicast destination address is a group of targets, and the members joining the group are the destination of packets.
  2. Focus on task schedulers and cleanup timers, which encapsulate policies for periodically cleaning up expired services.
2. Construction method
public MulticastRegistry(URL url) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    if(! isMulticastAddress(url.getHost())) {throw new IllegalArgumentException("Invalid multicast address " + url.getHost() + ", scope: 224.0.0.0-239.255.255.255");
    }
    try {
        mutilcastAddress = InetAddress.getByName(url.getHost());
        // If the configuration carried by the URL does not contain a port number, the default port number is used
        mutilcastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
        mutilcastSocket = new MulticastSocket(mutilcastPort);
        // Disable local loopback for multicast datagrams
        mutilcastSocket.setLoopbackMode(false);
        // Join the same broadcast group
        mutilcastSocket.joinGroup(mutilcastAddress);
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run(a) {
                byte[] buf = new byte[2048];
                // Instantiate datagrams
                DatagramPacket recv = new DatagramPacket(buf, buf.length);
                while(! mutilcastSocket.isClosed()) {try {
                        // Receive packets
                        mutilcastSocket.receive(recv);
                        String msg = new String(recv.getData()).trim();
                        int i = msg.indexOf('\n');
                        if (i > 0) {
                            msg = msg.substring(0, i).trim();
                        }
                        // Receive message requests and act accordingly, such as registration, subscription, etc
                        MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
                        Arrays.fill(buf, (byte) 0);
                    } catch (Throwable e) {
                        if(! mutilcastSocket.isClosed()) { logger.error(e.getMessage(), e); }}}}},"DubboMulticastRegistryReceiver");
        // Set it as a daemon
        thread.setDaemon(true);
        // Start the thread
        thread.start();
    } catch (IOException e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
    // The clearing delay is obtained from the URL first. If not, the default value is 60s
    this.cleanPeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
    // If yes, you need to clear it
    if (url.getParameter("clean".true)) {
        // Start the timer
        this.cleanFuture = cleanExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run(a) {
                try {
                    // Clean up expired services
                    clean(); // Remove the expired
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected exception occur at clean expired provider, cause: " + t.getMessage(), t);
                }
            }
        }, cleanPeriod, cleanPeriod, TimeUnit.MILLISECONDS);
    } else {
        this.cleanFuture = null; }}Copy the code

The key to this constructor is a thread and a scheduled cleanup task.

  1. The work of the thread is to determine the request according to the received message and perform corresponding operations. As long as mutilcastSocket is not disconnected, the message is always received, and the internal realization is embodied in the receive method, which will be described below.
  2. The scheduled clearing task is to clear expired registered services. Determines expiration by two socket attempts. The clean method is described below
3.isMulticastAddress
private static boolean isMulticastAddress(String ip) {
    int i = ip.indexOf('. ');
    if (i > 0) {
        String prefix = ip.substring(0, i);
        if (StringUtils.isInteger(prefix)) {
            int p = Integer.parseInt(prefix);
            return p >= 224 && p <= 239; }}return false;
}
Copy the code

The address range is 224.0.0.0 to 239.255.255.255.

4.clean
private void clean(a) {
    // You can clear the url only when the service interface carried in the URL is set to *
    if (admin) {
        for (Set<URL> providers : new HashSet<Set<URL>>(received.values())) {
            for (URL url : new HashSet<URL>(providers)) {
                // Determine whether it expires
                if (isExpired(url)) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Clean expired provider " + url);
                    }
                    // Cancel registration
                    doUnregister(url);
                }
            }
        }
    }
}
Copy the code

This method is also relatively simple, shutdown is how to determine expiration and do unregister operations. These methods will be explained in detail below.

5.isExpired
private boolean isExpired(URL url) {
    // If the protocol is non-dynamic management mode or consumer, route, or Override, it does not expire
    if(! url.getParameter(Constants.DYNAMIC_KEY,true)
            || url.getPort() <= 0
            || Constants.CONSUMER_PROTOCOL.equals(url.getProtocol())
            || Constants.ROUTE_PROTOCOL.equals(url.getProtocol())
            || Constants.OVERRIDE_PROTOCOL.equals(url.getProtocol())) {
        return false;
    }
    Socket socket = null;
    try {
        // Instantiate the socket with the host address and port number carried by the URL
        socket = new Socket(url.getHost(), url.getPort());
    } catch (Throwable e) {
        // If the instantiation fails, wait 100ms and try again
        try {
            / / wait for 100 ms
            Thread.sleep(100);
        } catch (Throwable e2) {
        }
        Socket socket2 = null;
        try {
            socket2 = new Socket(url.getHost(), url.getPort());
        } catch (Throwable e2) {
            return true;
        } finally {
            if(socket2 ! =null) {
                try {
                    socket2.close();
                } catch (Throwable e2) {
                }
            }
        }
    } finally {
        if(socket ! =null) {
            try {
                socket.close();
            } catch (Throwable e) {
            }
        }
    }
    return false;
}
Copy the code

This method is to determine whether the service is expired. There are two attempts on the socket operation. If the attempt fails, it is judged as expired.

6.receive
private void receive(String msg, InetSocketAddress remoteAddress) {
    if (logger.isInfoEnabled()) {
        logger.info("Receive multicast message: " + msg + " from " + remoteAddress);
    }
    // If the message starts with register, unregister, or subscribe, do the corresponding operation
    if (msg.startsWith(Constants.REGISTER)) {
        URL url = URL.valueOf(msg.substring(Constants.REGISTER.length()).trim());
        // Register service
        registered(url);
    } else if (msg.startsWith(Constants.UNREGISTER)) {
        URL url = URL.valueOf(msg.substring(Constants.UNREGISTER.length()).trim());
        // Cancel the registration service
        unregistered(url);
    } else if (msg.startsWith(Constants.SUBSCRIBE)) {
        URL url = URL.valueOf(msg.substring(Constants.SUBSCRIBE.length()).trim());
        // Get and register a collection of urls
        Set<URL> urls = getRegistered();
        if(urls ! =null && !urls.isEmpty()) {
            for (URL u : urls) {
                // Check whether it is valid
                if(UrlUtils.isMatch(url, u)) { String host = remoteAddress ! =null&& remoteAddress.getAddress() ! =null
                            ? remoteAddress.getAddress().getHostAddress() : url.getIp();
                    // It is recommended that the service provider and service consumer run on different machines. If they are on the same machine, unicast=false should be set
                    // Multiple processes in the same machine cannot unicast unicast, or only one process can receive information. Unicast messages sent to consumers may be preempted by providers. The same is true for two consumers on the same machine.
                    // Only multicast registries have this problem
                    if (url.getParameter("unicast".true) // Whether the consumer's machine has only one process
                            && !NetUtils.getLocalHost().equals(host)) { // Multiple processes in the same machine cannot be unicast with unicast or there will be only one process receiving information
                        unicast(Constants.REGISTER + "" + u.toFullString(), host);
                    } else {
                        broadcast(Constants.REGISTER + "" + u.toFullString());
                    }
                }
            }
        }
    }/* else if (msg.startsWith(UNSUBSCRIBE)) { }*/
}
Copy the code

It is clear what type of operation needs to be done based on the data received at the beginning of the message. The emphasis is on subscription. You can choose unicast or broadcast subscription depending on what configuration the URL carries.

7.broadcast
private void broadcast(String msg) {
    if (logger.isInfoEnabled()) {
        logger.info("Send broadcast message: " + msg + " to " + mutilcastAddress + ":" + mutilcastPort);
    }
    try {
        byte[] data = (msg + "\n").getBytes();
        // Instantiate the datagram. The destination address is mutilcastAddress
        DatagramPacket hi = new DatagramPacket(data, data.length, mutilcastAddress, mutilcastPort);
        // Send datagrams
        mutilcastSocket.send(hi);
    } catch (Exception e) {
        throw newIllegalStateException(e.getMessage(), e); }}Copy the code

This is the method of broadcast, in which the destination address of a data packet is mutilcastAddress. It represents a set of addresses

8.unicast
private void unicast(String msg, String host) {
    if (logger.isInfoEnabled()) {
        logger.info("Send unicast message: " + msg + " to " + host + ":" + mutilcastPort);
    }
    try {
        byte[] data = (msg + "\n").getBytes();
        // Instantiate datagrams. The destination address is a single address
        DatagramPacket hi = new DatagramPacket(data, data.length, InetAddress.getByName(host), mutilcastPort);
        // Send datagrams
        mutilcastSocket.send(hi);
    } catch (Exception e) {
        throw newIllegalStateException(e.getMessage(), e); }}Copy the code

This is a unicast implementation, and the difference with broadcast is just the destination address is not the same, unicast destination address is just an address, and broadcast is a group of addresses.

9.doRegister && doUnregister && doSubscribe && doUnsubscribe
@Override
protected void doRegister(URL url) {
    broadcast(Constants.REGISTER + "" + url.toFullString());
}
@Override
protected void doUnregister(URL url) {
    broadcast(Constants.UNREGISTER + "" + url.toFullString());
}
@Override
protected void doSubscribe(URL url, NotifyListener listener) {
    // You can clear the url only when the service interface carried in the URL is set to *
    if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
        admin = true;
    }
    broadcast(Constants.SUBSCRIBE + "" + url.toFullString());
    // Lock the listener synchronously
    synchronized (listener) {
        try {
            listener.wait(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
        } catch (InterruptedException e) {
        }
    }
}
@Override
protected void doUnsubscribe(URL url, NotifyListener listener) {
    if(! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY,true)) {
        unregister(url);
    }
    broadcast(Constants.UNSUBSCRIBE + "" + url.toFullString());
}
Copy the code

These methods are abstract methods that implement the parent class FailbackRegistry. Both call the broadcast method.

10.destroy
@Override
public void destroy(a) {
    super.destroy();
    try {
        // Cancel the cleanup task
        if(cleanFuture ! =null) {
            cleanFuture.cancel(true); }}catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
    try {
        // Remove the address from the group
        mutilcastSocket.leaveGroup(mutilcastAddress);
        / / close mutilcastSocket
        mutilcastSocket.close();
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
    // Close the thread pool
    ExecutorUtil.gracefulShutdown(cleanExecutor, cleanPeriod);
}
Copy the code

The logic of this method is similar to the Destroy method in the Dubbo registry, with the addition of removing the address from the group. The grace shutdown method is described in dubbo source Code Parsing (4) Registry — Dubbo.

11.register
@Override
public void register(URL url) {
    super.register(url);
    registered(url);
}
Copy the code
protected void registered(URL url) {
    // Iterate over the subscribed listener collection
    for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
        URL key = entry.getKey();
        // Check whether it is valid
        if (UrlUtils.isMatch(key, url)) {
            // Get the collection of received service urls from the consumer URL
            Set<URL> urls = received.get(key);
            if (urls == null) {
                received.putIfAbsent(key, new ConcurrentHashSet<URL>());
                urls = received.get(key);
            }
            // Add the service URL
            urls.add(url);
            List<URL> list = toList(urls);
            for (NotifyListener listener : entry.getValue()) {
                // Notify listeners of service URL changes
                notify(key, listener, list);
                synchronized (listener) {
                    listener.notify();
                }
            }
        }
    }
}
Copy the code

As you can see, this class overrides the parent class’s register method, but the logic doesn’t change much, except to put the url to be registered in the cache and notify the listener of the URL change.

12.unregister
@Override
public void unregister(URL url) {
    super.unregister(url);
    unregistered(url);
}
Copy the code
protected void unregistered(URL url) {
    // Iterate over the subscribed listener collection
    for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
        URL key = entry.getKey();
        if (UrlUtils.isMatch(key, url)) {
            Set<URL> urls = received.get(key);
            // Remove from cache
            if(urls ! =null) {
                urls.remove(url);
            }
            if (urls == null || urls.isEmpty()){
                if (urls == null){
                    urls = new ConcurrentHashSet<URL>();
                }
                // Set the URL carrying the empty protocol
                URL empty = url.setProtocol(Constants.EMPTY_PROTOCOL);
                urls.add(empty);
            }
            List<URL> list = toList(urls);
            // Notify listener service URL changes
            for(NotifyListener listener : entry.getValue()) { notify(key, listener, list); }}}}Copy the code

The logic is also clear: remove the service url that needs to be unregistered from the cache, and if there are no received service urls, add a URL that carries the Empty protocol and notify the listener of the service change.

13.lookup
@Override
public List<URL> lookup(URL url) {
    List<URL> urls = new ArrayList<URL>();
    // Get a listener for the subscribed service through the consumer URL
    Map<String, List<URL>> notifiedUrls = getNotified().get(url);
    // Get a collection of registered service urls
    if(notifiedUrls ! =null && notifiedUrls.size() > 0) {
        for(List<URL> values : notifiedUrls.values()) { urls.addAll(values); }}// If it is empty, the relevant value is obtained from the memory cache properties and returned as the registered service
    if (urls.isEmpty()) {
        List<URL> cacheUrls = getCacheUrls(url);
        if(cacheUrls ! =null&&! cacheUrls.isEmpty()) { urls.addAll(cacheUrls); }}// If still empty, get the set of registered service urls from the cache registered
    if (urls.isEmpty()) {
        for (URL u : getRegistered()) {
            if(UrlUtils.isMatch(url, u)) { urls.add(u); }}}// If the URL carries the configuration service interface *, that is, all services, the collection of registered service urls was obtained from the cache
    if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
        for (URL u : getSubscribed().keySet()) {
            if(UrlUtils.isMatch(url, u)) { urls.add(u); }}}return urls;
}
Copy the code

This method returns a list of registered service urls. As you can see, there are a number of methods that can be obtained. These caches are stored in the AbstractRegistry class.

14.subscribe && unsubscribe
@Override
public void subscribe(URL url, NotifyListener listener) {
    super.subscribe(url, listener);
    subscribed(url, listener);
}

@Override
public void unsubscribe(URL url, NotifyListener listener) {
    super.unsubscribe(url, listener);
    received.remove(url);
}
Copy the code
protected void subscribed(URL url, NotifyListener listener) {
    // Query the registration list
    List<URL> urls = lookup(url);
    / / notification url
    notify(url, listener, urls);
}

Copy the code

The two methods override the parent class, subscribe and unsubscribe. The logic is simple.

(2) MulticastRegistryFactory

This class inherits the AbstractRegistryFactory class and implements the AbstractRegistryFactory createRegistry method.

public class MulticastRegistryFactory extends AbstractRegistryFactory {

    @Override
    public Registry createRegistry(URL url) {
        return newMulticastRegistry(url); }}Copy the code

You can see that this is just instantiating MulticastRegistry, so I won’t explain it here.

Afterword.

The source code for this section is github.com/CrazyHZM/in…

This article explains dubbo’s use of multicast to implement registries. The key is to understand MulticastSocket and the concepts of unicast, broadcast, and multicast. The rest of the logic is not complicated. If I didn’t write enough or made mistakes in any part, please give me your advice.