What is stream processing
If you have Java experience, you will be full of praise for Java8 Stream, which greatly improves your ability to handle set type data.
int sum = widgets.stream()
.filter(w -> w.getColor() == RED)
.mapToInt(w -> w.getWeight())
.sum();
Copy the code
Stream allows us to support chain-call and functional programming styles to process data in a way that looks like it is being pipelined in real time, and then aggregated. The idea behind Stream is to abstract the data processing process into a data Stream, returning a new Stream for use after each processing.
It would be very difficult to implement a Stream component in Java, but what about implementing a similar Stream with Go?
Stream function definition
Before starting to write the code, first think clearly, clear requirements is the most important step, we try to substitute the perspective of the author to think about the whole component implementation process. Let’s put the underlying implementation logic aside and try to define the Stream function from scratch.
The Stream workflow is also part of the production consumer model. The entire workflow is very similar to the production process in a factory.
-
Creation Stage (Raw material)
-
Processing stage (assembly line processing)
-
Summary stage (final product)
Let’s start defining the API around the stream’s three life cycles:
Create a stage
To create an abstract object called a stream, you can think of it as a constructor.
We support three ways to construct a stream: slice conversion, channel conversion, and functional conversion.
Note that the methods at this stage are plain open methods and do not bind Stream objects.
// Create a stream with a mutable argument pattern
func Just(items ...interface{}) Stream
// Create a stream through a channel
func Range(source <-chan interface{}) Stream
// Create a stream with a function
func From(generate GenerateFunc) Stream
/ / stitching stream
func Concat(s Stream, others ... Stream) Stream
Copy the code
Processing stages
The operations that need to be carried out in the processing stage often correspond to our business logic, such as: transformation, filtering, de-duplication, sorting and so on.
The API belonging to Method at this stage needs to be bound to the Stream object.
Common service scenarios are defined as follows:
// remove duplicate items
Distinct(keyFunc KeyFunc) Stream
// Filter item by conditionFilter(filterFunc FilterFunc, opts ... Option) Stream/ / group
Group(fn KeyFunc) Stream
// Return the first n elements
Head(n int64) Stream
// Return the last n elements
Tail(n int64) Stream
// Convert objectsMap(fn MapFunc, opts ... Option) Stream// Merge items into slice to generate a new stream
Merge() Stream
/ / reverse
Reverse() Stream
/ / sorting
Sort(fn LessFunc) Stream
// apply to each itemWalk(fn WalkFunc, opts ... Option) Stream// Aggregate other streamsConcat(streams ... Stream) StreamCopy the code
The processing logic in the processing phase returns a new Stream object, and there is a basic implementation paradigm
Define a new channel read the channel of the current Stream, process data and finally write to the new channel close the new channel, because the channel will be read by the downstream, so it needs to be closed explicitly to return the new StreamCopy the code
The summary stage
The summary stage is actually the processing results we want, such as: match, count, traverse and so on.
// Check if all matches
AllMatch(fn PredicateFunc) bool
// Check if there is at least one match
AnyMatch(fn PredicateFunc) bool
// Count the quantity
Count() int
/ / empty stream
Done()
// Perform operations on all elements
ForAll(fn ForAllFunc)
// Perform operations on each element
ForEach(fn ForEachFunc)
Copy the code
After teasing out the component’s requirement boundaries, we have a clearer idea of the Stream we are going to implement. In my opinion, a real architect’s grasp of requirements and subsequent evolution can be extremely accurate, which cannot be achieved without deep thinking about requirements and penetrating the essence behind them. Through the author’s perspective to simulate the construction process of the whole project, we learn the author’s thinking methodology, which is the biggest value of learning open source projects.
Ok, let’s try to define the full Stream interface and its functions.
The role of the interface is not only a template function, also is to use its abstract ability to build the framework of the whole of the project from the start in details, can quickly to our thinking process through the interface is simple, learn to form the thinking of the top-down method to observe the entire system from a macro point of view, the beginning is in detail, it is easy to drew his poor heart vacant…
rxOptions struct {
unlimitedWorkers bool
workers int
}
Option func(opts *rxOptions)
/ / key generator
// the element in item-stream
KeyFunc func(item interface{}) interface{}
// Filter function
FilterFunc func(item interface{}) bool
// Object conversion function
MapFunc func(intem interface{}) interface{}
// Object comparison
LessFunc func(a, b interface{}) bool
// iterate over the function
WalkFunc func(item interface{}, pip chan<- interface{})
// Match function
PredicateFunc func(item interface{}) bool
// Perform operations on all elements
ForAllFunc func(pip <-chan interface{})
// Perform an operation on each item
ForEachFunc func(item interface{})
// Perform operations concurrently on each element
ParallelFunc func(item interface{})
// Perform aggregation on all elements
ReduceFunc func(pip <-chan interface{}) (interface{}, error)
//item generating function
GenerateFunc func(source <-chan interface{})
Stream interface {
// remove duplicate items
Distinct(keyFunc KeyFunc) Stream
// Filter item by conditionFilter(filterFunc FilterFunc, opts ... Option) Stream/ / group
Group(fn KeyFunc) Stream
// Return the first n elements
Head(n int64) Stream
// Return the last n elements
Tail(n int64) Stream
// Convert objectsMap(fn MapFunc, opts ... Option) Stream// Merge items into slice to generate a new stream
Merge() Stream
/ / reverse
Reverse() Stream
/ / sorting
Sort(fn LessFunc) Stream
// apply to each itemWalk(fn WalkFunc, opts ... Option) Stream// Aggregate other streamsConcat(streams ... Stream) Stream// Check if all matches
AllMatch(fn PredicateFunc) bool
// Check if there is at least one match
AnyMatch(fn PredicateFunc) bool
// Count the quantity
Count() int
/ / empty stream
Done()
// Perform operations on all elements
ForAll(fn ForAllFunc)
// Perform operations on each element
ForEach(fn ForEachFunc)
// Internal method to get an internal pipe object
channel() chan interface{}}Copy the code
The channel() method is used to fetch the Stream pipe property, exposing a private method read because we are implementing it for interface objects.
// Get the internal data container channel, the internal method
channel() chan interface{}
Copy the code
Implementation approach
Now that the functional definitions are sorted out, consider a few engineering implementation issues.
How do you implement chain calls
Chain call, the builder mode used to create objects can achieve the chain call effect. In fact, the Stream implementation works like a chain, creating a new Stream after each call and returning it to the user.
// remove duplicate items
Distinct(keyFunc KeyFunc) Stream
// Filter item by conditionFilter(filterFunc FilterFunc, opts ... Option) StreamCopy the code
How to achieve pipeline processing effect
The so-called pipeline can be understood as the storage container of data in Stream. In GO, we can use channel as the data pipeline to achieve asynchronous non-blocking effect when Stream chain calls perform multiple operations.
How can parallel processing be supported
Data processing is essentially processing the data in a channel, so to achieve parallel processing is nothing more than parallel consumption channel, using Goroutine coroutine +waitGroup mechanism can be very convenient to achieve parallel processing.
Go to zero
core/fx/stream.go
The implementation of Stream in Go-Zero does not define an interface, but the underlying implementation logic is the same.
To implement the Stream interface we define an internal implementation class where source is of type Channel and simulates pipeline functionality.
internalStream struct {
source <-chan interface{}}Copy the code
Create API
The channel to create a Range
Create a stream through a channel
func Range(source <-chan interface{}) Stream {
return internalStream{
source: source,
}
}
Copy the code
Mutable parameter mode creates Just
It is a good practice to create a stream with a variable argument mode and close a channel as soon as it is written.
func Just(items ...interface{}) Stream {
source := make(chan interface{}, len(items))
for _, item := range items {
source <- item
}
close(source)
return Range(source)
}
Copy the code
Function create From
Create a Stream with a function
func From(generate GenerateFunc) Stream {
source := make(chan interface{})
threading.GoSafe(func(a) {
defer close(source)
generate(source)
})
return Range(source)
}
Copy the code
The execution procedure is not available because it involves calls to function parameters that are passed in externally, so you need to catch runtime exceptions to prevent panic errors from propagating to the upper level and causing the application to crash.
func Recover(cleanups ...func(a)) {
for _, cleanup := range cleanups {
cleanup()
}
if r := recover(a); r ! =nil {
logx.ErrorStack(r)
}
}
func Runsage(fn func(a)) {
defer rescue.Recover()
fn()
}
func GoSafe(fn func(a)) {
go Runsage(fn)
}
Copy the code
Stitching Concat
Concatenate other streams to create a new Stream and call the internal Concat method method. The source implementation of Concat will be analyzed later.
func Concat(s Stream, others ... Stream) Stream {
return s.Concat(others...)
}
Copy the code
Processing API
To heavy Distinct
Because the function KeyFunc (item Interface {}) interface{} is passed in, it also supports custom de-weighting according to the business scenario, essentially using the result returned by KeyFunc to implement de-weighting based on the map.
Function parameters are very powerful and can greatly increase flexibility.
func (s internalStream) Distinct(keyFunc KeyFunc) Stream {
source := make(chan interface{})
threading.GoSafe(func(a) {
Remember that closing is a good habit
defer close(source)
keys := make(map[interface{}]lang.PlaceholderType)
for item := range s.source {
// Custom de-redo logic
key := keyFunc(item)
// If the key does not exist, write the data to the new channel
if_, ok := keys[key]; ! ok { source <- item keys[key] = lang.Placeholder } } })return Range(source)
}
Copy the code
Use cases:
// 1, 2, 3, 4, 5
Just(1.2.3.3.4.5.5).Distinct(func(item interface{}) interface{} {
return item
}).ForEach(func(item interface{}) {
t.Log(item)
})
/ / 1 2 3 4
Just(1.2.3.3.4.5.5).Distinct(func(item interface{}) interface{} {
uid := item.(int)
// Perform special de-weighting logic on items greater than 4, leaving only one item >3
if uid > 3 {
return 4
}
return item
}).ForEach(func(item interface{}) {
t.Log(item)
})
Copy the code
Filter Filter
By abstracting the filtering logic into FilterFunc and applying it to each item, the Boolean value returned by FilterFunc determines whether to write back to the new channel to implement filtering. The actual filtering logic is delegated to the Walk Method.
The Option parameter contains two options:
-
UnlimitedWorkers Do not limit the number of coroutines
-
Workers limit the number of coroutines
FilterFunc func(item interface{}) bool
func (s internalStream) Filter(filterFunc FilterFunc, opts ... Option) Stream {
return s.Walk(func(item interface{}, pip chan<- interface{}) {
if filterFunc(item) {
pip <- item
}
}, opts...)
}
Copy the code
Example:
func TestInternalStream_Filter(t *testing.T) {
// keep the even number 2,4
channel := Just(1.2.3.4.5).Filter(func(item interface{}) bool {
return item.(int) %2= =0
}).channel()
for item := range channel {
t.Log(item)
}
}
Copy the code
Traverse to execute Walk
Perform a WalkFunc operation on each item and write the result to a new Stream.
Notice that the order of the data in the channels of the new Stream is random because the coroutine mechanism is used to read and write data asynchronously.
//item-stream item element
//pipe-item is written to pipe
WalkFunc func(item interface{}, pipe chan<- interface{})
func (s internalStream) Walk(fn WalkFunc, opts ... Option) Stream {
option := buildOptions(opts...)
if option.unlimitedWorkers {
return s.walkUnLimited(fn, option)
}
return s.walkLimited(fn, option)
}
func (s internalStream) walkUnLimited(fn WalkFunc, option *rxOptions) Stream {
// Create a buffered channel
// The default is 16. Elements in a channel that exceed 16 will be blocked
pipe := make(chan interface{}, defaultWorkers)
go func(a) {
var wg sync.WaitGroup
for {
item, ok := <-s.source
// Check if s.ource is closed
// What is the reason for the channel
// If not closed, the coroutine may remain blocked and leak
if! ok {break
}
wg.Add(1)
// Execute the function in safe mode
threading.GoSafe(func(a) {
defer wg.Done()
fn(item, pipe)
})
}
wg.Wait()
close(pipe)
}()
// Return a new Stream
return Range(pipe)
}
func (s internalStream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
pipe := make(chan interface{}, option.workers)
go func(a) {
var wg sync.WaitGroup
// Control the number of coroutines
pool := make(chan lang.PlaceholderType, option.workers)
for {
// The coroutine limit will be blocked
pool <- lang.Placeholder
// Check if s.ource is closed
// What is the reason for the channel
// If not closed, the coroutine may remain blocked and leak
item, ok := <-s.source
if! ok {// A pool is released synchronously
<-pool
break
}
wg.Add(1)
// Execute the function in safe mode
threading.GoSafe(func(a) {
defer func(a) {
wg.Add(- 1)
// Pool is read at a time to release one coroutine position
<-pool
}()
fn(item, pipe)
})
}
wg.Wait()
close(pipe)
}()
return Range(pipe)
}
Copy the code
Use cases:
The order of return is random.
func Test_internalStream_Walk(t *testing.T) {
/ / back to 300100200
Just(1.2.3).Walk(func(item interface{}, pip chan<- interface{}) {
pip <- item.(int) * 100
}, WithWorkers(3)).ForEach(func(item interface{}) {
t.Log(item)
})
}
Copy the code
Group Group
Put into the map by matching items.
KeyFunc func(item interface{}) interface{}
func (s internalStream) Group(fn KeyFunc) Stream {
groups := make(map[interface{}] []interface{})
for item := range s.source {
key := fn(item)
groups[key] = append(groups[key], item)
}
source := make(chan interface{})
go func(a) {
for _, group := range groups {
source <- group
}
close(source)
}()
return Range(source)
}
Copy the code
Get the first n elements Head
N greater than the actual data set length will return all elements
func (s internalStream) Head(n int64) Stream {
if n < 1 {
panic("n must be greather than 1")
}
source := make(chan interface{})
go func(a) {
for item := range s.source {
n--
// The value of n may be greater than the s.ource length. Check whether the value is greater than or equal to 0
if n >= 0 {
source <- item
}
// let successive method go ASAP even we have more items to skip
// why we don't just break the loop, because if break,
// this former goroutine will block forever, which will cause goroutine leak.
// n==0
// Since the source satisfies the condition, why not just break the loop?
// The author mentions preventing coroutine leakage
// Because each operation eventually produces a new Stream, the old Stream will never be called
if n == 0 {
close(source)
break}}// N is larger than the actual length of the s.ource
// The new source still needs to be displayed to close
if n > 0 {
close(source)
}
}()
return Range(source)
}
Copy the code
Example:
/ / return 1, 2
func TestInternalStream_Head(t *testing.T) {
channel := Just(1.2.3.4.5).Head(2).channel()
for item := range channel {
t.Log(item)
}
}
Copy the code
Get the last n elements Tail
This is interesting. In order to make sure we get the last n elements using the Ring slice data structure, let’s look at the implementation of Ring.
// Ring slice
type Ring struct {
elements []interface{}
index int
lock sync.Mutex
}
func NewRing(n int) *Ring {
if n < 1 {
panic("n should be greather than 0")}return &Ring{
elements: make([]interface{}, n),
}
}
// Add elements
func (r *Ring) Add(v interface{}) {
r.lock.Lock()
defer r.lock.Unlock()
// Writes the element to the specified position in the slice
// Write in a loop
r.elements[r.index%len(r.elements)] = v
// Update the location of the next write
r.index++
}
// Get all elements
// Keep the read order the same as the write order
func (r *Ring) Take(a) []interface{} {
r.lock.Lock()
defer r.lock.Unlock()
var size int
var start int
// write in a loop
// The start read position needs to be offloaded, because we want to read out in the same order as the write
if r.index > len(r.elements) {
size = len(r.elements)
// The current write position index starts with the oldest data due to cyclic writes
start = r.index % len(r.elements)
} else {
size = r.index
}
elements := make([]interface{}, size)
for i := 0; i < size; i++ {
// The read order is the same as the write order
elements[i] = r.elements[(start+i)%len(r.elements)]
}
return elements
}
Copy the code
To summarize the advantages of circular slice:
-
Automatic rolling updates are supported
-
Save memory
Circular slice can realize that old data is constantly covered by new data when the fixed capacity is full, because this feature can be used to read n elements after the channel.
func (s internalStream) Tail(n int64) Stream {
if n < 1 {
panic("n must be greather than 1")
}
source := make(chan interface{})
go func(a) {
ring := collection.NewRing(int(n))
// Read all elements, if the number >n ring slice can implement new data overwrite old data
// Ensure that the last n elements are retrieved
for item := range s.source {
ring.Add(item)
}
for _, item := range ring.Take() {
source <- item
}
close(source)
}()
return Range(source)
}
Copy the code
So why not just use len(source) length slices?
The answer is memory savings. One of the great things about any data structure that’s a ring type is that it saves money, it allocates resources on demand.
Example:
func TestInternalStream_Tail(t *testing.T) {
/ / 4, 5
channel := Just(1.2.3.4.5).Tail(2).channel()
for item := range channel {
t.Log(item)
}
/ / 1, 2, 3, 4, 5
channel2 := Just(1.2.3.4.5).Tail(6).channel()
for item := range channel2 {
t.Log(item)
}
}
Copy the code
Element conversion Map
Elements are converted internally by coroutines. Note that the output channel is not guaranteed to output in the original order.
MapFunc func(intem interface{}) interface{}
func (s internalStream) Map(fn MapFunc, opts ... Option) Stream {
return s.Walk(func(item interface{}, pip chan<- interface{}) {
pip <- fn(item)
}, opts...)
}
Copy the code
Example:
func TestInternalStream_Map(t *testing.T) {
channel := Just(1.2.3.4.5.2.2.2.2.2.2).Map(func(item interface{}) interface{} {
return item.(int) * 10
}).channel()
for item := range channel {
t.Log(item)
}
}
Copy the code
Merge the Merge
The implementation was relatively simple, and I thought about it for a long time and didn’t think of a scenario that would work this way.
func (s internalStream) Merge(a) Stream {
var items []interface{}
for item := range s.source {
items = append(items, item)
}
source := make(chan interface{}, 1)
source <- items
return Range(source)
}
Copy the code
Inversion of Reverse
Invert the elements in a channel. The reverse algorithm flow is as follows:
-
Find the intermediate node
-
The two sides of the node begin to switch in pairs
Notice why the slice is used to receive the S.ource. Slices will automatically expand, wouldn’t arrays be better?
You can’t use arrays because you don’t know that Stream writes to a source are written asynchronously in a coroutine. The channel in each Stream may change dynamically.
func (s internalStream) Reverse(a) Stream {
var items []interface{}
for item := range s.source {
items = append(items, item)
}
for i := len(items)/2 - 1; i >= 0; i-- {
opp := len(items) - 1 - i
items[i], items[opp] = items[opp], items[i]
}
return Just(items...)
}
Copy the code
Example:
func TestInternalStream_Reverse(t *testing.T) {
channel := Just(1.2.3.4.5).Reverse().channel()
for item := range channel {
t.Log(item)
}
}
Copy the code
Sort Sort
The internal network calls the slice official package sorting scheme and passes in the comparison function to realize the comparison logic.
func (s internalStream) Sort(fn LessFunc) Stream {
var items []interface{}
for item := range s.source {
items = append(items, item)
}
sort.Slice(items, func(i, j int) bool {
return fn(i, j)
})
return Just(items...)
}
Copy the code
Example:
/ / 5,4,3,2,1
func TestInternalStream_Sort(t *testing.T) {
channel := Just(1.2.3.4.5).Sort(func(a, b interface{}) bool {
return a.(int) > b.(int)
}).channel()
for item := range channel {
t.Log(item)
}
}
Copy the code
Stitching Concat
func (s internalStream) Concat(steams ... Stream) Stream {
Create a new unbuffered channel
source := make(chan interface{})
go func(a) {
Create a waiGroup object
group := threading.NewRoutineGroup()
// Read data asynchronously from the original channel
group.Run(func(a) {
for item := range s.source {
source <- item
}
})
// Asynchronously read the channel data of the Stream to be spliced
for _, stream := range steams {
// Each Stream opens a coroutine
group.Run(func(a) {
for item := range stream.channel() {
source <- item
}
})
}
// Block and wait for the read to complete
group.Wait()
close(source)
}()
// Return a new Stream
return Range(source)
}
Copy the code
Summary the API
AllMatch AllMatch
func (s internalStream) AllMatch(fn PredicateFunc) bool {
for item := range s.source {
if! fn(item) {return false}}return true
}
Copy the code
Matches AnyMatch
func (s internalStream) AnyMatch(fn PredicateFunc) bool {
for item := range s.source {
if! fn(item) {return true}}return false
}
Copy the code
Quantity Statistics Count
func (s internalStream) Count(a) int {
var count int
for range s.source {
count++
}
return count
}
Copy the code
To empty the Done
func (s internalStream) Done(a) {
//clear channel
for range s.source {
}
}
Copy the code
Iterate over all elements ForAll
func (s internalStream) ForAll(fn ForAllFunc) {
fn(s.source)
}
Copy the code
Iterate over each element ForEach
func (s internalStream) ForAll(fn ForAllFunc) {
fn(s.source)
}
Copy the code
summary
At this point, the Stream component is completely implemented. The core logic is to use the channel as a pipe, data as a Stream, and continuously use the coroutine to receive/write data to the channel to achieve asynchronous non-blocking effect.
Going back to the problem I mentioned at the beginning, implementing a stream without actually doing it seems very difficult, and it’s hard to imagine that such a powerful component could be implemented in 300 + lines of go code.
The foundation for achieving efficiency comes from three language features:
-
channel
-
coroutines
-
Functional programming
The resources
Slice inversion algorithm for pipeline mode