sequence
This article focuses on skywalking’s DatabaseSlowStatement
DatabaseSlowStatement
Skywalking – 6.6.0 / oap – server/server – the core/SRC/main/Java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement .java
@ScopeDeclaration(id = DATABASE_SLOW_STATEMENT, name = "DatabaseSlowStatement")
public class DatabaseSlowStatement extends Source {
@Getter @Setter private String id;
@Getter @Setter private int databaseServiceId;
@Getter @Setter private String statement;
@Getter @Setter private long latency;
@Getter @Setter private String traceId;
@Override public int scope() {
return DefaultScopeDefine.DATABASE_SLOW_STATEMENT;
}
@Override public String getEntityId() {
returnConst.EMPTY_STRING; }}Copy the code
- DatabaseSlowStatement extends from Source, which defines the ID, databaseServiceId, Statement, latency, and traceId attributes. Its scope method returns DefaultScopeDefine DATABASE_SLOW_STATEMENT
MultiScopesSpanListener
Skywalking 6.6.0 / oap – server/server – receiver – the plugin/skywalking – trace – receiver – the plugin/SRC/main/Java/org/apache/skywalking / oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java
public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListener, GlobalTraceIdsListener {
private static final Logger logger = LoggerFactory.getLogger(MultiScopesSpanListener.class);
private final SourceReceiver sourceReceiver;
private final ServiceInstanceInventoryCache instanceInventoryCache;
private final ServiceInventoryCache serviceInventoryCache;
private final EndpointInventoryCache endpointInventoryCache;
private final List<SourceBuilder> entrySourceBuilders;
private final List<SourceBuilder> exitSourceBuilders;
private final List<DatabaseSlowStatement> slowDatabaseAccesses;
private final TraceServiceModuleConfig config;
private final NetworkAddressInventoryCache networkAddressInventoryCache;
private SpanDecorator entrySpanDecorator;
private long minuteTimeBucket;
private String traceId;
private MultiScopesSpanListener(ModuleManager moduleManager, TraceServiceModuleConfig config) {
this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
this.entrySourceBuilders = new LinkedList<>();
this.exitSourceBuilders = new LinkedList<>();
this.slowDatabaseAccesses = new ArrayList<>(10);
this.instanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
this.endpointInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
this.networkAddressInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(NetworkAddressInventoryCache.class);
this.config = config;
this.traceId = null;
}
@Override public boolean containsPoint(Point point) {
return Point.Entry.equals(point) || Point.Exit.equals(point) || Point.TraceIds.equals(point);
}
@Override
public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
this.minuteTimeBucket = segmentCoreInfo.getMinuteTimeBucket();
if (spanDecorator.getRefsCount() > 0) {
for (int i = 0; i < spanDecorator.getRefsCount(); i++) {
ReferenceDecorator reference = spanDecorator.getRefs(i);
SourceBuilder sourceBuilder = new SourceBuilder();
if (reference.getParentEndpointId() == Const.INEXISTENCE_ENDPOINT_ID) {
sourceBuilder.setSourceEndpointId(Const.USER_ENDPOINT_ID);
} else {
sourceBuilder.setSourceEndpointId(reference.getParentEndpointId());
}
final int networkAddressId = reference.getNetworkAddressId();
final int serviceIdByPeerId = serviceInventoryCache.getServiceId(networkAddressId);
final String address = networkAddressInventoryCache.get(networkAddressId).getName();
if (spanDecorator.getSpanLayer().equals(SpanLayer.MQ) || config.getUninstrumentedGatewaysConfig().isAddressConfiguredAsGateway(address)) {
int instanceIdByPeerId = instanceInventoryCache.getServiceInstanceId(serviceIdByPeerId, networkAddressId);
sourceBuilder.setSourceServiceInstanceId(instanceIdByPeerId);
sourceBuilder.setSourceServiceId(serviceIdByPeerId);
} else {
sourceBuilder.setSourceServiceInstanceId(reference.getParentServiceInstanceId());
sourceBuilder.setSourceServiceId(instanceInventoryCache.get(reference.getParentServiceInstanceId()).getServiceId());
}
sourceBuilder.setDestEndpointId(spanDecorator.getOperationNameId());
sourceBuilder.setDestServiceInstanceId(segmentCoreInfo.getServiceInstanceId());
sourceBuilder.setDestServiceId(segmentCoreInfo.getServiceId());
sourceBuilder.setDetectPoint(DetectPoint.SERVER);
sourceBuilder.setComponentId(spanDecorator.getComponentId());
setPublicAttrs(sourceBuilder, spanDecorator);
entrySourceBuilders.add(sourceBuilder); }}else {
SourceBuilder sourceBuilder = new SourceBuilder();
sourceBuilder.setSourceEndpointId(Const.USER_ENDPOINT_ID);
sourceBuilder.setSourceServiceInstanceId(Const.USER_INSTANCE_ID);
sourceBuilder.setSourceServiceId(Const.USER_SERVICE_ID);
sourceBuilder.setDestEndpointId(spanDecorator.getOperationNameId());
sourceBuilder.setDestServiceInstanceId(segmentCoreInfo.getServiceInstanceId());
sourceBuilder.setDestServiceId(segmentCoreInfo.getServiceId());
sourceBuilder.setDetectPoint(DetectPoint.SERVER);
sourceBuilder.setComponentId(spanDecorator.getComponentId());
setPublicAttrs(sourceBuilder, spanDecorator);
entrySourceBuilders.add(sourceBuilder);
}
this.entrySpanDecorator = spanDecorator;
}
@Override public void parseExit(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
if (this.minuteTimeBucket == 0) {
this.minuteTimeBucket = segmentCoreInfo.getMinuteTimeBucket();
}
SourceBuilder sourceBuilder = new SourceBuilder();
int peerId = spanDecorator.getPeerId();
if (peerId == 0) {
return;
}
int destServiceId = serviceInventoryCache.getServiceId(peerId);
int mappingServiceId = serviceInventoryCache.get(destServiceId).getMappingServiceId();
int destInstanceId = instanceInventoryCache.getServiceInstanceId(destServiceId, peerId);
int mappingServiceInstanceId = instanceInventoryCache.get(destInstanceId).getMappingServiceInstanceId();
sourceBuilder.setSourceServiceInstanceId(segmentCoreInfo.getServiceInstanceId());
sourceBuilder.setSourceServiceId(segmentCoreInfo.getServiceId());
if (Const.NONE == mappingServiceId) {
sourceBuilder.setDestServiceId(destServiceId);
} else {
sourceBuilder.setDestServiceId(mappingServiceId);
}
if (Const.NONE == mappingServiceInstanceId) {
sourceBuilder.setDestServiceInstanceId(destInstanceId);
} else {
sourceBuilder.setDestServiceInstanceId(mappingServiceInstanceId);
}
sourceBuilder.setDetectPoint(DetectPoint.CLIENT);
sourceBuilder.setComponentId(spanDecorator.getComponentId());
setPublicAttrs(sourceBuilder, spanDecorator);
exitSourceBuilders.add(sourceBuilder);
if (sourceBuilder.getType().equals(RequestType.DATABASE)) {
boolean isSlowDBAccess = false;
DatabaseSlowStatement statement = new DatabaseSlowStatement();
statement.setId(segmentCoreInfo.getSegmentId() + "-" + spanDecorator.getSpanId());
statement.setDatabaseServiceId(sourceBuilder.getDestServiceId());
statement.setLatency(sourceBuilder.getLatency());
statement.setTimeBucket(TimeBucket.getRecordTimeBucket(segmentCoreInfo.getStartTime()));
statement.setTraceId(traceId);
for (KeyStringValuePair tag : spanDecorator.getAllTags()) {
if (SpanTags.DB_STATEMENT.equals(tag.getKey())) {
String sqlStatement = tag.getValue();
if (StringUtil.isEmpty(sqlStatement)) {
statement.setStatement("[No statement]/" + sourceBuilder.getDestEndpointName());
} else if (sqlStatement.length() > config.getMaxSlowSQLLength()) {
statement.setStatement(sqlStatement.substring(0, config.getMaxSlowSQLLength()));
} else{ statement.setStatement(sqlStatement); }}else if (SpanTags.DB_TYPE.equals(tag.getKey())) {
String dbType = tag.getValue();
DBLatencyThresholdsAndWatcher thresholds = config.getDbLatencyThresholdsAndWatcher();
int threshold = thresholds.getThreshold(dbType);
if (sourceBuilder.getLatency() > threshold) {
isSlowDBAccess = true; }}}if(isSlowDBAccess) { slowDatabaseAccesses.add(statement); }}} / /... @Override public voidbuild() {
entrySourceBuilders.forEach(entrySourceBuilder -> {
entrySourceBuilder.setTimeBucket(minuteTimeBucket);
sourceReceiver.receive(entrySourceBuilder.toAll());
sourceReceiver.receive(entrySourceBuilder.toService());
sourceReceiver.receive(entrySourceBuilder.toServiceInstance());
sourceReceiver.receive(entrySourceBuilder.toEndpoint());
sourceReceiver.receive(entrySourceBuilder.toServiceRelation());
sourceReceiver.receive(entrySourceBuilder.toServiceInstanceRelation());
EndpointRelation endpointRelation = entrySourceBuilder.toEndpointRelation();
/**
* Parent endpoint could be none, because in SkyWalking Cross Process Propagation Headers Protocol v2,
* endpoint in ref could be empty, based on that, endpoint relation maybe can* So, I am making this source as optional. * * Also, since 6.6.0, source endpoint could be none, if this trace begins by an internal task(local span or exit span), such as Timer, * rather than, normally begin as an entry span, like a RPC server side. */ if (endpointRelation ! = null) { sourceReceiver.receive(endpointRelation); }}); exitSourceBuilders.forEach(exitSourceBuilder -> { if (nonNull(entrySpanDecorator)) { exitSourceBuilder.setSourceEndpointId(entrySpanDecorator.getOperationNameId()); } else { exitSourceBuilder.setSourceEndpointId(Const.USER_ENDPOINT_ID); } exitSourceBuilder.setSourceEndpointName(endpointInventoryCache.get(exitSourceBuilder.getSourceEndpointId()).getName()); exitSourceBuilder.setTimeBucket(minuteTimeBucket); sourceReceiver.receive(exitSourceBuilder.toServiceRelation()); sourceReceiver.receive(exitSourceBuilder.toServiceInstanceRelation()); if (RequestType.DATABASE.equals(exitSourceBuilder.getType())) { sourceReceiver.receive(exitSourceBuilder.toDatabaseAccess()); }}); slowDatabaseAccesses.forEach(sourceReceiver::receive); } / /... }Copy the code
- MultiScopesSpanListener implements the EntrySpanListener, ExitSpanListener, GlobalTraceIdsListener interface, Its parseExit method creates a DatabaseSlowStatement if sourceBuilder.getType() is requestType.database, and a DatabaseSlowStatement if tag.getKey() is spantags.db_type. Through the config. GetDbLatencyThresholdsAndWatcher get DBLatencyThresholdsAndWatcher (), and then update isSlowDBAccess when latency is greater than the threshold is true, Finally, add DatabaseSlowStatement to slowDatabaseAccesses; Its build method passes
slowDatabaseAccesses.forEach(sourceReceiver::receive)
Inform sourceReceiver
SourceReceiverImpl
Skywalking – 6.6.0 / oap – server/server – the core/SRC/main/Java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl ja va
public class SourceReceiverImpl implements SourceReceiver {
@Getter
private final DispatcherManager dispatcherManager;
public SourceReceiverImpl() {
this.dispatcherManager = new DispatcherManager();
}
@Override public void receive(Source source) {
dispatcherManager.forward(source); } public void scan() throws IOException, InstantiationException, IllegalAccessException { dispatcherManager.scan(); }}Copy the code
- SourceReceiverImpl SourceReceiver interface is achieved, the receive method performs dispatcherManager. Forward (source)
summary
DatabaseSlowStatement extends from Source, which defines the ID, databaseServiceId, Statement, latency, and traceId attributes. Its scope method returns DefaultScopeDefine. DATABASE_SLOW_STATEMENT; MultiScopesSpanListener implements the EntrySpanListener, ExitSpanListener, GlobalTraceIdsListener interface, Its parseExit method creates a DatabaseSlowStatement if sourceBuilder.getType() is requestType.database, and a DatabaseSlowStatement if tag.getKey() is spantags.db_type. Through the config. GetDbLatencyThresholdsAndWatcher get DBLatencyThresholdsAndWatcher (), and then update isSlowDBAccess when latency is greater than the threshold is true, Finally, add DatabaseSlowStatement to slowDatabaseAccesses; The build method through slowDatabaseAccesses. ForEach (sourceReceiver: : receive) notify sourceReceiver
doc
- DatabaseSlowStatement