Original address: medium.com/smsjunk/han…
By Marcio Castilho
Having worked for over 15 years at various companies in the anti-spam, anti-virus, and anti-malware industries, I now know how complex these systems can end up being due to the sheer volume of data we deal with every day.
Currently, I am CEO of Smsjunk.com and chief Architect of KnowBe4 for companies active in the cybersecurity industry.
Interestingly, most of all the Web backend development I’ve been involved in as a software engineer over the past decade or so has been done in Ruby on Rails. Don’t get me wrong, I like Ruby on Rails, I believe this is a great framework, but after a while, you start thinking and design system in the form of Ruby, and you forget if you can take advantage of the multithreaded, parallelization, fast execution and less memory overhead, so software architecture would have been so efficient and simple. I’ve been a developer in C/C ++, Delphi, and C # for years, and I’m just beginning to realize how easy it can be to get things done with the right tools.
I don’t know much about the language and framework battles the Internet has been fighting over. I believe efficiency, productivity, and code maintainability depend primarily on how easy you can design your solution.
The problem
When using our anonymous telemetry and analysis system, our goal is to be able to handle large numbers of POST requests from millions of clients. The Web processor will receive a JSON document that may contain a collection of many payloads that need to be written to Amazon S3 so that our Map-Reduce system can manipulate this data later.
Traditionally, we have considered the following techniques to build the system architecture:
- Sidekiq
- Resque
- DelayedJob
- Elasticbeanstalk Worker Tier
- RabbitMQ
- And so on…
And set up two different clusters, one cluster for the Web front end and the other cluster for the Worker, so we can expand the background workload we can handle.
But from the beginning, our team knew we should do this in Go, as it became clear during the discussion phase that this could be a very large traffic system. I’ve been using Go for about 2 years, and we’ve developed a few systems here, but none of them can handle this much load.
Let’s start by creating some structure to define the Web request payload that will be received through a POST call and the method for uploading it to an S3 bucket.
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}
type Payload struct {
// [redacted]
}
func (p *Payload) UploadToS3(a) error {
// The storageFolder method ensures that no name collisions occur if the same timestamp is obtained in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
ifencodeErr ! =nil {
return encodeErr
}
// All content we publish to S3 buckets should be marked as 'private'
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
Copy the code
Naive go sample program
Initially, we used a very naive implementation of the POST handler that simply tried to parallelize job processing into a simple Goroutine:
func payloadHandler(w http.ResponseWriter, r *http.Request) {
ifr.Method ! ="POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for JSON decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
iferr ! =nil {
w.Header().Set("Content-Type"."application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Iterate over each payload and queue each queue separately for publication to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}
w.WriteHeader(http.StatusOK)
}
Copy the code
For medium loads, this might work for most people, but it soon proved ineffective on a large scale. We were expecting a lot of requests, but when we deployed the first release into production, the volume didn’t reach the order of magnitude we were starting to see. We totally underestimated the traffic.
The above approach is bad in several different ways. Can’t control the goroutines we produce. And because we were getting a million POST requests per minute, the code quickly crashed.
Try it again
We need to find a different approach. From the beginning, we discussed how to keep the request handler life cycle very short and generate the handler in the background. Of course, this is a must in the Ruby on Rails world, otherwise, regardless of whether you are using puma, Unicorn, passenger, you will block all available worker Web processors (please don’t get into the JRuby discussion). We will then need to leverage common solutions to do this, such as Resque, Sidekiq, SQS, and so on. I continue because there are many ways to do this.
As a result, the second iteration is to create a buffer channel, in which we can line up some homework and upload it to S3, and because we can control the queue the largest number, and we can have a large amount of RAM in memory queue assignments, so we think that will only work can be cached in channel the queue.
var Queue chan Payload
func init(a) {
Queue = make(chan Payload, MAX_QUEUE)
}
func payloadHandler(w http.ResponseWriter, r *http.Request){...// Iterate over each payload and queue each queue separately for publication to S3
for _, payload := range content.Payloads {
Queue <- payload
}
...
}
Copy the code
Then to actually queue up the job and process it, we use a similar approach:
func StartProcessor(a) {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- STILL NOT GOOD}}}Copy the code
Honestly, I don’t know what we were thinking. It must be a late night full of Red Bulls. This approach doesn’t do us any good, we’ve swapped the defective concurrency with the buffered queue, which just postpones the problem. Our synchronous processor only uploads one payload to S3 at a time, and because the rate of incoming requests is much faster than a single processor can upload to S3, our buffer channel quickly reaches its limit, preventing the request handler’s ability to queue more items.
We’re just avoiding problems and eventually counting down to the death of our system. After deploying this flawed version, our wait time kept growing by a fixed number of minutes.
Better solutions
We have decided to use a common pattern when using Go Channels to create a two-tier channel system, one for queuing jobs and one for controlling the number of workers on the JobQueue at the same time.
The idea is to parallelize files uploaded to S3 to a degree of sustainability that neither degrades your computer’s performance nor causes S3 to make connection errors. Therefore, we chose to create the Job/Worker mode. For those familiar with Java, C#, etc., think of it as implementing Worker thread-pool’s Golang method using channels.
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE"))// represents the job to run
type Job struct {
Payload Payload
}
// A buffer channel to send work requests.
var JobQueue chan Job
// Worker Job executed
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}}// The Start method starts the worker's run loop, listening for the exit channel in case we need to stop it
func (w Worker) Start(a) {
go func(a) {
for {
// Register the current worker to the worker application queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// The request was received
iferr := job.Payload.UploadToS3(); err ! =nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// we have received a signal to stop
return(1)}}}}// worker stops listening for work requests
func (w Worker) Stop(a) {
go func(a) {
w.quit <- true(1)}}Copy the code
We have modified the Web request handler to create an instance of the Job structure with the payload and send it to the JobQueue channel for workers to pick up.
func payloadHandler(w http.ResponseWriter, r *http.Request) {
ifr.Method ! ="POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
iferr ! =nil {
w.Header().Set("Content-Type"."application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
// let's create a job with the payload
work := Job{Payload: payload}
// Push the work onto the queue.
JobQueue <- work
}
w.WriteHeader(http.StatusOK)
}
Copy the code
During our Web server initialization, we create a Dispatcher and call Run() to create a work pool and start listening for jobs to appear in JobQueue.
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
Copy the code
Here is the code for our scheduler implementation:
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run(a) {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch(a) {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
Copy the code
Note that we provide the maximum number of workers to instantiate and add to our worker pool. Since we’ve been using Amazon Elasticbeanstalk in a project with a Docker-like Go environment, and we’ve always tried to follow a 12-factor approach to configuring systems in a production environment, we read these values from environment variables. This way, we can control how many workers and Job queues are the maximum size, so we can quickly adjust these values without redeploying the cluster.
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE"))Copy the code
Immediately after deployment, we noticed that all latency rates dropped to negligible levels and the capacity to handle requests increased dramatically.
A few minutes after the elastic load balancer was fully warmed up, we saw that our ElasticBeanstalk application was handling nearly a million requests per minute. Usually, we have peak traffic for a few hours in the morning, over a million per minute.
Once we deployed the new code, the number of servers dropped dramatically from 100 to about 20.
With the clustering and auto scaling Settings properly configured, we can even reduce it to just 4 EC2 C4. If the CPU exceeds 90% for 5 consecutive minutes, large instances and Elastic auto-scaling are set to generate new instances.
conclusion
In my book, simplicity always wins. We could have designed a complex system with many queues, backend workers, and complex deployments, but instead we decided to take advantage of Elasticbeanstalk’s auto-scaling capabilities and Golang’s simple approach to out-of-the-box efficiency and concurrency.
Not every day there are clusters of four machines, which may not be as powerful as my current MacBook Pro, processing 1 million POST requests written to Amazon S3 buckets per minute.
There is always the right tool for the job. Sometimes, when your Ruby on Rails system needs a very powerful Web handler, think outside the Ruby ecosystem for a simpler but more powerful alternative solution.