The original address
If you’re in data science /AI/ machine learning, you might struggle with the speed of loading and dropping large amounts of data, because IO is the most time-consuming process. People often joke about how little space Python can optimize, but the truth is we can do better. Hopefully this article has been of some help to your program.
In this paper, the IO efficiency improvement is limited to the IO problems of large array (tensor, matrix) data objects represented by Numpy. Ndarray in the field of data science. The solution is parallel writes/reads based on multithreading/multiprocessing. Different from network IO and ordinary I/O problems with small data volume, large matrix objects in data science are often accompanied by matrix slicing and other operations, and their memory occupation (whether replication, movement, etc.) is unknown, and they are more likely to fall into the problem of redundant memory occupation, which will affect THE I/O efficiency. This article explores the following topics:
- Parallel read-write method and performance comparison based on multi-thread/process
- Note redundant memory copies in parallel I/OS
- Summary of best Practices
IO scene
The IO scenario discussed in this article is simple: load big data from disk for processing and store the results. This situation is common in various machine learning frameworks, and data load and dump are the most basic problems to be solved. Some of the principles and techniques discussed below are also present in the IO interfaces of PyTorch, TensorFlow, and others.
In the data science scenario, the efficiency of read and write can be optimized from the following aspects:
- Start from the file encoding format, adopt
pkl
And so on the binary code to speed up reading and writing - From the reading and writing interface optimization, using DirectIO/ zero copy optimization
- Block and batch parallel read and write, suitable for relatively independent data
The first of these three methods is simple to operate, but encoded in a form that is not easily compatible with other languages/tools. The second is a bit overkill for Python, and Python’s IO interface is not as explicit as static languages, although os.open(CLONE_FLATS=…) can be used directly. But using optimizations like DirectIO[4] or MMAP would add design costs. The third method involves multi-threading/process, but does not involve communication and synchronization, and is relatively simple to practice.
Multi-threaded/multi-process parallel reading and writing
Parallel basic logic
The parallel read and write logic caused by multiple processes is very simple, and the main overhead is the management of processes by the operating system. The theoretical support of multithreading for parallel reading and writing needs to be mentioned again (for Cpython). The following figure [1] shows the processing of GIL for thread IO scenarios.
The figure above also shows that the main overhead of multithreading is the sum of the run phases of each thread and the overhead of managing threads by the operating system.
Multithreading for Cpython still needs to be noted
- Linux is full POSIX-Thread, which means that the scheduling pattern is still 1:1 user-kernel mapping
- Cpython multithreading shares global variables in the interpreter by default
- The IO time for the thread to release the GIL is after the underlying basic IO system call
- Multithreading on scheduling communication using semaphore, conditional variables and other methods
Standard library interface evaluation
We designed a small experiment to test the efficiency of parallel writing of multithreaded/process results provided by the CPython standard library:
import os
import numpy as np
import time
from multiprocessing import Process
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import Thread
from memory_profiler import profile
# Time calculator
class Benchmark:
def __init__(self, text) :
self.text = text
def __enter__(self) :
self.start = time.time()
def __exit__(self, *args) :
self.end = time.time()
print("%s: consume: %s" % (self.text, self.end - self.start))
# Base Task
def store_task(data: np.ndarray, output, index) :
fname = "%s_worker_%s.csv" % (output, index)
np.savetxt(fname, data, delimiter='\t')
#main data source
worker_num = os.cpu_count()
big_data = np.random.rand(1000000.10)
task_num = big_data.shape[0] // worker_num
# 1. multiprocessing.Porcess
@profile
def loop_mp() :
pool = []
for i in range(worker_num):
start = i * task_num
end = (i+1) * task_num
p = Process(target=store_task, args=(big_data[start: end], 'testdata/', i))
p.start()
pool.append(p)
for p in pool:
p.join()
# 2. threading.Thread
@profile
def mt_thread() :
pool = []
for i in range(worker_num):
start = i * task_num
end = (i+1) * task_num
t = Thread(target=store_task, args=(big_data[start: end], 'testdata/thread', i))
t.start()
pool.append(t)
for p in pool:
p.join()
# 3. multiprocessing.Pool
@profile
def mp_pool() :
with Pool(processes=worker_num) as pool:
tasks = []
for i in range(worker_num):
start = i * task_num
end = (i+1) * task_num
tasks.append(
pool.apply_async(store_task_inner, (big_data[start: end], 'testdata/mp_pool', i)))
pool.close()
pool.join()
# 4. ProcessPoolExecutor
@profile
def loop_pool() :
with ProcessPoolExecutor(max_workers=worker_num) as exe:
for i in range(worker_num):
start = i * task_num
end = (i+1) * task_num
exe.submit(store_task, big_data[start: end], 'testdata/pool', i)
# 5. ThreadPoolExecutor
def loop_thread() :
with ThreadPoolExecutor(max_workers=worker_num) as exe:
for i in range(worker_num):
start = i * task_num
end = (i+1) * task_num
exe.submit(store_task, big_data[start: end], 'testdata/pool_thread', i)
# 6. direct
@profile
def direct() :
store_task(big_data, 'testdata/all'.0)
if __name__ == '__main__':
with Benchmark("loop mp"):
loop_mp()
with Benchmark("mt thread"):
mt_thread()
with Benchmark("mp pool"):
mp_pool()
with Benchmark("loop pool"):
loop_pool()
with Benchmark("direct"):
direct()
with Benchmark("Thread"):
loop_thread()
Copy the code
Analyze the efficiency of each interface in terms of time consumption and memory (test environment MacOS 2.2ghz quad-core Intel Core I7) :
interface | Time consuming | memory |
---|---|---|
multiprocessing.Process |
5.14 s | p.start() Incurs extra overhead, triggering replication of parameters |
theading.Thread |
10.34 s | No extra overhead |
multiprocessing.Pool |
4.18 s | Pool() Build overhead, parameters are not duplicated |
ProcessPoolExecutor |
3.69 s | Parameters are not copied |
ThreadPoolExecutor |
10.82 s | No extra overhead |
direct | 22.04 s | No extra overhead |
Time cost analysis
Intuitively, a multi-process interface speeds up 4-4.5X and multi-threading speeds up half the time. The reason why multi-threading is slower than multi-process is complicated. In principle, the overhead of switching is smaller than that of the process. But in this case, multi-threading also involves communication between threads on scheduling, while multi-process runs independently. Of course, interested friends can also choose asyncio. Tasks based on multiplexing interface comparison, the disadvantage is that it is difficult to find suitable non-blocking read and write interface.
It is worth noting that the speed of the two interfaces for multiple processes is also quite different, with the Process mode being much slower than that of thread pools, probably due to the overhead of copying data. The next section discusses how pooling technology avoids copying data.
Memory overhead analysis
Due to the limitations of CPython’s data type, it cannot be explicitly shown whether data is copied from threading and multiprocessing. In principle, Thread() does not need to copy data while Process does. However, the table above shows that the two thread pool-based methods multiprcocessing.Pool and ProcessPoolExecutor do not copy data.
The @Profile in the code is a tripartite library of in-memory analysis, but his results are not sufficient. Where the result of Process is
Line # Mem usage Increment Occurences Line Contents= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =29 101.3 MiB 101.3 MiB 1 @profile
30 def loop_mp() :
31 101.3 MiB 0.0 MiB 1 pool = []
32 120.6 MiB 0.0 MiB 9 for i in range(worker_num):
33 120.6 MiB 0.0 MiB 8 start = i * task_num
34 120.6 MiB 0.0 MiB 8 end = (i+1) * task_num
35 120.6 MiB 0.0 MiB 8 p = Process(target=store_task, args=(big_data[start: end], 'testdata/', i))
36 120.6 MiB 19.3 MiB 8 p.start()
37 120.6 MiB 0.0 MiB 8 pool.append(p)
38 120.6 MiB 0.0 MiB 9 for p in pool:
39 120.6 MiB 0.0 MiB 8 p.join()
Copy the code
Big_data [start: end] big_data[start: end] big_data[start: end] This is very different from the fork system call, which explicitly passes in CLONE_FLAGS to stipulate data copying between the child and parent processes. Look at ProcessPoolExecutor
Line # Mem usage Increment Occurences Line Contents= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = 68 MiB MiB 1 @ 121.1 121.1 profile 69 def loop_pool () : 70 121.1 MiB 0.0 MiB 1 with ProcessPoolExecutor(max_workers=worker_num) as exe: 71 121.2 MiB -0.0 MiB 9for i inrange(worker_num): 72 121.2 MiB 0.0 MiB 8 Start = I * task_num 73 121.2 MiB 0.0 MiB 8 End = (I +1) * task_num 74 121.2 MiB 0.1 MiB 8 exe.submit(store_task, big_data[start: end],'testdata/pool', i)
Copy the code
It looks like there’s no copy happening, but is there? Since exe.submit does not directly trigger the build of Process(), we have to dig into the Pool technology to understand this problem.
A lot of Pythonista work has been done on Cpython source parsing. See from reference [2] that the encapsulation logic for ProcessPoolExecutor is
|======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ | | => | Work Ids | => | | => | Call Q | => | | | | +----------+ | | +-----------+ | | | | | . | | | |... | | | | | | 6 | | | | 5, call() | | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | |# 1.. n || Executor | | Thread | | | | | +----------- + | | +-----------+ | | | | <=> | Work Items | <=> | | <= | Result Q | <= | | | | +------------+ | | +-----------+ | | | | | 6: call() | | | | ... | | | | | | future | | | | 4, result | | | | | | ... | | | | 3, except, | | | + - + -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- - + + -- -- -- -- -- -- -- -- -- +Copy the code
Does this process sound familiar? Yes, it is very similar to the previous article [C++ makes wheels] that constructed a thread pool based on a pthread pool:
- Use queues to maintain tasks
- A Pool is created with an empty process
- A dedicated management thread manages and monitors the Pool
If queue.put () and queue.get () are copied or not, the data copy will be performed. Multiporcessing.Queue is an important interface for multi-process communication. It is based on shared memory, and the transmission of parameter data does not copy, which is extremely important for large Ndarray objects.
ndarray
Object copy of
Everything is an object in the Python world. — Py Circle quotes
In the face of enterprise-level big data, it is often not easy to pinpoint the cause of high memory/video memory usage in Python programs. Dynamic reference type + GC makes Python’s memory management easier, but unnecessary data copying should be avoided.
Slice and combination
Slicing and combining are common operations in vector/matrix/tensor libraries represented by NUMpy, and it is difficult to analyze whether the underlying copy occurs:
import numpy as np
A = np.random.rand(1 << 8.1 << 8)
B = A[:16]
del A ## can not release A's mem, for B's reference
print(A) ## error, the ref A has not exist yet,however its mem still exist
C = np.random.rand(1 << 4.1 << 8)
D = np.concatenate([B, C], axis=1) ## D is a copy of B+C memory
Copy the code
For concatenate, memory distribution determines whether replication occurs [6]:
00 04 08 0C 10 14 18 1C 20 24 28 2C
| | | | | | | | | | | |
[data1 ][foo ][data2 ][bar ][concat(data1, data2) ]
data1 & data2 displayed in different place, concat them can only cover a new place.
Copy the code
Order =[‘C’, ‘F’] to determine whether the array is arranged by row or column. [7] Another method is to explore whether the slice can eventually be converted into the form of slice(start, offset, stride). [:] Slice (None, None, None), which is also copy.[8]
Do not need to care when the amount of data is small, but when the data size reaches the memory limit, large Ndarray slice operation must be careful.
Replication at process creation time
We want to slice the data and pass it to the child process, and we want the data not to be copied and the processes to share the large Ndarray. Process(target=func, args=(nDARray [start:offset]))) must copy ndarray. The main technique used here is the shared memory approach of Multiprocessing.
Shared_memeory has been added since Python3.8 to provide a unified, easy-to-use interface for all previous ways of sharing memory. Let’s use share_memory to modify the code from the previous section:
from multiprocessing import shared_memory
def store_task_sha_v2(start, end, output, index, sha_name, shape, dtype) :
fname = "%s_worker_%s.csv" % (output, index)
exist_sham = shared_memory.SharedMemory(name=sha_name)
data = np.ndarray(shape, dtype=dtype, buffer=exist_sham.buf)
print(sha_name, data.shape, index)
np.savetxt(fname, data[start: end], delimiter='\t')
del data
exist_sham.close()
@profile
def mp_pool_sha() :
shm = shared_memory.SharedMemory(create=True, size=big_data.nbytes)
b = np.ndarray(big_data.shape, dtype=big_data.dtype, buffer=shm.buf)
b[:] = big_data[:]
print(b.shape)
with ProcessPoolExecutor(max_workers=worker_num) as pool:
tasks = []
for i in range(worker_num):
start = i * task_num
end = (i+1) * task_num
tasks.append(
pool.submit(store_task_sha_v2,
start, end, 'testdata/mp_pool_sha', i ,
shm.name, b.shape, b.dtype))
for t in tasks:
# Note! Catch the exception here, as ProcessPoolExecutor recommends!
try:
print(t.result())
except Exception as e:
print(f'{e}')
del b
shm.close()
shm.unlink()
Copy the code
The code is quite complex, but the logic is simple: apply for a shared buffer -> map local-ndarray objects -> put data into the shared cache -> read and write to other processes -> close the cache. Share_memeory also has the advantage that it can apply for local-variable to share at any time.
Summary of best Practices
Parallel read file loadingndarray
Add that your training data is large and requires stream processing (training). Load it directly using modules such as Torch. Datasets, which encapsulate the parallel stream processing.
If you need to load RAM processing at one time (such as KNN algorithm), you can use block parallel reading:
def parallize_load(file, total_num, worker_num) :
"""Load embedding file parallelization @emb_file: source filename @total_num: total lines @worker_num: parallelize process num return: np.ndaary """
def load_from_txt(emb, start, n_rows, arr_list) :
data = np.loadtxt(emb, skiprows=start, max_rows=n_rows)
arr_list.append(data)
worker_load_num = total_num // worker_num
pool = []
with Manager() as manager:
arr_list = manager.list([])
for index in range(worker_num):
s = index * worker_load_num
ifindex ! = worker_num -1:
e = worker_load_num
else:
e = total_num - (worker_load_num * index)
p = Process(target=load_from_txt, args=(emb_file, s, e, arr_list))
pool.append(p)
p.start()
for p in pool:
p.join()
arr = np.concatenate(arr_list)
return arr
source_total_num = sum(1 for line in open("souce_big_file"."rb"))
source_emb_data = parallize_load("souce_big_file", source_total_num, worker_num)
Copy the code
This is basically a multiple of worker_numX acceleration.
Parallel writing practice
- Avoid slicing and combining large-ndarray objects.
- Avoid using
for-loop
, multi-purpose matrix operation - Multi-process file writing is more efficient and the logic is simpler. However, you should always be careful not to copy data between processes
- Use tripartite library IO interface as far as possible
np.savetxt
.df.to_csv
They may be optimized for exceptions, chunk writes, etc - When writing strings, concatenate them as much as possible
'\t'.join(List[])
, don’t use itfor ele in List: fp.write("%s\t%s\n" % (ele))
More work
The objects discussed in this paper are only limited to the RAM and disk of host-device. For the more common GPU-MEM, it is too painful for the interfaces of Python’s many tripartite libraries. They often omit the process of allocation – application – scheduling – communication – destruction. At this point, we can continue to look at best practices for video memory.
Finally, this article may surprise you, because optimizing Python is a thankless task. But I have to say that in my current job, these solutions solve many problems of the original program with constrain resources. Of course, the current mainstream machine learning algorithm processes are based on stream processing, and it seldom happens that the local read and write is used manually.
- [1] Understanding GIL
- [2] Lib/concurrent/futures/process.py
- [3] Python path
- [4] Direct IO in Python
- [5] Python Doc: 17.2.1.5. Sharing state between processes
- [6] In-place numpy array concatenation? # 13279
- [7] Numpy: views vs copy by slicing
- [8] Views versus copies in NumPy