Eureka is an open source service registration and discovery component of Netflix. Together with other Netflix service components (such as load balancers, fuses, gateways, etc.), Eureka is integrated into Spring Cloud Netflix module. Eureka 2.0 has been closed, but 1.x is still under maintenance and can be used. This article will take a closer look at the Eureka registry for better use and tuning.
About version: The Spring Cloud version used in this article is Hoxton.sr8, Spring Boot version is 2.3.3.RELEASE, and eureka version is 1.9.25.
At the beginning of Eureka experience
Eureka is divided into Eureka Server and Eureka Client. Eureka Server is the Eureka registry, and Eureka Client is the Eureka Client. This section starts with a demo that puts together the registry shelf to look at the infrastructure of the registry.
Eureka Server
1, create a registry service: sunny-register
Start by creating a Maven project with a service name of sunny-Register and introduce registry server dependencies in POM.xml.
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
</dependencies>
Copy the code
2. Add the configuration file
Add the application.yml configuration file under Resources and add the registrie-related configuration.
server:
port: 8000
spring:
application:
name: sunny-register
eureka:
instance:
hostname: dev.lyyzoo.com
client:
# Whether to register yourself with the registry
register-with-eureka: false
# Whether to retrieve services
fetch-registry: false
service-url:
defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/
Copy the code
3. Add a startup class
Add a startup class and annotate it with @enableeurekaserver to enable the registry.
package com.lyyzoo.sunny.register;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
@EnableEurekaServer
@SpringBootApplication
public class RegisterApplication {
public static void main(String[] args) { SpringApplication.run(RegisterApplication.class, args); }}Copy the code
4. Start the registry
After start the registry, visit http://dev.lyyzoo.com:8000/, you can see the registry page, there is no instance registered. (Dev.lyyzoo.com maps to 127.0.0.1 in the local hosts file)
Eureka Client
Create two demo services. The Demo-producer service provides an interface as a producer and the Demo-consumer service calls the Demo-producer interface as a consumer.
1. Create client services: Demo-producer
Create a Maven project with the service name demo-Producer, introduce registry client dependencies in POM.xml, and add Web dependencies.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
Copy the code
2. Add the configuration file
Add the application.yml configuration file under Resouces to add the registry client-related configuration.
server:
port: 8010
spring:
application:
name: demo-producer
eureka:
client:
serviceUrl:
defaultZone: ${EUREKA_DEFAULT_ZONE:http://dev.lyyzoo.com:8000/eureka}
Copy the code
3. Add a startup class
Add a startup class and annotate it with @enableeurekaclient to enable the client.
@EnableEurekaClient
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); }}Copy the code
4. Add a REST interface
Add an interface to test the call:
@RestController
public class DemoController {
private final Logger logger = LoggerFactory.getLogger(getClass());
@GetMapping("/v1/uuid")
public ResponseEntity<String> getUUID(a) {
String uuid = UUID.randomUUID().toString();
logger.info("generate uuid: {}", uuid);
returnResponseEntity.ok(uuid); }}Copy the code
Create a client service: demo-consumer
Similarly, we can create a consumer service: Demo-producer, which adds a consumer interface to the service and uses RestTemplate load balancing to invoke the Demo-producer interface.
Therefore, you need to configure a RestTemplate with load balancing:
@EnableEurekaClient
@SpringBootApplication
public class ConsumerApplication {
@Bean
@LoadBalanced
public RestTemplate restTemplate(a) {
return new RestTemplate();
}
public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); }}Copy the code
Add the consumer interface. Note that the URL is written to the service name, not the specific IP address or port. In the microservice scenario, it is impossible to write a specific address between services.
@RestController
public class DemoController {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private RestTemplate restTemplate;
@GetMapping("/v1/id")
public ResponseEntity<String> getId(a) {
ResponseEntity<String> result = restTemplate.getForEntity("http://demo-producer/v1/uuid", String.class);
String uuid = result.getBody();
logger.info("request id: {}", uuid);
returnResponseEntity.ok(uuid); }}Copy the code
6. Start the registry client
Start the Demo-producer with two different ports, which can be specified using environment variables. Then start demo-Consumer.
Once the startup is complete, you can see two registered Demo-Producer instances and one Demo-Consumer instance in the registry, both in UP state.
7. Test interface
Visit http://dev.lyyzoo.com:8020/v1/id call customer service interface, multiple interfaces, will find that the producer service demo – consumer two instances of the console can alternate the output of the log information. This means that the consumer client accesses the producer through the service name.
Eureka infrastructure
From the previous experience, it can be found that inter-service invocation only needs to know the name of a service to call the API of the service, without specifying the specific IP address and port. How can this be done?
As you can see, Eureka’s infrastructure consists of three roles:
Service Registry
: Eureka Server, which provides service registration and discoveryService provider
: Eureka Client, which provides services (as consumers themselves)Service consumer
: Eureka Client, consumer service (can also be a provider itself)
You first need a service registry, with which clients register and submit their information, such as the service name, IP address and port information of the service, to the registry. The client obtains information from the registry for a list of services that have been registered with the registry. After obtaining the service registration list information, the client service can find all instances of the service based on the service name, and then select one instance through load balancing. Based on its IP address and port information, the client service can invoke the API interface of the service.
This is the most basic architecture and function of the registry, providing service registration and discovery, providing service registry information for each client. In order to do this, Eureka has many mechanisms to achieve and ensure high availability, such as service registration, service renewal, service registration list, service offline, service cull, and so on. Eureka also provides a number of parameters that allow us to optimize some of its functions and configurations based on the actual scenario, such as the time to maintain the heartbeat, the time between pulling the registry, the self-protection mechanism, and so on. Below we will analyze these functions and parameters of Eureka from the source code level, understand its principle, and learn some of its design.
Eureka source code ready
Although we rely on spring-cloud-starter-Netflix-Eureka-server and spring-cloud-starter-Netflix-eureka-client in POP.xml, But spring-cloud-starter-Netflix just encapsulates Eureka so that it can be started and initialized by Springboot. At the bottom are Eureka-core and Eureka-client of Netflix. So we first analyze the source code of Netflix Eureka, and finally look at the source code of Spring-cloud-starter-Netflix.
Source environment preparation
1. Download the source code
Netflix Eureka
:Github.com/Netflix/eur…Spring Cloud Netflix
:Github.com/spring-clou…
Clone eureka source code to local:
$ git clone https://github.com/Netflix/eureka.git
Copy the code
Since we rely on version 1.9.25, after cloning the code locally, switch it to 1.9.25:
$git checkout -b 1.9.25Copy the code
Then go to the eureka root directory and execute the build command:
$ ./gradlew clean build -x test
Copy the code
2. IDEA open the source code
Eureka uses Gradle to manage dependencies, so you need to install Gradle plugins for eureka. After that, you need to install Gradle plugins for IDEA.
Eureka engineering structure
Eureka mainly contains the following modules:
eureka-client
: Eureka clienteureka-core
: Eureka server, the core function of the registryeureka-resources
: JSP-based Eureka console to see which service instances are registeredeureka-server
: The registry integrates Eureka-client, Eureka-core, and Eureka-Resources. Because it relies on Eureka-client, Eureka-Server is also a client. In the cluster mode of Eureka Server, Eureka-server is also registered with other registries as a clienteureka-examples
Example: the eurekaeureka-test-utils
: Eureka unit testing tooleureka-core|client-jersey2
: Encapsulation of the Jersey framework. Jersey is similar to Spring MVC and supports HTTP restful requests. Communication between Eureka-client and Eureka-Server is based on the Jersey framework
The Eureka Server starts initialization
The first thing to look at is eureka-server. Once the registry is up, clients can register and discover services.
Eureka – server module
1. Eureka-server directory
- The Resources directory is dominated by configuration files for Eureka Client and Server
- There’s one under WebApp
web.xml
The eureka-Server will be packaged as a WAR package to run - There is a unit test class under Test
EurekaClientServerRestIntegrationTest
, which contains unit tests for service registration, renewal, loggings, etc., that we can run to debug the code
2, web) XML
Contents of web.xml:
<web-app version="2.5"
xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd">
<! -- Eureka initializes classes -->
<listener>
<listener-class>com.netflix.eureka.EurekaBootStrap</listener-class>
</listener>
<! -- Status filter -->
<filter>
<filter-name>statusFilter</filter-name>
<filter-class>com.netflix.eureka.StatusFilter</filter-class>
</filter>
<! -- Authentication filter -->
<filter>
<filter-name>requestAuthFilter</filter-name>
<filter-class>com.netflix.eureka.ServerRequestAuthFilter</filter-class>
</filter>
<! -- Limited flow filter -->
<filter>
<filter-name>rateLimitingFilter</filter-name>
<filter-class>com.netflix.eureka.RateLimitingFilter</filter-class>
</filter>
<filter>
<filter-name>gzipEncodingEnforcingFilter</filter-name>
<filter-class>com.netflix.eureka.GzipEncodingEnforcingFilter</filter-class>
</filter>
<! -- Jersey container -->
<filter>
<filter-name>jersey</filter-name>
<filter-class>com.sun.jersey.spi.container.servlet.ServletContainer</filter-class>
<init-param>
<param-name>com.sun.jersey.config.property.WebPageContentRegex</param-name>
<param-value>/(flex|images|js|css|jsp)/.*</param-value>
</init-param>
<init-param>
<param-name>com.sun.jersey.config.property.packages</param-name>
<param-value>com.sun.jersey; com.netflix</param-value>
</init-param>
<filter-mapping>
<filter-name>statusFilter</filter-name>
<url-pattern>/ *</url-pattern>
</filter-mapping>
<filter-mapping>
<filter-name>requestAuthFilter</filter-name>
<url-pattern>/ *</url-pattern>
</filter-mapping>
<filter-mapping>
<filter-name>jersey</filter-name>
<url-pattern>/ *</url-pattern>
</filter-mapping>
<! Welcome page -->
<welcome-file-list>
<welcome-file>jsp/status.jsp</welcome-file>
</welcome-file-list>
</web-app>
Copy the code
You can obtain the following information from web.xml:
- Eureka server is started first by com.net flix. Eureka. EurekaBootStrap class to start the initialization work related.
- Configure the
StatusFilter
(Server status filter),ServerRequestAuthFilter
(Authentication filter),RateLimitingFilter
Filter, but RateLimitingFilter is not enabled by default. - The jersey servlet container is actually the same as the springframework’s
DispatcherServlet
This is similar to intercepting and processing HTTP restful requests, which we don’t have to worry about too much. - Finally, the welcome page of Eureka Server is configured as
jsp/status.jsp
This page is under the Eureka-Resources module, which is the Eureka console page seen earlier.
3, the unit test class EurekaClientServerRestIntegrationTest
Start with the setUp method, which is run before each test case is run to initialize the run environment.
@BeforeClass
public static void setUp(a) throws Exception {
// Initialize the Eureka configuration
injectEurekaConfiguration();
// Start eureka server.*. War package in build/libs
// When this step starts, the web.xm configuration file is loaded and the EurekaBootStrap class is initialized
startServer();
// Eureka server configuration
createEurekaServerConfig();
// Create a Jersey client to use to invoke resources
httpClientFactory = JerseyEurekaHttpClientFactory.newBuilder()
.withClientName("testEurekaClient")
.withConnectionTimeout(1000)
.withReadTimeout(1000)
.withMaxConnectionsPerHost(1)
.withMaxTotalConnections(1)
.withConnectionIdleTimeout(1000)
.build();
jerseyEurekaClient = httpClientFactory.newClient(new DefaultEndpoint(eurekaServiceUrl));
ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);
jerseyReplicationClient = JerseyReplicationClient.createReplicationClient(
eurekaServerConfig,
serverCodecs,
eurekaServiceUrl
);
}
Copy the code
This class provides the following test cases that you can run to debug.
EurekaBootStrap initialization
EurekaBootStrap is the entrance of the listener, which implements the ServletContextListener interface and mainly completes the startup and initialization of eureka Server.
From the contextInitialized method, it can be divided into eureka environment initialization and Eureka Server context initialization on the whole.
@Override
public void contextInitialized(ServletContextEvent event) {
try {
// The eureka environment is initialized
initEurekaEnvironment();
// Eureka Server context initialization
initEurekaServerContext();
ServletContext sc = event.getServletContext();
sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
} catch (Throwable e) {
logger.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e); }}Copy the code
1. Initialize the Eureka environment
The initEurekaEnvironment method sets the data center and runtime environment parameters:
- archaius.deployment.datacenter = default
- archaius.deployment.environment = test
2. Initialize eureka Server context
InitEurekaServerContext context initialization involves a number of phases:
- Construct the Eureka registry configuration: EurekaServerConfig
- Construct eureka instance configuration: EurekaInstanceConfig
- Construct instance information: InstanceInfo
- Construct the instance manager: ApplicationInfoManager
- Construct the Eureka client configuration: EurekaClientConfig
- Creating a Eureka client: EurekaClient (DiscoveryClient)
- Create the registry (registry) can sense had been cluster: PeerAwareInstanceRegistry
- Create a cluster: PeerEurekaNodes
- Encapsulate the information into the Eureka context: EurekaServerContext
- Put the eureka context in a global container: EurekaServerContextHolder
- Initialize the Eureka context
- Synchronize the registry of eureka Server
- Open the tracking
- Registration monitoring statistics
protected void initEurekaServerContext(a) throws Exception {
// 1. Eureka registry configuration
EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
logger.info("Initializing the eureka client...");
logger.info(eurekaServerConfig.getJsonCodecName());
ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);
ApplicationInfoManager applicationInfoManager = null;
if (eurekaClient == null) {
// 2. Eureka instance configuration
EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
? new CloudInstanceConfig()
: new MyDataCenterInstanceConfig();
// 3. Construct InstanceInfo instance information
// create the ApplicationInfoManager application manager
applicationInfoManager = new ApplicationInfoManager(
instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
// Configure eureka client
EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
// 6. Construct EurekaClient. DiscoveryClient encapsulates client-related operations
eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
} else {
applicationInfoManager = eurekaClient.getApplicationInfoManager();
}
PeerAwareInstanceRegistry registry;
if (isAws(applicationInfoManager.getInfo())) {
registry = new AwsInstanceRegistry(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
awsBinder.start();
} else {
// Create a registry that is aware of eureka clusters
registry = new PeerAwareInstanceRegistryImpl(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
}
Construct eureka-server cluster information
PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
registry,
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
applicationInfoManager
);
Create EurekaServerContext based on the object constructed previously
serverContext = new DefaultEurekaServerContext(
eurekaServerConfig,
serverCodecs,
registry,
peerEurekaNodes,
applicationInfoManager
);
/ / put serverContext EurekaServerContextHolder context,
/ / such other place can get EurekaServerContext EurekaServerContextHolder
EurekaServerContextHolder.initialize(serverContext);
// initialize the eureka-server context
serverContext.initialize();
logger.info("Initialized server context");
Synchronize the registry from the adjacent Eureka-server
int registryCount = registry.syncUp();
Start the registry to start some scheduled tasks
registry.openForTraffic(applicationInfoManager, registryCount);
//13
EurekaMonitors.registerAllStats();
}
Copy the code
Interface oriented configuration reading
There are three configuration interfaces in the initialization, EurekaServerConfig, EurekaInstanceConfig and EurekaClientConfig, which correspond to the configuration of the registry, eureka instance and Eureka client respectively.
As you can see from the constructor of their default implementation class, EurekaServerConfig is the read eureka-server.properties configuration file with the name prefix eureka.server; EurekaInstanceConfig and EurekaClientConfig are read eureka-client.properties configuration files. Their name prefixes are Eureka.
As you can see, Eureka gets the configuration in code in the form of interface methods, hard-coding the configuration encoding and default values in its default implementation class. This interface-based configuration reading approach is useful because it is easier to maintain without having to maintain a bunch of constants and simply change the implementation class if the configuration code changes.
For example:
@Override
public int getExpectedClientRenewalIntervalSeconds(a) {
final int configured = configInstance.getIntProperty(
namespace + "expectedClientRenewalIntervalSeconds".30).get();
return configured > 0 ? configured : 30;
}
@Override
public double getRenewalPercentThreshold(a) {
return configInstance.getDoubleProperty(
namespace + "renewalPercentThreshold".0.85).get();
}
@Override
public boolean shouldEnableReplicatedRequestCompression(a) {
return configInstance.getBooleanProperty(
namespace + "enableReplicatedRequestCompression".false).get();
}
Copy the code
Construct a service instance based on the builder pattern
See new EurekaConfigBasedInstanceInfoProvider (instanceConfig). The get () the code, in the get method to complete the service instance information structure. It basically uses the Builder design pattern here to build LeaseInfo and InstanceInfo. In InstanceInfo, for example, it has a static Builder class inside, The InstanceInfo object is created using the newBuilder() method. You can then call the Builder property Settings method to set the properties. While setting these properties, some correlation checks are done. You can also do some final validation in the build method. The Builder pattern is well suited for building such complex objects.
Public synchronized InstanceInfo get() {if (InstanceInfo == null) { Main contract time interval (the default) 30 seconds and contract expiration time (default 90 seconds) LeaseInfo. Builder leaseInfoBuilder = LeaseInfo. Builder. NewBuilder () .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds()) .setDurationInSecs(config.getLeaseExpirationDurationInSeconds()); if (vipAddressResolver == null) { vipAddressResolver = new Archaius1VipAddressResolver(); } / / based on the Builder pattern to create InstanceInfo InstanceInfo. Builder Builder = InstanceInfo. Builder. NewBuilder (vipAddressResolver); // set the appropriate id for the InstanceInfo, falling back to datacenter Id if applicable, else hostname String instanceId = config.getInstanceId(); / /... String defaultAddress; if (config instanceof RefreshableInstanceConfig) { // Refresh AWS data center info, and return up to date address defaultAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(false); } else { defaultAddress = config.getHostName(false); } // fail safe if (defaultAddress == null || defaultAddress.isEmpty()) { defaultAddress = config.getIpAddress(); } // Set property builder.setNamespace(config.getNamespace()).setInstanceId(instanceId).setAppName(config.getAppName ()) .setAppGroupName(config.getAppGroupName()) .setDataCenterInfo(config.getDataCenterInfo()) .setIPAddr(config.getIpAddress()) .setHostName(defaultAddress) .setPort(config.getNonSecurePort()) //... ; builder.setStatus(initialStatus); / /... InstanceInfo = builder.build(); InstanceInfo = builder.build(); instanceInfo.setLeaseInfo(leaseInfoBuilder.build()); } return instanceInfo; }Copy the code
LeaseInfo is the renewal information, and you can see that the two main configurations are the renewal interval and how long it is not renewed. InstanceInfo is then set based on config. InstanceInfo is instance information, including instance ID, host name, port, LeaseInfo, and so on.
The registry constructs DiscoveryClient
In cluster mode, Eureka Server registers with other registries as a client and is itself a Eureka client. So we build EurekaClient, whose default implementation class is DiscoveryClient. DiscoveryClient contains most of the core functions of the Eureka client, such as service registration, renewal, heartbeat maintenance, and registry pulling.
Step by step into the most complex construction method of DiscoveryClient, we will first analyze what we do in general, focusing on the big and small, and the details of many components, and we will look at the specific functions later.
- Save EurekaClientConfig, EurekaInstanceConfig, EurekaTransportConfig, InstanceInfo, ApplicationInfoManager, and so on to local variables
- To obtain the registry, create a registry status metric
- To register with the registry, create a heartbeat status meter
- If you don’t get the registry and don’t register with the registry, you don’t create schedulers, heartbeat thread pools, etc., free up some resources
- If you want to register in the registry and crawl the registry, initialize some scheduled resources:
- A scheduler-enabled thread pool has been created with two core threads that, as you can see from below, handle heartbeat and cache flushes
- A thread pool was created to maintain the heartbeat, the number of core threads was 1, and the default maximum number of threads was 5
- A cache flush thread pool was created with a core thread count of 1 and a maximum thread count configuration of 5 by default
- A component for eureka Client to communicate with eureka Server is created
EurekaTransport
And did some initialization. The client in EurekaTransport mainly encapsulates the API call interface to the server for easy call - Then, if you want to fetch the registry, the registry will be fetched, fetchRegistry can be divided into full fetch and incremental fetch, the first time when the full fetchRegistry is started
- Initialize the scheduling task:
- If the registry is to be fetched, a task to flush the cache is created and scheduling begins, with the registry fetched every 30 seconds by default
- If you want to register with the registry, create a task to send heartbeats and start scheduling, with heartbeats sent every 30 seconds by default
- If you want to register with the registry, you also create an instance copy propagator (also an internal scheduled task), and a listener for instance state changes
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
/ /...
// Save instance information and configuration information locally
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if(myInfo ! =null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
/ / set Applications
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
// Pull the address array from the registry remotely, using an atomic class that may dynamically update the address at run time
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
// If you want to get the registry, the status monitor is registered
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_".new long[] {15L.30L.60L.120L.240L.480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
// If you want to register with eureka-Server, the heartbeat status monitor is created
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_".new long[] {15L.30L.60L.120L.240L.480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
// If you do not register in the registry and do not pull the registry, you will not create the scheduler, thread pool, etc
if(! config.shouldRegisterWithEureka() && ! config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
//....
return; // no need to setup up an network tasks and we are done
}
try {
// Create a timer scheduler with 2 core threads by default. The main tasks are heartbeat tasks and cache refresh tasks
scheduler = Executors.newScheduledThreadPool(2.new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-%d").setDaemon(true).build());
// Maintain the heartbeat thread pool, a core thread, the maximum number of threads default 5.
SynchronousQueue = SynchronousQueue = SynchronousQueue = SynchronousQueue = SynchronousQueue = SynchronousQueue
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build()
); // use direct handoff
// Refresh the cached thread pool, a core thread, with a maximum thread data of 5 by default
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build()
); // use direct handoff
// Eureka HTTP call client supports communication between Eureka client and Eureka Server
eurekaTransport = new EurekaTransport();
// Initialize eurekaTransport
scheduleServerEndpointTask(eurekaTransport, args);
/ /...
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
if (clientConfig.shouldFetchRegistry()) {
try {
// Fetching the registry: full fetching and incremental fetching
boolean primaryFetchRegistryResult = fetchRegistry(false);
if(! primaryFetchRegistryResult) { logger.info("Initial registry fetch from primary servers failed");
}
/ /...
} catch (Throwable th) {
logger.error("Fetch registry error at startup: {}", th.getMessage());
throw newIllegalStateException(th); }}// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler ! =null) {
this.preRegistrationHandler.beforeRegistration();
}
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if(! register() ) {throw new IllegalStateException("Registration error at startup. Invalid server response."); }}catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw newIllegalStateException(th); }}// Initializes scheduling tasks: scheduling tasks to flush the cache, dispatching dispatching tasks to send heartbeats, and instance copy propagators
initScheduledTasks();
/ /...
}
Copy the code
Initializing a scheduling task:
private void initScheduledTasks(a) {
if (clientConfig.shouldFetchRegistry()) {
// The interval between fetching the registry, default 30 seconds
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
// Refresh the cache scheduler delay time multiplier. If a task times out, the delay time will be extended
// This can avoid frequent invalid scheduling when network jitter occurs and eureka-sever is unavailable
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// Registry refresh task
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh", scheduler, cacheRefreshExecutor,
registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound,
new CacheRefreshThread() // Refresh the registry task
);
// The registry refresh task will be scheduled after 30 seconds
scheduler.schedule(cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
// Renewal interval. Default: 30 seconds
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
// The latency of the heartbeat scheduler is increased by 10 by default
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat scheduled task
heartbeatTask = new TimedSupervisorTask(
"heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs,
TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread()
);
// The heartbeat task will be scheduled after 30 seconds
scheduler.schedule(heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS);
// Instance copy propagator, used to update its state periodically
instanceInfoReplicator = new InstanceInfoReplicator(this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2);
// A listener for instance state changes
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId(a) {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
/ /...
// Notification to re-register the instanceinstanceInfoReplicator.onDemandUpdate(); }};// Register listeners with ApplicationInfoManager
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// Start the copy propagator with a default delay of 40 seconds
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration"); }}Copy the code
Design of timed task supervisor
As you can see, eureka Client uses a scheduled task and scheduler to periodically send heartbeat and periodically grab the registry. I think the design idea of scheduled scheduling here can be referenced and used for reference.
Take this code for a heartbeat task as an example:
if (clientConfig.shouldFetchRegistry()) {
// The interval between fetching the registry, default 30 seconds
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
// Refresh the cache scheduler delay time multiplier. If a task times out, the delay time will be extended
// This can avoid frequent invalid scheduling when network jitter occurs and eureka-sever is unavailable
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// Registry refresh task
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh", scheduler, cacheRefreshExecutor,
registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound,
new CacheRefreshThread() // Refresh the registry task
);
// The registry refresh task will be scheduled after 30 seconds
scheduler.schedule(cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
Copy the code
The above code is not that complicated. It basically creates a scheduled task and then uses the scheduler to start scheduling after a certain delay. But it doesn’t schedule tasks directly with the scheduler (CacheRefreshThread), nor does it schedule tasks at a fixed frequency (every 30 seconds). It defines a supervisor for the task, the TimedSupervisorTask, that is created with parameters like the scheduler, the task to be executed, and the time interval that is passed in, and the scheduler schedules the TimedorTask.
Looking at the construction method of TimedSupervisorTask, it mainly has the following points:
- The timeout of the task is equal to the interval, which is 30 seconds by default, and then the delay is equal to the timeout by default. If the Eureka Server is down, or the network is faulty, a timeout may occur
- The maximum delay time is set, which defaults to 300 seconds or 10 times the timeout time
- Finally, some counters are created to count The Times of success, timeout, rejection and exception respectively. It can be seen that there are statistics for task scheduling here
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
this.name = name;
this.scheduler = scheduler;
this.executor = executor;
// The task timeout is equal to the task scheduling interval
this.timeoutMillis = timeUnit.toMillis(timeout);
this.task = task;
// Delay time The default value is timeout
this.delay = new AtomicLong(timeoutMillis);
// Maximum delay time, which is 10 times larger than the timeout by default
this.maxDelay = timeoutMillis * expBackOffBound;
// Initialize the counter and register it
successCounter = Monitors.newCounter("success");
timeoutCounter = Monitors.newCounter("timeouts");
rejectedCounter = Monitors.newCounter("rejectedExecutions");
throwableCounter = Monitors.newCounter("throwables");
threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
Monitors.registerObject(name, this);
}
Copy the code
The run method that the Timedcontainer is handling is:
- 1) First, the task is asynchronously submitted to the thread pool for execution. Instead of running the task directly, the task is asynchronously submitted to the thread pool. In this way, timeout waiting can be realized without affecting the main task
- 2) If the task times out, such as network delay or Eureka Server unavailable, it will consider that if the task is scheduled 30 seconds later, the Eureka Server may still be unavailable, so it will increase the delay time, so that the first time out will be scheduled 300 seconds later. If eureka Server becomes available within 300 seconds and a new service instance is registered, the client will not be able to sense it in time. So I think you can set the corresponding parameters appropriate getCacheRefreshExecutorExponentialBackOffBound smaller 10 times (the default).
- 3) If the task does not time out, the delay time is reset to the default timeout time after the task is scheduled successfully. Finally, the next scheduling is carried out in finally.
public void run(a) { Future<? > future =null;
try {
// Submit the task to the thread pool
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
// block until the task completes or times out
future.get(timeoutMillis, TimeUnit.MILLISECONDS);
// After the task is complete, reset the delay to the timeout period, that is, 30 seconds
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
// Number of successes +1
successCounter.increment();
} catch (TimeoutException e) {
logger.warn("task supervisor timed out", e);
// Times out +1
timeoutCounter.increment();
// If the task times out, the delay time is increased. The current delay time is *2, and then the maximum value is taken
long currentDelay = delay.get();
long newDelay = Math.min(maxDelay, currentDelay * 2);
// Set to the maximum delay time
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
/ /...
rejectedCounter.increment();
} catch (Throwable e) {
/ /...
throwableCounter.increment();
} finally {
if(future ! =null) {
future.cancel(true);
}
if(! scheduler.isShutdown()) {// Continue to schedule tasks after the delay
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); }}}Copy the code
To summarize the design:
- 1) First of all, the call timeout should be taken into account in the remote call due to network unavailability and server down. Thread pool can be used to asynchronously submit tasks to realize the waiting timeout mechanism.
- 2) After the timeout, it can be assumed that it may take some time for the service to recover the available state. If the service is scheduled at the original time interval, it may still time out, so the delay time is increased. If the call succeeds, it has been restored and the delay is reset.
- 3) Scheduled tasks are scheduled with a certain delay time (
schedule
), the delay time can vary according to the actual situation, rather than being scheduled at a fixed frequency from the beginning (scheduleAtFixedRate
). - 4) For scheduled tasks and tasks in the thread pool, it is better to make statistics on the execution status of tasks, so as to observe the scheduling situation of tasks.
Constructing the registry
Then PeerAwareInstanceRegistry structure, from the point of naming, it is a perception of eureka clusters registry, is under the cluster pattern, had pulled the registry server from other server nodes. It is PeerAwareInstanceRegistryImpl default implementation classes, inherit from AbstractInstanceRegistry, registry is an example.
1, tectonic PeerAwareInstanceRegistry
Enter the PeerAwareInstanceRegistryImpl constructor:
- In front of the first is to construct the EurekaServerConfig, EurekaClientConfig, EurekaClient incoming construction method to construct PeerAwareInstanceRegistry
- The super constructor is called, which initializes the following:
- Holds a circular queue of recently offline instances
- Holds a circular queue of recently registered instances
- Counter of the number of renewals in the last minute
- The scheduled task removes instances from the recentlyChangedQueue
- Then create a recent cluster synchronization frequency counter numberOfReplicationsLastMin in a minute. MeasuredRate we’ll look at its design later.
public PeerAwareInstanceRegistryImpl(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs, EurekaClient eurekaClient) {
super(serverConfig, clientConfig, serverCodecs);
this.eurekaClient = eurekaClient;
// Count of cluster synchronization counts in the last minute
this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
// We first check if the instance is STARTING or DOWN, then we check explicit overrides,
// then we check the status of a potentially existing lease.
this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
}
///////////////////////////////////////////////
protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
this.serverConfig = serverConfig;
this.clientConfig = clientConfig;
this.serverCodecs = serverCodecs;
// The latest offline loop queue
this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
// The most recently registered loop queue
this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);
// Count of the last minute renewal
this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);
// a scheduled task that periodically removes expired instances from the recently changed queue
this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
serverConfig.getDeltaRetentionTimerIntervalInMs(),
serverConfig.getDeltaRetentionTimerIntervalInMs());
}
Copy the code
The details of this will be discussed later when we analyze the specific functions. We just need to know that there are queues and counters.
2. Design of CircularQueue
As you can see from the constructor, it uses a circular queue to hold information about the most recently offline and recently registered instances, with a fixed capacity of 1000 to keep the number of recent instances under 1000.
CircularQueue is a custom CircularQueue, derived from AbstractQueue. Internally, it proxies the ArrayBlockingQueue, overrides the offer method, and when the queue is full, takes an element from the head, and then puts it at the end of the queue.
static class CircularQueue<E> extends AbstractQueue<E> {
private final ArrayBlockingQueue<E> delegate;
private final int capacity;
public CircularQueue(int capacity) {
this.capacity = capacity;
this.delegate = new ArrayBlockingQueue<>(capacity);
}
@Override
public Iterator<E> iterator(a) {
return delegate.iterator();
}
@Override
public int size(a) {
return delegate.size();
}
@Override
public boolean offer(E e) {
// If the queue is full, take one element from the head and place it at the end
while(! delegate.offer(e)) { delegate.poll(); }return true;
}
@Override
public E poll(a) {
return delegate.poll();
}
@Override
public E peek(a) {
returndelegate.peek(); }}Copy the code
Create and initialize the Eureka Server context
PeerEurekaNodes was created, presumably to represent the Eureka cluster. Then based on the previously created something to create eureka EurekaServerContext server context, from DefaultEurekaServerContext construction method in as you can see, only to encapsulate the front structure, convenient for global use. Then put serverContext EurekaServerContextHolder, such other place can serverContext obtained through the holder.
Initialize the Eureka Server context:
- Start eureka cluster:
- Start a scheduled task (the default interval is 10 minutes) to update the eureka cluster node information based on the configured Eureka server address
PeerEurekaNode
In this way, when a Eureka server goes offline or goes online, other server nodes can be detected in time. PeerEurekaNode is used to synchronize data between cluster nodes, which will be analyzed later in the cluster analysis.
- Start a scheduled task (the default interval is 10 minutes) to update the eureka cluster node information based on the configured Eureka server address
- Registry initialization:
- We start the counter we created earlier:
numberOfReplicationsLastMin
- To initialize the response cache, Eureka Server constructs a multi-level cache to respond to the client fetching the registry request. The design of this multi-level cache is the core of the response to the frequent fetching registry request, which will be analyzed later when the client fetching the registry
- The renewal threshold of scheduled scheduling tasks is updated
numberOfRenewsPerMinThreshold
This value, the number of renewals per minute, will be analyzed when renewals are analyzed - Initialize the
RemoteRegionRegistry
, the speculation is related to the deployment of Eureka in multiple regions
- We start the counter we created earlier:
public void initialize(a) {
logger.info("Initializing ...");
// Start eureka
peerEurekaNodes.start();
try {
// The registry is initialized
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("Initialized");
}
Copy the code
PeerEurekaNodes start method:
public void start(a) {
// A thread pool for a single thread
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
returnthread; }});try {
// Update PeerEurekaNode according to the cluster address. PeerEurekaNode contains clients that schedule other registries
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run(a) {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e); }}};PeerEurekaNode periodically collects information about new clusters. If eureka-Server becomes unavailable, it can be offline or a new Eureka-server is online, which can be detected in time
taskExecutor.scheduleWithFixedDelay(peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl()); }}Copy the code
PeerAwareInstanceRegistryImpl init method:
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
// Start the counter
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
// Initialize the response cache. Eureka server constructs a multi-level cache to respond to client requests to fetch the registry
initializedResponseCache();
/ / scheduling tasks regularly updated contract threshold, is mainly the update numberOfRenewsPerMinThreshold value, namely the contract number of times per minute
scheduleRenewalThresholdUpdateTask();
// Initialize RemoteRegionRegistry
initRemoteRegionRegistry();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e); }}Copy the code
The Eureka Server is initialized
Here are the last steps:
- First call
registry.syncUp()
Synchronize local instances of EurekaClient to the registry. In cluster mode, Eureka Server is also a client and therefore obtains registries from other registries and synchronizes them to the registry of the current server. By default, it retries five times, 30 seconds apart. In single-machine mode, the number of retries should be set to 0. - And then call
registry.openForTraffic
Do some final initialization:- Updated the hourly renewal threshold
- Setting instance State
- Example Start the counter that counts the latest one-minute renewal count
- Delete offline instances. By default, scheduled tasks are scheduled every 60 seconds
- The final step is to register some of eureka’s own monitoring statistics
SyncUp method:
public int syncUp(a) {
// Copy entire entry from neighboring DS node
int count = 0;
// Number of registry synchronization retries. The default value is 5
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
// Synchronization retry time, 30 seconds by default
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
// Register an instance
register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; }}catch (Throwable t) {
logger.error("During DS init copy", t); }}}}return count;
}
Copy the code
OpenForTraffic method:
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Expected number of client renewals per minute
this.expectedNumberOfClientsSendingRenews = count;
// Update the per-minute renewal threshold
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
// Set the instance state to started
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
super.postInit();
}
///////////////////////////////////////
protected void postInit(a) {
// Start the counter that counts the number of recent one-minute renewals
renewsLastMin.start();
if(evictionTaskRef.get() ! =null) {
evictionTaskRef.get().cancel();
}
// Delete tasks periodically
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
Copy the code
Eureka Server startup flowchart
The following figure shows the eureka Server startup initialization process.
The Eureka Client starts initialization
We look at the ExampleEurekaClient class under the Eureka-Examples module, which in its main method simulates starting an initialization as a Eureka client and sending a request to the registry.
The initialization of Eureka Server actually includes the initialization of the client. It can be seen that the initialization of the client mainly includes the following things:
- read
eureka-client.properties
Config file to create EurekaInstanceConfig - Create instance information InstanceInfo based on InstanceConfig
- Create application Instance manager ApplicationInfoManager based on InstanceConfig and InstanceInfo
- read
eureka-client.properties
Configuration file, createEurekaClientConfig
- Create EurekaClient (DiscoveryClient) based on the application instance Manager and clientConfig. The initialization process is the same as that of DiscoveryClient in the Eureka Server initialization process
public static void main(String[] args) {
ExampleEurekaClient sampleClient = new ExampleEurekaClient();
// Create an application instance manager based on instance configuration and instance information
ApplicationInfoManager applicationInfoManager = initializeApplicationInfoManager(new MyDataCenterInstanceConfig());
Create EurekaClient (DiscoveryClient) based on application instance manager and client configuration
EurekaClient client = initializeEurekaClient(applicationInfoManager, new DefaultEurekaClientConfig());
// use the client
sampleClient.sendRequestToServiceUsingEureka(client);
// shutdown the client
eurekaClient.shutdown();
}
Copy the code