Multiple threads (connection pooling) operate MySQL to insert data
What I learned from this blog:
- First, you can build connected databases
The connection pool
, so that you canMultiple open connection, join different tables at the same time for query,insertFor multithreading operation database foundation - Multithreading According to the way of multi-connection, in order to complete multi-language warehousing operation, we can enable multithreading for parallel operation of data in different languages
- Cur.executemany = cur.executemany = cur.executemany = cur.executemany = cur.executemany = cur.executemany = cur.executemany = cur.executemany = cur.executemany = cur.executemany
1. Main modules
DBUtils: The Threading module suite that allows connections between multi-threaded applications and databases
2. Create a connection pool
PooledDB
Mincached: The minimum number of idle connections. If the number of idle connections is less than this number, the Pool automatically creates new connections. Maxcached: Indicates the maximum number of idle connections. If the number of idle connections is greater than this, the Pool closes idle connections. Maxconnections: The maximum number of connections; Blocking: if True, the application will wait until the current number of connections is less than the maximum number of connections. If False, an error is reported.
def mysql_connection() :
maxconnections = 15 # Maximum number of connections
pool = PooledDB(
pymysql,
maxconnections,
host='localhost',
user='root',
port=3306,
passwd='123456',
db='test_DB',
use_unicode=True)
return pool
# Usage
pool = mysql_connection()
con = pool.connection()
Copy the code
3. Data preprocessing
Four copies of virtual data were prepared for testing, with 100,000, 500,000, 1,000,000 and 5,000,000 lines of data respectively
MySQL > alter table table_name
Data processing ideas:
- Each line contains one record, and each field is separated by the TAB character “\ T”, with double quotation marks.
- The read data type is Bytes;
- The result is a nested list format for a multi-threaded loop that processes 100,000 rows per task at a time; Format: [[(A, B, C, D), (A, B, C, D), (A, B, C, D),…], [(A, B, C, D), (A, B, C, D), (A, B, C, D),…], [],…
import re
import time
st = time.time()
with open("10w.txt"."rb") as f:
data = []
for line in f:
line = re.sub("\s"."".str(line, encoding="utf-8"))
line = tuple(line[1: -1].split("\" \ ""))
data.append(line)
n = 100000 Split into nested lists in the smallest unit per 100,000 rows of data
result = [data[i:i + n] for i in range(0.len(data), n)]
print("100,000 lines of data, time :{}".format(round(time.time() - st, 3)))
# 100,000 lines of data, time :0.374
# 500,000 rows of data, time :1.848
# 1 million rows of data, time :3.725
5 million rows of data, time :18.493
Copy the code
4. Threaded tasks
Each time the insert function is called, a link operation is taken from the pool and the link is closed. Executemany Batch operations reduce commit times and improve efficiency.
def mysql_insert(*args) :
con = pool.connection()
cur = con.cursor()
sql = "INSERT INTO test(sku,fnsku,asin,shopid) VALUES(%s, %s, %s, %s)"
try:
cur.executemany(sql, *args)
con.commit()
except Exception as e:
con.rollback() Transaction rollback
print(SQL execution error, cause:, e)
finally:
cur.close()
con.close()
Copy the code
5. Start multithreading
Code thread:
Set maximum queue number, the value must be less than the maximum number of connections of the pool, or create a thread task need connection can’t satisfy, complains: pymysql. Err. OperationalError: (1040, ‘Too many connections’) the loop preprocesses the list data and adds tasks to the queue. If the queue reaches the maximum value or the current task is the last one, the queue starts to execute tasks in multiple threads until the queue is empty.
def task() :
q = Queue(maxsize=10) # set the maximum number of queues and threads
# data: Preprocessed data (nested list)
while data:
content = data.pop()
t = threading.Thread(target=mysql_insert, args=(content,))
q.put(t)
if (q.full() == True) or (len(data)) == 0:
thread_list = []
while q.empty() == False:
t = q.get()
thread_list.append(t)
t.start()
for t in thread_list:
t.join()
Copy the code
6. Complete example
import pymysql
import threading
import re
import time
from queue import Queue
from DBUtils.PooledDB import PooledDB
class ThreadInsert(object) :
MySQL > insert data into MySQL
def __init__(self) :
start_time = time.time()
self.pool = self.mysql_connection()
self.data = self.getData()
self.mysql_delete()
self.task()
print("========= Data insertion, total time :{}' =========".format(round(time.time() - start_time, 3)))
def mysql_connection(self) :
maxconnections = 15 # Maximum number of connections
pool = PooledDB(
pymysql,
maxconnections,
host='localhost',
user='root',
port=3306,
passwd='123456',
db='test_DB',
use_unicode=True)
return pool
def getData(self) :
st = time.time()
with open("10w.txt"."rb") as f:
data = []
for line in f:
line = re.sub("\s"."".str(line, encoding="utf-8"))
line = tuple(line[1: -1].split("\" \ ""))
data.append(line)
n = 100000 Split into nested lists in the smallest unit per 100,000 rows of data
result = [data[i:i + n] for i in range(0.len(data), n)]
print("Total {} group data, each group {} elements.==>> Time :{}'s".format(len(result), n, round(time.time() - st, 3)))
return result
def mysql_delete(self) :
st = time.time()
con = self.pool.connection()
cur = con.cursor()
sql = "TRUNCATE TABLE test"
cur.execute(sql)
con.commit()
cur.close()
con.close()
print("Clear the original data.==>> Time :{}'s".format(round(time.time() - st, 3)))
def mysql_insert(self, *args) :
con = self.pool.connection()
cur = con.cursor()
sql = "INSERT INTO test(sku, fnsku, asin, shopid) VALUES(%s, %s, %s, %s)"
try:
cur.executemany(sql, *args)
con.commit()
except Exception as e:
con.rollback() Transaction rollback
print(SQL execution error, cause:, e)
finally:
cur.close()
con.close()
def task(self) :
q = Queue(maxsize=10) # set the maximum number of queues and threads
st = time.time()
while self.data:
content = self.data.pop()
t = threading.Thread(target=self.mysql_insert, args=(content,))
q.put(t)
if (q.full() == True) or (len(self.data)) == 0:
thread_list = []
while q.empty() == False:
t = q.get()
thread_list.append(t)
t.start()
for t in thread_list:
t.join()
print("Data insertion complete.==>> Time :{}'s".format(round(time.time() - st, 3)))
if __name__ == '__main__':
ThreadInsert()
Copy the code
Insert data comparison
A total of obtaining1Groups of data, each group100000== >> Time:0.374Clear the original data == >> > Time:0.031'Data is inserted successfully.== >> > Time:2.499S =============== 10w data insertion time:3.092S ===============5Groups of data, each group100000== >> Time:1.745Clear the original data == >> > Time:0.0'Data is inserted successfully.== >> > Time:16.129S =============== 50w data insertion time:17.969S ===============10Groups of data, each group100000== >> Time:3.858Clear the original data == >> > Time:0.028'Data is inserted successfully.== >> > Time:41.269S =============== 100w data insertion time:45.257S ===============50Groups of data, each group100000== >> Time:19.478Clear the original data == >> > Time:0.016'Data is inserted successfully.== >> > Time:317.346S =============== 500w data insertion, total time:337.053's = = = = = = = = = = = = = = =Copy the code
7. Think/summarize
Thinking: multi-threaded + queue can basically meet the needs of daily work, but there is still insufficient to think carefully; In this example, 10 thread tasks are executed at a time. The queue can be added only after the 10 tasks are completed, which will cause the queue to be idle. If the remaining 1 task is not completed, the idle number is 9, and the resources and time in the task are wasted; Is it possible to keep the queue full and refill it every time a task is completed?