sequence
This article focuses on the TopNDatabaseStatement of Skywalking
TopNDatabaseStatement
Skywalking – 6.6.0 / oap – server/server – the core/SRC/main/Java/org/apache/skywalking/oap/server/core/analysis/manual/database/Top NDatabaseStatement.java
@Stream(name = TopNDatabaseStatement.INDEX_NAME, scopeId = DefaultScopeDefine.DATABASE_SLOW_STATEMENT, builder = TopNDatabaseStatement.Builder.class, processor = TopNStreamProcessor.class)
public class TopNDatabaseStatement extends TopN {
public static final String INDEX_NAME = "top_n_database_statement";
@Setter private String id;
@Override public String id() {
return id;
}
@Override public boolean equals(Object o) {
if (this == o)
return true;
if(o == null || getClass() ! = o.getClass())return false;
TopNDatabaseStatement statement = (TopNDatabaseStatement)o;
return getServiceId() == statement.getServiceId();
}
@Override public int hashCode() {
return Objects.hash(getServiceId());
}
public static class Builder implements StorageBuilder<TopNDatabaseStatement> {
@Override public TopNDatabaseStatement map2Data(Map<String, Object> dbMap) {
TopNDatabaseStatement statement = new TopNDatabaseStatement();
statement.setStatement((String)dbMap.get(STATEMENT));
statement.setTraceId((String)dbMap.get(TRACE_ID));
statement.setLatency(((Number)dbMap.get(LATENCY)).longValue());
statement.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
statement.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
return statement;
}
@Override public Map<String, Object> data2Map(TopNDatabaseStatement storageData) {
Map<String, Object> map = new HashMap<>();
map.put(STATEMENT, storageData.getStatement());
map.put(TRACE_ID, storageData.getTraceId());
map.put(LATENCY, storageData.getLatency());
map.put(SERVICE_ID, storageData.getServiceId());
map.put(TIME_BUCKET, storageData.getTimeBucket());
returnmap; }}}Copy the code
- TopNDatabaseStatement inherits TopN and defines Builder
TopN
Skywalking – 6.6.0 / oap – server/server – the core/SRC/main/Java/org/apache/skywalking/oap/server/core/analysis/topn topn. Java
public abstract class TopN extends Record implements ComparableStorageData {
public static final String STATEMENT = "statement";
public static final String LATENCY = "latency";
public static final String TRACE_ID = "trace_id";
public static final String SERVICE_ID = "service_id";
@Getter @Setter @Column(columnName = STATEMENT, content = true) private String statement;
@Getter @Setter @Column(columnName = LATENCY) private long latency;
@Getter @Setter @Column(columnName = TRACE_ID) private String traceId;
@Getter @Setter @Column(columnName = SERVICE_ID) private int serviceId;
@Override public int compareTo(Object o) {
TopN target = (TopN)o;
return(int)(latency - target.latency); }}Copy the code
- TopN defines the statement, latency, trace_id, and service_id attributes
DatabaseStatementDispatcher
Skywalking – 6.6.0 / oap – server/server – the core/SRC/main/Java/org/apache/skywalking/oap/server/core/analysis/manual/database/Dat abaseStatementDispatcher.java
public class DatabaseStatementDispatcher implements SourceDispatcher<DatabaseSlowStatement> {
@Override public void dispatch(DatabaseSlowStatement source) { TopNDatabaseStatement statement = new TopNDatabaseStatement(); statement.setId(source.getId()); statement.setServiceId(source.getDatabaseServiceId()); statement.setLatency(source.getLatency()); statement.setStatement(source.getStatement()); statement.setTimeBucket(source.getTimeBucket()); statement.setTraceId(source.getTraceId()); TopNStreamProcessor.getInstance().in(statement); }}Copy the code
- DatabaseStatementDispatcher SourceDispatcher interface is achieved, the dispatch method converts DatabaseSlowStatement TopNDatabaseStatement, Then execute TopNStreamProcessor. GetInstance () in (statement)
TopNStreamProcessor
Skywalking – 6.6.0 / oap – server/server – the core/SRC/main/Java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamPr ocessor.java
public class TopNStreamProcessor implements StreamProcessor<TopN> { private static final TopNStreamProcessor PROCESSOR = new TopNStreamProcessor(); @Getter private List<TopNWorker> persistentWorkers = new ArrayList<>(); private Map<Class<? extends Record>, TopNWorker> workers = new HashMap<>(); @Setter @Getter private int topNWorkerReportCycle = 10; @Setter @Getter private int topSize = 50; public static TopNStreamProcessorgetInstance() {
return PROCESSOR;
}
@SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends TopN> topNClass) {
if (DisableRegister.INSTANCE.include(stream.name())) {
return;
}
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRecordDAO recordDAO;
try {
recordDAO = storageDAO.newRecordDao(stream.builder().newInstance());
} catch (InstantiationException | IllegalAccessException e) {
throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " top n record DAO failure.", e);
}
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true.true, Downsampling.Second), true);
TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model, topSize, topNWorkerReportCycle * 60 * 1000L, recordDAO);
persistentWorkers.add(persistentWorker);
workers.put(topNClass, persistentWorker);
}
public void in(TopN topN) {
TopNWorker worker = workers.get(topN.getClass());
if(worker ! = null) { worker.in(topN); }}}Copy the code
- TopNStreamProcessor implements the StreamProcessor interface, and its in method obtains TopNWorker from workers and executes worker.in(topN).
summary
TopNDatabaseStatement extends TopN and defines Builder. DatabaseStatementDispatcher SourceDispatcher interface is achieved, the dispatch method converts DatabaseSlowStatement TopNDatabaseStatement, Then execute TopNStreamProcessor. GetInstance () in (statement)
doc
- TopNDatabaseStatement