Read the directory

Multithreading in Python is not really multithreading, and in most cases it is necessary to use multiprocessing if you want to make full use of multi-core CPU resources. Python provides a very useful multiprocess package called Multiprocessing. You only need to define a function and Python will do everything else. With this package, you can easily convert from single process to concurrent execution. Multiprocessing supports child processes, communication and sharing of data, various forms of synchronization, and provides components such as Process, Queue, Pipe, and Lock.

 

Back to the top

1. Process

Create the class of the Process: Process([group [, target [, name [, args [, kwargs]]]]]), target represents the call object, args represents the location parameter tuple of the call object. Kwargs represents the dictionary of the calling object. Name is an alias. Group is essentially not used. Methods: is_alive(), join([timeout]), run(), start(), terminate(). Where Process starts a Process with start().

Properties: AuthKey, Daemon (to be set by start()), exitCode (the process is None at run time, if -n, it is terminated by signal N), Name, PID. Daemon automatically terminates when the parent process terminates and cannot create new processes. It must be set before start().

 

Example 1.1: Create a function as a single process

import multiprocessing
import time

def worker(interval):
    n = 5
    while n > 0:
        print("The time is {0}".format(time.ctime()))
        time.sleep(interval)
        n -= 1

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print "p.pid:", p.pid
    print "p.name:", p.name
    print "p.is_alive:", p.is_alive()Copy the code

The results of

?

1, 2, 3, 4, 5, 6, 7, 8 p.pid: 8736 p.name: Process- 1 p.is_alive: True The time is Tue Apr 21 20:55:12 2015 The time is Tue Apr 21 20:55:15 2015 The time is Tue Apr 21 20:55:18 2015 The time is Tue Apr 21 20:55:21 2015 The time is Tue Apr 21 20:55:24 2015

 

Example 1.2: Create a function and treat it as multiple processes

import multiprocessing import time def worker_1(interval): print "worker_1" time.sleep(interval) print "end worker_1" def worker_2(interval): print "worker_2" time.sleep(interval) print "end worker_2" def worker_3(interval): print "worker_3" time.sleep(interval) print "end worker_3" if __name__ == "__main__": p1 = multiprocessing.Process(target = worker_1, args = (2,)) p2 = multiprocessing.Process(target = worker_2, args = (3,)) p3 = multiprocessing.Process(target = worker_3, args = (4,)) p1.start() p2.start() p3.start() print("The number of CPU is:" + str(multiprocessing.cpu_count())) for p in  multiprocessing.active_children(): print("child p.name:" + p.name + "\tp.id" + str(p.pid)) print "END!!!!!!!!!!!!!!!!!"Copy the code

The results of

?

1 2 3 4 5 6 7 8 9 10 11 The number of CPU is:4 child   p.name:Process- 3    p.id7992 child   p.name:Process2 -    p.id4204 child   p.name:Process- 1    p.id6380 END!!!!!!!!!!!!!!!!! worker_1 worker_3 worker_2 end worker_1 end worker_2 end worker_3

 

Example 1.3: Define a process as a class

import multiprocessing
import time

class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("the time is {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1

if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()      Copy the code

Note: run() is automatically called when process P calls start().

The results of

?

One, two, three, four, five the time is Tue Apr 21 20:31:30 2015 the time is Tue Apr 21 20:31:33 2015 the time is Tue Apr 21 20:31:36 2015 the time is Tue Apr 21 20:31:39 2015 the time is Tue Apr 21 20:31:42 2015

 

Example 1.4: Daemon comparison results

#1.4-1 No daemon attributes

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print "end!"Copy the code

The results of

?

1 2 3 end! work start:Tue Apr 21 21:29:10 2015 work end:Tue Apr 21 21:29:13 2015

#1.4-2 plus daemon properties

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    print "end!"Copy the code

The results of

?

1 end!

Note: Factor processes set daemon properties and end when the main process ends.

#1.4-3 Setting the method to end daemon execution

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    p.join()
    print "end!"Copy the code

The results of

?

1 2 3 work start:Tue Apr 21 22:16:32 2015 work end:Tue Apr 21 22:16:35 2015 end!

 

Back to the top

2. Lock

Lock can be used to avoid access conflicts when multiple processes need to access a shared resource.

import multiprocessing
import sys

def worker_with(lock, f):
    with lock:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lockd acquired via with\n")
            n -= 1
        fs.close()
        
def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lock acquired directly\n")
            n -= 1
        fs.close()
    finally:
        lock.release()
    
if __name__ == "__main__":
    lock = multiprocessing.Lock()
    f = "file.txt"
    w = multiprocessing.Process(target = worker_with, args=(lock, f))
    nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print "end"Copy the code

Result (output file)

?

12 3 4 5 6 7 8 10 11 12 13 14 15 16 17 18 Lockd acquired via with Lockd acquired via with Lockd acquired via with Lockd acquired via with Lockd acquired via with Lockd acquired via with Lockd acquired via with Lockd acquired via with Lockd acquired via with Lock acquired directly Lock acquired directly Lock acquired directly Lock acquired directly Lock acquired directly Lock acquired directly Lock acquired directly Lock acquired directly Lock acquired directly

 

Back to the top

3. Semaphore

Semaphore is used to control the number of accesses to shared resources, such as the maximum number of connections in the pool.

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name + "acquire");
    time.sleep(i)
    print(multiprocessing.current_process().name + "release\n");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()Copy the code

The results of

?

12 3 4 5 6 7 8 9 10 11 12 13 14 Process- 1acquire Process- 1release   Process2 -acquire Process- 3acquire Process2 -release   Process- 5acquire Process- 3release   Process4 -acquire Process- 5release   Process4 -release

 

Back to the top

4. Event

Event is used to implement synchronous communication between processes.

import multiprocessing
import time

def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->" + str(e.is_set()))

def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))

if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name = "block",
            target = wait_for_event,
            args = (e,))

    w2 = multiprocessing.Process(name = "non-block",
            target = wait_for_event_timeout,
            args = (e, 2))
    w1.start()
    w2.start()

    time.sleep(3)

    e.set()
    print("main: event is set")Copy the code

The results of

?

One, two, three, four, five wait_for_event: starting wait_for_event_timeout:starting wait_for_event_timeout:e.is_set->False main: event is set wairt_for_event: e.is_set()->True

 

Back to the top

5. Queue

import multiprocessing

def writer_proc(q):      
    try:         
        q.put(1, block = False) 
    except:         
        pass   

def reader_proc(q):      
    try:         
        print q.get(block = False) 
    except:         
        pass

if __name__ == "__main__":
    q = multiprocessing.Queue()
    writer = multiprocessing.Process(target=writer_proc, args=(q,))  
    writer.start()   

    reader = multiprocessing.Process(target=reader_proc, args=(q,))  
    reader.start()  

    reader.join()  
    writer.join()Copy the code

The results of

?

1 1

 

Back to the top

6. Pipe

import multiprocessing
import time

def proc1(pipe):
    while True:
        for i in xrange(10000):
            print "send: %s" %(i)
            pipe.send(i)
            time.sleep(1)

def proc2(pipe):
    while True:
        print "proc2 rev:", pipe.recv()
        time.sleep(1)

def proc3(pipe):
    while True:
        print "PROC3 rev:", pipe.recv()
        time.sleep(1)

if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
    #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))

    p1.start()
    p2.start()
    #p3.start()

    p1.join()
    p2.join()
    #p3.join()Copy the code

The results of

 

Back to the top

7. Pool

When using Python for system administration, especially when operating multiple files and directories at the same time or remotely controlling multiple hosts, parallelism can save a lot of time. When the number of objects being operated is small, you can directly use the Process in Multiprocessing to dynamically create multiple processes. A dozen or so processes are fine. However, if there are hundreds or thousands of objects, manually limiting the number of processes is too tedious. The Pool can provide a specified number of processes for the user to call. When a new request is submitted to the Pool, if the Pool is not full, a new process is created to execute the request. But if the number of processes in the pool has reached the specified maximum, the request will wait until any processes in the pool end before a new process is created for it.

 

Example 7.1: Using a process pool (non-blocking)

#coding: utf-8 import multiprocessing import time def func(msg): print "msg:", msg time.sleep(3) print "end" if __name__ == "__main__": pool = multiprocessing.Pool(processes = 3) for i in xrange(4): MSG = "hello %d" %(I) pool.apply_async(func, (MSG,)) Print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~" pool.close() pool.join() # Print "sub-process (es) done." print" sub-process (es) done."Copy the code

Result of one execution

?

1, 2, 3, 4, 5, 6, 7, 8, 9, 10 mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0   msg: hello 1 msg: hello 2 end msg: hello 3 end end end Sub-process(es) done.

Function explanation:

  • Apply_async (func[, args[, KWDS [, callback]]]) is non-blocking, and apply(func[, args[, KWDS]]) is blocking.
  • Close () closes the pool so that it does not accept new tasks.
  • Terminate () terminates the work process and does not process unfinished tasks.
  • The main join() process blocks, waiting for the child process to exit, and the join method is used after close or terminate.

Execution instructions: Xrange (4) creates a pool with 3 processes. Xrange (4) creates a pool with 3 processes. Xrange (4) creates 4 objects in succession. When one of the executions is complete, a process is left to process object 3, so the output “MSG: hello 3” appears after “end”. Since it is not blocking, the main function will execute its own, ignoring the execution of the process. Therefore, after running the for loop, it directly prints “mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”, and the main program waits for the end of each process at pool.join ().

 

Example 7.2: Using a process pool (blocking)

#coding: utf-8 import multiprocessing import time def func(msg): print "msg:", msg time.sleep(3) print "end" if __name__ == "__main__": pool = multiprocessing.Pool(processes = 3) for i in xrange(4): MSG = "hello %d" %(I) pool.apply(func, (MSG,)) Print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~" pool.close() pool.join() # Print "sub-process (es) done." print" sub-process (es) done."Copy the code

The result of an execution

?

1, 2, 3, 4, 5, 6, 7, 8, 9, 10 msg: hello 0 end msg: hello 1 end msg: hello 2 end msg: hello 3 end Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ Sub-process(es) done.

  

Example 7.3: Use process pools and focus on the results

import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"
    return "done" + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in xrange(3):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print ":::", res.get()
    print "Sub-process(es) done."Copy the code

Result of one execution

?

1, 2, 3, 4, 5, 6, 7, 8, 9, 10 msg: hello 0 msg: hello 1 msg: hello 2 end end end ::: donehello 0 ::: donehello 1 ::: donehello 2 Sub-process(es) done.

 

Example 7.4: Using multiple process pools

#coding: utf-8 import multiprocessing import os, time, random def Lee(): Print "\nRun task Lee-%s" %(os.getpid()) #os.getpid() start = time.time() time.sleep(random.random() * 10) End = time.time() print 'Task Lee, runs %0.2f seconds.' %(end-start) def Marlon(): print "\nRun task Marlon-%s" %(os.getpid()) start = time.time() time.sleep(random.random() * 40) end=time.time() print 'Task Marlon runs %0.2f seconds.' %(end-start) def Allen(): print "\nRun task Allen-%s" %(os.getpid()) start = time.time() time.sleep(random.random() * 30) end = time.time() print 'Task Allen runs %0.2f seconds.' %(end-start) def Frank(): print "\nRun task Frank-%s" %(os.getpid()) start = time.time() time.sleep(random.random() * 20) end = time.time() print 'Task Frank runs % 0.2 f seconds. The' % (end - start) if __name__ = = "__main__ ': function_list= [Lee, Marlon, Allen, Frank] print "parent process %s" %(os.getpid()) pool=multiprocessing.Pool(4) for func in function_list: Apply_async (func) # pool.apply_async(func) # pool.apply_async(func) # pool.apply_async(func) # pool.apply_async(func) # pool.apply_async(func) # pool.apply_async(func) # pool.apply_async 'pool.close() pool.join() # Call close() before calling join Print 'All subprocesses done 'print 'All subprocesses done.'Copy the code

Result of one execution

?

12 3 4 5 6 7 8 9 10 11 12 13 14 15 parent process 7704   Waiting for all subprocesses done... Run task Lee- 6948.   Run task Marlon- 2896.   Run task Allen- 7304.   Run task Frank- 3052. Task Lee, runs 1.59 seconds. Task Marlon runs 8.48 seconds. Task Frank runs 15.68 seconds. Task Allen runs 18.08 seconds. All subprocesses done.