Part one: Declare the new process in __main__

Such as:

from multiprocessing import Pool

def f(x) :
    return x*x
pool = Pool(processes=4)
r=pool.map(f, range(100)) 
pool.close() 
pool.join()
Copy the code

Running in spyder does not respond directly; In a shell window, an error is reported as follows:

Process SpawnPoolWorker-15:
Traceback (most recent call last):
File "C:\Anaconda3\lib\multiprocessing\process.py", line 254.in _bootstr
self.run()
File "C:\Anaconda3\lib\multiprocessing\process.py", line 93.in run
self._target(*self._args, **self._kwargs)
File "C:\Anaconda3\lib\multiprocessing\pool.py", line 108.in worker
task = get()
File "C:\Anaconda3\lib\multiprocessing\queues.py", line 357.in get
return ForkingPickler.loads(res)
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>
Copy the code

Solution:

Multiprocessing on Windows is a little different than it is on Linux,

  • Linux is based on fork, after which all local variables are copied, so you can use any global variable
  • Under Windows, multi-process is done by starting a new process, all global variables are re-initialized, and dynamically generated, modified global variables are not available during the run.

Multiprocessing uses pickling internally to pass map parameters to different processes. When passing a function or class, the pickling represents the function or class as the module + function/class name. If the Python process on the other end cannot find the corresponding function or class in the corresponding module, I’m going to make a mistake.

When you create a function in Interactive Console, the function is added dynamically to the __main__ module and does not exist in the new process restarted, so an error occurs.

You run into another problem when running in a separate Python file instead of a Console: Since the code below you call Multiprocessing is not protected, the code will be re-executed when the module is loaded by a new process, creating a new Multiprocessing pool for infinite calls.

The solution to this problem is to always add the code that actually performs the function to the protected area: if __name__ == ‘__mian__’:

Part II:

Additional knowledge: Exception handling in the Multiprocessing Pool

When developing a multiprocess program, a child process executes a function that uses mysql-Python to connect to the database.

Due to a programming problem, not all exceptions are caught. As a result, an exception error is thrown directly into the Pool, causing the entire Pool to hang.

Exception in thread Thread-3:
Traceback (most recent call last):
 File "/ usr/lib64 / python2.7 / threading. Py." ", line 812.in __bootstrap_inner
 self.run()
 File "/ usr/lib64 / python2.7 / threading. Py." ", line 765.in run
 self.__target(*self.__args, **self.__kwargs)
 File "/ usr/lib64 / python2.7 / multiprocessing/pool. Py." ", line 376.in _handle_results
 task = get()
 File "/ usr/lib/python2.7 / site - packages/mysql/connector/errors. Py." ", line 194.in __init__
 'msg': self.msg.encode('utf8'if PY2 else self.msg
AttributeError: ("'int' object has no attribute 'encode'", <class 'mysql.connector.errors.Error'>,2055."2055: Lost Connection to MySQL '192.169.36.189:3306', system error: timed out".None))
Copy the code

This document analyzes the source code implementation of Multiprocessing.pool and python-mysql-connector based on the above problems to locate the specific cause of the error. The solution is simple: don’t throw exceptions into the Pool.

Problem Generation Scenario

Python version version 2.7.5 shipped with centos7.3, or the latest python-2.7.14

The mysql-connector library is 2.0 or higher. You can download the latest version of mysql-connector from the official website

The code where the problem occurred can be simplified as follows:

from multiprocessing import Pool, log_to_stderr
import logging
import mysql.connector

# open multiprocessing lib log
log_to_stderr(level=logging.DEBUG)

def func() :
    raise mysql.connector.Error("demo test".100)

if __name__ == "__main__":
    p = Pool(3)
    res = p.apply_async(func)
    res.get()
Copy the code

To solve the problem, add a try-except to func. But if you’re wondering why an AttributeError exception occurs, you can read on.

Multiprocessing. The realization of the Pool

The multiprocess.Pool implementation is as follows:

When we execute the following statement, the main process creates three child threads: _HANDLE_WORKERS, _HANDLE_RESULTS, and _HANDLE_TASKS; Create Pool(n) worker child process. Communication between the main process and worker child processes uses internally defined queues, which are actually pipes, such as _taskQueue, _inqueue and _outqueue in the figure above.

p = Pool(3)
res = p.apply_async(func)
res.get()
Copy the code

The purpose of these three child threads is:

  1. Handle_workers thread manages worker processes to keep Pool(N) worker processes.

  2. The handle_TASKS thread passes the user’s task (job_id, handler func, etc.) to the _inqueue. Child processes compete to get the task, run the relevant function, put the result in the _outqueue, and then continue listening to taskSqueue’s task list. It is typical production consumption problem actually.

  3. The handle_results thread listens for the contents of _outQqueue, gets the corresponding job through the dictionary _cache, stores the Result in the *Result object, releases the semaphore of the job, indicating that the job has been executed. After that, the * result.get () function can be used to get the Result of the execution.

When we call p.apply_async or P.P.M., we create AsyncResult or MapResult and place the task in _taskQueue; Call the * result.get () method to wait for the task to be executed by the worker child process and get the execution Result.

Now that we know the multprocess.Pool implementation logic, let’s explore how the Pool worker handles when func throws an exception. The following code is a simplified version of the core execution function of the Pool. Worker worker child.

def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None) :.while xxx:
  try:
   task = get()
  except:... job, i, func, args, kwds = tasktry:
   result = (True, func(*args, **kwds))
  except Exception, e:
   result = (False, e)
  ...
  try:
   put((job, i, result))
  except Exception, e:
   ...
Copy the code

As you can see from the code, when func executes, if func throws an exception, the worker puts the exception object directly into the _outqueue and waits for the next task. In other words, the worker can handle exceptions.

Let’s see how the _handle_result thread handles the result sent by the worker. As follows:

@staticmethod
def _handle_results(outqueue, get, cache) :
 while 1:
  try:
   task = get()
  except (IOError, EOFError):
   return.Copy the code

The code above is the main logic for _handle_result. As you can see, it only handles IOError, EOFError, which means that if something else happens during get(), the _handle_result thread will exit (which it does). Since _HANDle_result exits, there is no action to trigger the *Result object in the _cache to release the semaphore, and the user’s execution flow remains in wait. As a result, the main user process is stuck in get(), causing the main process to fail.

By opening the multiprocessing library log (log_to_stderr(level= logging.debug)) and modifying the _handel_result code in multiprocessing. Add an except Exception and run the Exception code from the beginning of the article, as follows:

# multiprocessing : pool.py
#
class Pool(object) :
 @staticmethod
 def _handle_results(outqueue, get, cache) :
  while 1:
   try:
    task = get()
   except (IOError, EOFError):
    return
   except Exception:
    debug("handle_result not catch Exceptions.")
    return.Copy the code

If the console prints “handle_result not catch Exceptions.”, _handle_Results does not catch all Exceptions. In fact, the exception is thrown by task = get().

So what does the _outqueue.get() method do? Digging deeper into the source code, you can see that the get() method is actually the read/write method of os.pipe, but with some manipulation. Its internal implementation is roughly as follows:

def Pipe(duplex=True) :. fd1, fd2 = os.pipe() c1 = _multiprocessing.Connection(fd1, writable=False# get
 c2 = _multiprocessing.Connection(fd2, readable=False# put
 return c1, c2
Copy the code

_multiprocessing. The realization of the internal Connection using the C, is no longer deeply, otherwise it will more and more complex. It should use the pickle library internally, pickling object instances at PUT and unpikcle at GET to regenerate instance objects. See the Python documentation for more information about pickling (including the conditions under which an object can be pickled and the methods called when unpickled). Either way, the instance failed in the get, or unpickle, process.

‘msg’: self.msg.encode(‘utf8’) if PY2 else self.msg

AttributeError: ‘int’ object has no attribute ‘encode’

As you can see from the above error log, the MSG parameter was passed a variable of type INT during the refactoring. This means that during the unpickle phase, Mysql Error re-instantiated the __init__() method, but passed the parameter incorrectly. To verify this, I simplified MySql Error’s __init__() and finally checked the assignment of self.args, which means that Exception and its subclasses call __init__() when unpickled. And pass self.args as a list of arguments to __init__().

The problem can be easily verified with the following code:

import os
from multiprocessing import Pipe

class DemoError(Exception) :

 def __init__(msg, errno) :
  print "msg: %s, errno: %s" % (msg, errno)
  self.args = ("aa"."bb")

def func() :
 raise DemoError("demo test".100)

r, w = Pipe(duplex=False)
try:
 result = (True, func(1))
except Exception, e:
 result = (False, e)

print "send result"
w.send(result)
print "get result"
res = r.recv()
print "finished."
Copy the code

The log prints MSG: aa, errno: bb on recv calls, indicating that the recv Exception class passes self.args as an argument to init(). Mysql’s Error class overrides the self.args variable in the wrong order, causing MSG to perform encoding errors. MySql Error implementation is simplified as follows:

class Error(Exception) :
 def __init__(self, msg=None, errno=None, values=None, sqlstate=None) :
  super(Error, self).__init__()
  ...
  if self.msg andself.errno ! = -1:
   fields = {
    'errno': self.errno,
    'msg': self.msg.encode('utf-8'if PY2 else self.msg
   }
  ...
  self.args = (self.errno, self._full_msg, self.sqlstate)
Copy the code

As you can see, self.args in mysql Error is not in the same order as __init__(MSG, errno, values, SQLstate), so the first argument errno in self.args is passed to MSG, causing AttributeError. As for what self.args is, it is defined in the Exception class and is generally output by __str__ or __repr__ methods. The official Python documentation does not recommend overwrite.

conclusion

Well, with all that said, we’ve basically figured out the implementation of the Multiprocessing.pool library by tracing the problem. In fact, it’s hard to say who’s the bug, it’s a combination of both. However, we want to use caution when using the Multiprocessing library, especially with pipes. It is best not to let exceptions run into multiProcess. We should handle all exceptions in func. Make sure the order of self.args is the same as that of __init__(). SuprocessError is a different definition from Exception. This Exception may be a problem with the Multprocessing and subprocess libraries.