What is stream processing

If you have Java experience, you will be full of praise for Java8 Stream, which greatly improves your ability to handle set type data.

int sum = widgets.stream()
              .filter(w -> w.getColor() == RED)
              .mapToInt(w -> w.getWeight())
              .sum();
Copy the code

Stream allows us to support chain-call and functional programming styles to process data in a way that looks like it is being pipelined in real time, and then aggregated. The idea behind Stream is to abstract the data processing process into a data Stream, returning a new Stream for use after each processing.

It would be very difficult to implement a Stream component in Java, but what about implementing a similar Stream with Go?

Stream function definition

Before starting to write the code, first think clearly, clear requirements is the most important step, we try to substitute the perspective of the author to think about the whole component implementation process. Let’s put the underlying implementation logic aside and try to define the Stream function from scratch.

The Stream workflow is also part of the production consumer model. The entire workflow is very similar to the production process in a factory.

  1. Creation Stage (Raw material)

  2. Processing stage (assembly line processing)

  3. Summary stage (final product)

Let’s start defining the API around the stream’s three life cycles:

Create a stage

To create an abstract object called a stream, you can think of it as a constructor.

We support three ways to construct a stream: slice conversion, channel conversion, and functional conversion.

Note that the methods at this stage are plain open methods and do not bind Stream objects.

// Create a stream with a mutable argument pattern
func Just(items ...interface{}) Stream

// Create a stream through a channel
func Range(source <-chan interface{}) Stream

// Create a stream with a function
func From(generate GenerateFunc) Stream

/ / stitching stream
func Concat(s Stream, others ... Stream) Stream
Copy the code
Processing stages

The operations that need to be carried out in the processing stage often correspond to our business logic, such as: transformation, filtering, de-duplication, sorting and so on.

The API belonging to Method at this stage needs to be bound to the Stream object.

Common service scenarios are defined as follows:

// remove duplicate items
Distinct(keyFunc KeyFunc) Stream
// Filter item by conditionFilter(filterFunc FilterFunc, opts ... Option) Stream/ / group
Group(fn KeyFunc) Stream
// Return the first n elements
Head(n int64) Stream
// Return the last n elements
Tail(n int64) Stream
// Convert objectsMap(fn MapFunc, opts ... Option) Stream// Merge items into slice to generate a new stream
Merge() Stream
/ / reverse
Reverse() Stream
/ / sorting
Sort(fn LessFunc) Stream
// apply to each itemWalk(fn WalkFunc, opts ... Option) Stream// Aggregate other streamsConcat(streams ... Stream) StreamCopy the code

The processing logic in the processing phase returns a new Stream object, and there is a basic implementation paradigm

Define a new channel read the channel of the current Stream, process data and finally write to the new channel close the new channel, because the channel will be read by the downstream, so it needs to be closed explicitly to return the new StreamCopy the code
The summary stage

The summary stage is actually the processing results we want, such as: match, count, traverse and so on.

// Check if all matches
AllMatch(fn PredicateFunc) bool
// Check if there is at least one match
AnyMatch(fn PredicateFunc) bool
// Count the quantity
Count() int
/ / empty stream
Done()
// Perform operations on all elements
ForAll(fn ForAllFunc)
// Perform operations on each element
ForEach(fn ForEachFunc)
Copy the code

After teasing out the component’s requirement boundaries, we have a clearer idea of the Stream we are going to implement. In my opinion, a real architect’s grasp of requirements and subsequent evolution can be extremely accurate, which cannot be achieved without deep thinking about requirements and penetrating the essence behind them. Through the author’s perspective to simulate the construction process of the whole project, we learn the author’s thinking methodology, which is the biggest value of learning open source projects.

Ok, let’s try to define the full Stream interface and its functions.

The role of the interface is not only a template function, also is to use its abstract ability to build the framework of the whole of the project from the start in details, can quickly to our thinking process through the interface is simple, learn to form the thinking of the top-down method to observe the entire system from a macro point of view, the beginning is in detail, it is easy to drew his poor heart vacant…

  rxOptions struct {
    unlimitedWorkers bool
    workers          int
  }
  Option func(opts *rxOptions)
  / / key generator
  // the element in item-stream
  KeyFunc func(item interface{}) interface{}
  // Filter function
  FilterFunc func(item interface{}) bool
  // Object conversion function
  MapFunc func(intem interface{}) interface{}
  // Object comparison
  LessFunc func(a, b interface{}) bool
  // iterate over the function
  WalkFunc func(item interface{}, pip chan<- interface{})
  // Match function
  PredicateFunc func(item interface{}) bool
  // Perform operations on all elements
  ForAllFunc func(pip <-chan interface{})
  // Perform an operation on each item
  ForEachFunc func(item interface{})
  // Perform operations concurrently on each element
  ParallelFunc func(item interface{})
  // Perform aggregation on all elements
  ReduceFunc func(pip <-chan interface{}) (interface{}, error)
  //item generating function
  GenerateFunc func(source <-chan interface{})
  
  
  Stream interface {
    // remove duplicate items
    Distinct(keyFunc KeyFunc) Stream
    // Filter item by conditionFilter(filterFunc FilterFunc, opts ... Option) Stream/ / group
    Group(fn KeyFunc) Stream
    // Return the first n elements
    Head(n int64) Stream
    // Return the last n elements
    Tail(n int64) Stream
    // Convert objectsMap(fn MapFunc, opts ... Option) Stream// Merge items into slice to generate a new stream
    Merge() Stream
    / / reverse
    Reverse() Stream
    / / sorting
    Sort(fn LessFunc) Stream
    // apply to each itemWalk(fn WalkFunc, opts ... Option) Stream// Aggregate other streamsConcat(streams ... Stream) Stream// Check if all matches
    AllMatch(fn PredicateFunc) bool
    // Check if there is at least one match
    AnyMatch(fn PredicateFunc) bool
    // Count the quantity
    Count() int
    / / empty stream
    Done()
    // Perform operations on all elements
    ForAll(fn ForAllFunc)
    // Perform operations on each element
    ForEach(fn ForEachFunc)
    // Internal method to get an internal pipe object
    channel() chan interface{}}Copy the code

The channel() method is used to fetch the Stream pipe property, exposing a private method read because we are implementing it for interface objects.

// Get the internal data container channel, the internal method
channel() chan interface{}
Copy the code

Implementation approach

Now that the functional definitions are sorted out, consider a few engineering implementation issues.

How do you implement chain calls

Chain call, the builder mode used to create objects can achieve the chain call effect. In fact, the Stream implementation works like a chain, creating a new Stream after each call and returning it to the user.

// remove duplicate items
Distinct(keyFunc KeyFunc) Stream
// Filter item by conditionFilter(filterFunc FilterFunc, opts ... Option) StreamCopy the code
How to achieve pipeline processing effect

The so-called pipeline can be understood as the storage container of data in Stream. In GO, we can use channel as the data pipeline to achieve asynchronous non-blocking effect when Stream chain calls perform multiple operations.

How can parallel processing be supported

Data processing is essentially processing the data in a channel, so to achieve parallel processing is nothing more than parallel consumption channel, using Goroutine coroutine +waitGroup mechanism can be very convenient to achieve parallel processing.

Go to zero

core/fx/stream.go

The implementation of Stream in Go-Zero does not define an interface, but the underlying implementation logic is the same.

To implement the Stream interface we define an internal implementation class where source is of type Channel and simulates pipeline functionality.

internalStream struct {
  source <-chan interface{}}Copy the code

Create API

The channel to create a Range

Create a stream through a channel

func Range(source <-chan interface{}) Stream {  

  return internalStream{  

    source: source,  

  }  

}
Copy the code
Mutable parameter mode creates Just

It is a good practice to create a stream with a variable argument mode and close a channel as soon as it is written.

func Just(items ...interface{}) Stream {
  source := make(chan interface{}, len(items))
  for _, item := range items {
    source <- item
  }
  close(source)
  return Range(source)
}
Copy the code
Function create From

Create a Stream with a function

func From(generate GenerateFunc) Stream {
  source := make(chan interface{})
  threading.GoSafe(func(a) {
    defer close(source)
    generate(source)
  })
  return Range(source)
}
Copy the code

The execution procedure is not available because it involves calls to function parameters that are passed in externally, so you need to catch runtime exceptions to prevent panic errors from propagating to the upper level and causing the application to crash.

func Recover(cleanups ...func(a)) {
  for _, cleanup := range cleanups {
    cleanup()
  }
  if r := recover(a); r ! =nil {
    logx.ErrorStack(r)
  }
}

func Runsage(fn func(a)) {
  defer rescue.Recover()
  fn()
}

func GoSafe(fn func(a)) {
  go Runsage(fn)
}


Copy the code
Stitching Concat

Concatenate other streams to create a new Stream and call the internal Concat method method. The source implementation of Concat will be analyzed later.

func Concat(s Stream, others ... Stream) Stream {
  return s.Concat(others...)
}
Copy the code

Processing API

To heavy Distinct

Because the function KeyFunc (item Interface {}) interface{} is passed in, it also supports custom de-weighting according to the business scenario, essentially using the result returned by KeyFunc to implement de-weighting based on the map.

Function parameters are very powerful and can greatly increase flexibility.

func (s internalStream) Distinct(keyFunc KeyFunc) Stream {
  source := make(chan interface{})
  threading.GoSafe(func(a) {
    Remember that closing is a good habit
    defer close(source)
    keys := make(map[interface{}]lang.PlaceholderType)
    for item := range s.source {
      // Custom de-redo logic
      key := keyFunc(item)
      // If the key does not exist, write the data to the new channel
      if_, ok := keys[key]; ! ok { source <- item keys[key] = lang.Placeholder } } })return Range(source)
}
Copy the code

Use cases:

  // 1, 2, 3, 4, 5
  Just(1.2.3.3.4.5.5).Distinct(func(item interface{}) interface{} {
    return item
  }).ForEach(func(item interface{}) {
    t.Log(item)
  })
  
  / / 1 2 3 4
  Just(1.2.3.3.4.5.5).Distinct(func(item interface{}) interface{} {
    uid := item.(int)
    // Perform special de-weighting logic on items greater than 4, leaving only one item >3
    if uid > 3 {
      return 4
    }
    return item
  }).ForEach(func(item interface{}) {
    t.Log(item)
  })
Copy the code
Filter Filter

By abstracting the filtering logic into FilterFunc and applying it to each item, the Boolean value returned by FilterFunc determines whether to write back to the new channel to implement filtering. The actual filtering logic is delegated to the Walk Method.

The Option parameter contains two options:

  1. UnlimitedWorkers Do not limit the number of coroutines

  2. Workers limit the number of coroutines

FilterFunc func(item interface{}) bool

func (s internalStream) Filter(filterFunc FilterFunc, opts ... Option) Stream {
  return s.Walk(func(item interface{}, pip chan<- interface{}) {
    if filterFunc(item) {
      pip <- item
    }
  }, opts...)
}

Copy the code

Example:

func TestInternalStream_Filter(t *testing.T) {
  // keep the even number 2,4
  channel := Just(1.2.3.4.5).Filter(func(item interface{}) bool {
    return item.(int) %2= =0
  }).channel()
  for item := range channel {
    t.Log(item)
  }
}
Copy the code
Traverse to execute Walk

Perform a WalkFunc operation on each item and write the result to a new Stream.

Notice that the order of the data in the channels of the new Stream is random because the coroutine mechanism is used to read and write data asynchronously.

//item-stream item element
//pipe-item is written to pipe
WalkFunc func(item interface{}, pipe chan<- interface{})

func (s internalStream) Walk(fn WalkFunc, opts ... Option) Stream {
  option := buildOptions(opts...)
  if option.unlimitedWorkers {
    return s.walkUnLimited(fn, option)
  }
  return s.walkLimited(fn, option)
}

func (s internalStream) walkUnLimited(fn WalkFunc, option *rxOptions) Stream {
  // Create a buffered channel
  // The default is 16. Elements in a channel that exceed 16 will be blocked
  pipe := make(chan interface{}, defaultWorkers)
  go func(a) {
    var wg sync.WaitGroup
    for {
      item, ok := <-s.source
      // Check if s.ource is closed
      // What is the reason for the channel
      // If not closed, the coroutine may remain blocked and leak
      if! ok {break
      }
      wg.Add(1)
      // Execute the function in safe mode
      threading.GoSafe(func(a) {
        defer wg.Done()
        fn(item, pipe)
      })
    }
    wg.Wait()
    close(pipe)
  }()
  // Return a new Stream
  return Range(pipe)
}

func (s internalStream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
  pipe := make(chan interface{}, option.workers)
  go func(a) {
    var wg sync.WaitGroup
    // Control the number of coroutines
    pool := make(chan lang.PlaceholderType, option.workers)
    for {
      // The coroutine limit will be blocked
      pool <- lang.Placeholder
      // Check if s.ource is closed
      // What is the reason for the channel
      // If not closed, the coroutine may remain blocked and leak
      item, ok := <-s.source
      if! ok {// A pool is released synchronously
        <-pool
        break
      }
      wg.Add(1)
      // Execute the function in safe mode
      threading.GoSafe(func(a) {
        defer func(a) {
          wg.Add(- 1)
          // Pool is read at a time to release one coroutine position
          <-pool
        }()
        fn(item, pipe)
      })
    }
    wg.Wait()
    close(pipe)
  }()
  return Range(pipe)
}

Copy the code

Use cases:

The order of return is random.

func Test_internalStream_Walk(t *testing.T) {
  / / back to 300100200
  Just(1.2.3).Walk(func(item interface{}, pip chan<- interface{}) {
    pip <- item.(int) * 100
  }, WithWorkers(3)).ForEach(func(item interface{}) {
    t.Log(item)
  })
}
Copy the code
Group Group

Put into the map by matching items.

KeyFunc func(item interface{}) interface{}

func (s internalStream) Group(fn KeyFunc) Stream {
  groups := make(map[interface{}] []interface{})
  for item := range s.source {
    key := fn(item)
    groups[key] = append(groups[key], item)
  }
  source := make(chan interface{})
  go func(a) {
    for _, group := range groups {
      source <- group
    }
    close(source)
  }()
  return Range(source)
}

Copy the code
Get the first n elements Head

N greater than the actual data set length will return all elements

func (s internalStream) Head(n int64) Stream {
  if n < 1 {
    panic("n must be greather than 1")
  }
  source := make(chan interface{})
  go func(a) {
    for item := range s.source {
      n--
      // The value of n may be greater than the s.ource length. Check whether the value is greater than or equal to 0
      if n >= 0 {
        source <- item
      }
      // let successive method go ASAP even we have more items to skip
      // why we don't just break the loop, because if break,
      // this former goroutine will block forever, which will cause goroutine leak.
      // n==0
      // Since the source satisfies the condition, why not just break the loop?
      // The author mentions preventing coroutine leakage
      // Because each operation eventually produces a new Stream, the old Stream will never be called
      if n == 0 {
        close(source)
        break}}// N is larger than the actual length of the s.ource
    // The new source still needs to be displayed to close
    if n > 0 {
      close(source)
    }
  }()
  return Range(source)
}
Copy the code

Example:

/ / return 1, 2
func TestInternalStream_Head(t *testing.T) {
  channel := Just(1.2.3.4.5).Head(2).channel()
  for item := range channel {
    t.Log(item)
  }
}
Copy the code
Get the last n elements Tail

This is interesting. In order to make sure we get the last n elements using the Ring slice data structure, let’s look at the implementation of Ring.

// Ring slice
type Ring struct {
  elements []interface{}
  index    int
  lock     sync.Mutex
}

func NewRing(n int) *Ring {
  if n < 1 {
    panic("n should be greather than 0")}return &Ring{
    elements: make([]interface{}, n),
  }
}

// Add elements
func (r *Ring) Add(v interface{}) {
  r.lock.Lock()
  defer r.lock.Unlock()
  // Writes the element to the specified position in the slice
  // Write in a loop
  r.elements[r.index%len(r.elements)] = v
  // Update the location of the next write
  r.index++
}

// Get all elements
// Keep the read order the same as the write order
func (r *Ring) Take(a) []interface{} {
  r.lock.Lock()
  defer r.lock.Unlock()

  var size int
  var start int
  // write in a loop
  // The start read position needs to be offloaded, because we want to read out in the same order as the write
  if r.index > len(r.elements) {
    size = len(r.elements)
    // The current write position index starts with the oldest data due to cyclic writes
    start = r.index % len(r.elements)
  } else {
    size = r.index
  }
  elements := make([]interface{}, size)
  for i := 0; i < size; i++ {
    // The read order is the same as the write order
    elements[i] = r.elements[(start+i)%len(r.elements)]
  }

  return elements
}
Copy the code

To summarize the advantages of circular slice:

  • Automatic rolling updates are supported

  • Save memory

Circular slice can realize that old data is constantly covered by new data when the fixed capacity is full, because this feature can be used to read n elements after the channel.

func (s internalStream) Tail(n int64) Stream {
  if n < 1 {
    panic("n must be greather than 1")
  }
  source := make(chan interface{})
  go func(a) {
    ring := collection.NewRing(int(n))
    // Read all elements, if the number >n ring slice can implement new data overwrite old data
    // Ensure that the last n elements are retrieved
    for item := range s.source {
      ring.Add(item)
    }
    for _, item := range ring.Take() {
      source <- item
    }
    close(source)
  }()
  return Range(source)
}
Copy the code

So why not just use len(source) length slices?

The answer is memory savings. One of the great things about any data structure that’s a ring type is that it saves money, it allocates resources on demand.

Example:

func TestInternalStream_Tail(t *testing.T) {
  / / 4, 5
  channel := Just(1.2.3.4.5).Tail(2).channel()
  for item := range channel {
    t.Log(item)
  }
  / / 1, 2, 3, 4, 5
  channel2 := Just(1.2.3.4.5).Tail(6).channel()
  for item := range channel2 {
    t.Log(item)
  }
}
Copy the code
Element conversion Map

Elements are converted internally by coroutines. Note that the output channel is not guaranteed to output in the original order.

MapFunc func(intem interface{}) interface{}
func (s internalStream) Map(fn MapFunc, opts ... Option) Stream {
  return s.Walk(func(item interface{}, pip chan<- interface{}) {
    pip <- fn(item)
  }, opts...)
}

Copy the code

Example:

func TestInternalStream_Map(t *testing.T) {
  channel := Just(1.2.3.4.5.2.2.2.2.2.2).Map(func(item interface{}) interface{} {
    return item.(int) * 10
  }).channel()
  for item := range channel {
    t.Log(item)
  }
}
Copy the code
Merge the Merge

The implementation was relatively simple, and I thought about it for a long time and didn’t think of a scenario that would work this way.

func (s internalStream) Merge(a) Stream {
  var items []interface{}
  for item := range s.source {
    items = append(items, item)
  }
  source := make(chan interface{}, 1)
  source <- items
  return Range(source)
}
Copy the code
Inversion of Reverse

Invert the elements in a channel. The reverse algorithm flow is as follows:

  • Find the intermediate node

  • The two sides of the node begin to switch in pairs

Notice why the slice is used to receive the S.ource. Slices will automatically expand, wouldn’t arrays be better?

You can’t use arrays because you don’t know that Stream writes to a source are written asynchronously in a coroutine. The channel in each Stream may change dynamically.

func (s internalStream) Reverse(a) Stream {
  var items []interface{}
  for item := range s.source {
    items = append(items, item)
  }
  for i := len(items)/2 - 1; i >= 0; i-- {
    opp := len(items) - 1 - i
    items[i], items[opp] = items[opp], items[i]
  }
  return Just(items...)
}
Copy the code

Example:

func TestInternalStream_Reverse(t *testing.T) {
  channel := Just(1.2.3.4.5).Reverse().channel()
  for item := range channel {
    t.Log(item)
  }
}
Copy the code
Sort Sort

The internal network calls the slice official package sorting scheme and passes in the comparison function to realize the comparison logic.

func (s internalStream) Sort(fn LessFunc) Stream {
  var items []interface{}
  for item := range s.source {
    items = append(items, item)
  }

  sort.Slice(items, func(i, j int) bool {
    return fn(i, j)
  })
  return Just(items...)
}
Copy the code

Example:

/ / 5,4,3,2,1
func TestInternalStream_Sort(t *testing.T) {
  channel := Just(1.2.3.4.5).Sort(func(a, b interface{}) bool {
    return a.(int) > b.(int)
  }).channel()
  for item := range channel {
    t.Log(item)
  }
}
Copy the code
Stitching Concat
func (s internalStream) Concat(steams ... Stream) Stream {
  Create a new unbuffered channel
  source := make(chan interface{})
  go func(a) {
    Create a waiGroup object
    group := threading.NewRoutineGroup()
    // Read data asynchronously from the original channel
    group.Run(func(a) {
      for item := range s.source {
        source <- item
      }
    })
    // Asynchronously read the channel data of the Stream to be spliced
    for _, stream := range steams {
      // Each Stream opens a coroutine
      group.Run(func(a) {
        for item := range stream.channel() {
          source <- item
        }
      })
    }
    // Block and wait for the read to complete
    group.Wait()
    close(source)
  }()
  // Return a new Stream
  return Range(source)
}
Copy the code

Summary the API

AllMatch AllMatch
func (s internalStream) AllMatch(fn PredicateFunc) bool {
  for item := range s.source {
    if! fn(item) {return false}}return true
}

Copy the code
Matches AnyMatch
func (s internalStream) AnyMatch(fn PredicateFunc) bool {
  for item := range s.source {
    if! fn(item) {return true}}return false
}
Copy the code
Quantity Statistics Count
func (s internalStream) Count(a) int {
  var count int
  for range s.source {
    count++
  }
  return count
}
Copy the code
To empty the Done
func (s internalStream) Done(a) {
  //clear channel
  for range s.source {

  }
}

Copy the code
Iterate over all elements ForAll
func (s internalStream) ForAll(fn ForAllFunc) {
  fn(s.source)
}
Copy the code
Iterate over each element ForEach
func (s internalStream) ForAll(fn ForAllFunc) {
  fn(s.source)
}
Copy the code

summary

At this point, the Stream component is completely implemented. The core logic is to use the channel as a pipe, data as a Stream, and continuously use the coroutine to receive/write data to the channel to achieve asynchronous non-blocking effect.

Going back to the problem I mentioned at the beginning, implementing a stream without actually doing it seems very difficult, and it’s hard to imagine that such a powerful component could be implemented in 300 + lines of go code.

The foundation for achieving efficiency comes from three language features:

  • channel

  • coroutines

  • Functional programming

The resources

Slice inversion algorithm for pipeline mode