In real development, we will hardly customize Publisher, but it is necessary to learn the knowledge of this paper. In the next three articles, I will explain how to customize Publisher, Operator and Subscriber, and I will try to explain these contents clearly. Through the study of these three articles, Can let everyone have a clear understanding of Combine’s implementation principle.

The main code for this article comes from CombineExt

Github.com/agelessman/…

combination

/// request data
static func fetch(url: URL) -> AnyPublisher<Data.GithubAPIError> {
    return URLSession.shared.dataTaskPublisher(for: url)
        .handleEvents(receiveCompletion: { _ in
            networkActivityPublisher.send(false)
        }, receiveCancel: {
            networkActivityPublisher.send(false)
        }, receiveRequest: { _ in
            networkActivityPublisher.send(true)
        })
        .tryMap { data, response in
            guard let httpResponse = response as? HTTPURLResponse else {
                throw GithubAPIError.unknown
            }
            switch httpResponse.statusCode {
            case 401:
                throw GithubAPIError.apiError(reason: "Unauthorized")
            case 403:
                throw GithubAPIError.apiError(reason: "Resource forbidden")
            case 404:
                throw GithubAPIError.apiError(reason: "Resource not found")
            case 405..<500:
                throw GithubAPIError.apiError(reason: "client error")
            case 500..<600:
                throw GithubAPIError.apiError(reason: "server error")
            default: break
            }

            return data
        }
        .mapError { error in
            if let err = error as? GithubAPIError {
                return err
            }
            if let err = error as? URLError {
                return GithubAPIError.networkError(from: err)
            }
            return GithubAPIError.unknown
        }
        .eraseToAnyPublisher()
}
Copy the code

The code above is an example of using Operator combinations. We didn’t customize AnyPublisher, but we ended up with an AnyPublisher

type.
,>

If you think about it, this way of realizing a function through composition is no different from custom Publisher. In other words, use this combination as much as possible to solve problems in development.

Customize Create (a new Publisher)

In this section we will demonstrate a complete example of a custom Publisher. Create is used in a manner similar to Record in Combine. This is a perfect example with the same functionality but you can see the implementation code. “Record” can be used as follows:

let recordPublisher = Record<String.MyCustomError> { recording in
    recording.receive("You")
    recording.receive("Good")
    recording.receive("吗")
    recording.receive(completion: Subscribers.Completion.finished)
}
Copy the code

The use of Create is as follows:

AnyPublisher<String.MyError>.create { subscriber in
  // Values
  subscriber.send("Hello")
  subscriber.send("World!")
  
  // Complete with error
  subscriber.send(completion: .failure(MyError.someError))
  
  // Or, complete successfully
  subscriber.send(completion: .finished)

  return AnyCancellable { 
    // Perform cleanup}}Copy the code

** As we learn new techniques, we learn to try to infer the design ideas of the code by observing how it is used. ** Let’s try to analyze the idea of the above code:

  • AnyPublisher<String, MyError>.createShow thatcreateisAnyPublisherIs a static function that takes a closure as an argument
  • Arguments to the closuresubscriberThere are at least two ways:send()andsend(completion:), one for sending data. One is used to send completion events
  • The closure backAnyCancellable
  • Important design idea: Use closures to encapsulate the process of sending data, and when a subscription is received, the closure is invoked to trigger the process

Next, we further analyze the concrete implementation process of the above code from the code level.

// MARK: - Publisher
@available(OSX 10.15.iOS 13.0.tvOS 13.0.watchOS 6.0.*)
public extension Publishers {
    /// A publisher which accepts a closure with a subscriber argument,
    /// to which you can dynamically send value or completion events.
    ///
    /// You should return a `Cancelable`-conforming object from the closure in
    /// which you can define any cleanup actions to execute when the pubilsher
    /// completes or the subscription to the publisher is canceled.
    struct Create<Output.Failure: Swift.Error> :Publisher {
        public typealias SubscriberHandler = (Subscriber) - >Cancellable
        private let factory: SubscriberHandler

        /// Initialize the publisher with a provided factory
        ///
        /// - parameter factory: A factory with a closure to which you can
        /// dynamically push value or completion events
        public init(factory: @escaping SubscriberHandler) {
            self.factory = factory
        }

        public func receive<S: Combine.Subscriber> (subscriber: S) where Failure = = S.Failure.Output = = S.Input {
            subscriber.receive(subscription: Subscription(factory: factory, downstream: subscriber))
        }
    }
}
Copy the code

When we want to customize Publisher, we should consider the following two points from a macro perspective:

  • Write aPublisherstheextension, convenient export type, such as the above code, export type isPublishers.Create
  • implementationPublisherThe protocol, the core of which is to send a subscriberSubscriptiontheSubscriptionIs the most core content, we will explain in detail below

In the above code, Create is initialized using a closure (SubscriberHandler = (Subscriber) -> Cancellable). We will examine the closure and see that the closure parameter is the Subscriber type. Let’s look at its code:

public extension Publishers.Create {
    struct Subscriber {
        private let onValue: (Output) - >Void
        private let onCompletion: (Subscribers.Completion<Failure- > >)Void

        fileprivate init(onValue: @escaping (Output) - >Void.onCompletion: @escaping (Subscribers.Completion<Failure- > >)Void) {
            self.onValue = onValue
            self.onCompletion = onCompletion
        }

        /// Sends a value to the subscriber.
        ///
        /// - Parameter value: The value to send.
        public func send(_ input: Output) {
            onValue(input)
        }

        /// Sends a completion event to the subscriber.
        ///
        /// - Parameter completion: A `Completion` instance which indicates whether publishing has finished normally or failed with an error.
        public func send(completion: Subscribers.Completion<Failure>) {
            onCompletion(completion)
        }
    }
}
Copy the code

The above code contains the following information:

  • Subscriber itself is a struct
  • The initialization method needs to pass in two private closures:onValueandonCompletionCannot be called externally
  • When callingsubscriber.send("Hello")Is essentially calledonValue
  • When callingsubscriber.send(completion: .finished)Is essentially calledonCompletion

To sum up: Subscriber exposes two function interfaces externally, and the closure will be triggered after it is called. As for the operations in the closure, we will talk about them in the following article.

Now that’s the point, we need to customizeSubscriptionData processing logic is in it, it plays a link between the preceding and the following core function.

private extension Publishers.Create {
    class Subscription<Downstream: Combine.Subscriber> :Combine.Subscription where Output= =Downstream.Input.Failure= =Downstream.Failure {
        private let buffer: DemandBuffer<Downstream>
        private var cancelable: Cancellable?

        init(factory: @escaping SubscriberHandler.downstream: Downstream) {
            self.buffer = DemandBuffer(subscriber: downstream)

            let subscriber = Subscriber(onValue: { [weak self] in _ = self?.buffer.buffer(value: $0) },
                                        onCompletion: { [weak self] in self?.buffer.complete(completion: $0)})self.cancelable = factory(subscriber)
        }

        func request(_ demand: Subscribers.Demand) {
            _ = self.buffer.demand(demand)
        }

        func cancel(a) {
            self.cancelable?.cancel()
        }
    }
}
Copy the code

Custom Subscription, which implements the Combination. Subscription protocol, serves two purposes:

  • throughfunc request(_ demand: Subscribers.Demand)Receive data requests from subscribers
  • throughfunc cancel()Receive cancellation requests from subscribers

A closer look at the code shows that the Subscription Output and Failure types must match those of the downstream subscribers, and a private let buffer is introduced: The DemandBuffer

property serves as the cache unit for the data.

self.buffer = DemandBuffer(subscriber: downstream)
Copy the code

The above line of code shows that DemandBuffer is initialized with downstream, and remember that downstream is a subscriber, so you just need to understand that DemandBuffer has a subscriber.

let subscriber = Subscriber(onValue: { [weak self] in _ = self?.buffer.buffer(value: $0) },
                                        onCompletion: { [weak self] in self?.buffer.complete(completion: $0)})Copy the code

This line of code is connected to Subscriber, whose onValue closure is bound to self? .buffer.buffer(value: $0), that is, when subscriber.send(“Hello”) is called, the actual operation is self? .buffer.buffer(value: “Hello”), in the same way that its onCompletion closure binds self? .buffer.complete(completion: $0), which is the actual operation after calling subscriber. Send (completion:.finished), self?.buffer.complete(completion: Finished).

self.cancelable = factory(subscriber)
Copy the code

This line of code isCreateInitializes where the argument closure is actually called.

You can see that already, right? Custom Subscription is at the heart of data management, and we need to understand the implementation of DemandBuffer, which has been at the heart of the last two articles.

Finally, we analyze a wave of DemandBuffer source:

class DemandBuffer<S: Subscriber> {
    private let lock = NSRecursiveLock(a)private var buffer = [S.Input] ()private let subscriber: S
    private var completion: Subscribers.Completion<S.Failure>?
    private var demandState = Demand(a)init(subscriber: S) {
        self.subscriber = subscriber
    }
  
    func buffer(value: S.Input) -> Subscribers.Demand {
        precondition(self.completion = = nil."How could a completed publisher sent values? ! Beats me 🤷 ‍ came ️")

        switch demandState.requested {
        case .unlimited:
            return subscriber.receive(value)
        default:
            buffer.append(value)
            return flush()
        }
    }


    func complete(completion: Subscribers.Completion<S.Failure>) {
        precondition(self.completion = = nil."Completion have already occured, which is quite awkward 🥺")

        self.completion = completion
        _ = flush()
    }


    func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand {
        flush(adding: demand)
    }

    private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand {
        lock.lock()
        defer { lock.unlock() }

        if let newDemand = newDemand {
            demandState.requested + = newDemand
        }

        // If buffer isn't ready for flushing, return immediately
        guard demandState.requested > 0 || newDemand = = Subscribers.Demand.none else { return .none }

        while !buffer.isEmpty && demandState.processed < demandState.requested {
            demandState.requested + = subscriber.receive(buffer.remove(at: 0))
            demandState.processed + = 1
        }

        if let completion = completion {
            // Completion event was already sent
            buffer = []
            demandState = .init(a)self.completion = nil
            subscriber.receive(completion: completion)
            return .none
        }

        let sentDemand = demandState.requested - demandState.sent
        demandState.sent + = sentDemand
        return sentDemand
    }
}
Copy the code

The above code looks long, but it doesn’t contain much. We can divide it into three parts:

  • Initialize the
  • External interface
  • Internal core logic

Let’s first look at the initialization code:

private let lock = NSRecursiveLock(a)private var buffer = [S.Input] ()private let subscriber: S
private var completion: Subscribers.Completion<S.Failure>?
private var demandState = Demand(a)init(subscriber: S) {
    self.subscriber = subscriber
}
Copy the code

As you can see from the let Lock = NSRecursiveLock() attribute, it adds security to data operations, which is necessary because Pipline specializes in handling asynchronous data flows. In normal development, we can also use this lock to ensure secure manipulation of data, as follows:

lock.lock()
defer { lock.unlock() }
Copy the code

As can be seen from the attribute var buffer = [s.input](), it internally stores data in a single data type, which is the input type of Subscriber.

As you can see from the let subscriber: S attribute, it holds subscriber, which will be used later in the code.

Var completion: Subscribers.Completion< s.call > The main purpose of this property is that the array buffer cannot hold this type of data, so it needs to be saved extra.

Var demandState = Demand() represents the current request state, which is an independent struct, source code is as follows:

private extension DemandBuffer {
    /// A model that tracks the downstream's
    /// accumulated demand state
    struct Demand {
        var processed: Subscribers.Demand = .none
        var requested: Subscribers.Demand = .none
        var sent: Subscribers.Demand = .none
    }
}
Copy the code

Subscribers Demand can be confusing here, but look at the definition of Subscribers.Demand:

/// A requested number of items, sent to a publisher from a subscriber through the subscription.
@frozen public struct Demand : Equatable.Comparable.Hashable.Codable.CustomStringConvertible {

    /// A request for as many values as the publisher can produce.
    public static let unlimited: Subscribers.Demand

    /// A request for no elements from the publisher.
    ///
    /// This is equivalent to `Demand.max(0)`.
    public static let none: Subscribers.Demand

    /// Creates a demand for the given maximum number of elements.
    ///
    /// The publisher is free to send fewer than the requested maximum number of elements.
    ///
    /// - Parameter value: The maximum number of elements. Providing a negative value for this parameter results in a fatal error.
    @inlinable public static func max(_ value: Int) -> Subscribers.Demand
}
Copy the code

Since it implements Equatable,Comparable, and Hashable, it can be treated as a number that can be manipulated and compared. None can be treated as 0,.unlimited can be treated as a maximum, and.max can be used to specify a value.

So what does this value do? Quite simply, it represents the maximum number of data a Subscriber can receive. We see the following printout:

receive subscription: (PassthroughSubject)
request unlimited
Copy the code

The Subscriber can receive any amount of data without limit. Var demandState = Demand() starts at.None.

Next, let’s look at the code in Part 2, which mainly exposes the interface for external calls:

func buffer(value: S.Input) -> Subscribers.Demand {
    precondition(self.completion = = nil."How could a completed publisher sent values? ! Beats me 🤷 ‍ came ️")

    switch demandState.requested {
    case .unlimited:
        return subscriber.receive(value)
    default:
        buffer.append(value)
        return flush()
    }
}


func complete(completion: Subscribers.Completion<S.Failure>) {
    precondition(self.completion = = nil."Completion have already occured, which is quite awkward 🥺")

    self.completion = completion
    _ = flush()
}


func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand {
    flush(adding: demand)
}
Copy the code
  • buffer()Part of the logic used to process the cached data, when receiving the external call, if the request is not limited, the data will be directly sent to Subscriber, otherwise, the data will be spliced into the array and then calledflush()
  • complete()Used to receive external completion events, save and callflush()
  • demand()Is a very fancy and important method whose purpose is to respond to a Demand request and then invoke itflush()To process the response,Essentially, the Demand request for the parameter in this function is the Subscriber request.

I find it a bit harder to use text as a tutorial, not as good as video, but take a look at the code for custom Subscription above:

func request(_ demand: Subscribers.Demand) {
    _ = self.buffer.demand(demand)
}
Copy the code

Subscription implements the Combine.Subscription protocol,func request(_ demand: Subscribers.Demand)It is the method in the protocol that is called by Subscriber.

If you have any questions, please leave a message. Let’s look at part 3:

private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand {
    lock.lock()
    defer { lock.unlock() }

    if let newDemand = newDemand {
        demandState.requested + = newDemand
    }

    // If buffer isn't ready for flushing, return immediately
    guard demandState.requested > 0 || newDemand = = Subscribers.Demand.none else { return .none }

    while !buffer.isEmpty && demandState.processed < demandState.requested {
        demandState.requested + = subscriber.receive(buffer.remove(at: 0))
        demandState.processed + = 1
    }

    if let completion = completion {
        // Completion event was already sent
        buffer = []
        demandState = .init(a)self.completion = nil
        subscriber.receive(completion: completion)
        return .none
    }

    let sentDemand = demandState.requested - demandState.sent
    demandState.sent + = sentDemand
    return sentDemand
}
Copy the code

Receive (buffer.remove(at: 0)). Let’s review the Subscription process again:

First, we initialize:

init(factory: @escaping SubscriberHandler.downstream: Downstream) {
    self.buffer = DemandBuffer(subscriber: downstream)

    let subscriber = Subscriber(onValue: { [weak self] in _ = self?.buffer.buffer(value: $0) },
                                onCompletion: { [weak self] in self?.buffer.complete(completion: $0)})self.cancelable = factory(subscriber)
}
Copy the code

After initialization, the. Flush () function in buffer does not transparently transfer the data to Subscriber, but calls the following code when the Subscriber’s request is received:

func request(_ demand: Subscribers.Demand) {
    _ = self.buffer.demand(demand)
}
Copy the code

Then the.demand() function in buffer is called, and the.demand() function is also called.flush(), and finally the number group is traversed and all data is passed to Subscriber transparently.

If you are a little confused, you can only read the code and the explanation above, and then taste it.

Finally, let’s go back to where we started and look at the following code:

public extension AnyPublisher {
    
    init(_ factory: @escaping Publishers.Create<Output.Failure>.SubscriberHandler) {
        self = Publishers.Create(factory: factory).eraseToAnyPublisher()
    }

    static func create(_ factory: @escaping Publishers.Create<Output.Failure>.SubscriberHandler)
        -> AnyPublisher<Output.Failure> {
        AnyPublisher(factory)
    }
}
Copy the code

conclusion

The key to custom Publisher is custom Subscription, which in turn manages data through DemandBuffer. The idea behind DemandBuffer is to put data into an array and then use func Demand (_ Demand: Subscribers.Demand) -> Subscribers.Demand releases data.