Background:

More than 100 school districts are sqLite database, requiring backup sqLite database to Ali OSS. The data forms of campus are demo1, DEMO2 and.... demo127Copy the code
  • /www
    • /wwwroot
      • /demo1
        • a.file
        • b.file
        • c.file
        • db.sqlite3
        • .
      • /demo2
        • a.file
        • b.file
        • c.file
        • db.sqlite3
        • .
      • .

I have corresponding two kinds of code, one is single thread, one is multithreaded. But I want to use the coroutine yield/yield from for concurrency. Single-thread time is about 29 seconds, multi-thread time is about 9 seconds, and the yield code I wrote myself is about 31 seconds, so I might as well not bother. There are corresponding codes behind.

requirements

I want someone to help me optimize the coroutine code. I/O has two places, one is local file read and write, one is Aliems_oss.put_object(remote_file, data)Are all the I/O. I hope you can deal with it.

# multithreaded
import os
import oss2
from datetime import datetime
from dateutil.relativedelta import relativedelta
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED, ProcessPoolExecutor


def ems_oss2() :
    auth = oss2.Auth('aTAasINNBhdfZHLfnD'.'AqOQaeLfaAoAsavyPGahgdptra37') Error authentication information
    bucket = oss2.Bucket(auth, 'http://oss-country-area-internal.aliyuncs.com'.'bucket_name') The address is not real
    return bucket


def upload_db(second_dir, db_path) :
    remote_file = 'old_sqlite/{}/{}/{}/{}.sqlite3'.format(
                        datetime.now().strftime("%Y-%m"),
                        datetime.now().strftime("%d"),
                        datetime.now().strftime("%H"), 
                        second_dir)
    ems_oss = ems_oss2()
    with open(db_path, 'rb') as f:
        data = f.read()
    if not ems_oss.object_exists(remote_file):
        ems_oss.put_object(remote_file, data)

def delete_db() :
    old_data = datetime.now() - relativedelta(weeks=3)
    remote_file = 'old_sqlite/{}/{}/{}/'.format(
                        old_data.strftime("%Y-%m"),
                        old_data.strftime("%d"),
                        old_data.strftime("%H"))
    ems_oss = ems_oss2()
    key_list = [i.key for i in oss2.ObjectIteratorV2(ems_oss, prefix=remote_file)]
    if key_list:
        ems_oss.batch_delete_objects(key_list)

if __name__ == "__main__":
    import time
    time_start = time.time()
    BASE_PATH = '/www/wwwroot'
    first_dir = os.listdir(BASE_PATH)
    count = 0
    all_task = []
    executor = ThreadPoolExecutor(max_workers=20)
    for _ in first_dir:
        second_dir = os.path.join(BASE_PATH, _)
        if os.path.isdir(second_dir):
            # os.chdir(second_dir)
            db_path = os.path.join(second_dir, "db.sqlite3")
            if os.path.exists(db_path):
                print('count= '+str(count), second_dir)
                count += 1
                all_task.append(executor.submit(upload_db, (_), (db_path)))
            else:
                print('no db.sqlite3')
            # os.chdir(BASE_PATH)
    print(all_task)
    print(executor)
    for future in as_completed(all_task):
        data = future.result()
        print("{} is finshed".format(data))
    delete_db()
    time_end = time.time()
    print("Total time {}:{}".format((time_end-time_start)//60, (time_end-time_start)%60))
Copy the code

Generally speaking, multithreading time is about 9s

# single thread
import os
import oss2
from datetime import datetime
from dateutil.relativedelta import relativedelta


def ems_oss2() :
    auth = oss2.Auth('aTAasINNBhdfZHLfnD'.'AqOQaeLfaAoAsavyPGahgdptra37') Error authentication information
    bucket = oss2.Bucket(auth, 'http://oss-country-area-internal.aliyuncs.com'.'bucket_name') The address is not real
    return bucket



def upload_db(second_dir, db_path) :
    remote_file = 'old_sqlite/{}/{}/{}/{}.sqlite3'.format(
                        datetime.now().strftime("%Y-%m"),
                        datetime.now().strftime("%d"),
                        datetime.now().strftime("%H"), 
                        second_dir)
    ems_oss = ems_oss2()
    with open(db_path, 'rb') as f:
        data = f.read()
    if not ems_oss.object_exists(remote_file):
        ems_oss.put_object(remote_file, data)

def delete_db() :
    old_data = datetime.now() - relativedelta(weeks=3)
    remote_file = 'old_sqlite/{}/{}/{}/'.format(
                        old_data.strftime("%Y-%m"),
                        old_data.strftime("%d"),
                        old_data.strftime("%H"))
    ems_oss = ems_oss2()
    key_list = [i.key for i in oss2.ObjectIteratorV2(ems_oss, prefix=remote_file)]
    if key_list:
        ems_oss.batch_delete_objects(key_list)

if __name__ == "__main__":
    import time
    time_start = time.time()
    BASE_PATH = '/www/wwwroot'
    first_dir = os.listdir(BASE_PATH)
    count = 0
    for _ in first_dir:
        second_dir = os.path.join(BASE_PATH, _)
        if os.path.isdir(second_dir):
            # os.chdir(second_dir)
            db_path = os.path.join(second_dir, "db.sqlite3")
            if os.path.exists(db_path):
                print('count= '+str(count), second_dir)
                count += 1
                upload_db(_, db_path)
            else:
                print('no db.sqlite3')
            # os.chdir(BASE_PATH)
    delete_db()
    time_end = time.time()
    print("Total time {}:{}".format((time_end-time_start)//60, (time_end-time_start)%60))

Copy the code

The average single-thread time is about 29 seconds

import os
import oss2
from datetime import datetime
from dateutil.relativedelta import relativedelta
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED, ProcessPoolExecutor


def ems_oss2() :
    auth = oss2.Auth('aTAasINNBhdfZHLfnD'.'AqOQaeLfaAoAsavyPGahgdptra37') Error authentication information
    bucket = oss2.Bucket(auth, 'http://oss-country-area-internal.aliyuncs.com'.'bucket_name') The address is not real
    return bucket


def yield_read_db(db_path) :
    with open(db_path, 'rb') as f:
        yield f.read()

def yield_upload_db(second_dir, db_path) :
    remote_file = 'old_sqlite/{}/{}/{}/{}.sqlite3'.format(
                        datetime.now().strftime("%Y-%m"),
                        datetime.now().strftime("%d"),
                        datetime.now().strftime("%H"),
                        second_dir)

    ems_oss = ems_oss2()
    data = yield
    if not ems_oss.object_exists(remote_file):
        if not data:
            print(remote_file, "No data received.")
        ems_oss.put_object(remote_file, data)

def delete_db() :
    old_data = datetime.now() - relativedelta(weeks=3)
    remote_file = 'old_sqlite/{}/{}/{}/'.format(
                        old_data.strftime("%Y-%m"),
                        old_data.strftime("%d"),
                        old_data.strftime("%H"))
    ems_oss = ems_oss2()
    key_list = [i.key for i in oss2.ObjectIteratorV2(ems_oss, prefix=remote_file)]
    if key_list:
        ems_oss.batch_delete_objects(key_list)



if __name__ == "__main__":
    import time
    time_start = time.time()
    BASE_PATH = '/www/wwwroot'
    first_dir = os.listdir(BASE_PATH)
    count = 0
    db_path_list = []
    for _ in first_dir:
        Sqlite3 = db.sqlite3 = db.sqlite3 = db.sqlite3 = db.sqlite3
        second_dir = os.path.join(BASE_PATH, _)
        if os.path.isdir(second_dir):
            # os.chdir(second_dir)
            db_path = os.path.join(second_dir, "db.sqlite3")
            if os.path.exists(db_path):
                print('count= '+str(count), second_dir)
                count += 1
                db_path_list.append((_, db_path))
            else:
                print('no db.sqlite3')
            # os.chdir(BASE_PATH)
    db_path_list_length = len(db_path_list)
    for i in range(db_path_list_length//10 + 1) :# todo: read_data
        Instantiate 10 groups of read_data functions at once, de facto producers
        once_yield_list = [yield_read_db(db_path_list[_+10*i][1]) for _ in range(10) if_ +10*i < db_path_list_length]
        Instantiate 10 groups of upload functions at once, de facto consumers
        once_upload_list = [yield_upload_db(db_path_list[_+10*i][0], db_path_list[_+10*i][1]) for _ in range(10) if_ +10*i < db_path_list_length]
        data_list = [once_yield_list[i].send(None) for i in range(len(once_yield_list))]
        [once_upload_list[i].send(None) for i in range(len(once_yield_list))]
        for i in range(len(once_yield_list)):
            try:
                once_upload_list[i].send(data_list[i]) # Get consumers to spend
            except StopIteration:
                pass
        # [once_upload_list[i].send(data_list[i]) for i in range(len(once_yield_list))]
        # for i in range(len(once_yield_list)):
        # data = once_yield_list[I]. Send (None) #
        # if data:
        # once_upload_list[I].send(None) #
                # try:
                # once_upload_list[I].send(data) #
                # except StopIteration:
                # pass


    time_end = time.time()
    print("Total time {}:{}".format((time_end-time_start)//60, (time_end-time_start)%60))

Copy the code

This garbage coroutine code is 31s.