Github.com/agelessman/…

Although this paper mainly explains how to customize Subscriber, it is not necessary to do so in real development. As can be seen from the figure above, Subscriber has done three things altogether:

  • Subscribe to the Publisher,
  • Send the request
  • Receive data

Generally, when a Subscriber subscribes to a Publisher and receives a subscription, it sends a Request immediately and waits for the data.

  • If you want to control the timing of your subscription, for example, by clicking a button and then subscribing, you can call.sink() after clicking the button. You don’t need to customize sink

  • If you want to control the timing of sending a request, for example, sending a request with a delay of 5 seconds, there is no need to define a user-defined sink. You only need to call.sink() with a delay of 5 seconds

  • If you want to process data, you can do it in a closure; there is no need to encapsulate the processing details

This paper only discusses the problem of self-definition of sink, the purpose is to let everyone learn the realization of sink in Combine.

Let’s first look at the definition of the Sink class in Combine:

extension Subscribers {

    /// A simple subscriber that requests an unlimited number of values upon subscription.
    final public class Sink<Input.Failure> : Subscriber.Cancellable.CustomStringConvertible.CustomReflectable.CustomPlaygroundDisplayConvertible where Failure : Error {

        /// The closure to execute on receipt of a value.
        final public var receiveValue: (Input) - >Void { get }

        /// The closure to execute on completion.
        final public var receiveCompletion: (Subscribers.Completion<Failure- > >)Void { get }

        final public var description: String { get }

        final public var customMirror: Mirror { get }

        /// A custom playground description for this instance.
        final public var playgroundDescription: Any { get }

        public init(receiveCompletion: @escaping ((Subscribers.Completion<Failure- > >)Void), receiveValue: @escaping ((Input) - >Void))

        final public func receive(subscription: Subscription)


        final public func receive(_ value: Input) -> Subscribers.Demand

        final public func receive(completion: Subscribers.Completion<Failure>)

        /// Cancel the activity.
        final public func cancel(a)}}Copy the code

As can be seen from the above code, Sink is a class that implements multiple protocols such as Subscriber and Cancellable, so the methods below are all methods in the protocol.

What we care about is the Subscriber agreement. Since Sink has implemented this agreement, we can use its instance object to subscribe Publisher, as follows:

let publisher = PassthroughSubject<Int.Never> ()let sink = Subscribers.Sink<Int.Never>(receiveCompletion: {
    print($0)
}, receiveValue: {
    print($0)
})

publisher.subscribe(sink)

publisher.send(1)
Copy the code

The above code is equivalent to:

publisher
    .sink(receiveCompletion: {
        print($0)
    }, receiveValue: {
        print($0)})Copy the code

I think it’s worth explaining why the above code is equivalent. The key is the sink method in the above code:

extension Publisher {

    public func sink(receiveCompletion: @escaping ((Subscribers.Completion<Self.Failure- > >)Void), receiveValue: @escaping ((Self.Output) - >Void)) -> AnyCancellable
}
Copy the code

First it is the Publisher protocol method, so all Publishers are available, and second it just creates a Subscribers.Sink inside and returns it as follows:

extension Publisher {
    public func testSink(receiveCompletion: @escaping ((Subscribers.Completion<Self.Failure- > >)Void),
                     receiveValue: @escaping ((Self.Output) - >Void)) -> AnyCancellable {
        let sink = Subscribers.Sink<Self.Output.Self.Failure>(receiveCompletion: {
           receiveCompletion($0)
        }, receiveValue: {
            receiveValue($0)})self.subscribe(sink)
        return AnyCancellable(sink)
    }
}
Copy the code

In the above code, I intentionally wrote sink as testSink to distinguish it from other functions. In essence, we create an instance of sink inside the testSink function, so we can use it as follows:

let publisher = PassthroughSubject<Int.Never>()

cancellable = publisher.testSink(receiveCompletion: {
    print($0)
}, receiveValue: {
    print($0)
})

publisher.send(1)
Copy the code

Please be careful..sink() is only a simple function interface exposed to the outside world. The real core is sink, because it implements Subscriber and Cancellable protocols.

So the point is, what does Sink do in these protocol methods?

extension Subscribers {
    final public class CustomSink<Input.Failure> :Subscriber.Cancellable where Failure: Error {
        let receiveCompletion: (Subscribers.Completion<Failure- > >)Void
        let receiveValue: (Input) - >Void
        
        var subscription: Subscription?
        
        init(receiveCompletion: @escaping ((Subscribers.Completion<Failure- > >)Void),
             receiveValue: @escaping ((Input) - >Void)) {
            self.receiveCompletion = receiveCompletion
            self.receiveValue = receiveValue
        }
        
        public func receive(subscription: Subscription) {
            self.subscription = subscription
            self.subscription?.request(.unlimited)
        }
        
        public func receive(_ input: Input) -> Subscribers.Demand {
            receiveValue(input)
            return .none
        }
        
        public func receive(completion: Subscribers.Completion<Failure>) {
            receiveCompletion(completion)
            subscription = nil
        }
        
        public func cancel(a) {
            subscription?.cancel()
            subscription = nil}}}Copy the code

CustomSink is our custom class that implements Subscriber and Cancellable protocols. The code is easy to understand and I won’t go into more details. The following two points are worth noting:

  • Request is sent as soon as subscription is received
  • receive(_ input: Input)The return value type of the function isSubscribers.Demand, why do I need to give a return value? The reason is that whenCustomSinkOnce the data is received through this method, we can return a value that tells Publisher to accept more values when the maximum accepted value is reached, for example, assuming we have a customCustomSinkInstead of receiving an infinite number of values, you receive a maximum of three, so when you send a request, the code looks like thisself.subscription? .request(.max(3))In this case, we can only accept 3 values at most. We can change the code whenreceive(_ input: Input)On receipt of the third value, we returnreturn .max(1), so you can receive four values
self.subscription?.request(.max(3))
Copy the code

Let’s first set the parameters in the request to accept a maximum of 3 values, and then try:

let publisher = PassthroughSubject<Int.Never>()

cancellable = publisher.customSink(receiveCompletion: {
    print($0)
}, receiveValue: {
    print($0)
})

publisher.send(1)
publisher.send(2)
publisher.send(3)
publisher.send(4)
publisher.send(5)
Copy the code

Print result:

1
2
3
Copy the code

You can only receive a maximum of three pieces of data, and then you can modify the code. The changes are as follows:

extension Subscribers {
    final public class CustomSink<Input.Failure> :Subscriber.Cancellable where Failure: Error {
        .
        
        var count = 0
        
        .
        
        public func receive(_ input: Input) -> Subscribers.Demand {
            receiveValue(input)
            count + = 1
            if count = = 3 {
                return .max(1)}else {
                return .none
            }
            
        }
        .}}Copy the code

Max (1) = 1; Max (1) = 1; Max (1) = 1; Max (1) = 1;

1
2
3
4
Copy the code

Does that make sense? This approach is flexible and can be used in some scenarios to add new received parameters as above.

All you need to do is expose an interface under Publisher:

extension Publisher {
    public func customSink(receiveCompletion: @escaping ((Subscribers.Completion<Self.Failure- > >)Void),
                     receiveValue: @escaping ((Self.Output) - >Void)) -> AnyCancellable {
        let sink = Subscribers.CustomSink<Self.Output.Self.Failure>(receiveCompletion: {
           receiveCompletion($0)
        }, receiveValue: {
            receiveValue($0)})self.subscribe(sink)
        return AnyCancellable(sink)
    }
}
Copy the code

conclusion

In general, it is very simple and unnecessary to define Subscriber. The core idea of Subscriber is to receive data and events without any logic.