Plan a
Aliyun DTS migration supports data migration among multiple data sources, and supports structural migration, full migration and incremental migration of data. Through structural migration, full migration and incremental migration, users can synchronize data to the target end in real time to achieve smooth migration of services.
Scheme 2
Shutdown migration. Use the mongodump command to back up MongoDB data and the mongorestore command to restore the backup data.
Mongodump -h dbhost -d dbname -o dbdirectory # -host <:port> # -d: indicates the location where the backup data is stored # -drop When restoring data, delete the current data first, and then restore the backup data. Mongorestore # <path> mongorestore -h <hostname><:port> -d dbname <path>Copy the code
Plan 3
Non-stop migration. Based on Scenario 2, use ChangeStream to listen for changed databases, collections, or deployment nodes.
This article focuses on the implementation of scheme 3.
ChangeStream
ChangeStream allows applications to access real-time data changes. An application can use ChangeStream to listen for all data changes on a single collection, database, or entire deployment and respond to them immediately. Because ChangeStream uses an aggregation framework, the application can also filter specific change or transformation notifications.
availability
ChangeStream is available in shard and replica sets
-
Storage engine replica sets and sharding clusters must use WiredTriger’s storage engine
-
Replica set Protocol version Replica set and shard cluster must use Protocol Version 1 (PV1)
-
Turn on the read concern of “majority”
MongoDB 4.0 and prior to MongoDB, ChangeStream is only available if majority read support is enabled (the default).
Starting with MongoDB 4.2, majority read is not required for ChangeStream
Listen to collections, databases, and deployment nodes
The target | describe | use |
---|---|---|
collection | You can open a ChangeStream cursor on a single collection (except admin). localand Collection in config database) |
db.collection.watch() |
database | Starting with MongoDB 4.0, you can create databases for a single database (except admin). localand Config database) opens a ChangeStream cursor to see all its changes |
db.watch() |
deployment | Starting with MongoDB 4.0, you can open a ChangeStream cursor for a deployment (replica set or sharded cluster) to monitor all databases except admin. localand Config) for all non-system collections. |
Mongo.watch() |
Open a ChangeStream
- Replica set: You can open ChangeStream from any member issue that hosts data
- Sharding cluster: You must open ChangeStream from Mongos issue
Let’s use Python to connect to Mongo’s replica set, open a ChangeStream, and traverse the cursor for data change information
Starting with MongoDB 4.0, you can specify startAtOperationTime to open the cursor at a specific point in time. If the specified start point is in the past, it must be within the Oplog time range
def get_client() :
con_url = 'mongo: / / 192.168.131.130:27000192168 131.130:27001192168 131.130:27002 /? replicaSet=rs0'
client = MongoClient(con_url)
return client
def open_change_stream() :
client = get_client()
db = client.get_database(listen_db)
cursor = db.watch(start_at_operation_time=Timestamp(1623730169.1))
for data in cursor:
mongo_operator(data)
Copy the code
When the MongoDB connection remains open, the cursor also remains open
Cursor is closed
- The cursor is explicitly closed
- An invalid event occurred.
- Drop, RENAME, and dropDatabase will cause invalid events for ChangeStream opened by Collection
- DropDatabase causes invalid events when ChangeStream is opened for database.
- If a sharding cluster is deployed, sharding deletion may cause open ChangeStream cursors to close, and closed ChangeStream cursors may not fully recover
Modify the ChangeStream output
You can control the output of ChangeStream by providing pipelines.
$replaceWith, $set, $unset (Mongodb4.2 available)Copy the code
def open_change_stream() :
client = get_client()
pipeline = [
{'$match': {'fullDocument.username': 'alice'}},
{'$addFields': {'newField': 'this is an added field! '}}
]
cursor = db.inventory.watch(pipeline=pipeline)
document = next(cursor)
Copy the code
Note: The _ID field in the ChangeStream event document acts as a Resume flag. Do not use pipes to modify or delete the _ID field of the ChangeStream event. If you change the _ID, ChangeStream will throw an exception
pipeline = [ { '$unset': "_id" }]
Copy the code
Update the Full Document of the operation
By default, ChangeStream only returns the increment of a field during an update operation. You can configure ChangeStream to return the latest most submitted version of document. This is reflected in the fullDocument field of the update operation
cursor = db.inventory.watch(full_document='updateLookup')
document = next(cursor)
Copy the code
'fullDocument': {'_id': ObjectId(' 60c2d488c95f85e0a915DECe '), 'name': 'Zhang Si', 'age': 23.0}Copy the code
If one or more of the “majority” commits modified the updated Document after the update but before the search, the fullDocument returned may be quite different from the Document that was used during the update. However, the deltas included in the ChangeStream document always correctly describe the changes to the monitoring set applied to the change-flow events.
Restore ChangeStream
ChangeStream can be restored by specifying a Resume token when a cursor is opened.
resumeAfter
You can restore ChangeStream after a specific event by passing a Resume token to resumafter when a cursor is opened. For resume token, use the _id of the ChangeStream event document.
Matters needing attention
-
Oplog must have sufficient history to locate token or timestamp related operations (if the timestamp is in the past).
-
ChangeStream cannot be restored using resumeAfter after an invalid event closes a stream. Starting with MongoDB 4.2, you can use startAfter to start a new ChangeStream after an invalid event.
You can use resume_after to resume notifications after the resume token operation. The resume_after modifier must accept a Resume Token, for which the _id value of the ChangeStream event document is used. For example, resume_token in the following example.
resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = next(cursor)
Copy the code
startAfter
You can start a new ChangeStream after a specific event by passing a Resume token to startAfter when a cursor is opened. Unlike resumeAfter, Startafter can restore notifications after invalid events by creating a new ChangeStream. For resume token, use the _id of the ChangeStream event document.
Matters needing attention
- Oplog must have sufficient history to locate operations related to tokens or timestamps if the timestamps are in the past.
Event notification
ChangeStream only notifies most of the data bearer members persisted to the replica set of data changes.
For example, consider a replicated set with three members, where a ChangeStream cursor is open on the primary node. If the client issues an insert, ChangeStream notifies the application of a data change only if the insert is persisted to most of the members hosting the data
Change Event
Change Event Events include
Insert, update, replace, delete, drop, rename, dropDatabase, invalidateCopy the code
ChangeStream responds to all the fields that a document might have.
{
_id : { <BSON Object> },
"operationType" : "<operation>"."fullDocument" : { <document> },
"ns" : {
"db" : "<database>"."coll" : "<collection>"
},
"to" : {
"db" : "<database>"."coll" : "<collection>"
},
"documentKey" : { "_id" : <value> },
"updateDescription" : {
"updatedFields" : { <document> },
"removedFields" : [ "<field>". ] },"clusterTime" : <Timestamp>,
"txnNumber" : <NumberLong>,
"lsid" : {
"id" : <UUID>,
"uid" : <BinData>
}
}
Copy the code
key | describe |
---|---|
_id | BSON object as ChangeStream event identifier. This value is used as the resumeToken for the resumeAfter parameter when ChangeStream is restored. |
operationType | Insert, update, replace, delete, drop, rename, dropDatabase, invalidate |
fullDocument | For the INSERT and REPLACE operations, represents the new document created by the operation. For a delete operation, this field is omitted because the document no longer exists. For update operations, this field only appears when you set ChangeStream fullDocument to updateLookup. This field represents the most recent committed version of the document modified by the update operation. If most other commit operations modify the document between the original update operation and the full document lookup, this document may differ from the changes described in updateDescription. |
ns | Namespaces affected by events (databases and/or collections) |
to | When operationType: Rename, the document displays the new name of the NS collection. Omit this document for all other values of operationType. |
documentKey | The document id of the CRUD operation. |
updateDescription | Document for fields to be updated or deleted by update operations. The document and its fields appear only when operationType is Update. |
clusterTime | The timestamp of the Oplog entry associated with the event |
txnNumber | Number of transactions. Occurs only if the operation is part of a multi-document transaction. |
lsid | The identifier of the session associated with the transaction. Occurs only if the operation is part of a multi-document transaction. |
Special instructions
The replace events
The replace operation uses the update command, which consists of two phases:
- Use the documentKey and delete the original document
- Insert a new document using the same documentKey
The fullDocument of the replace event represents the document after the replacement document is inserted.
Invalidate the event
- Drop, RENAME, and dropDatabase will cause invalid events for ChangeStream opened by Collection
- DropDatabase causes invalid events when ChangeStream is opened for database
Invalid event closes the ChangeStream cursor.
ChangeStream cannot be restored using resumeAfter after an invalid event closes a stream. Starting with MongoDB 4.2, you can use startAfter to start a new ChangeStream after an invalidate event.
Scripts for Mongo data migration
The script only focuses on adding, deleting, and modifying collections. Normally, users are not allowed to operate on collections and databases, so the corresponding operation is not considered.
This is listening for data increments for certain collections in Change_stream. You can also use pipelines directly instead of if coll not in listener_coll: return.
from builtins import print
from bson import ObjectId, Timestamp
from pymongo import MongoClient
listen_db = "change_stream"
target_db = "change_stream_copy"
listener_coll = ['inventory'.'student'.'user'.'school']
def get_client() :
con_url = 'mongo: / / 192.168.131.130:27000192168 131.130:27001192168 131.130:27002 /? replicaSet=rs0'
client = MongoClient(con_url)
return client
def get_target_collection(coll) :
client = get_client()
db = client.get_database(target_db)
coll_instance = db.get_collection(coll)
return coll_instance
def mongo_insert(collection, data) :
collection.insert_one(data)
def mongo_find(collection, condition) :
return collection.find_one(condition)
def mongo_update(collection, conditions, data) :
print(conditions)
print(data)
collection.update_one(filter=conditions, update=data)
def mongo_delete(collection, data) :
collection.delete_one(data)
def mongo_operator(event_dict) :
print(event_dict)
namespace = event_dict['ns']
coll = namespace['coll']
if coll not in listener_coll:
return
# document_key = {'_id': ObjectId('60c8595a9ffc11572249d198')}
document_key = event_dict['documentKey']
operate = event_dict['operationType']
coll_instance = get_target_collection(coll)
if operate == 'insert':
result = mongo_find(coll_instance, document_key)
if result:
print(document_key, "is existed")
else:
document = event_dict['fullDocument']
mongo_insert(coll_instance, document)
print("insert success")
elif operate == 'update':
data = event_dict['updateDescription']
upd = {"$set": data["updatedFields"]}
mongo_update(coll_instance, document_key, upd)
print("update success")
elif operate == 'delete':
result = mongo_find(coll_instance, document_key)
if result:
For delete operations, the fullDocument field is omitted because the document no longer exists.
mongo_delete(coll_instance, document_key)
print("delete success")
else:
print(document_key, " is already deleted")
elif operate == 'replace':
mongo_delete(coll_instance, document_key)
document_insert = event_dict['fullDocument']
mongo_insert(coll_instance, document_insert)
print("replace success")
else:
print("Insert/update/delete/replace operations, data format is: % s", event_dict)
def open_change_stream() :
client = get_client()
db = client.get_database(listen_db)
cursor = db.watch(start_at_operation_time=Timestamp(1623730169.1))
for data in cursor:
mongo_operator(data)
if __name__ == '__main__':
open_change_stream()
Copy the code
# reference document 1. The Mongo official documentation ChangeStream modules: docs.mongodb.com/manual/chan… 2. Getting started with Python: docs.python.org/zh-cn/3.11/…