Current business status:

1. The existing business cluster uses Jeager for data collection and adopts AllInOne strategy. In this mode, the server components of Jeager are all in a mirror, which is not suitable for the generation environment or the introduction of other components.

2. Although the data storage is changed to ES under the existing AllInOne mode, and the ES cluster that collects logs is reused, too many useless links occupy too much ES storage space, which degrades the PERFORMANCE of ES search.

Project Architecture Chart

To add link sampling and introduce other components, I first set up a robust Jeager cluster.

Official Cluster Architecture Diagram:

The actual project adopts the architecture diagram:

Reuse the original Ingester component, unified management of back-end DB.

The cluster structures,

1. Jeager cluster construction

In k8s jeager cluster setup, the way I use is jeager operator, specific can refer to www.jaegertracing.io/docs/1.21/o…

By default, only the Collector, ingester, and storage services will be deployed using the Streaming Strategy. We also need to collect logs from other services in our cluster. Add agent to default configuration:

apiVersion: jaegertracing.io/v1
kind: Jaeger
metadata:
  name: simple-streaming
  namespace: logging
spec:
  strategy: streaming
  collector:
    options:
      kafka: # <1>
        producer:
          topic: flink-span-dev
          brokers: host:9092
  ingester:
    options:
      kafka: # <1>
        consumer:
          topic: test-tracer
          brokers: host:9092
      ingester:
        deadlockInterval: 0 # <2>
  storage:
    type: elasticsearch
    options:
      es:
        server-urls: http://host:9200
  agent:
    strategy: DaemonSet
    options:
      log-level: debug 
Copy the code

Once the boot is complete, you can see the Service and pod inside k8S

As the Agent is provided with DaemonSet, it needs to be called after adding a Service to provide other NS services

apiVersion: v1
kind: Service
metadata:
  labels:
    app: jaeger
  name: jaeger-agent
  namespace: logging
spec:
  clusterIP: None
  ports:
  - name: agent-zipkin-thrift
    port: 5775
    protocol: UDP
    targetPort: 5775
  - name: agent-compact
    port: 6831
    protocol: UDP
    targetPort: 6831
  - name: agent-binary
    port: 6832
    protocol: UDP
    targetPort: 6832
  selector:
    app: jaeger
    app.kubernetes.io/component: agent
    app.kubernetes.io/instance: simple-streaming
    app.kubernetes.io/managed-by: jaeger-operator
    app.kubernetes.io/name: simple-streaming-agent
    app.kubernetes.io/part-of: jaeger
  sessionAffinity: None
  type: ClusterIP
Copy the code

So far, we have set up a complete Jeager cluster and added Kafka between Collector and Ingester.

2. Flink cluster

Flink application resources, can use the company’s existing flink cluster in the dev environment, I am using native k8s builds the flink session cluster test, cluster building official documents: ci.apache.org/projects/fl…

Use Flink for consumption:

(Note that the version of Flink referenced by the Flink consumer must be the same as that of the Flink cluster.)

Because jaeger’s collector sends protobuf data to Kafka in protobuf format, fllink needs to deserialize protobuf data to Java Span format using github.com/jaegertraci…

1. Kafka Consumer configuration

kafkaModelSpanConsumer = new FlinkKafkaConsumer011<>(topics, new ProtoUnMarshalerToModelSpan(), kafkaConsumerConfig()); public class ProtoUnMarshalerToModelSpan extends AbstractDeserializationSchema<Model.Span> { private static final long serialVersionUID = 7952979116702613945L; @Override public Model.Span deserialize(byte[] message) throws IOException { Model.Span protoSpan = Model.Span.parseFrom(message); return protoSpan; }}Copy the code

2. Use the Session Time window and use Process Time to collect links. When the sessionTime exceeds the preset time, we can assume that the whole link has been processed and the window enters the processing method. You can determine whether an abnormal link is an abnormal link based on the configured exception policy configuration (the configuration based on Broadcast State dynamic update is improved later). The abnormal link is sent to the downstream, and the normal link is sampled according to the configured coefficient. Finally, the classified data above will be sent to Sink (Kafka Producer). In sink, we need to convert the SPAN object back to protobuf format data and serialize it with the generated Jeager Java Span.

3. Kafka Producer configuration

public class SpanSerialization implements SerializationSchema<Model.Span> {

@Override
    public byte[] serialize(Model.Span element) {
return element.toByteArray();
    }
}
Copy the code

At this point the data is written back to Kafka and the ingester primary key of the Jeager cluster is consumed and written to es.

Flink cluster performance submitted: