“This is the 17th day of my participation in the Gwen Challenge in November. Check out the details: The Last Gwen Challenge in 2021.”


preface

GoReplay abstracts two concepts of data flow, namely, using input and output to represent the source and destination of data, collectively called plugin. Middleware between input and output modules is used to realize the expansion mechanism.

Output_http. go: mainly HTTP output plug-in, implement HTTP protocol, implement IO.Writer interface, finally according to the configuration registered in the Plugin.

Parameters that

-output-http value Forwards incoming requests to a forward HTTP address.# Redirect all incoming requests to staging.com address Gor --input-raw :80 --output-http http://staging.com - output-http-elasticSearch string // Sends the request and response status to ElasticSearch Send request and response stats to ElasticSearch: gor --input-raw :8080 --output-http staging.com --output-http-elasticsearch'es_host:api_port/index_name'-output-http-queue-len int // Number of requests that can be queuedfor output, ifAll workers are busy. Default = 1000 (default 1000) -output-http-redirects int Default Value: Enable how often redirects should be followed. -output-http-response-buffer value // Maximum size of received response (buffer) HTTP response buffer size, all data after this size will be discarded. -output-http-skip-verify Don-output-http-stats // Outputs the status of the output queue every 5 seconds Report HTTP output queue stats to console every N milliseconds. See output-http-stats-ms -output-http-stats-ms int Report http output queue stats to console every N milliseconds. default: 5000 (default 5000) -output-http-timeout duration // Specifies the HTTP request/response timeout duration. Specify HTTP request/response timeout. By default 5s. Example: --output-http-timeout 30s (default 5s) -output-http-track-response If turned on, HTTP output responses will be set to all outputs like stdout, file and etc. -output-http-worker-timeout duration Duration to rollback idle workers. (default 2s) -output-http-workers Int //gor defaults to a dynamic number of extension workers, Gor uses dynamic worker scaling. Enter a number to set a maximum number of workers. Default = 0 = unlimited. -output-http-workers-min int Gor uses dynamic worker scaling. Enter a number to set a minimum number of workers. default = 1.Copy the code

By default, Gor creates a dynamic work pool: it starts at 10 and creates more HTTP output coroutines when the HTTP output queue length is greater than 10. The number of coroutines created (N) is equal to the queue length check for that working time and is found to be greater than 10. The queue length is checked each time a message is written to the HTTP output queue. No more coroutines are created until the request to produce N coroutines is satisfied. If the dynamic coroutine pool cannot process a message at that time, it will sleep for 100 milliseconds. If the dynamic working coroutine cannot process a message for 2 seconds, it dies. A fixed number of coroutines can be specified using the — output-HTTP-workers =20 option.

HTTP output number of jobs

NewHTTPOutput Default:

// NewHTTPOutput constructor for HTTPOutput
// Initialize workers
func NewHTTPOutput(address string, config *HTTPOutputConfig) PluginReadWriter {
	o := new(HTTPOutput)
	var err error
	config.url, err = url.Parse(address)
	iferr ! =nil {
		log.Fatal(fmt.Sprintf("[OUTPUT-HTTP] parse HTTP output URL error[%q]", err))
	}
	if config.url.Scheme == "" {
		config.url.Scheme = "http"
	}
	config.rawURL = config.url.String()
	if config.Timeout < time.Millisecond*100 {
		config.Timeout = time.Second
	}
	if config.BufferSize <= 0 {
		config.BufferSize = 100 * 1024 // 100kb
	}
	if config.WorkersMin <= 0 {
		config.WorkersMin = 1
	}
	if config.WorkersMin > 1000 {
		config.WorkersMin = 1000
	}
	if config.WorkersMax <= 0 {
		config.WorkersMax = math.MaxInt32 // idealy so large
	}
	if config.WorkersMax < config.WorkersMin {
		config.WorkersMax = config.WorkersMin
	}
	if config.QueueLen <= 0 {
		config.QueueLen = 1000
	}
	if config.RedirectLimit < 0 {
		config.RedirectLimit = 0
	}
	if config.WorkerTimeout <= 0 {
		config.WorkerTimeout = time.Second * 2
	}
	o.config = config
	o.stop = make(chan bool)
	// Whether to collect statistics and what is the interval between statistics output
	if o.config.Stats {
		o.queueStats = NewGorStat("output_http", o.config.StatsMs)
	}

	o.queue = make(chan *Message, o.config.QueueLen)
	if o.config.TrackResponses {
		o.responses = make(chan *response, o.config.QueueLen)
	}
	// it should not be buffered to avoid races
	o.stopWorker = make(chan struct{})

	ifo.config.ElasticSearch ! ="" {
		o.elasticSearch = new(ESPlugin)
		o.elasticSearch.Init(o.config.ElasticSearch)
	}
	o.client = NewHTTPClient(o.config)
	o.activeWorkers += int32(o.config.WorkersMin)
	for i := 0; i < o.config.WorkersMin; i++ {
		go o.startWorker()
	}
	go o.workerMaster()
	return o
}
Copy the code

Start httpClient after configuration:

o.client = NewHTTPClient(o.config)
	o.activeWorkers += int32(o.config.WorkersMin)
	for i := 0; i < o.config.WorkersMin; i++ {
		go o.startWorker()
	}
Copy the code

Start multiple send coroutines:

func (o *HTTPOutput) startWorker(a) {
	for {
		select {
		case <-o.stopWorker:
			return
		case msg := <-o.queue:
			o.sendRequest(o.client, msg)
		}
	}
}
Copy the code

Execute send:

func (o *HTTPOutput) sendRequest(client *HTTPClient, msg *Message) {
	if! isRequestPayload(msg.Meta) {return
	}

	uuid := payloadID(msg.Meta)
	start := time.Now()
	resp, err := client.Send(msg.Data)
	stop := time.Now()

	iferr ! =nil {
		Debug(1, fmt.Sprintf("[HTTP-OUTPUT] error when sending: %q", err))
		return
	}
	if resp == nil {
		return
	}

	if o.config.TrackResponses {
		o.responses <- &response{resp, uuid, start.UnixNano(), stop.UnixNano() - start.UnixNano()}
	}

	ifo.elasticSearch ! =nil {
		o.elasticSearch.ResponseAnalyze(msg.Data, resp, start, stop)
	}
}
Copy the code

Send details, various configuration effective point:

// Send sends an http request using client create by NewHTTPClient
func (c *HTTPClient) Send(data []byte) ([]byte, error) {
	var req *http.Request
	var resp *http.Response
	var err error

	req, err = http.ReadRequest(bufio.NewReader(bytes.NewReader(data)))
	iferr ! =nil {
		return nil, err
	}
	// we don't send CONNECT or OPTIONS request
	if req.Method == http.MethodConnect {
		return nil.nil
	}

	if! c.config.OriginalHost { req.Host = c.config.url.Host }// fix #862
	if c.config.url.Path == "" && c.config.url.RawQuery == "" {
		req.URL.Scheme = c.config.url.Scheme
		req.URL.Host = c.config.url.Host
	} else {
		req.URL = c.config.url
	}

	// force connection to not be closed, which can affect the global client
	req.Close = false
	// it's an error if this is not equal to empty string
	req.RequestURI = ""

	resp, err = c.Client.Do(req)
	iferr ! =nil {
		return nil, err
	}
	if c.config.TrackResponses {
		return httputil.DumpResponse(resp, true)
	}
	_ = resp.Body.Close()
	return nil.nil
Copy the code

HTTP output queue

What’s a queue for?

Code logic call diagram