Plan a

Aliyun DTS migration supports data migration among multiple data sources, and supports structural migration, full migration and incremental migration of data. Through structural migration, full migration and incremental migration, users can synchronize data to the target end in real time to achieve smooth migration of services.

Scheme 2

Shutdown migration. Use the mongodump command to back up MongoDB data and the mongorestore command to restore the backup data.

Mongodump -h dbhost -d dbname -o dbdirectory # -host <:port> # -d: indicates the location where the backup data is stored # -drop When restoring data, delete the current data first, and then restore the backup data. Mongorestore # <path> mongorestore -h <hostname><:port> -d dbname <path>Copy the code

Plan 3

Non-stop migration. Based on Scenario 2, use ChangeStream to listen for changed databases, collections, or deployment nodes.

This article focuses on the implementation of scheme 3.

ChangeStream

ChangeStream allows applications to access real-time data changes. An application can use ChangeStream to listen for all data changes on a single collection, database, or entire deployment and respond to them immediately. Because ChangeStream uses an aggregation framework, the application can also filter specific change or transformation notifications.

availability

ChangeStream is available in shard and replica sets

  • Storage engine replica sets and sharding clusters must use WiredTriger’s storage engine

  • Replica set Protocol version Replica set and shard cluster must use Protocol Version 1 (PV1)

  • Turn on the read concern of “majority”

    MongoDB 4.0 and prior to MongoDB, ChangeStream is only available if majority read support is enabled (the default).

    Starting with MongoDB 4.2, majority read is not required for ChangeStream

Listen to collections, databases, and deployment nodes

The target describe use
collection You can open a ChangeStream cursor on a single collection (except admin).localandCollection in config database) db.collection.watch()
database Starting with MongoDB 4.0, you can create databases for a single database (except admin).localandConfig database) opens a ChangeStream cursor to see all its changes db.watch()
deployment Starting with MongoDB 4.0, you can open a ChangeStream cursor for a deployment (replica set or sharded cluster) to monitor all databases except admin.localandConfig) for all non-system collections. Mongo.watch()

Open a ChangeStream

  • Replica set: You can open ChangeStream from any member issue that hosts data
  • Sharding cluster: You must open ChangeStream from Mongos issue

Let’s use Python to connect to Mongo’s replica set, open a ChangeStream, and traverse the cursor for data change information

Starting with MongoDB 4.0, you can specify startAtOperationTime to open the cursor at a specific point in time. If the specified start point is in the past, it must be within the Oplog time range

def get_client() :
    con_url = 'mongo: / / 192.168.131.130:27000192168 131.130:27001192168 131.130:27002 /? replicaSet=rs0'
	client = MongoClient(con_url)
    return client
def open_change_stream() :
    client = get_client()
    db = client.get_database(listen_db)
    cursor = db.watch(start_at_operation_time=Timestamp(1623730169.1))
    for data in cursor:
        mongo_operator(data)
Copy the code

When the MongoDB connection remains open, the cursor also remains open

Cursor is closed

  • The cursor is explicitly closed
  • An invalid event occurred.
    • Drop, RENAME, and dropDatabase will cause invalid events for ChangeStream opened by Collection
    • DropDatabase causes invalid events when ChangeStream is opened for database.
  • If a sharding cluster is deployed, sharding deletion may cause open ChangeStream cursors to close, and closed ChangeStream cursors may not fully recover

Modify the ChangeStream output

You can control the output of ChangeStream by providing pipelines.

$replaceWith, $set, $unset (Mongodb4.2 available)Copy the code
def open_change_stream() :
    client = get_client()
    pipeline = [
        {'$match': {'fullDocument.username': 'alice'}},
        {'$addFields': {'newField': 'this is an added field! '}}
    ]
    cursor = db.inventory.watch(pipeline=pipeline)
    document = next(cursor)
Copy the code

Note: The _ID field in the ChangeStream event document acts as a Resume flag. Do not use pipes to modify or delete the _ID field of the ChangeStream event. If you change the _ID, ChangeStream will throw an exception

pipeline = [ { '$unset': "_id" }]
Copy the code

Update the Full Document of the operation

By default, ChangeStream only returns the increment of a field during an update operation. You can configure ChangeStream to return the latest most submitted version of document. This is reflected in the fullDocument field of the update operation

cursor = db.inventory.watch(full_document='updateLookup')
document = next(cursor)
Copy the code
'fullDocument': {'_id': ObjectId(' 60c2d488c95f85e0a915DECe '), 'name': 'Zhang Si', 'age': 23.0}Copy the code

If one or more of the “majority” commits modified the updated Document after the update but before the search, the fullDocument returned may be quite different from the Document that was used during the update. However, the deltas included in the ChangeStream document always correctly describe the changes to the monitoring set applied to the change-flow events.

Restore ChangeStream

ChangeStream can be restored by specifying a Resume token when a cursor is opened.

resumeAfter

You can restore ChangeStream after a specific event by passing a Resume token to resumafter when a cursor is opened. For resume token, use the _id of the ChangeStream event document.

Matters needing attention

  • Oplog must have sufficient history to locate token or timestamp related operations (if the timestamp is in the past).

  • ChangeStream cannot be restored using resumeAfter after an invalid event closes a stream. Starting with MongoDB 4.2, you can use startAfter to start a new ChangeStream after an invalid event.

You can use resume_after to resume notifications after the resume token operation. The resume_after modifier must accept a Resume Token, for which the _id value of the ChangeStream event document is used. For example, resume_token in the following example.

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = next(cursor)
Copy the code

startAfter

You can start a new ChangeStream after a specific event by passing a Resume token to startAfter when a cursor is opened. Unlike resumeAfter, Startafter can restore notifications after invalid events by creating a new ChangeStream. For resume token, use the _id of the ChangeStream event document.

Matters needing attention

  • Oplog must have sufficient history to locate operations related to tokens or timestamps if the timestamps are in the past.

Event notification

ChangeStream only notifies most of the data bearer members persisted to the replica set of data changes.

For example, consider a replicated set with three members, where a ChangeStream cursor is open on the primary node. If the client issues an insert, ChangeStream notifies the application of a data change only if the insert is persisted to most of the members hosting the data

Change Event

Change Event Events include

Insert, update, replace, delete, drop, rename, dropDatabase, invalidateCopy the code

ChangeStream responds to all the fields that a document might have.

{
   _id : { <BSON Object> }, 
   "operationType" : "<operation>"."fullDocument" : { <document> },
   "ns" : {
      "db" : "<database>"."coll" : "<collection>"
   },
   "to" : {
      "db" : "<database>"."coll" : "<collection>"
   },
   "documentKey" : { "_id" : <value> },
   "updateDescription" : {
      "updatedFields" : { <document> },
      "removedFields" : [ "<field>". ] },"clusterTime" : <Timestamp>,
   "txnNumber" : <NumberLong>,
   "lsid" : {
      "id" : <UUID>,
      "uid" : <BinData>
   }
}
Copy the code
key describe
_id BSON object as ChangeStream event identifier. This value is used as the resumeToken for the resumeAfter parameter when ChangeStream is restored.
operationType Insert, update, replace, delete, drop, rename, dropDatabase, invalidate
fullDocument For the INSERT and REPLACE operations, represents the new document created by the operation.

For a delete operation, this field is omitted because the document no longer exists.

For update operations, this field only appears when you set ChangeStream fullDocument to updateLookup. This field represents the most recent committed version of the document modified by the update operation. If most other commit operations modify the document between the original update operation and the full document lookup, this document may differ from the changes described in updateDescription.
ns Namespaces affected by events (databases and/or collections)
to When operationType: Rename, the document displays the new name of the NS collection. Omit this document for all other values of operationType.
documentKey The document id of the CRUD operation.
updateDescription Document for fields to be updated or deleted by update operations. The document and its fields appear only when operationType is Update.
clusterTime The timestamp of the Oplog entry associated with the event
txnNumber Number of transactions. Occurs only if the operation is part of a multi-document transaction.
lsid The identifier of the session associated with the transaction. Occurs only if the operation is part of a multi-document transaction.

Special instructions

The replace events

The replace operation uses the update command, which consists of two phases:

  1. Use the documentKey and delete the original document
  2. Insert a new document using the same documentKey

The fullDocument of the replace event represents the document after the replacement document is inserted.

Invalidate the event

  • Drop, RENAME, and dropDatabase will cause invalid events for ChangeStream opened by Collection
  • DropDatabase causes invalid events when ChangeStream is opened for database

Invalid event closes the ChangeStream cursor.

ChangeStream cannot be restored using resumeAfter after an invalid event closes a stream. Starting with MongoDB 4.2, you can use startAfter to start a new ChangeStream after an invalidate event.

Scripts for Mongo data migration

The script only focuses on adding, deleting, and modifying collections. Normally, users are not allowed to operate on collections and databases, so the corresponding operation is not considered.

This is listening for data increments for certain collections in Change_stream. You can also use pipelines directly instead of if coll not in listener_coll: return.

from builtins import print
from bson import ObjectId, Timestamp
from pymongo import MongoClient

listen_db = "change_stream"
target_db = "change_stream_copy"
listener_coll = ['inventory'.'student'.'user'.'school']

def get_client() :
    con_url = 'mongo: / / 192.168.131.130:27000192168 131.130:27001192168 131.130:27002 /? replicaSet=rs0'
    client = MongoClient(con_url)
    return client


def get_target_collection(coll) :
    client = get_client()
    db = client.get_database(target_db)
    coll_instance = db.get_collection(coll)
    return coll_instance


def mongo_insert(collection, data) :
    collection.insert_one(data)


def mongo_find(collection, condition) :
    return collection.find_one(condition)


def mongo_update(collection, conditions, data) :
    print(conditions)
    print(data)
    collection.update_one(filter=conditions, update=data)


def mongo_delete(collection, data) :
    collection.delete_one(data)


def mongo_operator(event_dict) :
    print(event_dict)
    namespace = event_dict['ns']
    coll = namespace['coll']
    if coll not in listener_coll:
        return
    # document_key = {'_id': ObjectId('60c8595a9ffc11572249d198')}
    document_key = event_dict['documentKey']
    operate = event_dict['operationType']
    coll_instance = get_target_collection(coll)
    if operate == 'insert':
        result = mongo_find(coll_instance, document_key)
        if result:
            print(document_key, "is existed")
        else:
            document = event_dict['fullDocument']
            mongo_insert(coll_instance, document)
            print("insert success")
    elif operate == 'update':
        data = event_dict['updateDescription']
        upd = {"$set": data["updatedFields"]}
        mongo_update(coll_instance, document_key, upd)
        print("update success")
    elif operate == 'delete':
        result = mongo_find(coll_instance, document_key)
        if result:
            For delete operations, the fullDocument field is omitted because the document no longer exists.
            mongo_delete(coll_instance, document_key)
            print("delete success")
        else:
            print(document_key, " is already deleted")
    elif operate == 'replace':
        mongo_delete(coll_instance, document_key)
        document_insert = event_dict['fullDocument']
        mongo_insert(coll_instance, document_insert)
        print("replace success")
    else:
        print("Insert/update/delete/replace operations, data format is: % s", event_dict)


def open_change_stream() :
    client = get_client()
    db = client.get_database(listen_db)
    cursor = db.watch(start_at_operation_time=Timestamp(1623730169.1))
    for data in cursor:
        mongo_operator(data)


if __name__ == '__main__':
    open_change_stream()
Copy the code

# reference document 1. The Mongo official documentation ChangeStream modules: docs.mongodb.com/manual/chan… 2. Getting started with Python: docs.python.org/zh-cn/3.11/…