sequence
This article focuses on tunny’s workerWrapper
workerWrapper
type workerWrapper struct { worker Worker interruptChan chan struct{} // reqChan is NOT owned by this type, it is used to send requests for work. reqChan chan<- workRequest // closeChan can be closed in order to cleanly shutdown this worker. closeChan chan struct{} // closedChan is closed by the run() goroutine when it exits. closedChan chan struct{} } func newWorkerWrapper( reqChan chan<- workRequest, worker Worker, ) *workerWrapper { w := workerWrapper{ worker: worker, interruptChan: make(chan struct{}), reqChan: reqChan, closeChan: make(chan struct{}), closedChan: make(chan struct{}), } go w.run() return &w }Copy the code
WorkerWrapper wraps the worker and defines interruptChan, reqChan, closeChan, and closedChan properties
interrupt
func (w *workerWrapper) interrupt() {
close(w.interruptChan)
w.worker.Interrupt()
}
Copy the code
The interrupt method closes w.i terruptchan and executes w.ware.interrupt ()
run
func (w *workerWrapper) run() {
jobChan, retChan := make(chan interface{}), make(chan interface{})
defer func() {
w.worker.Terminate()
close(retChan)
close(w.closedChan)
}()
for {
// NOTE: Blocking here will prevent the worker from closing down.
w.worker.BlockUntilReady()
select {
case w.reqChan <- workRequest{
jobChan: jobChan,
retChan: retChan,
interruptFunc: w.interrupt,
}:
select {
case payload := <-jobChan:
result := w.worker.Process(payload)
select {
case retChan <- result:
case <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case _, _ = <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case <-w.closeChan:
return
}
}
}
Copy the code
Run creates jobChan and retChan, and the for loop executes select to read the reqChan payload. The payload is read, processed, and written to the retChan
stop
func (w *workerWrapper) stop() {
close(w.closeChan)
}
Copy the code
The stop method closes w. closechan
join
func (w *workerWrapper) join() {
<-w.closedChan
}
Copy the code
The join method waits for W. closedchan
summary
Tunny’s workerWrapper wraps the worker, defines interruptChan, reqChan, closeChan, and closedChan properties, and provides methods for interrupt, run, stop, and Join.
doc
- tunny