This is the 19th day of my participation in the August More Text Challenge

This article uses the source address: simple-rpc

The distributed service framework is deployed on multiple different machines, as shown below:

This will face the following problems to be solved:

  • How service callers in cluster A discover service providers in cluster B.
  • How A service caller in cluster A chooses A service provider machine in cluster B to initiate the call.
  • After A provider machine in cluster B goes offline, how does the service caller in cluster A sense that the machine is offline and not invoke the offline machine?
  • How A service provided by cluster B knows which machines in cluster A are consuming services.

All of the above problems are solved through the registry. We use the service registry to store the update service provider information and the real-time caller information of the service in real time.

Service registries have the following advantages:

  • Soft load and transparent routing: Service providers and service callers are decouple from each other, and service callers do not need to hard-code service provider addresses.
  • Dynamic discovery and scalability of services: the increase and decrease of service provider machines can be dynamically sensed by service callers through the registry, and elastic scaling of services can be achieved through the increase and decrease mechanism.
  • Registries can dynamically monitor service quality and service dependencies, and provide service governance capabilities for services.

Zookeeper implements the service registry

For details about how to deploy Zookeeper, see Using Docker to set up a Zookeeper cluster.

ZkClient

ZkClient is an open source ZooKeeper client. It is packaged on the native ZooKeeper API interface and is easier to use. ZkClient internally realizes functions such as Session timeout reconnection and Watcher repeated registration, making the tedious details of ZooKeeper client transparent to developers. For details, see ZkClient of ZooKeeper.

Service registry implementation

Goal:

  • When the service is started, zNode is composed of service provider information (host IP address, service port, and name of service interface class) and written into the Zookeeper leaf node as a temporary node. In this way, the service is registered.
  • Before initiating the service invocation, the service consumer connects to Zookeeper, registers the listener on the service provider node path, and obtains the service provider information to the local cache.
  • The service registry can sense that a machine in the service provider cluster goes offline, delete the machine service provider information from the service registry, and proactively notify every machine in the service caller cluster so that the service caller does not call the machine anymore.
  • Service consumer information can be collected through a service registry.
The main method
public interface IRegisterCenter {

    /** * Register service provider information *@paramProviders service provider */
    void registerProvider(List<Provider> providers);

    /** * Get the list of service providers * key: interface *@return* /
    Map<String, List<Provider>> getProvidersMap();

    /** * destroy *@paramServiceItfName Indicates the interface name */
    void destroy(String serviceItfName);

    /** * The consumer initializes the local service cache *@paramRemoteAppKey Unique identifier of the service provider *@paramGroupName service groupName */
    void loadProviderMap(String remoteAppKey, String groupName);

    /** * Get service provider information *@return* /
    Map<String, List<Provider>> getServiceMetadata();

    /** * Register service consumer information for monitoring *@param invoker
     */
    void registerInvoker(Invoker invoker);
}
Copy the code

The whole implementation is quite long, so let’s look at it in sections. The service provider and service consumer registration are similar, and the destruction operation is only locally cached. Void registerProvider(List providers) and void loadProviderMap(String remoteAppKey, String groupName).

Let’s first look at the related variable Settings:

    private volatile ZkClient zkClient = null;

    / * * * the zk service list 127.0.0.1:2181127.00 0.1:2182127.00 0.1:2183 * /
    private static final String ZK_SERVERS = SimpleRpcPropertiesUtil.getZkServers();
    /** * zk session timeout duration */
    private static final int ZK_SESSION_TIMEOUT = SimpleRpcPropertiesUtil.getSessionTimeout();
    /** * zk connection timeout */
    private static final int ZK_CONNECT_TIMEOUT = SimpleRpcPropertiesUtil.getConnectionTimeout();

    /** * zk root directory */
    private static final String ROOT_PATH = "/config_register";
    
    private static final String PROVIDER_TYPE = "/provider";
    private static final String INVOKER_TYPE = "/invoker";

    private static final int SERVER_PORT = SimpleRpcPropertiesUtil.getServerPort();
    private static final String LOCAL_IP = IpUtil.getLocalIP();

    /** * Service Provider List * key:interface name * value: List
      
        (methods) */
      
    private static final Map<String, List<Provider>> PROVIDER_MAP = Maps.newConcurrentMap();

    /** * The list of services imported by the client, added to the local cache at startup * key:interface name */
    private static final Map<String, List<Provider>> SERVICE_METADATA = Maps.newConcurrentMap();
Copy the code

PROVIDER_MAP and SERVICE_METADATA are the most important. ConcurrentHashMap is used to store the list of services introduced by the service provider and client to ensure thread safety. The key is the name of the service’s full class. Value is the specific information of the service (currently, service discovery is still at the class level, where multiple methods under a class represent multiple services).

Let’s move on to the implementation of two important methods.

RegisterProvider – Completes service registration

This method, known by its name, is the method that performs service registration. Let’s look at the specific implementation steps as follows:

The specific code is as follows:

@Override
public void registerProvider(List<Provider> providers) {
    if (CollectionUtils.isEmpty(providers)) {
        log.debug("RegisterCenterImpl registerProvider providers is empty, ignore it, providers={}", providers);
    } else {
        synchronized (RegisterCenterImpl.class) {
            / / load zkClient
            this.lazyInitZkClient();

            // Set the local service list
            this.setLocalCache(providers);

            String rootNode = ROOT_PATH + UrlConstants.SLASH + providers.get(0).getAppKey();
            // Create root node
            this.createRootNode(rootNode);
            
            for (Map.Entry<String, List<Provider>> entry : PROVIDER_MAP.entrySet()) {
                // Service interface classpath
                String serviceItfName = entry.getKey();
                Provider provider = entry.getValue().get(0);
                // Service group name
                String groupName = entry.getValue().get(0).getGroupName();
                // Create a service provider node
                String servicePath = rootNode + UrlConstants.SLASH + groupName + UrlConstants.SLASH + serviceItfName + PROVIDER_TYPE;
                this.createServiceNode(servicePath);

                // Create a temporary node for the current server
                String currentServiceIpNode = servicePath + UrlConstants.SLASH + LOCAL_IP + UrlConstants.VERTICAL_LINE
                        + SERVER_PORT + UrlConstants.VERTICAL_LINE + provider.getWeight() + UrlConstants.VERTICAL_LINE
                        + provider.getWorkerThreads() + UrlConstants.VERTICAL_LINE + groupName;

                this.createCurrentServiceIpNode(currentServiceIpNode);
                log.debug("create current service node success, node path = {}", currentServiceIpNode);

                // Listen for local service changes and join the local cache
                this.subscribeChildChanges(servicePath, PROVIDER_MAP); }}}}Copy the code

The whole logic is relatively simple. In essence, the service List obtained from service discovery (which will be described later) is encapsulated as zNode, which is used as a temporary node (Zookeeper monitors the survival of the server. Once the server goes offline, the temporary node will be automatically deleted and pushed to the service consumer. To achieve automatic service provider offline mother) saved in ZooKeeper, and registered listeners to monitor the process of service changes. There may be some special handling in this process, such as locking, lazy loading, etc., for safety and performance reasons.

LoadProviderMap – Loads a list of services

The specific code is as follows:

@Override
public void loadProviderMap(String remoteAppKey, String groupName) {
    if(MapUtils.isEmpty(SERVICE_METADATA)) { SERVICE_METADATA.putAll(fetchOrUpdateServiceMetaData(remoteAppKey, groupName)); }}/** * Get or update service metadata information *@paramRemoteAppKey Service provider Appkey *@paramGroupName service groupName *@return* /
private Map<String, List<Provider>> fetchOrUpdateServiceMetaData(String remoteAppKey, String groupName) {
    final Map<String, List<Provider>> providerServiceMap = Maps.newHashMap();
    / / connect the ZK
    this.lazyInitZkClient();

    // Service provider node
    String providerNode = ROOT_PATH + UrlConstants.SLASH + remoteAppKey + UrlConstants.SLASH + groupName;
    // Get the list of party service providers from ZK
    List<String> providerServices = zkClient.getChildren(providerNode);
    for (String serviceName : providerServices) {
        String servicePath = providerNode + UrlConstants.SLASH + serviceName + PROVIDER_TYPE;
        List<String> nodeList = zkClient.getChildren(servicePath);
        log.info("get zk nodeList={} from path={}", nodeList, servicePath);
        for (String node : nodeList) {
            / / packaging service information: IP | port | weight | workerThreads | groupName
            String[] serverAddress = StringUtils.split(node, UrlConstants.VERTICAL_LINE);
            if (ArrayUtils.isNotEmpty(serverAddress)) {
                String serverIp = serverAddress[0];
                int serverPort = Integer.parseInt(serverAddress[1]);
                int weight = Integer.parseInt(serverAddress[2]);
                int workerThreads = Integer.parseInt(serverAddress[3]);
                String group = serverAddress[4];

                List<Provider> providerList = providerServiceMap.get(serviceName);
                if (providerList == null) {
                    providerList = new ArrayList<>();
                }
                Provider provider = Provider.builder().serverIp(serverIp).serverPort(serverPort).weight(weight)
                        .workerThreads(workerThreads).groupName(group).build();
                try {
                    provider.setServiceItf(ClassUtils.getClass(serviceName));
                } catch (Exception e) {
                    log.error("get service interface class error", e);
                    throw new SRpcException("get service interface class error", e);
                }
                providerList.add(provider);
                providerServiceMap.put(serviceName, providerList);
            } else {
                log.debug("illegal service address, ignore it"); }}// Listen for remote service changes and join the local cache
        this.subscribeChildChanges(servicePath, SERVICE_METADATA);
    }
    return providerServiceMap;
}
Copy the code

Loading the service list is an important code when the service is introduced. It loads all the services to the local cache according to the two information of appKey and groupName, and registers listeners to monitor the changes of remote services, so as to ensure the normal use of services.

summary

The registry is the core function of the whole distributed service framework. It provides guarantee for the quality of service in distributed services, and provides an entrance for service monitoring and service governance.