There are currently no built-in objects in JavaScript with the [symbol.asynciterator] property set by default, However, some core modules (Stream, Events) and some third-party NPM modules (mongodb) already support symbol.asynciterator properties in Node.js. This article also explores the use of asynchronous iterators in Node.js.

directory

  • Use asyncIterator in Events
    • Events. On () example 1
    • Example 2 events. On ()
    • Events.on () starts a Node.js server
    • The node.js source code implements the events.on asynchronous iterator
  • Use asyncIterator in Stream
    • Asynchronous iterators versus Readable
    • How will readable implement asyncIterator from node.js source code
    • Asynchronous iterators and Writeable
  • Use asyncIterator in MongoDB
    • Directing the cursor
    • MongoDB asynchronous iterator source code analysis
    • Using for await… Of traverses iterable cursor
    • Transfer cursor to writable flow

Use asyncIterator in Events

Events.on (Emitter, eventName) method, which returns an asynchronous iterator iterating over eventName events, has been added to Node.js v12.16.0.

Events. On () example 1

As in the following example, for await… The of loop just prints Hello and gets caught by a try catch when an error event is raised.

const { on, EventEmitter } = require('events');

(async() = > {const ee = new EventEmitter();
  const ite = on(ee, 'foo');

  process.nextTick(() = > {
    ee.emit('foo'.'Hello');
    ee.emit('error'.new Error('unknown mistake.'))
    ee.emit('foo'.'Node.js');
  });

  try {
    for await (const event of ite) {
      console.log(event); // prints ['Hello']}}catch (err) {
    console.log(err.message); // unknown mistake.
  }
})();
Copy the code

In the example above, if the EventEmitter object instance EE fires an error event, the error message is thrown and the loop exits, and all event listeners registered with the instance are removed.

Example 2 events. On ()

for await… The execution of the inner block of OF is synchronous and can only handle one event at a time, even if you have an immediate event to execute next. This is not recommended if concurrent execution is required, as will be explained when parsing the events.on() source code below.

As shown below, although the events are fired twice simultaneously in sequence, a 2s delay is simulated in the inner block, and the next event is also delayed.

const ite = on(ee, 'foo');

process.nextTick(() = > {
  ee.emit('foo'.'Hello');
  ee.emit('foo'.'Node.js');
  // ite.return(); // After the call can end for await... Of traversal
  // ite.throw() // The iterator object throws an error
});

try {
  for await (const event of ite) {
    console.log(event); // prints ['Hello'] ['Node.js']
    await sleep(2000); }}catch (err) {
  console.log(err.message);
}

// Unreachable here
console.log('This will not be executed');
Copy the code

The last sentence in the above example will not be executed. The iterator will always be iterated. Although both events emit, the iterator does not terminate. That is, the iterator terminates when some internal error occurs or when we manually call the iterable’s return() or throw() method.

Events.on () starts a Node.js server

In my previous article, “Hello Node.js” is written in a way you haven’t seen before. I wrote a code to start an HTTP server using events.on(). It can be explained very well.

The relevant code is as follows:

import { createServer as server } from 'http';
import { on } from 'events';
const ee = on(server().listen(3000), 'request');
for await (const [{ url }, res] of ee)
  if (url === '/hello')
    res.end('Hello Node.js! ');
  else
    res.end('OK! ');
Copy the code

The code above seems novel, and its core implementation is to return an asynchronous iterable of the createServer() request event using events.on(), followed by for await… Emit (‘request’, Req, Res) each time the client makes a request.

Since the execution of the internal block is synchronous, the next event processing can be executed only after the last event is completed. For an HTTP server that needs to consider concurrency, please do not use the above method!

The node.js source code implements the events.on asynchronous iterator

The Events module directly exports the on() method. The on() method mainly combines asynchronous iterators with instances of the EventEmitter class. The implementation is very clever. Once understood, you can implement an events.on() yourself.

  • The line {1} ObjectSetPrototypeOf sets a new prototype for an object that contains three methods: next(), return(), and throw().
  • Line {2} According to the asynchronous iterable protocol, an iterable must contain a symbol. asyncIterator property, which is a function with no arguments that returns the iterable itself, SymbolAsyncIterator in the code below.
  • Line {3} the new prototype is the second argument AsyncIteratorPrototype to ObjectSetPrototypeOf.
  • Line {4} eventTargetAgnosticAddListener is the registered event listeners, object or event trigger inside on emitter () method. The on (the name, the listener).
  • Line {5} addErrorHandlerIfEventEmitter judge if the event name is not equal to the ‘error’ registered an error event listener at the same time, the concrete implementation {4}.
  • Line {6} eventHandler() is the listener function registered above. The listener function is executed when an event is triggered. The combination with the asynchronous iterator is here. If you understand the implementation standard for asynchronous iterators, PromiseResolve(createIterResult(args, false)) is the standard definition of the return value of the next() method on an asynchronous iterator object.
  • Let’s continue to see where unconsumedPromises come from.
module.exports = EventEmitter;
module.exports.on = on;

function on(emitter, event) {
  const unconsumedEvents = [];
  const unconsumedPromises = [];
  const iterator = ObjectSetPrototypeOf({ / / {1}
    next(){... },return() {... },throw(err) { ... },
    [SymbolAsyncIterator]() { / / {2}
      return this;
    }
  }, AsyncIteratorPrototype); / / {3}
  eventTargetAgnosticAddListener(emitter, event, eventHandler); / / {4}
  if(event ! = ='error') {
    addErrorHandlerIfEventEmitter(emitter, errorHandler); / / {5}
  }
  return iterator;
              
  function eventHandler(. args) { / / {6}
    const promise =  .shift();
    if (promise) {
      Resolve ({value: args, done: false});
      PromiseResolve(createIterResult(args, false));
    } else {
      // for await... The execution of the internal blocks of the OF traverser is synchronous, so only one event can be processed at a time. If multiple events are triggered at the same time, the remaining events will be saved to unconsumedEvents until the last event is completed. The iterator automatically calls the next() method of the iterator, consuming any unprocessed events.unconsumedEvents.push(args); }}}function eventTargetAgnosticAddListener(emitter, name, listener, flags) {... emitter.on(name, listener); }Copy the code

Here is an implementation of the next() method on iterator:

  • Line {1} first consumes the unread message
  • Line {2} determines if an error has occurred and throws an error message. For example, the throw() method of the iterator is called and the error is assigned until the next iterator calls next(), where the code is executed.
  • Line {3} If the iterator is complete, the return Promise object’s done property is set to true, and the traverser ends. The finished variable is set after the iterator’s return() method is called.
  • Line {4}This is the data source of unconsumedPromises mentioned aboveFor example, when we executefor await... ofThe next() method of the iterator is automatically triggered when the statement iterates over the asynchronous iterator. At line {4}, a Promise object is created. Resolve is not executed immediately. So up hereThe sample 2 # # events. On ()To mention a question,for await... ofThe following code block is not executed when iterating through the asynchronous iterator object of the event,The resolve function will be released when we fire an eventfor await... ofThe iterator automatically executes the next() method again, and then a new Promise repeats until the event object throws an error event or executes the Iterator’s return() method.
const iterator = ObjectSetPrototypeOf({
  next() {
    // {1} First, we consume all unread messages
    const value = unconsumedEvents.shift();
    if (value) {
      return PromiseResolve(createIterResult(value, false));
    }

    // {2} If an error occurs, promise.reject is executed and an error is thrown, after which event listening is also stopped.
    if (error) {
      const p = PromiseReject(error);
      // Only the first element errors
      error = null;
      return p;
    }

    // {3} If the iterator object is complete, promise.resolve done is set to true
    if (finished) {
      return PromiseResolve(createIterResult(undefined.true));
    }

    // {4} Wait until an event occurs
    return new Promise(function(resolve, reject) { unconsumedPromises.push({ resolve, reject }); }); }... }Copy the code

Use asyncIterator in Stream

The node.js Stream module’s readable Stream objects experimentally support the [symbol.asynciterator] property in v10.0.0, and can be used with for await… The OF statement traverses a readable stream object and is supported by LTS in v11.14.0 and older.

Asynchronous iterators versus Readable

Create a readable stream object, readable, with the FS module.

const fs = require('fs');
const readable = fs.createReadStream('./hello.txt', {
  encoding: 'utf-8'.highWaterMark: 1
});
Copy the code

In the past, when we read a file, we need to listen for the data event, concatenate the data, and determine the completion in the end event, as follows:

function readText(readable) {
  let data = ' ';
  return new Promise((resolve, reject) = > {
    readable.on('data'.chunk= > {
      data += chunk;
    })
    readable.on('end'.() = > {
      resolve(data);
    });
    readable.on('error'.err= >{ reject(err); }); })}Copy the code

It can now be implemented in a simpler way with asynchronous iterators, as follows:

async function readText(readable) {
  let data = ' ';
  for await (const chunk of readable) {
    data += chunk;
  }
  return data;
}
Copy the code

Now we can call readText to test.

(async() = > {try {
    const res = await readText(readable);
    console.log(res); // Hello Node.js
  } catch (err) {
    console.log(err.message);
  }
})();
Copy the code

Using for await… The of statement iterates through readable, and if the loop terminates because of a break or throw error, the Stream will also be destroyed.

In the example above, the value that Chunk receives each time is determined by the highWaterMark attribute when creating the readable stream. In order to see the effect clearly, the highWaterMark attribute is set to 1 when creating the readable object, which will read only one character at a time.

How will readable implement asyncIterator from node.js source code

Iterate over the statement for… with synchronized iterators. Of is similar to for await for asyncIterator traversal… The of statement internally calls the iterable’s symbol.asynciterator () method on readable to get an asynchronous iterator object by default, and then calls the iterator’s next() method to get the result.

This article takes Node.js source code V14.x as an example to see how to achieve the source code. When we call fs.createreadstream () to create a readable stream object, the ReadStream constructor is called inside the corresponding method

// https://github.com/nodejs/node/blob/v14.x/lib/fs.js#L2001
function createReadStream(path, options) {
  lazyLoadStreams();
  return new ReadStream(path, options);
}
Copy the code

There is nothing in the ReadStream constructor that we are looking for. The point is that it inherits the Stream module’s Readable constructor as a prototype.

function ReadStream(path, options) {... Readable.call(this, options);
}
Copy the code

Now let’s focus on the implementation of the Readable constructor.

The SymbolAsyncIterator property is defined on the Readable prototype, which returns an iterator object created by a generator function.

// for await... The of loop will call
Readable.prototype[SymbolAsyncIterator] = function() {
  let stream = this; .const iter = createAsyncIterator(stream);
  iter.stream = stream;
  return iter;
};

// Declare a generator function that creates an asynchronous iterator object
async function* createAsyncIterator(stream) {
  let callback = nop;

  function next(resolve) {
    if (this === stream) {
      callback();
      callback = nop;
    } else{ callback = resolve; }}const state = stream._readableState;

  let error = state.errored;
  let errorEmitted = state.errorEmitted;
  let endEmitted = state.endEmitted;
  let closeEmitted = state.closeEmitted;
 
  The error, end, and close events control when to end iterator traversal.
  stream
    .on('readable', next)
    .on('error'.function(err) {
      error = err;
      errorEmitted = true;
      next.call(this);
    })
    .on('end'.function() {
      endEmitted = true;
      next.call(this);
    })
    .on('close'.function() {
      closeEmitted = true;
      next.call(this);
    });

  try {
    while (true) {
      // stream.read() pulls data from the internal buffer and returns it. If no data is readable, null is returned
      Readable. destroyed is true after the readable method destroy() is called. Readableis the stream object below
      const chunk = stream.destroyed ? null : stream.read();
      if(chunk ! = =null) {
        yield chunk; // This is the key point. According to the iterator protocol definition, the iterator object returns a next() method, using yield to return the value each time
      } else if (errorEmitted) {
        throw error;
      } else if (endEmitted) {
        break;
      } else if (closeEmitted) {
        break;
      } else {
        await new Promise(next); }}}catch (err) {
    destroyImpl.destroyer(stream, err);
    throw err;
  } finally {
    if(state.autoDestroy || ! endEmitted) {// TODO(ronag): ERR_PREMATURE_CLOSE?
      destroyImpl.destroyer(stream, null); }}}Copy the code

The asynchronous iterator implementation of a readable stream uses Generator yield, so iterating on readable objects will be done with the exception of for await… In addition to the of traversal, you can also use the next() method that calls the generator function directly.

const ret = readable[Symbol.asyncIterator]()
console.log(await ret.next()); // { value: 'H', done: false }
console.log(await ret.next()); // { value: 'e', done: false }
Copy the code

Asynchronous iterators and Writeable

We’ve seen how to iterate over an asynchronous iterator to get data from a readable object, but have you ever wondered how to pass an asynchronous iterator object to a writable stream? That’s what this is about.

Creates a readable stream from an iterator

The Node.js stream object provides a utility method, stream.readable. from(), Iterable objects that conform to symbol. asyncIterator or symbol. iterator will first create a readable stream object, readable, and then build the Node.js readable stream from the iterator.

Here are some examples from understanding to implementing easy to Master iterators in ES6. R1 is the iterable we created. Using the stream.readable.from () method, an iterable can be constructed as a Readable stream object, Readable.

function Range(start, end) {
  this.id = start;
  this.end = end;
}
Range.prototype[Symbol.asyncIterator] = async function* () {
  while (this.id <= this.end) {
    yield this.id++; }}const r1 = new Range(0.3);
const readable = stream.Readable.from(r1);
readable.on('data'.chunk= > {
  console.log(chunk); // 0 1 2 3
});
Copy the code

Pass an asynchronous iterator to a writable stream

A pipeline lets you send a series of stream and generator functions through the pipeline and get notification when the pipeline is complete.

Convert the pipeline to a promise form using util.promisify.

const util = require('util');
const pipeline = util.promisify(stream.pipeline); // Convert to the promise form

(async() = > {try {
    const readable = stream.Readable.from(r1);
    const writeable = fs.createWriteStream('range.txt');
    await pipeline(
      readable,
      async function* (source) {
        for await (const chunk of source) {
          yield chunk.toString();
        }
      },
      writeable
    );
    console.log('Pipeline success');
  } catch (err) {
    console.log(err.message);
  }
})()
Copy the code

When writing data, the chunk to be passed in must be the type of String, Buffer, Uint8Array; otherwise, writeable object will report an error when writing data. Since our custom iterable R1 ultimately returns a value of type Number and needs a conversion here, the generator function in the middle of the pipe converts each received value to a string.

Use asyncIterator in MongoDB

In addition to the node.js modules, MongoDB also supports asynchronous iteration, but there is little information on this point. MongoDB is implemented through the concept of a cursor.

Directing the cursor

When we call db.collection.find(), we return a cursor. If we want to access the document, we need to iterate over the cursor object to complete the process. But we usually do this directly using the toArray() method.

Now we have a database example, a collection books, with two records in the table, as follows:

The myCursor variable defined in the following code is the cursor object. It does not iterate automatically. You can use the cursor object’s hasNext() method to check if there is a next one, and if so, you can access the data using the next() method.

As you can see from the following log, the third hasNext() call returns false. If next() is called, an error is reported, and the cursor is closed, that is, there is no data to traverse.

const MongoClient = require('mongodb').MongoClient;
const dbConnectionUrl = 'mongo: / / 127.0.0.1:27017 / example';

(async() = > {const client = await MongoClient.connect(dbConnectionUrl, { useUnifiedTopology: true });
  const bookColl = client.db('example').collection('books');
  const myCursor = await bookColl.find();
 
  console.log(await myCursor.hasNext()); // true
  console.log((await myCursor.next()).name); // Node.js
  console.log(await myCursor.hasNext()); // true
  console.log((await myCursor.next()).name); / / the Node. Js in actual combat
  console.log(await myCursor.hasNext()); // false
  console.log((await myCursor.next()).name); // MongoError: Cursor is closed}) ()Copy the code

A direct call to next() can also detect this, returning the record if there is still a value, or null if not.

console.log((await myCursor.next()).name);
console.log((await myCursor.next()).name);
console.log((await myCursor.next()));
Copy the code

MongoDB asynchronous iterator source code analysis

MongoDB uses hasNext() to return false or next() to return null to determine whether the end of the cursor has been reached. In contrast, in our JavaScript iterable protocol definition, we have a Symbol. AsyncIterator property iterator object, and the iterator object is {done, value}.

Fortunately, the MongoDB node.js driver has helped us to achieve this function, through a section of source code to see the implementation in MongoDB.

  • The find method

The find method returns an iterable cursor object.

/ / https://github.com/mongodb/node-mongodb-native/blob/3.6/lib/collection.js#L470

Collection.prototype.find = deprecateOptions(
  {
    name: 'collection.find'.deprecatedOptions: DEPRECATED_FIND_OPTIONS,
    optionsIndex: 1
  },
  function(query, options, callback) {
    const cursor = this.s.topology.cursor(
      new FindOperation(this.this.s.namespace, findCommand, newOptions),
      newOptions
    );

    returncursor; });Copy the code
  • CoreCursor

This is the CoreCursor class that all cursors in the MongoDB node. js driver are based on. If asynchronous iterators are currently supported, set the Symbol. AsyncIterator property on the CoreCursor prototype. Returns an asynchronous iterator object based on the Promise implementation, which conforms to the standard JavaScript definition of an asynchronous iterable.

/ / https://github.com/mongodb/node-mongodb-native/blob/3.6/lib/core/cursor.js#L610

if (SUPPORTS.ASYNC_ITERATOR) {
  CoreCursor.prototype[Symbol.asyncIterator] = require('.. /async/async_iterator').asyncIterator;
}
Copy the code
/ / https://github.com/mongodb/node-mongodb-native/blob/3.6/lib/async/async_iterator.js#L16

// async function* asyncIterator() {
// while (true) {
// const value = await this.next();
// if (! value) {
// await this.close();
// return;
/ /}

// yield value;
/ /}
// }

// TODO: change this to the async generator function above
function asyncIterator() {
  const cursor = this;

  return {
    next: function() {
      return Promise.resolve()
        .then(() = > cursor.next())
        .then(value= > {
          if(! value) {return cursor.close().then(() = > ({ value, done: true }));
          }
          return { value, done: false}; }); }}; }Copy the code

At present, it is implemented in the form of Promise by default. There is a section of TODO in the above code. The asynchronous iterative implementation of node.js driver may be changed to the implementation based on generator functions in the future, which is unchanged for our use.

Using for await… Of traverses iterable cursor

Again based on our example above, if replaced with for await… The of statement traversal is much easier.

const myCursor = await bookColl.find();
for await (val of myCursor) {
  console.log(val.name);
}
Copy the code

The same is true for aggregate pipes in MongoDB, without further analysis, as shown below:

const myCursor = await bookColl.aggregate();
for await (val of myCursor) {
  console.log(val.name);
}
Copy the code

For traversing large data sets, using cursors will load MongoDB data in bulk, and we don’t have to worry about storing all the data in server memory at once, causing excessive memory stress.

Transfer cursor to writable flow

The MongoDB cursor object itself is an Iterable, which, combined with the readable.from () method of the stream module, can be converted into a Readable stream object, which can be used to write files as a stream.

But note that the cursor in MongoDB returns a single document each time, which is of type Object. If written directly, writable stream will report parameter type error, because writable stream is a non-object mode by default (only String, Buffer, Unit8Array are accepted). This is why you can see the generator function used in the middle of the pipeline transfer, processing each received block of data as writable Buffer.

const myCursor = await bookColl.find();
const readable = stream.Readable.from(myCursor);
await pipeline(
  readable,
  async function* (source) {
    for await (const chunk of source) {
      yield Buffer.from(JSON.stringify(chunk));
    }
  },
  fs.createWriteStream('books.txt'));Copy the code

Reference

  • Nodejs.org/dist/latest…
  • Nodejs.org/dist/latest…
  • Docs.mongodb.com/manual/tuto…