- Defining structure
type RestChan struct { Status uint32 //true: closed Rw sync.RWMutex Ch chan *azmode.ResultMode Wg sync.WaitGroup }Copy the code
- Construct new method
func NewRestChan(len int16) *RestChan { var ch chan *azmode.ResultMode ch = make(chan *azmode.ResultMode, len) return &RestChan{Ch:ch} }Copy the code
- Structure method get
// Get data from chan func (r *RestChan)Get(a)(bool, *azmode.ResultMode) { var( ok bool rest *azmode.ResultMode ) select { case rest, ok = <- r.Ch: if! ok{ fmt.Println("***********RestChan is closed") return false.nil } case <-time.After(time.Second*1) :return false.nil //default: // // FMT.Println("*********** no data in RestChan ") // return false, nil } return true, rest }Copy the code
- The constructor method put
// Send data to chan func (r *RestChan)put(rest *azmode.ResultMode) bool { select { case r.Ch <- rest: return true default: / / FMT. Println (" * * * * * * * * * * * RestChan is full ") return false}}Copy the code
- Struct method close
/ / close chan func (r *RestChan)Close(a) { var( state uint32 ) r.Rw.Lock() state = atomic.LoadUint32(&r.Status) // Check whether chan is disabled ifstate! =1{ if atomic.CompareAndSwapUint32(&r.Status, state, 1){ fmt.Println("*********** start shutting down RestChan") close(r.Ch) } } defer r.Rw.Unlock() }Copy the code
- The practice data is sent to Chan
// Continue sending data to restChan until LT24SChan is closed func (r *RestChan)PushSRest(a) { var( bl bool rest *azmode.ResultMode ) for{ ifbl, rest = LT24SChan.Get(); ! bl{// Check whether chan is closed ifatomic.LoadUint32(<24SChan.Status) ! =1{ //fmt.Println("**************LT24SChan not available yet ") continue } fmt.Println("***************** closed lt24schan.ch") break }else { ifrest ! =nil{ // Send data to RestChan ifbl = RestCh.put(rest); ! bl{// Send asynchronously, synchronization will cause blocking here r.Wg.Add(1) go func(rest *azmode.ResultMode) { var bl bool for{ ifbl = RestCh.put(rest); ! bl{continue } break } defer r.Wg.Done() }(rest) } } // FMT.Println("key already exists ") continue}}}Copy the code
- Practice closing Chan
go func(a) { // Make sure that all data sent to chan is complete chanpip.RestCh.Wg.Wait() chanpip.RestCh.Close() }()Copy the code