“This is the 17th day of my participation in the First Challenge 2022.

Pipe. Go analysis

This file uses the Errors package as well as the Sync library.

Pipe is an adapter used to connect readers and writers.

Structure analysis

What is exposed is a constructor and two objects that are constructed. Both objects expose methods separately, and both objects have a common underlying object. In fact, the methods exposed by these two objects call the underlying object directly, so the core remains on the underlying object, but the details of the underlying object are hidden by two objects and a constructor.

Pipe sruct analysis

There are few ways to write pipe, but there are plenty of new ones.

type atomicError struct{ v atomic.Value }

func (a *atomicError) Store(err error) {
  a.v.Store(struct{ error }{err})
}
func (a *atomicError) Load() error {
  err, _ := a.v.Load().(struct{ error })
  return err.error
}
Copy the code

AtomicError provides atomic reads and writes for error.

type pipe struct {
  wrMu sync.Mutex // Serializes Write operations
  wrCh chan []byte
  rdCh chan int

  once sync.Once // Protects closing done
  done chan struct{}
  rerr atomicError
  werr atomicError
}
Copy the code

The pipe structure is divided into two parts:

  • Read/write channel
    • Two unbuffered channels
    • A mutex (protects exposed write functions)
  • End of identity
    • Once ensures that the done shutdown is performed only once
    • Done indicates the end of reading and writing
    • The remaining two are used to store read/write errors

PipeReader/PipeWriter analysis

PipeReader exposes read/close

type PipeReader struct {
  p *pipe
}

func (r *PipeReader) Read(data []byte) (n int, err error) {
  return r.p.Read(data)
}

func (r *PipeReader) Close() error {
  return r.CloseWithError(nil)
}

func (r *PipeReader) CloseWithError(err error) error {
  return r.p.CloseRead(err)
}
Copy the code

PipeWriter exposes write/close

 type PipeWriter struct {
   p *pipe
 }

func (w *PipeWriter) Write(data []byte) (n int, err error) {
  return w.p.Write(data)
}

func (w *PipeWriter) Close() error {
  return w.CloseWithError(nil)
}

func (w *PipeWriter) CloseWithError(err error) error {
  return w.p.CloseWrite(err)
}
Copy the code

Their method sets are pointer receivers. The implementation of the pipe method is more explicit: read/get read error/end read/write and set read error; Write/get write errors/End read/write and set write errors. The idea is quite clear.

The following focuses on reading and writing pipes

func (p *pipe) Read(b []byte) (n int, err error) {
  select {
  case <-p.done:
    return 0, p.readCloseError()
  default:
  }

  select {
  case bw := <-p.wrCh:
    nr := copy(b, bw)
    p.rdCh <- nr
    return nr, nil
  case <-p.done:
    return 0, p.readCloseError()
  }
}

func (p *pipe) Write(b []byte) (n int, err error) {
  select {
  case <-p.done:
    return 0, p.writeCloseError()
  default:
    p.wrMu.Lock()
    defer p.wrMu.Unlock()
  }

  for once := true; once || len(b) > 0; once = false {
    select {
    case p.wrCh <- b:
      nw := <-p.rdCh
      b = b[nw:]
      n += nw
    case <-p.done:
      return n, p.writeCloseError()
    }
  }
  return n, nil
}
Copy the code

Both reads and writes are accomplished using two stages of SELECT. The first stage of SELECT determines whether the read and write has finished, and the second stage handles the actual read and write.

  • Read
    • Write the number of reads to the read channel each time
  • Write
    • The buffer is written to the write channel, the number of read bytes is retrieved from the read channel, and the buffer is adjusted
    • If the buffer is too large, repeat the process several times until the buffer is full

writing

If the PipeWriter/PipeReader is closed, only one CloseWithError can be kept, but for the convenience of the client (the caller), it can be split into two. Performance tests found little difference in performance when split into two or written as an optional parameter function, so the main effect of writing this method is to make the exposed method easier to understand.

In pipe.Write, the for loop takes the once parameter to ensure that the loop repeats at least once, which is an implementation of do while.

conclusion

Both PipeReader/PipeWriter and Pipe have (partial) implementations for Reader/Writer.

There are other details that are not covered: read-write errors and EOF.

Reflection: in this reading, I first sorted out the code and then looked at the document, only to find that I did not pay attention to the error part. In the following part, the document was still followed by the code, which would be a little more efficient.