sequence

This article focuses on tunny’s workerWrapper

workerWrapper

type workerWrapper struct { worker Worker interruptChan chan struct{} // reqChan is NOT owned by this type, it is used to send requests for work. reqChan chan<- workRequest // closeChan can be closed in order to cleanly shutdown  this worker. closeChan chan struct{} // closedChan is closed by the run() goroutine when it exits. closedChan chan struct{} } func newWorkerWrapper( reqChan chan<- workRequest, worker Worker, ) *workerWrapper { w := workerWrapper{ worker: worker, interruptChan: make(chan struct{}), reqChan: reqChan, closeChan: make(chan struct{}), closedChan: make(chan struct{}), } go w.run() return &w }Copy the code

WorkerWrapper wraps the worker and defines interruptChan, reqChan, closeChan, and closedChan properties

interrupt

func (w *workerWrapper) interrupt() {
	close(w.interruptChan)
	w.worker.Interrupt()
}
Copy the code

The interrupt method closes w.i terruptchan and executes w.ware.interrupt ()

run

func (w *workerWrapper) run() {
	jobChan, retChan := make(chan interface{}), make(chan interface{})
	defer func() {
		w.worker.Terminate()
		close(retChan)
		close(w.closedChan)
	}()

	for {
		// NOTE: Blocking here will prevent the worker from closing down.
		w.worker.BlockUntilReady()
		select {
		case w.reqChan <- workRequest{
			jobChan:       jobChan,
			retChan:       retChan,
			interruptFunc: w.interrupt,
		}:
			select {
			case payload := <-jobChan:
				result := w.worker.Process(payload)
				select {
				case retChan <- result:
				case <-w.interruptChan:
					w.interruptChan = make(chan struct{})
				}
			case _, _ = <-w.interruptChan:
				w.interruptChan = make(chan struct{})
			}
		case <-w.closeChan:
			return
		}
	}
}
Copy the code

Run creates jobChan and retChan, and the for loop executes select to read the reqChan payload. The payload is read, processed, and written to the retChan

stop

func (w *workerWrapper) stop() {
	close(w.closeChan)
}
Copy the code

The stop method closes w. closechan

join

func (w *workerWrapper) join() {
	<-w.closedChan
}
Copy the code

The join method waits for W. closedchan

summary

Tunny’s workerWrapper wraps the worker, defines interruptChan, reqChan, closeChan, and closedChan properties, and provides methods for interrupt, run, stop, and Join.

doc

  • tunny