DevUI is a team with both design and engineering perspectives, serving huawei DevCloud platform and huawei internal background systems, as well as designers and front-end engineers. Add devui Helper (Devui-Official) DevUIHelper plugin: Devuihelper-lsp (welcome Star)

The introduction

I haven’t talked about Scheduler until now, so this chapter will start to explain the design and basic structure of Scheduler. RxJS exists to handle asynchronous IO, and the apis that asynchronous IO contains must be further encapsulated to be used by asynchronous operations in RxJS.

As you can see, it mainly designs these basic structures based on the asynchronous capabilities that JS can provide.

  • AsyncScheduler: Asynchronous scheduler, usedsetIntervalThe implementation.
  • QueueScheduler: queue asynchronous scheduler, inheritedAsyncScheduler, butQueueActionIs a chain structure that allows scheduling to take the form of iterators.
  • Use AnimationFrameScheduler:reqeustAnimationFrameThe frame scheduler is implemented.
  • Use AsapScheduler:Promise.resolve().then()Implementation of the microtask scheduler.

SchedulerLike, Scheduler & Action

First, SchedulerLike provides the following two interfaces.

Export interface SchedulerLike {// Now (): number; Schedule <T>(work: (this: SchedulerAction<T>, state? : T) => void, delay? : number, state? : T ): Subscription; }Copy the code

Scheduler implements these interfaces.

Public static now: () => number = () => date.now (); constructor( private SchedulerAction: typeof Action, now: () => number = Scheduler.now ) { this.now = now; } public now: () => number; Schedule public schedule<T>(work: (this: SchedulerAction<T>, state? : T) => void, delay: number = 0, state? : T): Subscription { return new this.SchedulerAction<T>(this, work).schedule(state, delay); }}Copy the code

The Scheduler defines how subsequent schedulers that inherit it are created, passing in an Action factory so that specific actions can be constructed internally. Actions inherit Subscription, which means that an Action is, in effect, a subscriber.

export class Action<T> extends Subscription { constructor(scheduler: Scheduler, work: (this: SchedulerAction<T>, state? : T) => void) { super(); } // Action start schedule public schedule(state? : T, delay: number = 0): Subscription { return this; }}Copy the code

The design above is a design pattern called Template Method, which effectively constrains the implementation of subsequent different Schedulers.

Define the skeleton of an algorithm in an operation, deferring some steps to subclasses. It allows subclasses to redefine specific steps of an algorithm without changing its structure.

Asynchronous scheduler

Let’s take a look at Scheduler’s subclass AsyncScheduler, which all remaining schedulers inherit. Before we rush into source code analysis, we need to understand how the scheduler encapsulates asynchronous apis in order to understand how it works.

First, the scheduler itself is also designed based on the observer mode, but it is independent of the Rxjs Observable. In general, AsyncScheduler is called like this.

const scheduler = AsyncScheduler(AsyncAction); const subscription = async.schedule(function (counter) { console.log(counter); // This binds AsyncAction this.schedule(counter + 1, 1000); }, 1000, 1); // subscription.unsubscribe();Copy the code

Its call stack looks like this.

AsyncScheduler. Schedule AsyncAction. Schedule AsyncAction. RequestAsyncId listOnTimeout / / native processTimers / / native events AsyncScheduler.flush AsyncAction.execute AsyncAction._execute AsyncAction.workCopy the code

AsyncAction.schedule

In the Schedule method of AsyncScheduler, it first constructs AsyncAction and then calls its schedule. In this method, you are actually updating the internal state of the Action, so the focus here is on how the Schedule triggers the asynchronous API.

class AsyncAction<T> extends Action<T> { constructor( protected scheduler: AsyncScheduler, protected work: (this: SchedulerAction<T>, state? : T) => void ) { super(scheduler, work); } public schedule(state? : T, delay: number = 0): Subscription { if (this.closed) { return this; } this.state = state; const id = this.id; const scheduler = this.scheduler; If (id! = null) { this.id = this.recycleAsyncId(scheduler, id, delay); } this.pending = true; this.delay = delay; / / reconfigure the asynchronous API. This id = this. Id | | this. RequestAsyncId (scheduler, enclosing id, delay); return this; }}Copy the code

As you can see, the callback function passed in from the Scheduler ends up being held by the Action, so the work that the call stack ends up executing is actually the callback function.

AsyncAction.requestAsyncId

RequestAsyncId is a method that calls the asynchronous API that eventually triggers setInterval when AsyncAction is performed. In fact, according to the design of Template Method, all actions that inherit AsyncAction will implement the corresponding asynchronous API through this Method.

AsyncAction uses setInterval instead of setTimeout.

Actions only execute once by default, unless rescheduled from within the scheduled callback. This allows us to implement single and repeat actions via the same code path, without adding API surface area, as well as mimic traditional recursion but across asynchronous boundaries. However, JS runtimes and timers distinguish between intervals achieved by serial setTimeout calls vs. a single setInterval call. An interval of serial setTimeout calls can be individufenally delayed, which delays scheduling the next setTimeout, and so on. setInterval attempts to guarantee the interval callback will be invoked more precisely to the interval period, regardless of load. Therefore, we use setInterval to schedule single and repeat actions. If the action reschedules itself with the same delay, the interval is not canceled. If the action doesn’t reschedule, or reschedules with a different delay, the interval will be canceled after scheduled callback execution.

An Action is executed only once by default unless it is rescheduled in a scheduled callback. This approach allows us to schedule single or repeated Actions through uniform code without adding apis and mimics traditional recursive extension asyncio. However, the JS runtime or timer gets the called timer through a serial setTimout or a single setInterval, respectively. Serial setTimout timers can be delayed separately, which will delay c’s next setTimout schedule, and so on. SetInterval, on the other hand, tries to ensure that the callbacks of each timer are properly timed, regardless of the load on which the program is running. Therefore, we use setInterval to schedule single or repeated Actions so that the current timer will not be cancelled if the Actions themselves are scheduled with the same delay. If the action is not rescheduled or rescheduled with a different delay, the change timer will be cancelled after the scheduled callback is executed.

class AsyncAction<T> extends Action<T> { protected requestAsyncId( scheduler: AsyncScheduler, id? : any, delay: number = 0): any {// Bind scheduler and pass the current AsyncAction as an argument. return setInterval(scheduler.flush.bind(scheduler, this), delay); }}Copy the code

AsyncScheduler.flush

So, in AsyncScheduler, the new flush method actually serves setInterval as a callback function of the asynchronous API. The main steps are as follows.

• If there is a running Action, it saves the Action used to call it. If no Action is running, it will execute all action.execute in the call queue.

export class AsyncScheduler extends Scheduler { public flush(action: AsyncAction<any>): void { const {actions} = this; If (this.active) {// Uses a queue to hold all input Actions actions.push(action); return; } let error: any; this.active = true; // The default action is also a member of the queue. do { if (error = action.execute(action.state, action.delay)) { break; } } while (action = actions.shift()); this.active = false; If (error) {// Note that the unsubscribe will not be repeated here, because the action executing the error will be dequeued before the loop is executed. while (action = actions.shift()) { action.unsubscribe(); } throw error; }}}Copy the code

AsyncAction.execute

Flush calls the execute method of the action. This method also gets execution results by handling the internal state of the action, which calls the _execute internal method, which calls asyncAction.work and handles its exceptions.

class AsyncAction<T> extends Action<T> { public execute(state: T, delay: number): any { if (this.closed) { return new Error('executing a cancelled action'); } this.pending = false; Const error = this._execute(state, delay); if (error) { return error; } else if (this.pending === false && this.id ! = null) { this.id = this.recycleAsyncId(this.scheduler, this.id, null); } } protected _execute(state: T, delay: number): any { let errored: boolean = false; let errorValue: any = undefined; try { // work this.work(state); } catch (e) { errored = true; errorValue = !! e && e || new Error(e); } if (errored) { this.unsubscribe(); return errorValue; }}}Copy the code

AsyncAction.recycleAsyncId

In the analysis of action. schedule, the source code is referenced to the internal notes, there is a very important sentence, that is, “if the Action schedule itself with the same delay, then the current timer will not be cancelled”, so the recycleAsyncId method is needed to deal with this situation.

class AsyncAction<T> extends Action<T> { protected recycleAsyncId(scheduler: AsyncScheduler, id: any, delay: Number = 0): any {// this.delay === delay if (delay ! == null && this.delay === delay && this.pending === false) { return id; } // Cancel the current timer clearInterval(id); return undefined; }}Copy the code

Using the Template Method

AsyncScheduler has laid all the groundwork, so it can be used directly, or it can inherit and rewrite some related interfaces to replace the corresponding asynchronous API.

Queue scheduler

The queue scheduler determines whether to use synchronous or setInterval scheduling based on the delay passed in by the caller.

QueueScheduler simply inherits AsyncScheduler. QueueAction overrides schedule, Execute, and requestAsyncId to implement this function.

export class QueueAction<T> extends AsyncAction<T> { public schedule(state? : T, delay: number = 0): Subscription {if (delay > 0) {return super.schedule(state, delay); } this.delay = delay; this.state = state; Scheduler. flush(this); return this; } public execute(state: T, delay: number): Any {/ / according to the delay of the incoming judge whether direct execution work (synchronous) return (delay > 0 | | this. Closed)? super.execute(state, delay) : this._execute(state, delay) ; } protected requestAsyncId(scheduler: QueueScheduler, id? : any, delay: number = 0): any {if ((delay! == null && delay > 0) || (delay === null && this.delay > 0)) { return super.requestAsyncId(scheduler, id, delay); Return scheduler. Flush (this); }}Copy the code

Frame scheduler and microtask scheduler

The frame scheduler uses requestAnimationFrame or setInterval based on the delay passed in by the caller, and the microtask scheduler uses promise.reslove ().then() or setInterval based on the delay.

The calls are so similar that they can be analyzed together.

Action

Both action methods rewrite requestAsyncId and recycleAsyncId, mainly to handle different asynchronous apis.

protected requestAsyncId(scheduler: AnimationFrameScheduler, id? : any, delay: number = 0): any { if (delay ! == null && delay > 0) { return super.requestAsyncId(scheduler, id, delay); } // Add the current action to the end of the actions queue scheduler.actions.push(this); if (! * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * null)); // AnimationFrameAction Const scheduled = requestAnimationFrame(scheduler.flush. Bind (scheduler, null)); scheduler.scheduled = scheduled; } return scheduler.scheduled; } protected recycleAsyncId(scheduler: AnimationFrameScheduler, id? : any, delay: number = 0): any { if ((delay ! == null && delay > 0) || (delay === null && this.delay > 0)) { return super.recycleAsyncId(scheduler, id, delay); } if (scheduler.actions.length === 0) { // AsapAction Immediate.clearImmediate(id); // AnimationFrameAction cancelAnimationFrame(id); scheduler.scheduled = undefined; } return undefined; }Copy the code

Scheduler

Their flush, similar to the implementation of AsyncScheduler’s Flush, polls the Actions queue to call action.execute, except that their flush needs to handle the following additional details.

• Action may be passed in empty. • Handle actions state. • Empty scheduled so that scheduler can make the next scheduled.

// export class AnimationFrameScheduler extends AsyncScheduler { export class AsapScheduler extends AsyncScheduler { public flush(action? : AsyncAction<any>): void { this.active = true; this.scheduled = undefined; const {actions} = this; let error: any; let index: number = -1; / / order cannot be here, because this action = action | | actions. The shift ()! ; let count: number = actions.length; do { if (error = action.execute(action.state, action.delay)) { break; } } while (++index < count && (action = actions.shift())); this.active = false; if (error) { while (++index < count && (action = actions.shift())) { action.unsubscribe(); } throw error; }}}Copy the code

Immediate

An interesting point here is that AsapScheduler is not implemented directly through promise.reslove ().then(). Instead, it is encapsulated as Immediate, with the setImmediate and clearImmediate apis, thus making the microtask call indistinctly to any other timing API.

The internal implementation uses a Map to store the number of microtasks that are currently being performed. Instead of saving the Promise directly, all it needs is a tag because the Promise is released when it completes.

let nextHandle = 1; const RESOLVED = (() => Promise.resolve())(); const activeHandles: { [key: number]: any } = {}; function findAndClearHandle(handle: number): boolean { if (handle in activeHandles) { delete activeHandles[handle]; return true; } return false; } export const Immediate = { setImmediate(cb: () => void): number { const handle = nextHandle++; activeHandles[handle] = true; RESOLVED.then(() => findAndClearHandle(handle) && cb()); return handle; }, clearImmediate(handle: number): void { findAndClearHandle(handle); }};Copy the code

conclusion

This paper analyzes the RxJS scheduler related to a series of content, through the packaging OF JS asynchronous API, scheduler to achieve the corresponding asynchronous function, enhance the CONTROL of RxJS asynchronous IO.

Join us

We are DevUI team, welcome to come here and build elegant and efficient human-computer design/research and development system with us. Email: [email protected].

ZCX (官 码 : Coder)

The original link: mp.weixin.qq.com/s/vG0aaQmDy…

Previous articles are recommended

RxJS source code parsing (5) — Operator III

Dark Mode and Thematic Development of Web Interfaces

“How to build a grayscale Publishing environment”