Column address: a Python module per week

Meanwhile, welcome to follow my wechat official account AlwaysBeta, more exciting content waiting for you.

Multiprocessing is a standard module in Python that can be used to write both multiprocesses and multithreads. Dummy dummy dummy dummy dummy dummy dummy dummy dummy dummy dummy

basis

A Process can be created using the multiprocessing.process object, which has the same usage as Thread objects and also has methods like start(), run(), join(), and so on. The Process class is suitable for simple Process creation and can be used in conjunction with multiprocessing.queue if resource sharing is required. If you want to control the number of processes, you are advised to use the process Pool class.

The Process is introduced:

Construction method:

  • Process([group [, target [, name [, args [, kwargs]]]]])
  • Group: thread group, not yet implemented, must be None in library references;
  • Target: the method to execute;
  • Name: indicates the process name.
  • Args /kwargs: Parameters to be passed to the method.

Instance methods:

  • Is_alive () : returns whether the process is running.
  • Join ([timeout]) : Blocks the process in the current context until the process calling this method terminates or reaches the specified timeout (optional).
  • Start () : The process is ready for CPU scheduling.
  • Run () : Strat () calls the run method, and if target is not passed in when the instance process is instantiated, start executes the default run() method.
  • Terminate () : Immediately stop the work process whether or not the task is complete.

Properties:

  • authkey
  • Daemon: Same as setDeamon for threads (setting the parent as a daemon and terminating the child when the parent terminates).
  • Exitcode (the process is None at run time, or -n if terminated by signal N).
  • Name: indicates the process name.
  • Pid: indicates the process ID.

Here’s a simple example:

import multiprocessing


def worker(a):
    """worker function"""
    print('Worker')


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()

# output
# Worker
# Worker
# Worker
# Worker
# Worker        
Copy the code

The output result is that the Worker was printed five times, and we don’t know which Worker was printed by which process, depending on the order of execution, because each process is competing for access to the output stream.

So how do you know what the order is? This can be done by passing parameters to the process. Unlike threading, the parameters passed to the Multiprocessing Process must be serializable. Take a look at the code:

import multiprocessing


def worker(num):
    """thread worker function"""
    print('Worker:', num)


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()
        
# output
# Worker: 1
# Worker: 0
# Worker: 2
# Worker: 3
# Worker: 4
Copy the code

An importable object function

One difference between threading and Multiprocessing is the additional protection when used in __main__. Since the process has been started, the child process needs to be able to import scripts containing the target function. Wrapping the main part of the application in __main__ ensures that it is not run recursively in each subitem when importing a module. Another approach is to import the target function from a separate script. For example, multiprocessing_import_main.py uses the worker function defined in the second module.

# multiprocessing_import_main.py 
import multiprocessing
import multiprocessing_import_worker

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(
            target=multiprocessing_import_worker.worker,
        )
        jobs.append(p)
        p.start()
        
# output
# Worker
# Worker
# Worker
# Worker
# Worker
Copy the code

The worker function is defined in multiprocessing_import_worker.py.

# multiprocessing_import_worker.py 
def worker(a):
    """worker function"""
    print('Worker')
    return
Copy the code

Determine the current process

Passing parameters to identify or name processes is cumbersome and unnecessary. Each Process instance has a name, whose default value can be changed when the Process is created. Named processes are useful for tracking them, especially in applications that run multiple types of processes at the same time.

import multiprocessing
import time


def worker(a):
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(2)
    print(name, 'Exiting')


def my_service(a):
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(3)
    print(name, 'Exiting')


if __name__ == '__main__':
    service = multiprocessing.Process(
        name='my_service',
        target=my_service,
    )
    worker_1 = multiprocessing.Process(
        name='worker 1',
        target=worker,
    )
    worker_2 = multiprocessing.Process(  # default name
        target=worker,
    )

    worker_1.start()
    worker_2.start()
    service.start()
    
# output
# worker 1 Starting
# worker 1 Exiting
# Process-3 Starting
# Process-3 Exiting
# my_service Starting
# my_service Exiting
Copy the code

daemon

By default, the main program does not exit until all child processes exit. There are times when it is useful to start a background process running without preventing the main program from exiting, such as the task of generating a “heartbeat” for a monitoring tool.

Marking processes as daemons is as simple as setting the daemon property to True.

import multiprocessing
import time
import sys


def daemon(a):
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    time.sleep(2)
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()


def non_daemon(a):
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    time.sleep(1)
    n.start()
    
# output
# Starting: daemon 41838
# Starting: non-daemon 41841
# Exiting : non-daemon 41841
Copy the code

The output does not include “exit” messages from daemons, because all non-daemons (including the main program) exit before the daemons wake up from their two-second sleep state.

The daemon automatically terminates before the main program exits, which avoids running an isolated process. This can be verified by looking for the process ID value printed when the program is running, and then checking the process using the ps command.

Wait for the process

To wait until the process finishes its work and exits, use the join() method.

import multiprocessing
import time
import sys


def daemon(a):
    name = multiprocessing.current_process().name
    print('Starting:', name)
    time.sleep(2)
    print('Exiting :', name)


def non_daemon(a):
    name = multiprocessing.current_process().name
    print('Starting:', name)
    print('Exiting :', name)


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    time.sleep(1)
    n.start()

    d.join()
    n.join()
    
# output
# Starting: non-daemon
# Exiting : non-daemon
# Starting: daemon
# Exiting : daemon
Copy the code

Because the main process uses join() to wait for the daemon to exit, an exit message is printed at this point.

By default, join() blocks indefinitely. You can also pass a timeout argument (a floating point number representing the number of seconds to wait for the process to become inactive). If the process does not complete within the timeout period, join() returns anyway.

import multiprocessing
import time
import sys


def daemon(a):
    name = multiprocessing.current_process().name
    print('Starting:', name)
    time.sleep(2)
    print('Exiting :', name)


def non_daemon(a):
    name = multiprocessing.current_process().name
    print('Starting:', name)
    print('Exiting :', name)


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    n.start()

    d.join(1)
    print('d.is_alive()', d.is_alive())
    n.join()
    
# output
# Starting: non-daemon
# Exiting : non-daemon
# d.is_alive() True
Copy the code

Since the timeout passed is less than the time the daemon sleeps, the process is still “active” after join() returns.

Terminate the process

If you want a process to exit, it is best to signal it using the “poison pill” method, which is useful to forcibly terminate the process if it is hanging or deadlocking. Call Terminate () to kill the child process.

import multiprocessing
import time


def slow_worker(a):
    print('Starting worker')
    time.sleep(0.1)
    print('Finished worker')


if __name__ == '__main__':
    p = multiprocessing.Process(target=slow_worker)
    print('BEFORE:', p, p.is_alive())

    p.start()
    print('DURING:', p, p.is_alive())

    p.terminate()
    print('TERMINATED:', p, p.is_alive())

    p.join()
    print('JOINED:', p, p.is_alive())
    
# output
# BEFORE: <Process(Process-1, initial)> False
# DURING: <Process(Process-1, started)> True
# TERMINATED: <Process(Process-1, started)> True
# JOINED: <Process(Process-1, stopped[SIGTERM])> False
Copy the code

It is important to use join() on the process after terminating it to give the process management code time to update the object state to reflect the effect of termination.

Handling exit state

The status code generated when the process exits can be accessed through the exitCode property. The allowable ranges are listed in the table below.

Exit code meaning
= = 0 No errors
> 0 The process fails and exits the code
< 0 The process is killed by a signal-1 * exitcode
import multiprocessing
import sys
import time


def exit_error(a):
    sys.exit(1)


def exit_ok(a):
    return


def return_value(a):
    return 1


def raises(a):
    raise RuntimeError('There was an error! ')


def terminated(a):
    time.sleep(3)


if __name__ == '__main__':
    jobs = []
    funcs = [
        exit_error,
        exit_ok,
        return_value,
        raises,
        terminated,
    ]
    for f in funcs:
        print('Starting process for', f.__name__)
        j = multiprocessing.Process(target=f, name=f.__name__)
        jobs.append(j)
        j.start()

    jobs[- 1].terminate()

    for j in jobs:
        j.join()
        print('{:>15}.exitcode = {}'.format(j.name, j.exitcode))
        
# output
# Starting process for exit_error
# Starting process for exit_ok
# Starting process for return_value
# Starting process for raises
# Starting process for terminated
# Process raises:
# Traceback (most recent call last):
# File "... / lib/python3.6 / multiprocessing/process. Py, line 258, ""
# in _bootstrap
# self.run()
# File "... / lib/python3.6 / multiprocessing/process. Py, line 93, ""
# in run
# self._target(*self._args, **self._kwargs)
# File "multiprocessing_exitcode.py", line 28, in raises
# raise RuntimeError('There was an error! ')
# RuntimeError: There was an error!
# exit_error.exitcode = 1
# exit_ok.exitcode = 0
# return_value.exitcode = 0
# raises.exitcode = 1
# terminated.exitcode = -15
Copy the code

log

Accessing the internal structure of a Multiprocessing object is useful when debugging concurrency problems. There is a convenient module-level function to enable the called log called log_to_stderr(). It uses logging and adds handlers to set up logger objects to send log messages to the standard Error channel.

import multiprocessing
import logging
import sys


def worker(a):
    print('Doing some work')
    sys.stdout.flush()


if __name__ == '__main__':
    multiprocessing.log_to_stderr(logging.DEBUG)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()
    
# output
# [INFO/Process-1] child process calling self.run()
# Doing some work
# [INFO/Process-1] process shutting down
# [DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
# [DEBUG/Process-1] running the remaining "atexit" finalizers
# [INFO/Process-1] process exiting with exitcode 0
# [INFO/MainProcess] process shutting down
# [DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
# [DEBUG/MainProcess] running the remaining "atexit" finalizers
Copy the code

By default, the logging level is set to NOTSET to generate no messages. Pass different levels to initialize the logger to the required level of detail.

To manipulate the logger directly (changing its level Settings or adding handlers), use get_logger().

import multiprocessing
import logging
import sys


def worker(a):
    print('Doing some work')
    sys.stdout.flush()


if __name__ == '__main__':
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()
    
# output
# [INFO/Process-1] child process calling self.run()
# Doing some work
# [INFO/Process-1] process shutting down
# [INFO/Process-1] process exiting with exitcode 0
# [INFO/MainProcess] process shutting down    
Copy the code

Subclassing process

While the easiest way to start a child Process in a separate Process is to use Process and pass the target function, custom subclasses can also be used.

import multiprocessing


class Worker(multiprocessing.Process):

    def run(self):
        print('In {}'.format(self.name))
        return


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()
        
# output
# In Worker-1
# In Worker-3
# In Worker-2
# In Worker-4
# In Worker-5
Copy the code

Derived classes should override run() to do their job.

Passing messages to the process

As with threads, a common usage pattern for multiple processes is to divide jobs into multiple jobs to run in parallel. Using multiple processes effectively usually requires some communication between them so that work can be divided and results aggregated. A simple way to communicate between processes is to use a Queue to pass messages. Any object that can be serialized by pickle can be passed to Queue.

import multiprocessing


class MyFancyClass:

    def __init__(self, name):
        self.name = name

    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print('Doing something fancy in {} for {}! '.format(proc_name, self.name))


def worker(q):
    obj = q.get()
    obj.do_something()


if __name__ == '__main__':
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()

    queue.put(MyFancyClass('Fancy Dan'))

    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()
    
# output
# Doing something fancy in Process-1 for Fancy Dan!
Copy the code

This short example just passes a single message to a single worker, and the main process waits for the worker to complete.

Let’s look at a more complex example that shows how to manage multiple workers consuming data from JoinableQueue and pass the results back to the parent process. The “Poison pill” technology is used to terminate workers. After the actual task is set, the main program adds a “stop” value for each worker program to the queue. When the worker encounters a particular value, it jumps out of the loop. The main process uses the join() method of the task queue to wait for all tasks to complete before processing the results.

import multiprocessing
import time


class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print('{}: Exiting'.format(proc_name))
                self.task_queue.task_done()
                break
            print('{}: {}'.format(proc_name, next_task))
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)


class Task:

    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        time.sleep(0.1)  # pretend to take time to do the work
        return '{self.a} * {self.b} = {product}'.format(
            self=self, product=self.a * self.b)

    def __str__(self):
        return '{self.a} * {self.b}'.format(self=self)


if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print('Creating {} consumers'.format(num_consumers))
    consumers = [
        Consumer(tasks, results)
        for i in range(num_consumers)
    ]
    for w in consumers:
        w.start()

    # Enqueue jobs
    num_jobs = 10
    for i in range(num_jobs):
        tasks.put(Task(i, i))

    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()

    # Start printing results
    while num_jobs:
        result = results.get()
        print('Result:', result)
        num_jobs -= 1
        
# output
# Creating 8 consumers
# Consumer-1: 0 * 0
# Consumer-2: 1 * 1
# Consumer-3: 2 * 2
# Consumer-4: 3 * 3
# Consumer-5: 4 * 4
# Consumer-6: 5 * 5
# Consumer-7: 6 * 6
# Consumer-8: 7 * 7
# Consumer-3: 8 * 8
# Consumer-7: 9 * 9
# Consumer-4: Exiting
# Consumer-1: Exiting
# Consumer-2: Exiting
# Consumer-5: Exiting
# Consumer-6: Exiting
# Consumer-8: Exiting
# Consumer-7: Exiting
# Consumer-3: Exiting
# Result: 6 * 6 = 36
# Result: 2 * 2 = 4
# Result: 3 * 3 = 9
# Result: 0 * 0 = 0
# Result: 1 * 1 = 1
# Result: 7 * 7 = 49
# Result: 4 * 4 = 16
# Result: 5 * 5 = 25
# Result: 8 * 8 = 64
# Result: 9 * 9 = 81
Copy the code

Although jobs are queued in order, their execution is parallel, so there is no guarantee of their completion order.

Interprocess communication

The Event class provides a simple way to communicate between processes. You can toggle events between set and unset states. Users of the event object can wait for it to be never set to change to set using an optional timeout value.

import multiprocessing
import time


def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    print('wait_for_event: starting')
    e.wait()
    print('wait_for_event: e.is_set()->', e.is_set())


def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    print('wait_for_event_timeout: starting')
    e.wait(t)
    print('wait_for_event_timeout: e.is_set()->', e.is_set())


if __name__ == '__main__':
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(
        name='block',
        target=wait_for_event,
        args=(e,),
    )
    w1.start()

    w2 = multiprocessing.Process(
        name='nonblock',
        target=wait_for_event_timeout,
        args=(e, 2),
    )
    w2.start()

    print('main: waiting before calling Event.set()')
    time.sleep(3)
    e.set()
    print('main: event is set')
    
# output
# main: waiting before calling Event.set()
# wait_for_event: starting
# wait_for_event_timeout: starting
# wait_for_event_timeout: e.is_set()-> False
# main: event is set
# wait_for_event: e.is_set()-> True
Copy the code

If wait() times out, no error is returned. The caller can check the status of the event using is_set().

Controls access to resources

In cases where multiple processes share a single resource, you can use locks to avoid access conflicts.

import multiprocessing
import sys


def worker_with(lock, stream):
    with lock:
        stream.write('Lock acquired via with\n')


def worker_no_with(lock, stream):
    lock.acquire()
    try:
        stream.write('Lock acquired directly\n')
    finally:
        lock.release()


lock = multiprocessing.Lock()
w = multiprocessing.Process(
    target=worker_with,
    args=(lock, sys.stdout),
)
nw = multiprocessing.Process(
    target=worker_no_with,
    args=(lock, sys.stdout),
)

w.start()
nw.start()

w.join()
nw.join()

# output
# Lock acquired via with
# Lock acquired directly
Copy the code

In this example, if the two processes do not synchronize their access and locking to standard output, messages printed to the console may be jumbled together.

Synchronous operation

Condition objects can be used to synchronize parts of a workflow, allowing some objects to run in parallel but others to run sequentially, even if they are in different processes.

import multiprocessing
import time


def stage_1(cond):
    """ perform first stage of work, then notify stage_2 to continue """
    name = multiprocessing.current_process().name
    print('Starting', name)
    with cond:
        print('{} done and ready for stage 2'.format(name))
        cond.notify_all()


def stage_2(cond):
    """wait for the condition telling us stage_1 is done"""
    name = multiprocessing.current_process().name
    print('Starting', name)
    with cond:
        cond.wait()
        print('{} running'.format(name))


if __name__ == '__main__':
    condition = multiprocessing.Condition()
    s1 = multiprocessing.Process(name='s1',
                                 target=stage_1,
                                 args=(condition,))
    s2_clients = [
        multiprocessing.Process(
            name='stage_2[{}]'.format(i),
            target=stage_2,
            args=(condition,),
        )
        for i in range(1.3)]for c in s2_clients:
        c.start()
        time.sleep(1)
    s1.start()

    s1.join()
    for c in s2_clients:
        c.join()
        
# output
# Starting stage_2[1]
# Starting stage_2[2]
# Starting s1
# s1 done and ready for stage 2
# stage_2[1] running
# stage_2[2] running
Copy the code

In this example, both processes run stage_2 in parallel, but only after STAGe_1 is complete.

Controls concurrent access to resources

Sometimes it is useful to allow multiple workers to access resources simultaneously, while still limiting the total number. For example, connection pooling might support a fixed number of concurrent connections, or a network application might support a fixed number of concurrent downloads. Semaphore is one way to manage these connections.

import random
import multiprocessing
import time


class ActivePool:

    def __init__(self):
        super(ActivePool, self).__init__()
        self.mgr = multiprocessing.Manager()
        self.active = self.mgr.list()
        self.lock = multiprocessing.Lock()

    def makeActive(self, name):
        with self.lock:
            self.active.append(name)

    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)

    def __str__(self):
        with self.lock:
            return str(self.active)


def worker(s, pool):
    name = multiprocessing.current_process().name
    with s:
        pool.makeActive(name)
        print('Activating {} now running {}'.format(name, pool))
        time.sleep(random.random())
        pool.makeInactive(name)


if __name__ == '__main__':
    pool = ActivePool()
    s = multiprocessing.Semaphore(3)
    jobs = [
        multiprocessing.Process(
            target=worker,
            name=str(i),
            args=(s, pool),
        )
        for i in range(10)]for j in jobs:
        j.start()

    while True:
        alive = 0
        for j in jobs:
            if j.is_alive():
                alive += 1
                j.join(timeout=0.1)
                print('Now running {}'.format(pool))
        if alive == 0:
            # all done
            break

# output            
# Activating 0 now running ['0', '1', '2']
# Activating 1 now running ['0', '1', '2']
# Activating 2 now running ['0', '1', '2']
# Now running ['0', '1', '2']
# Now running ['0', '1', '2']
# Now running ['0', '1', '2']
# Now running ['0', '1', '2']
# Activating 3 now running ['0', '1', '3']
# Activating 4 now running ['1', '3', '4']
# Activating 6 now running ['1', '4', '6']
# Now running ['1', '4', '6']
# Now running ['1', '4', '6']
# Activating 5 now running ['1', '4', '5']
# Now running ['1', '4', '5']
# Now running ['1', '4', '5']
# Now running ['1', '4', '5']
# Activating 8 now running ['4', '5', '8']
# Now running ['4', '5', '8']
# Now running ['4', '5', '8']
# Now running ['4', '5', '8']
# Now running ['4', '5', '8']
# Now running ['4', '5', '8']
# Activating 7 now running ['5', '8', '7']
# Now running ['5', '8', '7']
# Activating 9 now running ['8', '7', '9']
# Now running ['8', '7', '9']
# Now running ['8', '9']
# Now running ['8', '9']
# Now running ['9']
# Now running ['9']
# Now running ['9']
# Now running ['9']
# Now running []            
Copy the code

In this example, the ActivePool class is only used as a convenient way to keep track of the processes that are running at a given moment. The actual resource pool might assign a connection or other value to a newly active process and reclaim that value when the task is complete. Here, pool is only used to hold the name of the active process to show that only three are running concurrently.

Managing Share Status

In the previous example, a special type of list was first created through the Manager, and then the active process list was centrally maintained in the instance through the ActivePool. The Manager is responsible for coordinating the state of shared information among all users.

import multiprocessing
import pprint


def worker(d, key, value):
    d[key] = value


if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    d = mgr.dict()
    jobs = [
        multiprocessing.Process(
            target=worker,
            args=(d, i, i * 2),for i in range(10)]for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    print('Results:', d)
    
# output
# Results: {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18}
Copy the code

The list is created by the manager, it is shared, and updates can be seen in all processes. Dictionaries are also supported.

Shared namespace

In addition to dictionaries and lists, the Manager can also create shared namespaces.

import multiprocessing


def producer(ns, event):
    ns.value = 'This is the value'
    event.set()


def consumer(ns, event):
    try:
        print('Before event: {}'.format(ns.value))
    except Exception as err:
        print('Before event, error:', str(err))
    event.wait()
    print('After event:', ns.value)


if __name__ == '__main__': mgr = multiprocessing.Manager() namespace = mgr.Namespace() event = multiprocessing.Event() p = multiprocessing.Process(  target=producer, args=(namespace, event), ) c = multiprocessing.Process( target=consumer, args=(namespace, event), ) c.start() p.start() c.join() p.join()# output
# Before event, error: 'Namespace' object has no attribute 'value'
# After event: This is the value
Copy the code

Once added to a Namespace, all clients receiving instances of a Namespace are visible.

It is important to know that updates to mutable value content in namespaces are not automatically propagated.

import multiprocessing


def producer(ns, event):
    # DOES NOT UPDATE GLOBAL VALUE!
    ns.my_list.append('This is the value')
    event.set()


def consumer(ns, event):
    print('Before event:', ns.my_list)
    event.wait()
    print('After event :', ns.my_list)


if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    namespace.my_list = []

    event = multiprocessing.Event()
    p = multiprocessing.Process(
        target=producer,
        args=(namespace, event),
    )
    c = multiprocessing.Process(
        target=consumer,
        args=(namespace, event),
    )

    c.start()
    p.start()

    c.join()
    p.join()
    
# output
# Before event: []
# After event : []
Copy the code

To update the list, you need to add it to the namespace again.

The process of pool

The Pool class can be used to manage the simple case of a fixed number of workers. The return value is returned as a list. The Pool argument includes the number of processes and the functions to run when starting the task process (called once per child process).

import multiprocessing


def do_calculation(data):
    return data * 2


def start_process(a):
    print('Starting', multiprocessing.current_process().name)


if __name__ == '__main__':
    inputs = list(range(10))
    print('Input :', inputs)

    builtin_outputs = map(do_calculation, inputs)
    print('Built-in:', builtin_outputs)

    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(
        processes=pool_size,
        initializer=start_process,
    )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()  # no more tasks
    pool.join()  # wrap up current tasks

    print('Pool :', pool_outputs)
    
# output
# Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# Built-in: <map object at 0x1007b2be0>
# Starting ForkPoolWorker-3
# Starting ForkPoolWorker-4
# Starting ForkPoolWorker-5
# Starting ForkPoolWorker-6
# Starting ForkPoolWorker-1
# Starting ForkPoolWorker-7
# Starting ForkPoolWorker-2
# Starting ForkPoolWorker-8
# Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Copy the code

The result of the map() method is functionally equivalent to the built-in map(), except that each task is run in parallel. Because the Pool processes its input in parallel, close() and join() can be used to synchronize the main process with the task process to ensure complete cleanup.

By default, Pool creates a fixed number of worker processes and passes jobs to them until there are no other jobs. Setting the maxTasksperChild parameter tells the Pool to restart the worker process after completing some tasks, preventing workers from consuming more system resources by running them for long periods of time.

import multiprocessing


def do_calculation(data):
    return data * 2


def start_process(a):
    print('Starting', multiprocessing.current_process().name)


if __name__ == '__main__':
    inputs = list(range(10))
    print('Input :', inputs)

    builtin_outputs = map(do_calculation, inputs)
    print('Built-in:', builtin_outputs)

    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(
        processes=pool_size,
        initializer=start_process,
        maxtasksperchild=2,
    )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()  # no more tasks
    pool.join()  # wrap up current tasks

    print('Pool :', pool_outputs)
    
# output
# Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# Built-in: <map object at 0x1007b21d0>
# Starting ForkPoolWorker-1
# Starting ForkPoolWorker-2
# Starting ForkPoolWorker-4
# Starting ForkPoolWorker-5
# Starting ForkPoolWorker-6
# Starting ForkPoolWorker-3
# Starting ForkPoolWorker-7
# Starting ForkPoolWorker-8
# Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Copy the code

Even if there is no more work, the Pool will restart workers after completing the assigned task. In this output, even if there are only 10 tasks, 8 workers will be created, and each worker can complete two of them at once.

Realize the graphs

The Pool class can be used to create a simple single-server implementation of MapReduce. While it doesn’t give you the full benefits of distributed processing, it does illustrate how easy it is to break down some problems into assignable units of work.

In mapReduce-based systems, input data is broken up into chunks for processing by different work instances. Each input block is mapped to an intermediate state using a simple transformation. The intermediate data is then gathered together and partitioned based on key values so that all related values are together. Finally, the partitioned data is reduced to the result.

# multiprocessing_mapreduce.py
import collections
import itertools
import multiprocessing


class SimpleMapReduce:

    def __init__(self, map_func, reduce_func, num_workers=None):
        """ map_func Function to map inputs to intermediate data. Takes as argument one input value and returns a tuple with the  key and a value to be reduced. reduce_func Function to reduce partitioned version of intermediate data to final output.  Takes as argument a key as produced by map_func and a sequence of the values associated with that key. num_workers The number of workers to create in the pool. Defaults to the number of CPUs available on the current host. """
        self.map_func = map_func
        self.reduce_func = reduce_func
        self.pool = multiprocessing.Pool(num_workers)

    def partition(self, mapped_values):
        """Organize the mapped values by their key. Returns an unsorted sequence of tuples with a key and a sequence of values. "" "
        partitioned_data = collections.defaultdict(list)
        for key, value in mapped_values:
            partitioned_data[key].append(value)
        return partitioned_data.items()

    def __call__(self, inputs, chunksize=1):
        """Process the inputs through the map and reduce functions given. inputs An iterable containing the input data to be processed. chunksize=1 The portion of the input data to hand to each worker. This can be used to tune performance during  the mapping phase. """map_responses = self.pool.map( self.map_func, inputs, chunksize=chunksize, ) partitioned_data = self.partition( itertools.chain(*map_responses) ) reduced_values = self.pool.map( self.reduce_func,  partitioned_data, )return reduced_values
Copy the code

The following example script uses SimpleMapReduce to compute the “words” in the reStructuredText source of this article, ignoring some of the markers.

# multiprocessing_wordcount.py 
import multiprocessing
import string

from multiprocessing_mapreduce import SimpleMapReduce


def file_to_words(filename):
    """Read a file and return a sequence of (word, occurences) values. """
    STOP_WORDS = set([
        'a'.'an'.'and'.'are'.'as'.'be'.'by'.'for'.'if'.'in'.'is'.'it'.'of'.'or'.'py'.'rst'.'that'.'the'.'to'.'with',
    ])
    TR = str.maketrans({
        p: ' '
        for p in string.punctuation
    })

    print('{} reading {}'.format(
        multiprocessing.current_process().name, filename))
    output = []

    with open(filename, 'rt') as f:
        for line in f:
            # Skip comment lines.
            if line.lstrip().startswith('.. ') :continue
            line = line.translate(TR)  # Strip punctuation
            for word in line.split():
                word = word.lower()
                if word.isalpha() and word not in STOP_WORDS:
                    output.append((word, 1))
    return output


def count_words(item):
    """Convert the partitioned data for a word to a tuple containing the word and the number of occurences. """
    word, occurences = item
    return (word, sum(occurences))


if __name__ == '__main__':
    import operator
    import glob

    input_files = glob.glob('*.rst')

    mapper = SimpleMapReduce(file_to_words, count_words)
    word_counts = mapper(input_files)
    word_counts.sort(key=operator.itemgetter(1))
    word_counts.reverse()

    print('\nTOP 20 WORDS BY FREQUENCY\n')
    top20 = word_counts[:20]
    longest = max(len(word) for word, count in top20)
    for word, count in top20:
        print('{word:<{len}}: {count:5}'.format(
            len=longest + 1,
            word=word,
            count=count)
        )
Copy the code

The file_to_words() function converts each input file into a sequence of tuples containing the word and the number 1, representing a single match. Partition () uses words as keys to divide the data, so the resulting structure consists of a key and a sequence of values of 1 for each word occurrence. Count_words () In the minification phase, partition data is converted to a set of tuples containing a word and a count of that word.

$ python3 -u multiprocessing_wordcount.py

ForkPoolWorker- 1 reading basics.rst
ForkPoolWorker2 - reading communication.rst
ForkPoolWorker- 3 reading index.rst
ForkPoolWorker4 - reading mapreduce.rst

TOP 20 WORDS BY FREQUENCY

process         :    83
running         :    45
multiprocessing :    44
worker          :    40
starting        :    37
now             :    35
after           :    34
processes       :    31
start           :    29
header          :    27
pymotw          :    27
caption         :    27
end             :    27
daemon          :    22
can             :    22
exiting         :    21
forkpoolworker  :    21
consumer        :    20
main            :    18
event           :    16
Copy the code

Related documents:

Pymotw.com/3/multiproc…

Thief. One / 2016/11/23 /…

Use www.dongwm.com/archives/…