Registries — Multicast
Objectives: Explain the principles of the registry implemented for multicast, understand the differences between unicast, broadcast, and multicast, and interpret the source code for Duubo-Registry-multicast
This is the second way dubbo implements the registry, and it is the way dubbo implements the registry in the Demo module. Multicast is actually implemented with MulticastSocket.
Let me add a little bit about multicast, or MulticastSocket. The MulticastSocket class inherits from the DatagramSocket class, which only allows datagrams to be sent to a specified destination address. MulticastSocket can broadcast data to multiple clients. The idea is that MulticastSocket will send a datagram to a particular multicast address, a special set of network addresses that clients can simply join when they want to send or receive broadcast messages. The IP protocol provides a special set of IP addresses for multicast, ranging from 224.0.0.0 to 239.255.255.255. The MulticastSocket class can both send datagrams to multicast addresses and receive broadcast messages from other hosts.
That’s a brief overview of the multicast background, but let’s look at how Dubbo applies MulticastSocket to the registry implementation.
Let’s take a look at the classes under the package:
You can see that the package structure is very similar to the default registry. Let’s take a look at these two classes.
(a) MulticastRegistry
This class inherits FailbackRegistry class, which is aimed at the core functions of the registry registration, subscription, unregistration, unsubscription, query registration list to expand, using the broadcast way to achieve.
1. The attribute
// logging output
// Log output
private static final Logger logger = LoggerFactory.getLogger(MulticastRegistry.class);
// The default multicast port
private static final int DEFAULT_MULTICAST_PORT = 1234;
// Multicast address
private final InetAddress mutilcastAddress;
// Multipoint broadcast
private final MulticastSocket mutilcastSocket;
// Multicast broadcast port
private final int mutilcastPort;
// The received URL
private final ConcurrentMap<URL, Set<URL>> received = new ConcurrentHashMap<URL, Set<URL>>();
// Task scheduler
private final ScheduledExecutorService cleanExecutor = Executors.newScheduledThreadPool(1.new NamedThreadFactory("DubboMulticastRegistryCleanTimer".true));
// Clean up the expired url
private finalScheduledFuture<? > cleanFuture;// The interval between cleanups
private final int cleanPeriod;
// Admin permission
private volatile boolean admin = false;
Copy the code
Looking at the above attributes, you need to pay attention to the following points:
- MutilcastSocket, the key to the implementation of muticast registry, is added to the distinction of singlebroadcast, broadcast, and multicast, as described below. Unicast is in which only two entities communicate with each other at a time, and the sender and receiver are uniquely identified. The broadcast destination address is the whole target in the network, while the multicast destination address is a group of targets, and the members joining the group are the destination of packets.
- Focus on task schedulers and cleanup timers, which encapsulate policies for periodically cleaning up expired services.
2. Construction method
public MulticastRegistry(URL url) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
if(! isMulticastAddress(url.getHost())) {throw new IllegalArgumentException("Invalid multicast address " + url.getHost() + ", scope: 224.0.0.0-239.255.255.255");
}
try {
mutilcastAddress = InetAddress.getByName(url.getHost());
// If the configuration carried by the URL does not contain a port number, the default port number is used
mutilcastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
mutilcastSocket = new MulticastSocket(mutilcastPort);
// Disable local loopback for multicast datagrams
mutilcastSocket.setLoopbackMode(false);
// Join the same broadcast group
mutilcastSocket.joinGroup(mutilcastAddress);
Thread thread = new Thread(new Runnable() {
@Override
public void run(a) {
byte[] buf = new byte[2048];
// Instantiate datagrams
DatagramPacket recv = new DatagramPacket(buf, buf.length);
while(! mutilcastSocket.isClosed()) {try {
// Receive packets
mutilcastSocket.receive(recv);
String msg = new String(recv.getData()).trim();
int i = msg.indexOf('\n');
if (i > 0) {
msg = msg.substring(0, i).trim();
}
// Receive message requests and act accordingly, such as registration, subscription, etc
MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
Arrays.fill(buf, (byte) 0);
} catch (Throwable e) {
if(! mutilcastSocket.isClosed()) { logger.error(e.getMessage(), e); }}}}},"DubboMulticastRegistryReceiver");
// Set it as a daemon
thread.setDaemon(true);
// Start the thread
thread.start();
} catch (IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// The clearing delay is obtained from the URL first. If not, the default value is 60s
this.cleanPeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
// If yes, you need to clear it
if (url.getParameter("clean".true)) {
// Start the timer
this.cleanFuture = cleanExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run(a) {
try {
// Clean up expired services
clean(); // Remove the expired
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected exception occur at clean expired provider, cause: " + t.getMessage(), t);
}
}
}, cleanPeriod, cleanPeriod, TimeUnit.MILLISECONDS);
} else {
this.cleanFuture = null; }}Copy the code
The key to this constructor is a thread and a scheduled cleanup task.
- The work of the thread is to determine the request according to the received message and perform corresponding operations. As long as mutilcastSocket is not disconnected, the message is always received, and the internal realization is embodied in the receive method, which will be described below.
- The scheduled clearing task is to clear expired registered services. Determines expiration by two socket attempts. The clean method is described below
3.isMulticastAddress
private static boolean isMulticastAddress(String ip) {
int i = ip.indexOf('. ');
if (i > 0) {
String prefix = ip.substring(0, i);
if (StringUtils.isInteger(prefix)) {
int p = Integer.parseInt(prefix);
return p >= 224 && p <= 239; }}return false;
}
Copy the code
The address range is 224.0.0.0 to 239.255.255.255.
4.clean
private void clean(a) {
// You can clear the url only when the service interface carried in the URL is set to *
if (admin) {
for (Set<URL> providers : new HashSet<Set<URL>>(received.values())) {
for (URL url : new HashSet<URL>(providers)) {
// Determine whether it expires
if (isExpired(url)) {
if (logger.isWarnEnabled()) {
logger.warn("Clean expired provider " + url);
}
// Cancel registration
doUnregister(url);
}
}
}
}
}
Copy the code
This method is also relatively simple, shutdown is how to determine expiration and do unregister operations. These methods will be explained in detail below.
5.isExpired
private boolean isExpired(URL url) {
// If the protocol is non-dynamic management mode or consumer, route, or Override, it does not expire
if(! url.getParameter(Constants.DYNAMIC_KEY,true)
|| url.getPort() <= 0
|| Constants.CONSUMER_PROTOCOL.equals(url.getProtocol())
|| Constants.ROUTE_PROTOCOL.equals(url.getProtocol())
|| Constants.OVERRIDE_PROTOCOL.equals(url.getProtocol())) {
return false;
}
Socket socket = null;
try {
// Instantiate the socket with the host address and port number carried by the URL
socket = new Socket(url.getHost(), url.getPort());
} catch (Throwable e) {
// If the instantiation fails, wait 100ms and try again
try {
/ / wait for 100 ms
Thread.sleep(100);
} catch (Throwable e2) {
}
Socket socket2 = null;
try {
socket2 = new Socket(url.getHost(), url.getPort());
} catch (Throwable e2) {
return true;
} finally {
if(socket2 ! =null) {
try {
socket2.close();
} catch (Throwable e2) {
}
}
}
} finally {
if(socket ! =null) {
try {
socket.close();
} catch (Throwable e) {
}
}
}
return false;
}
Copy the code
This method is to determine whether the service is expired. There are two attempts on the socket operation. If the attempt fails, it is judged as expired.
6.receive
private void receive(String msg, InetSocketAddress remoteAddress) {
if (logger.isInfoEnabled()) {
logger.info("Receive multicast message: " + msg + " from " + remoteAddress);
}
// If the message starts with register, unregister, or subscribe, do the corresponding operation
if (msg.startsWith(Constants.REGISTER)) {
URL url = URL.valueOf(msg.substring(Constants.REGISTER.length()).trim());
// Register service
registered(url);
} else if (msg.startsWith(Constants.UNREGISTER)) {
URL url = URL.valueOf(msg.substring(Constants.UNREGISTER.length()).trim());
// Cancel the registration service
unregistered(url);
} else if (msg.startsWith(Constants.SUBSCRIBE)) {
URL url = URL.valueOf(msg.substring(Constants.SUBSCRIBE.length()).trim());
// Get and register a collection of urls
Set<URL> urls = getRegistered();
if(urls ! =null && !urls.isEmpty()) {
for (URL u : urls) {
// Check whether it is valid
if(UrlUtils.isMatch(url, u)) { String host = remoteAddress ! =null&& remoteAddress.getAddress() ! =null
? remoteAddress.getAddress().getHostAddress() : url.getIp();
// It is recommended that the service provider and service consumer run on different machines. If they are on the same machine, unicast=false should be set
// Multiple processes in the same machine cannot unicast unicast, or only one process can receive information. Unicast messages sent to consumers may be preempted by providers. The same is true for two consumers on the same machine.
// Only multicast registries have this problem
if (url.getParameter("unicast".true) // Whether the consumer's machine has only one process
&& !NetUtils.getLocalHost().equals(host)) { // Multiple processes in the same machine cannot be unicast with unicast or there will be only one process receiving information
unicast(Constants.REGISTER + "" + u.toFullString(), host);
} else {
broadcast(Constants.REGISTER + "" + u.toFullString());
}
}
}
}
}/* else if (msg.startsWith(UNSUBSCRIBE)) { }*/
}
Copy the code
It is clear what type of operation needs to be done based on the data received at the beginning of the message. The emphasis is on subscription. You can choose unicast or broadcast subscription depending on what configuration the URL carries.
7.broadcast
private void broadcast(String msg) {
if (logger.isInfoEnabled()) {
logger.info("Send broadcast message: " + msg + " to " + mutilcastAddress + ":" + mutilcastPort);
}
try {
byte[] data = (msg + "\n").getBytes();
// Instantiate the datagram. The destination address is mutilcastAddress
DatagramPacket hi = new DatagramPacket(data, data.length, mutilcastAddress, mutilcastPort);
// Send datagrams
mutilcastSocket.send(hi);
} catch (Exception e) {
throw newIllegalStateException(e.getMessage(), e); }}Copy the code
This is the method of broadcast, in which the destination address of a data packet is mutilcastAddress. It represents a set of addresses
8.unicast
private void unicast(String msg, String host) {
if (logger.isInfoEnabled()) {
logger.info("Send unicast message: " + msg + " to " + host + ":" + mutilcastPort);
}
try {
byte[] data = (msg + "\n").getBytes();
// Instantiate datagrams. The destination address is a single address
DatagramPacket hi = new DatagramPacket(data, data.length, InetAddress.getByName(host), mutilcastPort);
// Send datagrams
mutilcastSocket.send(hi);
} catch (Exception e) {
throw newIllegalStateException(e.getMessage(), e); }}Copy the code
This is a unicast implementation, and the difference with broadcast is just the destination address is not the same, unicast destination address is just an address, and broadcast is a group of addresses.
9.doRegister && doUnregister && doSubscribe && doUnsubscribe
@Override
protected void doRegister(URL url) {
broadcast(Constants.REGISTER + "" + url.toFullString());
}
@Override
protected void doUnregister(URL url) {
broadcast(Constants.UNREGISTER + "" + url.toFullString());
}
@Override
protected void doSubscribe(URL url, NotifyListener listener) {
// You can clear the url only when the service interface carried in the URL is set to *
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
admin = true;
}
broadcast(Constants.SUBSCRIBE + "" + url.toFullString());
// Lock the listener synchronously
synchronized (listener) {
try {
listener.wait(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
} catch (InterruptedException e) {
}
}
}
@Override
protected void doUnsubscribe(URL url, NotifyListener listener) {
if(! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY,true)) {
unregister(url);
}
broadcast(Constants.UNSUBSCRIBE + "" + url.toFullString());
}
Copy the code
These methods are abstract methods that implement the parent class FailbackRegistry. Both call the broadcast method.
10.destroy
@Override
public void destroy(a) {
super.destroy();
try {
// Cancel the cleanup task
if(cleanFuture ! =null) {
cleanFuture.cancel(true); }}catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
// Remove the address from the group
mutilcastSocket.leaveGroup(mutilcastAddress);
/ / close mutilcastSocket
mutilcastSocket.close();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
// Close the thread pool
ExecutorUtil.gracefulShutdown(cleanExecutor, cleanPeriod);
}
Copy the code
The logic of this method is similar to the Destroy method in the Dubbo registry, with the addition of removing the address from the group. The grace shutdown method is described in dubbo source Code Parsing (4) Registry — Dubbo.
11.register
@Override
public void register(URL url) {
super.register(url);
registered(url);
}
Copy the code
protected void registered(URL url) {
// Iterate over the subscribed listener collection
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL key = entry.getKey();
// Check whether it is valid
if (UrlUtils.isMatch(key, url)) {
// Get the collection of received service urls from the consumer URL
Set<URL> urls = received.get(key);
if (urls == null) {
received.putIfAbsent(key, new ConcurrentHashSet<URL>());
urls = received.get(key);
}
// Add the service URL
urls.add(url);
List<URL> list = toList(urls);
for (NotifyListener listener : entry.getValue()) {
// Notify listeners of service URL changes
notify(key, listener, list);
synchronized (listener) {
listener.notify();
}
}
}
}
}
Copy the code
As you can see, this class overrides the parent class’s register method, but the logic doesn’t change much, except to put the url to be registered in the cache and notify the listener of the URL change.
12.unregister
@Override
public void unregister(URL url) {
super.unregister(url);
unregistered(url);
}
Copy the code
protected void unregistered(URL url) {
// Iterate over the subscribed listener collection
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL key = entry.getKey();
if (UrlUtils.isMatch(key, url)) {
Set<URL> urls = received.get(key);
// Remove from cache
if(urls ! =null) {
urls.remove(url);
}
if (urls == null || urls.isEmpty()){
if (urls == null){
urls = new ConcurrentHashSet<URL>();
}
// Set the URL carrying the empty protocol
URL empty = url.setProtocol(Constants.EMPTY_PROTOCOL);
urls.add(empty);
}
List<URL> list = toList(urls);
// Notify listener service URL changes
for(NotifyListener listener : entry.getValue()) { notify(key, listener, list); }}}}Copy the code
The logic is also clear: remove the service url that needs to be unregistered from the cache, and if there are no received service urls, add a URL that carries the Empty protocol and notify the listener of the service change.
13.lookup
@Override
public List<URL> lookup(URL url) {
List<URL> urls = new ArrayList<URL>();
// Get a listener for the subscribed service through the consumer URL
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
// Get a collection of registered service urls
if(notifiedUrls ! =null && notifiedUrls.size() > 0) {
for(List<URL> values : notifiedUrls.values()) { urls.addAll(values); }}// If it is empty, the relevant value is obtained from the memory cache properties and returned as the registered service
if (urls.isEmpty()) {
List<URL> cacheUrls = getCacheUrls(url);
if(cacheUrls ! =null&&! cacheUrls.isEmpty()) { urls.addAll(cacheUrls); }}// If still empty, get the set of registered service urls from the cache registered
if (urls.isEmpty()) {
for (URL u : getRegistered()) {
if(UrlUtils.isMatch(url, u)) { urls.add(u); }}}// If the URL carries the configuration service interface *, that is, all services, the collection of registered service urls was obtained from the cache
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
for (URL u : getSubscribed().keySet()) {
if(UrlUtils.isMatch(url, u)) { urls.add(u); }}}return urls;
}
Copy the code
This method returns a list of registered service urls. As you can see, there are a number of methods that can be obtained. These caches are stored in the AbstractRegistry class.
14.subscribe && unsubscribe
@Override
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
subscribed(url, listener);
}
@Override
public void unsubscribe(URL url, NotifyListener listener) {
super.unsubscribe(url, listener);
received.remove(url);
}
Copy the code
protected void subscribed(URL url, NotifyListener listener) {
// Query the registration list
List<URL> urls = lookup(url);
/ / notification url
notify(url, listener, urls);
}
Copy the code
The two methods override the parent class, subscribe and unsubscribe. The logic is simple.
(2) MulticastRegistryFactory
This class inherits the AbstractRegistryFactory class and implements the AbstractRegistryFactory createRegistry method.
public class MulticastRegistryFactory extends AbstractRegistryFactory {
@Override
public Registry createRegistry(URL url) {
return newMulticastRegistry(url); }}Copy the code
You can see that this is just instantiating MulticastRegistry, so I won’t explain it here.
Afterword.
The source code for this section is github.com/CrazyHZM/in…
This article explains dubbo’s use of multicast to implement registries. The key is to understand MulticastSocket and the concepts of unicast, broadcast, and multicast. The rest of the logic is not complicated. If I didn’t write enough or made mistakes in any part, please give me your advice.