sequence
In this paper, we study the skywalking ServiceAndEndpointRegisterClient
ServiceAndEndpointRegisterClient
Skywalking – 6.6.0 / apm – sniffers/apm – agent – core/SRC/main/Java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpoint RegisterClient.java
@DefaultImplementor public class ServiceAndEndpointRegisterClient implements BootService, Runnable, GRPCChannelListener { private static final ILog logger = LogManager.getLogger(ServiceAndEndpointRegisterClient.class); private static String INSTANCE_UUID; private static List<KeyStringValuePair> SERVICE_INSTANCE_PROPERTIES; private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT; private volatile RegisterGrpc.RegisterBlockingStub registerBlockingStub; private volatile ServiceInstancePingGrpc.ServiceInstancePingBlockingStub serviceInstancePingStub; private volatile ScheduledFuture<? > applicationRegisterFuture; private volatile long coolDownStartTime = -1; @Override public void statusChanged(GRPCChannelStatus status) {if (GRPCChannelStatus.CONNECTED.equals(status)) {
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
registerBlockingStub = RegisterGrpc.newBlockingStub(channel);
serviceInstancePingStub = ServiceInstancePingGrpc.newBlockingStub(channel);
} else {
registerBlockingStub = null;
serviceInstancePingStub = null;
}
this.status = status;
}
@Override
public void prepare() throws Throwable {
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
INSTANCE_UUID = StringUtil.isEmpty(Config.Agent.INSTANCE_UUID) ? UUID.randomUUID().toString()
.replaceAll("-"."") : Config.Agent.INSTANCE_UUID;
SERVICE_INSTANCE_PROPERTIES = new ArrayList<KeyStringValuePair>();
for (String key : Config.Agent.INSTANCE_PROPERTIES.keySet()) {
SERVICE_INSTANCE_PROPERTIES.add(KeyStringValuePair.newBuilder()
.setKey(key).setValue(Config.Agent.INSTANCE_PROPERTIES.get(key)).build());
}
}
@Override
public void boot() throws Throwable {
applicationRegisterFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("ServiceAndEndpointRegisterClient"))
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override
public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
}
@Override
public void onComplete() throws Throwable {
}
@Override
public void shutdown() throws Throwable {
applicationRegisterFuture.cancel(true);
}
@Override
public void run() {
logger.debug("ServiceAndEndpointRegisterClient running, status:{}.", status);
if (coolDownStartTime > 0) {
final long coolDownDurationInMillis = TimeUnit.MINUTES.toMillis(Config.Agent.COOL_DOWN_THRESHOLD);
if (System.currentTimeMillis() - coolDownStartTime < coolDownDurationInMillis) {
logger.warn("The agent is cooling down, won't register itself");
return;
} else {
logger.warn("The agent is re-registering itself to backend");
}
}
coolDownStartTime = -1;
boolean shouldTry = true;
while (GRPCChannelStatus.CONNECTED.equals(status) && shouldTry) {
shouldTry = false;
try {
if (RemoteDownstreamConfig.Agent.SERVICE_ID == DictionaryUtil.nullValue()) {
if(registerBlockingStub ! = null) { ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).doServiceRegister( Services.newBuilder().addServices(Service.newBuilder().setServiceName(Config.Agent.SERVICE_NAME)).build());if(serviceRegisterMapping ! = null) {for (KeyIntValuePair registered : serviceRegisterMapping.getServicesList()) {
if (Config.Agent.SERVICE_NAME.equals(registered.getKey())) {
RemoteDownstreamConfig.Agent.SERVICE_ID = registered.getValue();
shouldTry = true;
}
}
}
}
} else {
if(registerBlockingStub ! = null) {if (RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID == DictionaryUtil.nullValue()) {
ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
.doServiceInstanceRegister(ServiceInstances.newBuilder()
.addInstances(
ServiceInstance.newBuilder()
.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID)
.setInstanceUUID(INSTANCE_UUID)
.setTime(System.currentTimeMillis())
.addAllProperties(OSUtil.buildOSInfo())
.addAllProperties(SERVICE_INSTANCE_PROPERTIES)
).build());
for (KeyIntValuePair serviceInstance : instanceMapping.getServiceInstancesList()) {
if (INSTANCE_UUID.equals(serviceInstance.getKey())) {
int serviceInstanceId = serviceInstance.getValue();
if(serviceInstanceId ! = DictionaryUtil.nullValue()) { RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID = serviceInstanceId; RemoteDownstreamConfig.Agent.INSTANCE_REGISTERED_TIME = System.currentTimeMillis(); }}}}else {
final Commands commands = serviceInstancePingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
.doPing(ServiceInstancePingPkg.newBuilder()
.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID)
.setTime(System.currentTimeMillis())
.setServiceInstanceUUID(INSTANCE_UUID)
.build());
NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS));
EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS));
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
}
}
} catch (Throwable t) {
logger.error(t, "ServiceAndEndpointRegisterClient execute fail.");
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
}
}
}
public void coolDown() { this.coolDownStartTime = System.currentTimeMillis(); }}Copy the code
- ServiceAndEndpointRegisterClient implements BootService Runnable, GRPCChannelListener interface; The prepare method sets INSTANCE_UUID and initializes SERVICE_INSTANCE_PROPERTIES. The boot method registers the run method with config.collector. APP_AND_SERVICE_REGISTER_CHECK_INTERVAL. The shutdown method cancels the scheduling. The statusChanged method updates registerBlockingStub, serviceInstancePingStub, and Status; Run method mainly perform doServiceRegister, doServiceInstanceRegister, doPing, NetworkAddressDictionary. INSTANCE. SyncRemoteDictionary, Endp ointNameDictionary.INSTANCE.syncRemoteDictionary
NetworkAddressDictionary
Skywalking – 6.6.0 / apm – sniffers/apm – agent – core/SRC/main/Java/org/apache/skywalking/apm/agent/core/dictionary/NetworkAddress Dictionary.java
public enum NetworkAddressDictionary {
INSTANCE;
private Map<String, Integer> serviceDictionary = new ConcurrentHashMap<String, Integer>();
private Set<String> unRegisterServices = new ConcurrentSet<String>();
public PossibleFound find(String networkAddress) {
Integer applicationId = serviceDictionary.get(networkAddress);
if(applicationId ! = null) {return new Found(applicationId);
} else {
if (serviceDictionary.size() + unRegisterServices.size() < SERVICE_CODE_BUFFER_SIZE) {
unRegisterServices.add(networkAddress);
}
return new NotFound();
}
}
public void syncRemoteDictionary(
RegisterGrpc.RegisterBlockingStub networkAddressRegisterServiceBlockingStub) {
if (unRegisterServices.size() > 0) {
NetAddressMapping networkAddressMappings = networkAddressRegisterServiceBlockingStub.doNetworkAddressRegister(
NetAddresses.newBuilder().addAllAddresses(unRegisterServices).build());
if (networkAddressMappings.getAddressIdsCount() > 0) {
for (KeyIntValuePair keyWithIntegerValue : networkAddressMappings.getAddressIdsList()) {
unRegisterServices.remove(keyWithIntegerValue.getKey());
serviceDictionary.put(keyWithIntegerValue.getKey(), keyWithIntegerValue.getValue());
}
}
}
}
public void clear() { this.serviceDictionary.clear(); }}Copy the code
- NetworkAddressDictionary syncRemoteDictionary method performs networkAddressRegisterServiceBlockingStub doNetworkAddressRegister
EndpointNameDictionary
Skywalking – 6.6.0 / apm – sniffers/apm – agent – core/SRC/main/Java/org/apache/skywalking/apm/agent/core/dictionary/EndpointNameDi ctionary.java
public enum EndpointNameDictionary {
INSTANCE;
private static final ILog logger = LogManager.getLogger(EndpointNameDictionary.class);
private Map<OperationNameKey, Integer> endpointDictionary = new ConcurrentHashMap<OperationNameKey, Integer>();
private Set<OperationNameKey> unRegisterEndpoints = new ConcurrentSet<OperationNameKey>();
public PossibleFound findOrPrepare4Register(int serviceId, String endpointName) {
return find0(serviceId, endpointName, true);
}
public PossibleFound findOnly(int serviceId, String endpointName) {
return find0(serviceId, endpointName, false);
}
private PossibleFound find0(int serviceId, String endpointName,
boolean registerWhenNotFound) {
if (endpointName == null || endpointName.length() == 0) {
return new NotFound();
}
OperationNameKey key = new OperationNameKey(serviceId, endpointName);
Integer operationId = endpointDictionary.get(key);
if(operationId ! = null) {return new Found(operationId);
} else {
if (registerWhenNotFound &&
endpointDictionary.size() + unRegisterEndpoints.size() < ENDPOINT_NAME_BUFFER_SIZE) {
unRegisterEndpoints.add(key);
}
return new NotFound();
}
}
public void syncRemoteDictionary(
RegisterGrpc.RegisterBlockingStub serviceNameDiscoveryServiceBlockingStub) {
if (unRegisterEndpoints.size() > 0) {
Endpoints.Builder builder = Endpoints.newBuilder();
for (OperationNameKey operationNameKey : unRegisterEndpoints) {
Endpoint endpoint = Endpoint.newBuilder()
.setServiceId(operationNameKey.getServiceId())
.setEndpointName(operationNameKey.getEndpointName())
.setFrom(DetectPoint.server)
.build();
builder.addEndpoints(endpoint);
}
EndpointMapping serviceNameMappingCollection = serviceNameDiscoveryServiceBlockingStub.doEndpointRegister(builder.build());
if (serviceNameMappingCollection.getElementsCount() > 0) {
for (EndpointMappingElement element : serviceNameMappingCollection.getElementsList()) {
OperationNameKey key = new OperationNameKey(
element.getServiceId(),
element.getEndpointName());
unRegisterEndpoints.remove(key);
endpointDictionary.put(key, element.getEndpointId());
}
}
}
}
public void clear() { endpointDictionary.clear(); } / /... }Copy the code
- EndpointNameDictionary syncRemoteDictionary will perform serviceNameDiscoveryServiceBlockingStub. DoEndpointRegister (builder. The build () )
summary
ServiceAndEndpointRegisterClient mainly perform doServiceRegister, doServiceInstanceRegister, doPing, NetworkAddressDictionary. INSTANC E.s yncRemoteDictionary, EndpointNameDictionary. INSTANCE. SyncRemoteDictionary
doc
- ServiceAndEndpointRegisterClient