preface
Today we introduce another component of the Go-Zero ecology called Go-Stash. This is a Go version of the Logstash language. We use Go-Stash to save 2/3 of the server resources compared to the original logstash. Give it a try if you’re using Logstash, and see how easy it is to implement such a tool based on Go-Zero, which the authors did in just two days.
The overall architecture
Starting with its configuration, let’s look at the design architecture.
Clusters:
- Input:
Kafka:
# Kafka configuration --> linkage go-queue
Filters:
# filter action
- Action: drop
- Action: remove_field
- Action: transfer
Output:
ElasticSearch:
# es configuration {host, index}
Copy the code
Kafka is the data output, es is the data input, filter abstracts the data processing.
Yes, the entire Go-stash is what you see is what you get as shown in the Config configuration.
Start the
The startup process from Stash. Go is roughly divided into several parts. Since multiple clusters can be configured, analyzing from a cluster:
- The establishment and
es
The connection of thees
Configuration 】 - build
filter processors
【es
Preprocessor, do data filtering and processing, you can set up a number of 】 - Perfect for
es
Index configuration, starthandle
And at the same time willfilter
Add handle - Connected downstream
kafka
, will be created abovehandle
Pass in, donekafka
和es
Between data consumption and data writing
MessageHandler
In the architecture diagram above, the filter in the middle is only seen from config. In fact, it is a part of MessageHandler in more detail, which does data filtering and conversion.
The following code: github.com/tal-tech/go…
type MessageHandler struct {
writer *es.Writer
indexer *es.Index
filters []filter.FilterFunc
}
Copy the code
MessageHandler is connected to the downstream ES, but there is no operation on Kafka.
The interface design of MessageHandler implements the ConsumeHandler interface in go-queue.
Here, the upstream and downstream are connected:
MessageHandler
Took over thees
The operation is responsible for data processing to data writing- That’s true
kafka
的Consume
Operation. This is done during the consumption processhandler
To writees
In fact, Consume() does the same:
func (mh *MessageHandler) Consume(_, val string) error {
var m map[string]interface{}
// Deserialize messages from Kafka
if err := jsoniter.Unmarshal([]byte(val), &m); err ! =nil {
return err
}
// es writes the index configuration
index := mh.indexer.GetIndex(m)
// Filter chain-processing [map in, map out]
for _, proc := range mh.filters {
if m = proc(m); m == nil {
return nil
}
}
bs, err := jsoniter.Marshal(m)
iferr ! =nil {
return err
}
/ / es to write
return mh.writer.Write(index, string(bs))
}
Copy the code
The data flow
So, data processing, upstream and downstream connections. Data from Kafka -> es is pulled from Kafka by the developer.
So how does the data flow move? Let’s go back to the main program github.com/tal-tech/go…
In fact, starting the whole process is actually a combination of modes:
func main(a) {
// Parse command line arguments to start graceful exit.// Service combination mode
group := service.NewServiceGroup()
defer group.Stop()
for _, processor := range c.Clusters {
Es / / connection.// filter processors build.// Prepare es write {write index, writer}
handle := handler.NewHandler(writer, indexer)
handle.AddFilters(filters...)
handle.AddFilters(filter.AddUriFieldFilter("url"."uri"))
// Start kafka as configured, passing in the consumption operation, and add to the combinator
for _, k := range toKqConf(processor.Input.Kafka) {
group.Add(kq.MustNewQueue(k, handle))
}
}
// Start the combinator
group.Start()
}
Copy the code
The whole data flow is related to the group combinator.
group.Start()
|- group.doStart()
|- [service.Start() for service in group.services]
Copy the code
All services added to a group implement Start(). Kafka starts at Start() :
func (q *kafkaQueue) Start(a) {
q.startConsumers()
q.startProducers()
q.producerRoutines.Wait()
close(q.channel)
q.consumerRoutines.Wait()
}
Copy the code
- Start the
kafka
Consumer applications - Start the
kafka
Consumption pull end [may be confused by the name, but is actually fromkafka
Pull the message toq.channel
】 - Consumptive process terminates, finishing work
The handler we pass to Kafka is actually Consume, and this method is executed in q.tartconsumers () :
q.startConsumers()
|- [q.consumeOne(key, value) for msg in q.channel]
|- q.handler.Consume(key, value)
Copy the code
So the whole data stream is completely strung together:
conclusion
As the first go-Stash article, this is an overview of the Architecture and design of Go-Stash. The next article will reveal more about the performance and why we want to develop such a component.
Github.com/tal-tech/go…
Stay tuned for more design and implementation articles on Go-Zero.
Github.com/tal-tech/go…
Welcome to Go-Zero and star support us!
Wechat communication group
Follow the “micro service practice” public account and reply to the group to obtain the community qr code.