sequence

Those of you who have learned Swift know that RxSwift is like Chou’s Mojito

Start small

Moreover it

Why learn RxSwift?

Camille said

Elegant and timeless


An RxSwift for my love, please

RxSwift is the Swift version of the Rx series, compared to the OC version of ReactiveCocoa

Functional Responsive Programming (FRP)

What is functional responsive programming?


Functional:

The core idea of functional programming is stateless. The function itself doesn’t care about the input

It simply maps the input values to the output values inside the function, i.e. input => output

Such as:

func changeNum(input: Int) -> Int {
    returnInput * 3} // changeNum does not change the input, but outputs the calculated valueCopy the code

Stateless means that the function itself does not change the state of the outside, nor does it change the state of the input value


Q: multiply all elements of array [1,2,3,4] by 2 to return the new array

So the general approach might be:

Imperative programming

letArray = [1,2,3,4] var newArray: [Int] = []for item inArray {var num = item num *= 2 newarray.append (num)} // [2,4,6,8]Copy the code

What does imperative programming tend to do, how do I get everything to be equal to 2, so this involves a mutable array called newArray

If newArray is changed somewhere at any time, it will have an unexpected effect


So what does functional programming do?

letArray = [1, 2, 3, 4]let newArray = array.compactMap {
    return $0* 2} // [2,4,6,8]Copy the code

The ideas of functional and declarative programming are largely the same

They’re all about what to do, not how to do it?


Functional programming: what do you prefer to do, save the tedious process, a more secure, intuitive, understandable way of programming


Response:

An asynchronous programming approach to abstract event streams

For example, a user clicks a button, sends a network request, and displays the result on a label

The network request is asynchronous

To display on the label, get the callback of the network request and display it further

Forming a flow of events is written as


Subscribe (onNext: {// hit homeapi.getTitle ().asObservable() // initiate network request.map {(title) -> Stringin// Get the callback to map ("I am \" (title)) } .bind(to: (titleLabel? .rx.text)!) Disposed (by: rx.disposebag) // Manage life cycle})Copy the code


Such a complex operation, and includes the flow of asynchronous operation

Is it easier to understand after the changes to RxSwift? Event distribution and maintenance can be done in one place

Greatly improved code readability, as well as maintenance costs

The entire event flow is as follows:

We don’t care about every element in the sequence, whether it’s asynchronous or synchronous, whether it’s thread safe

It is only when we click the button and send the signal that the function body within the code block is executed and the whole sequence of events is generated

This makes us more business logic oriented

Not the details of each step


So how exactly does RxSwift do it?

You’ll find out after mojito


I love the furrowed brow when I read it

For beginners

RxSwift’s learning curve is indeed steep. What does it say about protocol oriented programming

The process is obscure

But the road is long and blocked

A true master always has the heart of an apprentice


rx

In the world of RxSwift, everything is RX, and sequences are everywhere.

Sounds like iOS everything

Yes, that’s right. Let’s look at the definition of RX Reactive, and the first thing we see is a protocol called Reactivecomble

public protocol ReactiveCompatible {
    # Association protocol
    associatedtype ReactiveBase 

    # rx is Reactive and passes ReactiveBase
    static var rx: Reactive<ReactiveBase>.Type { get set }

    var rx: Reactive<ReactiveBase> { get set}}Copy the code


Reactive also extends the ReactiveCompatible protocol, where, by calling Rx, returns

Reactive Type or instance of Reactive

extension ReactiveCompatible {
    # Reactive type
    public static var rx: Reactive<Self>.Type {
        get {  return Reactive<Self>.self }
    }
    # Reactive instance
    public var rx: Reactive<Self> {
        get {  return Reactive(self) }
    }
}
Copy the code


Look at the implementation of Reactive, which is a structure that contains the parameter generic Base

public struct Reactive<Base> {
    public let base: Base
    Set the initial caller to Base on Reactive
    public init(_ base: Base) {
        self.base = base
    }
}
Copy the code


Tap the tap button, button.rx. Tap, of type UIButton type, and set the instance of UIButton to base

So to achieve rX for everything, it’s a simple step

extension NSObject: ReactiveCompatible { }
Copy the code

This allows all objects that inherit from NSObjce to follow the Reactivecomble protocol, that is, everything is RX


Observable

Observable means Observable, Observable sequence, what is a sequence?

What I understand is a signal that has the ability to send out an event


Hungry -> Eat

Being hungry can serve as an observable sequence, and when our brain perceives hunger, it can perform an action to eat


TextField input -> display

The TextField input operation can be used as a sequence to listen for input


The following

I started debugging mojito

Take a look at the subscription process:

# to create
let observable = Observable<String>.create { (observe) -> Disposable in
    # to send
    observe.onNext("mojito")
    return Disposables.create()
}
# subscription
observable.subscribe(onNext: { text in
    print(text)
}).disposed(by: rx.disposeBag)

// print "mojito"
Copy the code





Observable observes a sequence

  • step1
ObservableType is an extension of ObservableType
public class Observable<Element> : ObservableType {
    Resource reference count +1
    init() {
        _ = Resources.incrementTotal()
    }
     # Provide the ability to be subscribed, implemented by subclasses
    public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        rxAbstractMethod()
    }
     Turn an Observable class into an Observable instance
    public func asObservable() -> Observable<Element> {
        return self
    }
     The resource reference count is 1
    deinit {
        _ = Resources.decrementTotal()
    }
}
Copy the code


Observable

: ObservableType (ObservableType); ObservableType (ObservableType)

Enter the ObservableType

  • step2
# ObservableType agreement, in ObservableConvertibleType inheritance
public protocol ObservableType: ObservableConvertibleType {
    func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}

# ObservableType extension
extension ObservableType {
    ObservableType provides a method to turn ObservableType compliant objects into Observable entities
    public func asObservable() -> Observable<Element> {
        return Observable.create { o in
            return self.subscribe(o)
        }
    }
}
Copy the code

I still don’t see the subscription method here

Also found that his father is a protocol, and grandpa ObservableConvertibleType


Hold a skeptical, you point into the ObservableConvertibleType again

  • step3
# is also a protocol
public protocol ObservableConvertibleType {

    associatedtype Element
    typealias E = Element
    
    Observable # defines a method that returns an Observable sequence of types
    func asObservable() -> Observable<Element>
}

Copy the code


damn

Since this road is not going anywhere, we have to stay

Where to fall

Where am I lying

To achieve a sequence of everything, we need to find a way to transform all events into a sequence. AsObservable () is the essence of RxSwift


Observable.create()

When I click on Creat, it becomes clear that I created it using an ObservableType extension, which also demonstrates the benefits of OOP and scalability

ObservableType is like a chain company called ObservableType

It can open a branch anywhere

Realize the business of their own company

  • step4
# extension of ObservableType
extension ObservableType {
    public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        # Return an anonymous observation sequence, passing in the SUBSCRIBE escape closure
        return AnonymousObservable(subscribe)
    }
}
Copy the code

Click on AnonymousObservable to enter

  • step5
Private methods cannot be shared by the outside world
# AnonymousObservable inherits from Producer
final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    Define closure properties
    let _subscribeHandler: SubscribeHandler
    Save closures that are passed in from the outside world
    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }
    # Override the run method provided by its parent Producer
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        Initialize the anonymous pipe, passing in a subscriber
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}
Copy the code


There’s Producer again. Click Producer

  • step6
# Producer also inherits from Observable
class Producer<Element> : Observable<Element> {
    override init() {
        super.init()
    }

If CurrentThreadScheduler specifies a thread, run will be executed in that thread
Otherwise, run will be executed in the current thread
# SinkDisposer Instance Disposer, used to manage the release of resources
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
         if! CurrentThreadScheduler.isScheduleRequired { // The returned disposable needs to release all references once it was disposed.let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
                return disposer
            }
        }
  }
Abstract method, subclass to implement, namely anonymous sequence AnonymousObservable
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    rxAbstractMethod()
    }
}
Copy the code

By this point, you and I are already tipsy

You ask me what is a sequence?

I pointed in the direction of the sea


For now, I can only draw a picture and continue to see


summary

  • summary

    • We callParent agreementCreat method to generate anonymous observation sequence, i.eProducerA subclass ofAnonymousObservable
    • AnonymousObservableSaves closures passed in from the outside world
    • Responsible for resource management, reference counting isObservableAbstract classes that do not implement methods
    • ProducerThe subscribe class implements the external subscribe method and schedules thread scheduling
    • specificrunBy theAnonymousObservableImplementation, parent classProducerIs not responsible for


Ok, keep going down

The subscribe (onNext:) subscription

Click subscribe to enter, and you can see the extension of ObservableType, which provides two methods: subscribe. On and subscribe. OnNext

Subscribe. On is omitted

  • step7
extension ObservableType {
    ...
    
    public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            ....
            Create an anonymous subscriber AnonymousObserver
            Save the execution closure passed in from the outside world
            let observer = AnonymousObserver<Element> { event in
                switch event {
                case .next(letvalue): onNext? (value)case .error(let error):
                    if let onError = onError {
                        onError(error)
                    } else { Hooks.defaultErrorHandler(callStack, error) }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
}
Copy the code

The closure that the outside world needs to execute, print(text) in this case, generates an instance of AnonymousObserver and is passed in

self.asObservable().subscribe(observer)

That is, the AnonymousObserver instance calls SUBSCRIBE via Producer

The run method is then called by subclass AnonymousObservable, the sequence instance


Coming to the article, the run method of step 5 is as follows

Pass into AnonymousObservableSink the closures that the outside world needs to execute, along with the primitives generated by the resource destruction instance
Create the sink channel instance and execute run
Give subscription the instance generated after run and return subscription and sink

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
    let subscription = sink.run(self)
    return (sink: sink, subscription: subscription)
}
Copy the code


Enter the AnonymousObservableSink

  • step8
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias Element = Observer.Element 
    typealias Parent = AnonymousObservable<Element>
    
    The channel AnonymousObservableSink holds these two attributes by calling the initialization method of Sink, passing in observer and cancel
    override init(observer: Observer, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
    ### familiar things there
    # Here you see _subscribeHandler, which is the signal emitted and the closure saved
    
    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
}

Copy the code

When we get here, we’ll find the Sink pipeline which is very important

It holds the

Emits the closure of the sequence and executes the closure of the sequence

AnyObserver(self) here is intended to be compatible with the type of the closure passed in; in this article, it corresponds to StringCopy the code


So the train sequence starts going, but how do you respond? Where do we go next?

So that’s AnyObserver(self), what does it do, go into AnyObserver, init


 public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
    self.observer = observer.on
}
Copy the code

You’ll notice that self.observer here keeps its own on method

So I saved a function

The forwardOn of Sink will be called after the on of step8


  • step9
# the superclass Sink
class Sink<Observer: ObserverType> : Disposable {
    final func forwardOn(_ event: Event<Observer.Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        # the subscriber
        self._observer.on(event)
    }
}
Copy the code


In the parent forwardOn, the subscriber executes the on event

But the subscriber AnonymousObserver class has no on methods, only onCore

So look for ObserverBase in the parent class of AnonymousObserver

class ObserverBase<Element> : Disposable, ObserverType {
    private let _isStopped = AtomicInt(0)

    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }
    # subclass implementation
    func onCore(_ event: Event<Element>) {
        rxAbstractMethod()
    }
}
Copy the code

Finally, AnonymousObserver calls its own onCore to execute the eventHandler closure

To this

The whole execution process is over

Recycling will be covered in a future article


The first mojito experience is over

summary

  • summary
    • throughAnonymousObservableSave the observable sequence
    • throughAnonymousObserveSave the execution closure
    • The outside world starts to subscribe, and the Producer schedules the thread to subscribe
    • generateSinkDisposerAs well asobserverExamples of tuples
    • Inject the ancestorSinkThe pipe
    • SinkHandle events, send signals, respond to sequences
    • Resource recovery


A simple flow chart is shown below


And the way I write it, it’s easy as magic

With RxSwift, daily development is fun, for example

  • Listen to the tableView scroll:
 tableView.rx.contentOffset.subscribe(onNext: { contentOffset inDisposed (by: rx.disposebag)Copy the code


  • Listen for textField input
 textField.rx.text.skip(1).subscribe(onNext: { (text) in
    print("输入的是 : \(text!)")
 })
 .disposed(by: rx.disposeBag)
Copy the code


  • Click on the button
 self.messageBtn.rx.tap.subscribe(onNext: { in
    Navigator.push("")
 })
 .disposed(by: rx.disposeBag)
Copy the code


  • The tableView is bound to the datasource proxy
  Import RxDataSources
  dataSource = RxTableViewSectionedReloadDataSource(configureCell: { (_, tab, indexPath, item) -> UITableViewCell in
    let cell = tab.dequeue(Reusable.settingCell, for: indexPath)
    cell.bind(to: item)
    return cell
  })
  
  # or
  let items = Observable.just([
    "Just"."Relay"."From"."Driver"."merge"
  ])
  
 items.bind(to: tableView.rx.items) { (tableView,_,element) in
    let cell = self.tableView.dequeue(TestV.normalCell)
    cell?.textLabel?.text = element
    return cell!
  }
  .disposed(by: rx.disposeBag)
Copy the code


  • TableView hits proxy
 tableView.rx.itemSelected.subscribe(onNext: { indexPath in
   /// doSomething
 })
 .disposed(by: rx.disposeBag)
Copy the code


  • Coordinate HandyJSON to Model
extension Response {
    func mapHandyJsonModel<T: HandyJSON>(_ type: T.Type) -> T {
        let jsonString = String.init(data: data, encoding: .utf8)
        if let modelT = JSONDeserializer<T>.deserializeFrom(json: jsonString) {
            return modelT
        }
        return JSONDeserializer<T>.deserializeFrom(json: "{\" MSG \":\" parsing error \"}")!
    }
}

extension ObservableType whereElement == Response {public func mapHandyJsonModel<T: HandyJSON>(_)type: T.Type) -> Observable<T> {
        return flatMap { response -> Observable<T> in
            return Observable.just(response.mapHandyJsonModel(T.self))
        }
    }
}

# with Moya
static func getTopList() -> Observable<HomeResponseModel> {
    return HomeApiProvider.rx.request(.Top).asObservable().mapHandyJsonModel(HomeResponseModel.self)
}
Copy the code


  • Multiple request merge
 Observable.zip(HomeApi.getTopList(), HomeApi.getRecommondList()).subscribe(onNext: { topResponse, recommodResponse inDisposed (by: self.rx.disposebag)Copy the code


A little bit of simple usage

It will be updated gradually


The world is free of your suffering because of me

Once familiar with RxSwift, it makes our code simple and elegant

It’s protocol oriented, we’re business oriented

The RxSwift needs to be savoured

Listening to Mojito once is certainly not enough

Don’t say the

I went to listen to music


RxSwift Chinese website

If you haven’t had mojito, start here