directory

  • Business background
  • The scheme
    • Data smooth migration scheme
  • Migration phase
    • The migration to optimize
  • analyse

As long as there is a hair, that you can work harder 🐶

Business background

The statistics of the voting system of the company were stored in HBase before, and the historical data was about 400 million pieces. The director said that the HBase data should be migrated to mongodb now, and only the data of the last two years should be saved, and other data should be backed up on disk. The requirements are as follows:

  1. Don’t lose data
  2. Smooth migration
  3. Don’t stop

Therefore, as a newly graduated young man full of passion and enthusiasm, I went to Baidu to search the plan at the first moment after the morning meeting:

Data Migration scheme

After clicking and reading the first few, my hands on the keyboard trembled a little, leaving only two lines of clear tears. Then I opened it silently:

That’s the drama of life

The scheme

So my boss took my happy analysis needs, mainly migrating voting statistics, daily voting records, daily activity voting records, per-user voting records and so on. Finally, it was determined that 300 million pieces of data were distributed in two tables. The two tables with large data volume were migrated by him, and the other seven tables with a total data volume of nearly 100 million pieces were migrated by me.

Since it cannot affect online customers, my migration plan is finally new and old double write + Offline data sync and check, which is actually online double write + offline check.

I need to migrate five tables with a relatively small amount of data, about 100,000 pieces of data, so I first take these tables out of the air. Due to the small amount of data in these tables, I perform full table migration, that is, I query all data from HBase and directly pour it into mongodb.

Here is my data migration solution:

Data smooth migration scheme

Double write policy is adopted for new data during online:

  1. The first version of the online “add, delete, modify” data to write two database measures, in the original code only operate hbase, add mongodb library operation logic;
  2. Write Python scripts and interfaces for data migration to migrate data from hbase to mongodb;
  3. Add all the query logic from the mongodb library next to the hbase query logic.
  4. When the migration is complete and online, remove all hbase logic and go to Mongo for query.
  5. Considering the idempotency of data migration, all migration codes are written to cover rather than increase votes.

Migration phase

The plan is determined, everything seems to be in accordance with the schedule in an orderly way:

  1. Establish mongodb data table and set up good index;
  2. Double write code online, vote write operations are recorded in mongodb library at the same time, so that the vote data records will not be lost during migration;
  3. Take the table with small amount of migration data to vent, all found out to mongodb dump;
/ / query hbase

public List<VoteRecordable> listVoteRecord(VoteRecordable begin, VoteRecordable end) {

        byte[] startRow = begin.getRowKey();

        byte[] endRow = end.getRowKey();

        Scan scan = new Scan(startRow, endRow);

        return this.hbaseTemplate.find(begin.getTableName(), scan, this.getRowMapper(begin.getClass()));

    }

Copy the code
/ / mongo put in storage

public void batchInsertTotalActivityVoteRecordDoc(List<TotalActivityVoteRecordDoc> totalActivityVoteRecordDocs) {

        BulkOperations operations = mongoTemplate.bulkOps(BulkOperations.BulkMode.ORDERED, TotalActivityVoteRecordDoc.class);

        List<Pair<Query, Update>> upsertList = new ArrayList<>(totalActivityVoteRecordDocs.size());

        totalActivityVoteRecordDocs.forEach(data -> {

            Query query = new Query(Criteria.where("activityId").is(data.getActivityId()));

            Update update = new Update();

            update.set("voteCount", data.getVoteCount());

            Pair<Query, Update> upsertPair = Pair.of(query, update);

            upsertList.add(upsertPair);

        });



        operations.upsert(upsertList);

        operations.execute();

    }

Copy the code

A scan query will return a large amount of data. Therefore, when a client initiates a Scan request, it does not load all data locally at one time, but divides it into multiple RPC requests for loading. If the data volume is small, a pleasant scan can be performed regardless of gains and losses.

  1. Severely consumes network bandwidth, affecting other services.
  2. OOM occurs on the local client.
  3. If the number of requests is too large or too concentrated, HBase is overwhelmed, because my requests are in scan mode rather than rowKey equivalent query mode (for equivalent query, detailed activity ids or voting ids need to be added together).

Five tables I am happy to migrate, but the migration of the big table when karma came, one is slow query, the second is to query the insertion process of the need for a lot of new objects, happy OOM, so this way I also think of the small table to the whole first, the big table to find another way out.

The migration to optimize

Optimization idea:

  1. Scan dangerous words, then equivalent query, splicing to things I am not without;
  2. Query slow to improve speed, it is multithreaded to go;
  3. Local migration too delay things, put into the server migration, large memory access to the Intranet is also fast;
  4. Failed to invoke the interface request during migration. Retry.

So I wrote the query data and insert data logic in Java code:

@ResponseBody

    public void insertHourlyVoteRecordDoc2(@RequestParam(value = "voteItemId") String voteItemId,

                                           @RequestParam(value = "beginDate") String beginDate,

                                           @RequestParam(value = "endDate") String endDate){

        VoteRecordable begin = new HourlyVoteRecord();

        begin.setTraceId(voteItemId);

        begin.setDate(beginDate);



        VoteRecordable end = new HourlyVoteRecord();

        end.setTraceId(voteItemId);

        end.setDate(endDate);



        List<VoteRecordable> voteRecordables = voteItemStatService.listVoteRecords(begin, end);

        System.out.println(voteRecordables.size() + "How many insertHourlyVoteRecordDoc voteRecordables size is");

        if (CollectionUtils.isEmpty(voteRecordables)) {

            return;

        }

        List<HourlyVoteRecordDoc> hourlyVoteRecordDocs = new ArrayList<>(voteRecordables.size());

        for (VoteRecordable voteRecordable1 : voteRecordables) {

            HourlyVoteRecord hourlyVoteRecord = (HourlyVoteRecord) voteRecordable1;

            HourlyVoteRecordDoc hourlyVoteRecordDoc = new HourlyVoteRecordDoc();

            hourlyVoteRecordDoc.setVoteItemId(new ObjectId(hourlyVoteRecord.getVoteItemId()));

            hourlyVoteRecordDoc.setVoteCount(hourlyVoteRecord.getVoteCount());

            hourlyVoteRecordDoc.setHour(hourlyVoteRecord.getHour());



            hourlyVoteRecordDocs.add(hourlyVoteRecordDoc);

        }

        voteRecordService.batchInsertHourlyVoteRecordDoc(hourlyVoteRecordDocs);



    }

Copy the code

Then write a script in Python for multi-threaded concurrent request migration:

"""

The migration

DailyVoteRecordDoc

HourlyVoteRecord

HourlyActivtiyVoteRecord

"
""



import threading

import time

from datetime import datetime, timedelta



from core_service import vote_service

from scripts.biz.vote.migrate_user_vote_records import MigrateLimitException





class MyThread(threading.Thread):

    def __init__(self, thread_id, name, archive_activities):

        threading.Thread.__init__(self)

        self.threadID = thread_id

        self.name = name

        self.archive_activities = archive_activities



    def run(self):

        print("Starting " + self.name)

        main(self.archive_activities)

        print("exiting " + self.name)





def main(archive_activities):

    """

Migrating Voting Records

    :param archive_activities:

    :return:

    "
""



    final_date_end_timestamp = 1595433600



    for activity in archive_activities:

        date_start = time.strftime('%Y-%m-%d', time.localtime(int(activity['dateStart'] / 1000)))

        hour_start = time.strftime('%Y-%m-%d-%H', time.localtime(int(activity['dateStart'] / 1000)))



        date_end_timestamp = int(activity['dateEnd'] / 1000 + 24 * 60 * 60)



        if final_date_end_timestamp < date_end_timestamp:

            date_end_timestamp = final_date_end_timestamp



        date_end = time.strftime('%Y-%m-%d', time.localtime(date_end_timestamp))

        hour_end = time.strftime('%Y-%m-%d-%H', time.localtime(date_end_timestamp))

        # # migration hourly_activity_vote_records

        vote_service.batch_insert_hourly_activity_vote_records(activity.get('_id'), hour_start, hour_end)

        print(activity)



        vote_items = vote_service.list_vote_items(str(activity['_id']))

        try:

            for vote_item in vote_items:

                # print("user_vote_record: activityId voteItemId score", activity.get('_id'), vote_item.get('_id'))

                # print(vote_item)

                if vote_item['score'] != 0:

                    # # migration daily_vote_records

                    vote_service.batch_insert_daily_vote_records(vote_item.get('_id'), date_start, date_end)

                    # # migration hourly_vote_records

                    print("Start end time time", hour_start, hour_end)

                    print("Start and end date", date_start, date_end)

                    vote_service.batch_insert_hourly_vote_records(vote_item.get('_id'), hour_start, hour_end)



        except MigrateLimitException:

            print("migrate limit error")







if __name__ == "__main__":

// Call the Java code interface to query all activities

    archive_activities_local = vote_service.archive_activities_v3()

    # Enable multithreading

    i = 1

    available_activities = []

    threads = []



    for index, local_activity in enumerate(archive_activities_local):

        Open a thread every 700 loops

        if index == i * 700:

            thread = MyThread(i, "Thread-" + str(i), available_activities)

            thread.start()

            threads.append(thread)

            i += 1

            available_activities = []

        available_activities.append(local_activity)



    if available_activities:

        thread = MyThread(i, "Thread-" + str(i), available_activities)

        thread.start()

        threads.append(thread)



    for t in threads:

        t.join()



Copy the code
Retry the request three times if it fails

@retry(3)

def batch_insert_hourly_activity_vote_records(activityId, beginDate, endDate):

    """

Batch insert active hour voting records

    :param activityId:

    :param beginDate:

    :param endDate:

    :return:

    "
""

    url = "http://xxx/vote/api/internal/dbMigration/insertHourlyActivityVoteRecordDoc?" \

          "activityId={activityId}&beginDate={beginDate}&endDate={endDate}".format(activityId=activityId, beginDate=beginDate, endDate=endDate)



    r = requests.session().get(url)



Copy the code

Finally put it into the server and ran for two hours. My boss ran 300 million pieces of data in a day, I thought my boss said 12 hours a day (actually 24 hours), then I thought my data is less than one third of his, three hours is about the same, so I ran locally, the result was a whole day.

That’s all it takes.

analyse

However, the process was not as smooth and easy as I said. The architect gave me five days and I postponed it for two days. I wasted time in the following aspects:

  1. Personal reasons. One day, I had a small broken tooth and asked for a half-day’s leave. I had a painful out-of-body experience and had no intention to work.
  2. The early migration was smooth, but due to the lack of experience in data migration, there were many accidents in the late migration of big data tables.
  3. Big data table migration, and did not deal with the interruption in the middle of the log record, so I did not dare to interrupt easily at the beginning, and then if the code logic has a bug, to always be very fatal;
  4. One day, I finished writing the logic and ran the data migration locally, which lasted for six hours and seriously affected my operation.
  5. In the offline data review, there were seven tables of data, but I was a little dizzy, and there were three large tables of data that could not be matched. At the beginning, I did not think of the situation that there was a gift voting data brushing in the business. As a result, I always thought there was a problem with the data migration. I tried my best to find the reason but could not find the logical loophole.

Said so much, dish is original sin, study hard, as long as there is a hair will not give up learning!!

Welcome criticism and correction, the public number “Forrest gump’s code road” welcome and newbie number Lord grow together, harvest friends point in view or share encouragement, thank you very much ~

Focus on me and grow together