Learning is not so utilitarian, two brothers with you from a higher dimension to easily read the source ~
Nacos uses THE UDP communication mode in the service registration function, which is used to assist in notifying clients when service instances change. However, most programmers using Nacos may not know about this feature, let alone use it flexibly.
Look at the implementation of the complete source code, or to point a praise for this function, can be said to be very clever and practical. However, there are some deficiencies in the implementation, which will be pointed out at the end of this paper.
This article takes a look at Nacos 2.0’s UDP protocol for notification of service instance changes from a source code perspective.
Basic principles of UDP notification
Before analyzing the source code, let’s take a look at the implementation of UDP in Nacos as a whole.
As we know, UDP protocol communication is bidirectional, there is no so-called client and server, so the client and server will enable UDP monitoring. The client starts a separate thread to process UDP messages. When communicating with the registry over HTTP, the UPD information (IP and port) of the client is sent to the registry when the client invokes the service subscription interface. The registry is encapsulated and stored in the PushClient object.
When the registry has an instance change, it issues a ServiceChangeEvent. The registry listens for this event and then iterates through the stored PushClient, notifying the client based on UDP. After receiving the UDP notification, the client can update the list of locally cached instances.
As we already know, when registering a service over HTTP, there is a lag in instance updates because the list of instances in the server is pulled periodically by the client. If the pull is too frequent, the registry will be stressed, and if the pull cycle is long, the instance changes will not be quickly perceived. UDP makes up for this shortcoming, so give UDP notification a thumbs up.
Here’s how it works at the source level.
Client UDP notification monitoring and processing
When a client instantiates NamingHttpClientProxy, it initializes PushReceiver in its constructor.
public NamingHttpClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListManager serverListManager, Properties properties, ServiceInfoHolder serviceInfoHolder) { // ... // Build BeatReactor this. BeatReactor = new BeatReactor(this, properties); PushReceiver = new pushReceiver (serviceInfoHolder); / /... }Copy the code
The constructor of PushReceiver looks like this:
Public PushReceiver(ServiceInfoHolder ServiceInfoHolder) {try {// Holds ServiceInfoHolder reference this. ServiceInfoHolder = serviceInfoHolder; String udpPort = getPushReceiverUdpPort(); If (stringutils.isempty (udpPort)) {this.udpSocket = new DatagramSocket(); if (stringutils.isempty) {this.udpSocket = new DatagramSocket(); } else { this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort))); } / / to create only one thread ScheduledExecutorService enclosing the executorService = new ScheduledThreadPoolExecutor (1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.push.receiver"); return thread; }}); / / thread of execution, PushReceiver implements Runnable interface enclosing the executorService. Execute (this); } catch (Exception e) { NAMING_LOGGER.error("[NA] init udp socket failed", e); }}Copy the code
The constructor for PushReceiver does the following:
- First, hold a ServiceInfoHolder object reference.
- Second, obtain the UDP port;
- 3. Instantiate DatagramSocket objects for sending and receiving Socket data.
- Fourth, create a thread pool and implement PushReceiver (which implements the Runnable interface).
Since PushReceiver implements the Runnable interface, the run method definitely needs to be reimplemented:
@Override public void run() { while (! closed) { try { // byte[] is initialized with 0 full filled by default byte[] buffer = new byte[UDP_MSS]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); Udpsocket. receive(packet); // The thread blocks the packet if no packet is received. String json = new String(ioutils.tryDecompress (packet.getData()), UTF_8).trim(); NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString()); PushPacket PushPacket = Jacksonutils.toobj (json, pushpacket.class); String ack; // If the conditions are met, ServiceInfoHolder is called to receive the packet. And returns a response message if (PUSH_PACKAGE_TYPE_DOM equals (pushPacket. Type) | | PUSH_PACKAGE_TYPE_SERVICE. Equals (pushPacket. Type)) { serviceInfoHolder.processServiceInfo(pushPacket.data); // send ack to server ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":" + "\"\"}"; } else if (PUSH_PACKAGE_TYPE_DUMP.equals(pushPacket.type)) { // dump data to server ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":" + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap())) + "\"}"; } else { // do nothing send ack only ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":" + "\"\"}"; } udpsocket. send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length, packet.getSocketAddress())); } catch (Exception e) { if (closed) { return; } NAMING_LOGGER.error("[NA] error while receiving push data", e); }}}Copy the code
The PushReceiver#run method handles the following operations:
- First, construct DatagramPacket for receiving packet data;
- The DatagramSocket#receive method blocks and waits for packets to arrive.
- DatagramSocket#receive after receiving the packet, the method continues execution.
- Fourth, parse the JSON packet as PushPacket object;
- Fifth, determine the type of the packet and call ServiceInfoHolder#processServiceInfo to process the received packet. In this method, PushPacket is converted to ServiceInfo.
- Sixth, encapsulate ACK information (that is, reply message information);
- 7. Send reply packets through DatagramSocket;
We saw above how the Nacos client listens and processes packets based on UDP, but did not find out how the client sends UDP information to the registry. Here we will comb through the logic of sending UDP information.
The client sends UDP information
NamingHttpClientProxy stores UDP_PORT_PARAM, the UDP port parameter information.
UDP port information is transmitted through instance query interfaces, such as the instance list, single health instance, all instances, subscription interface, and UpdateTask interface. The NamingClientProxy#queryInstancesOfService method is called in each of these methods.
NamingHttpClientProxy queryInstancesOfService
@Override public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { final Map<String, String> params = new HashMap<String, String>(8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, NamingUtils.getGroupedName(serviceName, groupName)); params.put(CLUSTERS_PARAM, clusters); Put (UDP_PORT_PARAM, string.valueof (udpPort)); params.put(CLIENT_IP_PARAM, NetUtils.localIP()); params.put(HEALTHY_ONLY_PARAM, String.valueOf(healthyOnly)); String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET); if (StringUtils.isNotEmpty(result)) { return JacksonUtils.toObj(result, ServiceInfo.class); } return new ServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), clusters); }Copy the code
However, if you check the source code, you can find that the parameter value passed by the UDP port is 0 in the instance list query, single health instance query, all instances query, and subscribed UpdateTask UpdateTask. Only the UDP port number in PushReceiver is used for the HTTP subscription interface.
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
return queryInstancesOfService(serviceName, groupName, clusters, pushReceiver.getUdpPort(), false);
}
Copy the code
We already know that PushReceiver has a getPushReceiverUdpPort method in the above code:
public static String getPushReceiverUdpPort() {
return System.getenv(PropertyKeyConst.PUSH_RECEIVER_UDP_PORT);
}
Copy the code
Obviously, the UDP port is set by the environment variable, and the corresponding key is “push.receiver.udp.port”.
As of version 1.4.2, the queryList method of the NamingProxy member variable in HostReactor also passes UDP ports:
public void updateService(String serviceName, String clusters) throws NacosException { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { processServiceJson(result); } } finally { // ... }}Copy the code
About 1.4.2 version of the implementation, you can see the source code, here is no longer expanded.
After the basic UDP information is transferred to the client, let’s see how the server receives and stores this information.
UDP service Storage
The server processes the UDP port on the interface for obtaining the instance list.
@GetMapping("/list") @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ) public Object list(HttpServletRequest request) throws Exception { // ... Int udpPort = integer.parseint (webutils. optional(request, "udpPort", "0")); / /... // encapsulate the clientIP and UDP ports to the Subscriber object. Subscriber = new Subscriber(clientIP + ":" + udpPort, agent, app, clientIP, namespaceId, serviceName, udpPort, clusters); return getInstanceOperator().listInstance(namespaceId, serviceName, subscriber, clusters, healthyOnly); }Copy the code
The current protocol is obtained in the getInstanceOperator() method, and the corresponding handler class is selected:
* @return V1: Jraft protocol (server side); V2: gRpc protocol (client) * / private InstanceOperator getInstanceOperator () {return upgradeJudgement. IsUseGrpcFeatures ()? instanceServiceV2 : instanceServiceV1; }Copy the code
Here the specific implementation class for InstanceOperatorServiceImpl:
@Override public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster, boolean healthOnly) throws Exception { ClientInfo clientInfo = new ClientInfo(subscriber.getAgent()); String clientIP = subscriber.getIp(); ServiceInfo result = new ServiceInfo(serviceName, cluster); Service service = serviceManager.getService(namespaceId, serviceName); long cacheMillis = switchDomain.getDefaultCacheMillis(); // Now try to enable the push try {// Process the UDP client information if (subscriber.getport () > 0 && pushService.canEnablePush(subscriber.getAgent())) { subscriberServiceV1.addClient(namespaceId, serviceName, cluster, subscriber.getAgent(), new InetSocketAddress(clientIP, subscriber.getPort()), pushDataSource, StringUtils.EMPTY, StringUtils.EMPTY); cacheMillis = switchDomain.getPushCacheMillis(serviceName); } } catch (Exception e) { // ... } / /... }Copy the code
When UDP port is greater than zero, and the agent parameters defined support UDP client, client information encapsulation of corresponding to the InetSocketAddress object, and then into the NamingSubscriberServiceV1Impl (the class has already been abandoned, See how to tweak this method implementation later).
In NamingSubscriberServiceV1Impl will encapsulate the corresponding parameters for PushClient, stored in the Map.
public void addClient(String namespaceId, String serviceName, String clusters, String agent,
InetSocketAddress socketAddr, DataSource dataSource, String tenant, String app) {
PushClient client = new PushClient(namespaceId, serviceName, clusters, agent, socketAddr, dataSource, tenant,
app);
addClient(client);
}
Copy the code
ConcurrentMap<String, ConcurrentMap<String, PushClient>>
private final ConcurrentMap<String, ConcurrentMap<String, PushClient>> clientMap = new ConcurrentHashMap<>();
public void addClient(PushClient client) {
// client is stored by key 'serviceName' because notify event is driven by serviceName change
String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName());
ConcurrentMap<String, PushClient> clients = clientMap.get(serviceKey);
if (clients == null) {
clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<>(1024));
clients = clientMap.get(serviceKey);
}
PushClient oldClient = clients.get(client.toString());
if (oldClient != null) {
oldClient.refresh();
} else {
PushClient res = clients.putIfAbsent(client.toString(), client);
// ...
}
}
Copy the code
At this point, the UDP IP and port information has been encapsulated into PushClient, and stored in NamingSubscriberServiceV1Impl member variables.
UDP notifications for the registry
When the server detects that an instance has changed, for example, that it has been unregistered, it issues a ServiceChangeEvent. UdpPushService listens to this event and processes the event.
The onApplicationEvent method of UdpPushService removes or sends UDP notifications based on PushClient. The core logic in onApplicationEvent is as follows:
ConcurrentMap<String, PushClient> clients = subscriberServiceV1.getClientMap() .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); if (MapUtils.isEmpty(clients)) { return; } Map<String, Object> cache = new HashMap<>(16); long lastRefTime = System.nanoTime(); For (PushClient client: client.values ()) {// Remove zombie client if (client.zombie()) {loggers.push. Debug ("client is zombie: " + client); clients.remove(client.toString()); Loggers.PUSH.debug("client is zombie: " + client); continue; } AckEntry ackEntry; String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent()); byte[] compressData = null; Map<String, Object> data = null; if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) { org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key); compressData = (byte[]) (pair.getValue0()); data = (Map<String, Object>) pair.getValue1(); } // Encapsulate the AckEntry object if (compressData! = null) { ackEntry = prepareAckEntry(client, compressData, data, lastRefTime); } else { ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime); if (ackEntry ! = null) { cache.put(key, new org.javatuples.Pair<>(ackEntry.getOrigin().getData(), ackEntry.getData())); }} // Notify other clients through UDP udpPush(ackEntry); }Copy the code
The core logic of event processing is to check the state of PushClient and remove it if it is already a zombie client. Then encapsulate the information about the sent UDP packet and the receiving client into an AckEntry object, and invoke udpPush to send UDP messages.
UDP reception for the registry
When looking at the client source, we see that the client will not only receive UDP requests, but also reply. So how does the registry receive replies? Also in the UdpPushService class, a static block of code inside the class initializes a UDP DatagramSocket for receiving messages:
static { try { udpSocket = new DatagramSocket(); Receiver receiver = new Receiver(); Thread inThread = new Thread(receiver); inThread.setDaemon(true); inThread.setName("com.alibaba.nacos.naming.push.receiver"); inThread.start(); } catch (SocketException e) { Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service"); }}Copy the code
Receiver is an internal class that realizes the Runnable interface. In its run method, Receiver mainly receives message information and then determines the message. According to the judgment result, it operates the data in the local Map.
Inadequate UDP design
As the article begins, UDP is designed to compensate for HTTP timed pull without affecting performance too much. But currently Nacos has some UDP shortcomings, which may be a personal quibble.
First, there is no clear description of how UDP functions are used, so many users are not aware of the existence of UDP functions and the limitations of using them.
Second, it is not friendly to cloud services. The UDP port on the client can be customized, but the UDP port on the server is randomly obtained. In cloud services, even Intranet services, UDP ports are restricted by firewalls. If the UDP port on the server is obtained randomly (and by default on the client), the UDP traffic is blocked by the firewall, and the user does not see any exception (UDP protocol does not care whether the client receives the message).
As for these two points, it is a matter of flaws, read the source code or read my article friends probably already know how to use. You can submit an Issue to the official later to see if it can be improved.
summary
This article focuses on three aspects of Nacos udP-based service instance change notification:
First, the client listens to the UDP port and updates the local instance cache in time when receiving service instance changes from the registry.
Second, the client sends its UDP information to the registry through the subscription interface, and the registry stores it.
Third, the instance in the registry has changed, and the change information is sent to the client through UDP protocol through the event mechanism.
Through this article, you’ve learned more than just the notification mechanism of UDP in Nacos. At the same time, it also opens up a new idea, that is, how to use UDP, in what scenarios to use UDP, and the possible problems when using UDP in cloud services. If this article has helped you, follow it or like it.
About the blogger: Author of the technology book SpringBoot Inside Technology, loves to delve into technology and writes technical articles.
Public account: “program new vision”, the blogger’s public account, welcome to follow ~
Technical exchange: Please contact the weibo user at Zhuan2quan