“This is the 31st day of my participation in the First Challenge 2022. For details: First Challenge 2022”

The realization of the jsonrpc2.0

Request the request

Request can stand for jsorpc2.0 Request or notification.

type Request struct {
  Method string           `json:"method"`
  Params *json.RawMessage `json:"params,omitempty"`
  ID     ID               `json:"id"`
  Notif  bool             `json:"-"`

  Meta *json.RawMessage `json:"meta,omitempty"`
}
Copy the code

The Meta field is not specified in the JSONRPC Spec, but rather AIDS in tracing context. The Notif field indicates whether it is a notification. The Params/Meta fields are json serialized strings.

The Request has several methods:

  • MarshalJSON serializes to a JSON string
  • UnmarshalJSON deserializes a JSON string into a Request object
  • SetParams/SetMeta sets the Params/Meta fields respectively

ID is also specified in spec and can be a string/number/empty. The current implementation of this package does not support ID

type ID struct {
  Num uint64
  Str string

  IsString bool
}
Copy the code

In most cases, either Num or Str has a non-zero value. When both are zero, IsString is used to distinguish which field is used as the ID. The method is as follows:

  • String supports printing
  • MarshalJSON serialization
  • UnmarshalJSON deserialization

In Response to the Response

Response indicates a Response but does not contain a notification.

type Response struct {
  ID     ID               `json:"id"`
  Result *json.RawMessage `json:"result,omitempty"`
  Error  *Error           `json:"error,omitempty"`

  Meta *json.RawMessage `json:"meta,omitempty"`
}
Copy the code

Only one Result or Error can be returned. When errors occur, there is one particular type of error that is simply ignored for simplicity: the ID error in the request.

Response also contains several methods:

  • MarshalJSON serializes to a JSON string
  • UnmarshalJSON deserializes into a Response object
  • SetResult Sets the Result field

As you can see from the source code here,Request and Response are serialized and deserialized in two different ways, which is the author’s trick.

Response.Error is specified in the spec:

type Error struct {
  Code    int64            `json:"code"`
  Message string           `json:"message"`
  Data    *json.RawMessage `json:"data"`
}
Copy the code

Error method:

  • SetError sets Data in Error
  • Error implements the Error interface

The following are errors specified in the spec

const (
  CodeParseError     = -32700
  CodeInvalidRequest = -32600
  CodeMethodNotFound = -32601
  CodeInvalidParams  = -32602
  CodeInternalError  = -32603
)
Copy the code

Connect the Conn

Conn is a jSON-RPC connection between client and server. Client and server are symmetric, so both client and server use Conn objects.

type Conn struct {
  stream ObjectStream

  h Handler

  mu       sync.Mutex
  shutdown bool
  closing  bool
  seq      uint64
  pending  map[ID]*call

  sending sync.Mutex

  disconnect chan struct{}

  logger Logger

  onRecv []func(*Request, *Response)
  onSend []func(*Request, *Response)
}
Copy the code

Conn involves many other objects, so let’s take a look at them first, and finally look at the capabilities Conn provides.

ObjectStream

ObjectStreram, which represents a bidirectional stream of JSONRpc2.9 objects. Through this bidirectional stream, jsonRpc2.0 objects can be written to and fetched from the stream.

type ObjectStream interface {
  WriteObject(obj interface{}) error
  ReadObject(v interface{}) error

  io.Closer
}
Copy the code

Looking at the implementation type through GoImplements, you can see that bufferedObjectStream and a type in the subpackage are also implemented. Let’s start with bufferedObjectStream:

type bufferedObjectStream struct {
  conn io.Closer // all writes should go through w, all reads through r
  w    *bufio.Writer
  r    *bufio.Reader

  codec ObjectCodec

  mu sync.Mutex
}
Copy the code

This bufferedObjectStream is a buffered IO.ReadWriteCloser for sending and receiving objects (jSONRPC2.0 objects). Before taking a look at bufferedobobject Stream in detail, ObjectCodec:

ObjectCodec specifies how to codec jsonrpc2.0 objects in a stream.

type ObjectCodec interface {
  WriteObject(stream io.Writer, obj interface{}) error
  ReadObject(stream *bufio.Reader, v interface{}) error
}
Copy the code

ObjectCodec is implemented by two types, indicating that there are two encoding methods: VarintObjectCodec and VSCodeObjectCodec.

VarintObjectCodec is called header variable length. The header is the length. Let’s look at the write object first:

    type VarintObjectCodec struct{}
    func (VarintObjectCodec) WriteObject(
      stream io.Writer, obj interface{}) error {
      data, err := json.Marshal(obj)
      iferr ! =nil {
        return err
      }
      var buf [binary.MaxVarintLen64]byte
      b := binary.PutUvarint(buf[:], uint64(len(data)))
      if_, err := stream.Write(buf[:b]); err ! =nil {
        return err
      }
      if_, err := stream.Write(data); err ! =nil {
        return err
      }
      return nil
    }
Copy the code

Binar. MaxVarintLen64 is 10, indicating that the maximum number of bytes in int64 is 10. Binar. PutUvarint Indicates that a uint64 number is written into a slice. So VarintObjectCodec. WriteObject process is as follows:

  • Serialize the data with JSON
  • Write the binary’s encoded length to the stream
  • Write json serialized data to stream

Because the length of JSON serialization is variable, this encoding is also known as variable-length encoding.

func (VarintObjectCodec) ReadObject( stream *bufio.Reader, v interface{}) error { b, err := binary.ReadUvarint(stream) if err ! = nil { return err } return json.NewDecoder(io.LimitReader(stream, int64(b))).Decode(v) }Copy the code

Read a binary encoded uint64 length of data from the stream. Then read a fixed length of data from the stream. This data is the JSON serialized data.

The remaining encoding is VSCodeObjectCodec, which contains the length and type in the header, and is one that Microsoft defines for LSPS.

type VSCodeObjectCodec struct{} func (VSCodeObjectCodec) WriteObject( stream io.Writer, obj interface{}) error { data, err := json.Marshal(obj) if err ! = nil { return err } if _, err := fmt.Fprintf( stream, "Content-Length: %d\r\n\r\n", len(data)); err ! = nil { return err } if _, err := stream.Write(data); err ! = nil { return err } return nil }Copy the code

As you can see, the Content-Type is fixed (json serialized data), so when implemented, only the length is specified. The length and Content are preceded by a blank line, and the newline is ‘\r\n’.

    func (VSCodeObjectCodec) ReadObject(
      stream *bufio.Reader, v interface{}) error {
      var contentLength uint64
      for {
        line, err := stream.ReadString('\r')
        iferr ! =nil {
          return err
        }
        b, err := stream.ReadByte()
        iferr ! =nil {
          return err
        }
        ifb ! ='\n' {
          return fmt.Errorf(`jsonrpc2: line endings must be \r\n`)}if line == "\r" {
          break
        }
        if strings.HasPrefix(line, "Content-Length: ") {
          line = strings.TrimPrefix(line, "Content-Length: ")
          line = strings.TrimSpace(line)
          var err error
          contentLength, err = strconv.ParseUint(line, 10.32)
          iferr ! =nil {
            return err
          }
        }
      }
      if contentLength == 0 {
        return fmt.Errorf("jsonrpc2: no Content-Length header found")}return json.NewDecoder(io.LimitReader(stream, int64(contentLength))).Decode(v)
    }
Copy the code

Bufio.read.readstring (), which reads the specified character directly, including the specified character strings.HasPrefix(), which checks whether the prefix of a string is XXX. Strings.trimprefix (), which removes the prefix or returns the string if no prefix is specified. Strings.trimspace (), which removes the leading and trailing Spaces.

With a for loop, both ‘\r\n’ are cleverly handled, and the exact length of the contents is calculated when the for loop exits. Finally, the JSON library is called to deserialize.

Now that we’ve gone through two json encodings, let’s go back to Bufferedo ObjectStream.

BufferedObjectStream implements ObjectStream:

func (t *bufferedObjectStream) WriteObject(obj interface{}) error { t.mu.Lock() defer t.mu.Unlock() if err := t.codec.WriteObject(t.w, obj); err ! // Write object func (t *bufferedObjectStream) ReadObject(v) interface{}) error { return t.codec.ReadObject(t.r, v) } func (t *bufferedObjectStream) Close() error { return t.conn.Close() }Copy the code

A bufferedObjectStream is not exported. It is constructed using the NewBufferedStream constructor. Again, reading and writing to a stream is done using a specific encoding object, so bufferedObjectStream actually provides a stream representation, which is what NewBufferedStream specifies,

func NewBufferedStream(
  conn io.ReadWriteCloser, codec ObjectCodec) ObjectStream {
  return &bufferedObjectStream{
    conn:  conn,
    w:     bufio.NewWriter(conn),
    r:     bufio.NewReader(conn),
    codec: codec,
  }
}
Copy the code

As you can see from this constructor, the stream object that is being read and written is IO.ReadWriteCloser.

At this point, everything related to ObjectStream is analyzed (except in subpackages). Let’s go back to the Conn object.

Handler

The second field in the Conn structure.

type Handler interface {
  Handle(context.Context, *Conn, *Request)
}
Copy the code

Handler is an interface that handles requests and notifications from JSONRPC. Handle() handles requests one by one. If there is no order required, AsyncHandler can be used. Synchronous version HandlerWithErrorConfigurer we see first, and then see the asynchronous version AsyncHandler.

type HandlerWithErrorConfigurer struct {
  handleFunc func(
    context.Context, *Conn, *Request) (result interface{}, err error)
  suppressErrClosed bool
}
Copy the code

This structure is relatively simple and contains a function and a flag bit. The constructor is HandlerWithError:

func HandlerWithError(handleFunc
  func(context.Context, *Conn, *Request) (result interface{}, err error
    )) *HandlerWithErrorConfigurer {
  return &HandlerWithErrorConfigurer{handleFunc: handleFunc}
}
Copy the code

The implementation of Handler is as follows:

    func (h *HandlerWithErrorConfigurer) Handle( ctx context.Context, conn *Conn, req *Request) {
      result, err := h.handleFunc(ctx, conn, req)
      if req.Notif {
        iferr ! =nil {
          conn.logger.Printf("jsonrpc2 handler: notification %q handling error: %v\n",
            req.Method, err)
        }
        return
      }

      resp := &Response{ID: req.ID}
      if err == nil {
        err = resp.SetResult(result)
      }
      iferr ! =nil {
        if e, ok := err.(*Error); ok {
          resp.Error = e
        } else {
          resp.Error = &Error{Message: err.Error()}
        }
      }

      if! req.Notif {iferr := conn.SendResponse(ctx, resp); err ! =nil {
          iferr ! = ErrClosed || ! h.suppressErrClosed { conn.logger.Printf("jsonrpc2 handler: sending response %s: %v\n",
              resp.ID, err)
          }
        }
      }
    }
Copy the code

The processing process is as follows:

  • The handler function is called when it is specified during construction
    • If Request is a notification, an error is logged; That’s right. End it.
  • If Request is a Request, the handler returns result
    • Construct a Response
    • Call conn.sendresponse to send the response

The HandlerWithErrorConfigurer also supports ignore error connection is closed. Let’s look at asynchronous handlers:

func AsyncHandler(h Handler) Handler {
  return asyncHandler{h}
}

type asyncHandler struct {
  Handler
}

func (h asyncHandler) Handle(ctx context.Context, conn *Conn, req *Request) {
  go h.Handler.Handle(ctx, conn, req)
}
Copy the code

Asynchronous is relatively simple, directly embedded a Handler interface object, with go to do asynchronous processing. Note that the constructor AsyncHandler is exposed and the implementation object AsyncHandler structure is not. Since the asynchronous Handler embedded Hanlder, so HanlderWithErrorConfigurer eventually will have access to.

call

Call represents the entire life cycle of a JSONRPC call.

type call struct {
  request  *Request
  response *Response
  seq      uint64 // the seq of the request
  done     chan error
}
Copy the code

Map [ID]call records all calls.

Logger

Logging is implemented in the standard library log.logger

type Logger interface { Printf(format string, v ... interface{}) }Copy the code

Analysis of the Conn

Make sure Conn implements the JSONRPC2 interface

var _ JSONRPC2 = (*Conn)(nil) func NewConn(ctx context.Context, stream ObjectStream, h Handler, opts ... ConnOpt) *Conn { c := &Conn{ stream: stream, h: h, pending: map[ID]*call{}, disconnect: make(chan struct{}), logger: log.New(os.Stderr, "", log.LstdFlags), } for _, opt := range opts { if opt == nil { continue } opt(c) } go c.readMessages(ctx) return c }Copy the code

ObjectStream and Handler are both interfaces that need to be constructed before being passed in. The log ends up being lost to os.stderr. The construct also does two things:

  • The opt is traversed and Conn constructed is processed
  • Open read (coroutine)

ConnOpt provides custom handling of Conn. Conn.readmessages () is a read coroutine. Before delving further, let’s examine the anyMessage type:

anyMessage

AnyMessage is a Request or a Response:

type anyMessage struct { request *Request response *Response } func (m anyMessage) MarshalJSON() ([]byte, error) { var v interface{} switch { case m.request ! = nil && m.response == nil: v = m.request case m.request == nil && m.response ! = nil: v = m.response } if v ! = nil { return json.Marshal(v) } return nil, errors.New( "jsonrpc2: message must have exactly \ one of the request or response fields set") }Copy the code

Json serialization is the serialization of a call to Request or Response.

    func (m *anyMessage) UnmarshalJSON(data []byte) error {
      type msg struct {
        ID     interface{}              `json:"id"`
        Method *string                  `json:"method"`
        Result anyValueWithExplicitNull `json:"result"`
        Error  interface{}              `json:"error"`
      }

      var isRequest, isResponse bool
      checkType := func(m *msg) error{ mIsRequest := m.Method ! =nilmIsResponse := m.Result.null || m.Result.value ! =nil|| m.Error ! =nil
        if(! mIsRequest && ! mIsResponse) || (mIsRequest && mIsResponse) {return errors.New(
            "jsonrpc2: unable to determine message type (request or response)")}if (mIsRequest && isResponse) || (mIsResponse && isRequest) {
          return errors.New(
            "jsonrpc2: batch message type mismatch (must be all requests or all responses)")
        }
        isRequest = mIsRequest
        isResponse = mIsResponse
        return nil
      }

      if isArray := len(data) > 0 && data[0] = ='['; isArray {
        var msgs []msg
        iferr := json.Unmarshal(data, &msgs); err ! =nil {
          return err
        }
        if len(msgs) == 0 {
          return errors.New("jsonrpc2: invalid empty batch")}for i := range msgs {
          iferr := checkType(&msg{ ID: msgs[i].ID, Method: msgs[i].Method, Result: msgs[i].Result, Error: msgs[i].Error, }); err ! =nil {
            return err
          }
        }
      } else {
        var m msg
        iferr := json.Unmarshal(data, &m); err ! =nil {
          return err
        }
        iferr := checkType(&m); err ! =nil {
          return err
        }
      }

      var v interface{}
      switch {
      caseisRequest && ! isResponse: v = &m.requestcase! isRequest && isResponse: v = &m.response }iferr := json.Unmarshal(data, v); err ! =nil {
        return err
      }
      if! isRequest && isResponse && m.response.Error ==nil
          && m.response.Result == nil {
        m.response.Result = &jsonNull
      }
      return nil
    }
Copy the code

The code for this block can be broken into three pieces:

  • Define a function that checks whether a deserialization is Request or Response
  • Json deserialization, which is then determined by a detection function, supports either Request or Response arrays
  • Json deserialization to the specified object

Conn’s read coroutine

Read coroutines are started when Conn is constructed

// This read-coroutine takes two steps: // loop reading; Func (c *Conn) readMessages(CTX context.context) {var err error for err == nil {// Read data from stream // Var m anyMessage err = c.stem.readObject (&m) if err! = nil {break} switch {// If Request is made, call onRecv first, then call Handler to process Request case m.request! = nil: for _, onRecv := range c.onRecv { onRecv(m.request, nil) } c.h.Handle(ctx, c, m.request) case m.response ! = nil: resp := m.response if resp ! = nil {id := resp.id // If Response is pending, drop it from pending list // if Response is received, drop it from pending list. delete(c.pending, id) c.mu.Unlock() if call ! If len(c.onrecv) > 0 {var req *Request if call! = nil { req = call.request } for _, onRecv := range c.onRecv { onRecv(req, Resp)}} // The final error handling is divided into three classes // After the error handling, the request-response life cycle will be marked as the end of the switch {case call == nil: c.logger.Printf("jsonrpc2: ignoring response #%s with no corresponding request\n", id) case resp.Error ! = nil: call.done <- resp.Error close(call.done) default: Call. Done < -nil close(call.done)}}}} // Stream Releases resources after a read failure. C. ending.Lock()  := c.closing if err == io.EOF { if closing { err = ErrClosed } else { err = io.ErrUnexpectedEOF } } for _, call := range c.pending { call.done <- err close(call.done) } c.mu.Unlock() c.sending.Unlock() if err ! = io.ErrUnexpectedEOF && ! closing { c.logger.Printf("jsonrpc2: protocol error: %v\n", err) } close(c.disconnect) }Copy the code

Conn implementation of JSONRPC2 interface

JSONRPC2 interface, the Call method is a standard request,Notify is a notification request,Close is to Close the connection.

type JSONRPC2 interface { Call(ctx context.Context, method string, params, result interface{}, opt ... CallOption) error Notify(ctx context.Context, method string, params interface{}, opt ... CallOption) error Close() error }Copy the code

Implementation of Call:

func (c *Conn) Call(ctx context.Context, method string, params, result interface{}, opts ... Req := &Request{Method: Method} if err := req.setparams (params); err ! = nil {return err} // Preprocessing the request for _, opt := range opts {if opt == nil {continue} if err := opt.apply(req); err ! = nil {return err}} // Send a request call, err := c. end(CTX, &anyMessage{request: req}, true) if err! = nil {return err} // wait for return select {// Return error (read response from stream) case err, OK := <-call.done: if! ok { err = ErrClosed } if err ! = nil { return err } if result ! Result == nil {call.response.Result = &jsonNull} // Finally return Result in the response if err := json.Unmarshal(*call.response.Result, result); err ! = nil {return err}} return nil // Timed out or cancelled case < -ctx.done (): return ctx.err ()}}Copy the code

The notification request does not respond to the notification request. The notification request is returned as soon as it is sent:

func (c *Conn) Notify(ctx context.Context, method string, params interface{}, opts ... CallOption) error { req := &Request{Method: method, Notif: true} if err := req.SetParams(params); err ! = nil { return err } for _, opt := range opts { if opt == nil { continue } if err := opt.apply(req); err ! = nil { return err } } _, err := c.send(ctx, &anyMessage{request: req}, false) return err }Copy the code

Notify is similar to Call in implementation, but without waiting for a response. In addition, the notification Request does not have an ID, so the Request id must be added in the pre-processing of the Call.

Implementation of Close:

func (c *Conn) Close() error {
  c.mu.Lock()
  if c.shutdown || c.closing {
    c.mu.Unlock()
    return ErrClosed
  }
  c.closing = true
  c.mu.Unlock()
  return c.stream.Close()
}
Copy the code

Here are Conn’s other methods:

Func (c *Conn) send(_ context. context, m *anyMessage, wait bool) (cc *call, err error) { c.sending.Lock() defer c.sending.Unlock() var id ID c.mu.Lock() if c.shutdown || c.closing { c.mu.Unlock() Return nil, ErrClosed} // If it is a standard request, you need to maintain a pending list // each element in the list represents the complete life cycle of a request. = nil && wait { cc = &call{request: m.request, seq: c.seq, done: make(chan error, 1)} if ! m.request.ID.IsString && m.request.ID.Num == 0 { m.request.ID.Num = c.seq } id = m.request.ID c.pending[id] = cc c.seq++ If len(c.send) > 0 {var (req *Request resp *Response) switch {case m.request! = nil: req = m.request case m.response ! = nil: resp = m.response } for _, onSend := range c.onSend { onSend(req, Resp)}} // In the event of an error, delete defer func() {if err! = nil { if cc ! = nil { c.mu.Lock() delete(c.pending, If err := c.stem.writeObject (m); err ! = nil { return nil, err } return cc, nil }Copy the code

Conn.reply returns a response (success); Conn.replywitherror Returns a failed response. The response will have an ID that corresponds to the requested ID.

func (c *Conn) Reply(ctx context.Context, id ID, result interface{}) error { resp := &Response{ID: id} if err := resp.SetResult(result); err ! = nil { return err } _, err := c.send(ctx, &anyMessage{response: resp}, false) return err } func (c *Conn) ReplyWithError( ctx context.Context, id ID, respErr *Error) error { _, err := c.send(ctx, &anyMessage{ response: &Response{ID: id, Error: respErr}}, false) return err } func (c *Conn) SendResponse(ctx context.Context, resp *Response) error { _, err := c.send(ctx, &anyMessage{response: resp}, false) return err }Copy the code

Finally, there is a “disconnected channel”:

func (c *Conn) DisconnectNotify() <-chan struct{} {
  return c.disconnect
}
Copy the code

To analyze

To this point jsonRpc2 package source code has been completed, but are islands, not formed a comprehensive understanding. Skip to the subpackage WebSocket first

The extensions provided so far are as follows:

  • OnSend/OnRecv
  • Handler, how do you handle requests, which is part of the business logic
  • ObjectStream, which identifies the use of webSockets through subpackages

At this point, it is found that many structures are just for testing, in fact, the whole jSONRPC2 provides a relatively simple function. This package, with the exception of the test section, is very concise.

The last one

BufferedObjectStream provides ObjectStream implementation, but based on bufio, subpackage sourcegraph jsonrpc2 / websocket is based on the websocket jsonrpc2 flow.

type ObjectStream struct {
  conn *ws.Conn
}

func NewObjectStream(conn *ws.Conn) ObjectStream {
  return ObjectStream{conn: conn}
}

func (t ObjectStream) WriteObject(obj interface{}) error {
  return t.conn.WriteJSON(obj)
}

func (t ObjectStream) ReadObject(v interface{}) error {
  err := t.conn.ReadJSON(v)
  if e, ok := err.(*ws.CloseError); ok {
    if e.Code == ws.CloseAbnormalClosure &&
        e.Text == io.ErrUnexpectedEOF.Error() {
      err = io.ErrUnexpectedEOF
    }
  }
  return err
}

func (t ObjectStream) Close() error {
  return t.conn.Close()
}
Copy the code

Ws here refers to Gorilla/Websocket.

With this subpackage, all incoming and outgoing traffic is webSocket-based.

Ion – SFU application

You only use building Conn objects and listening for disconnected channels.

jc := jsonrpc2.NewConn(r.Context(), websocketjsonrpc2.NewObjectStream(c), p)
<-jc.DisconnectNotify()
Copy the code

So sourceGraph /jsonrpc2 has many extensions, but the ion project only uses ObjectStream and Handler, and ObjectStream extends to webSocket with subpackages.

The third argument to jsonrpc2.newconn is Handler. We will first analyze when it is called and finally analyze what is inside Handler.

When Conn is constructed, a read coroutine is created. In this read coroutine, a for loop is read from a stream (read from a websocket). When Request is read, Handler is called:

c.h.Handle(ctx, c, m.request)
Copy the code

Handler () is called by the Handler interface. Handler () is called by the Handler interface.

s := sfu.NewSFU(conf)
p := server.NewJSONSignal(sfu.NewPeer(s))
Copy the code