As a small programmer in my programming field, I am currently working as team lead in an entrepreneurial team. The technology stack involves Android, Python, Java and Go, which is also the main technology stack of our team.

0 x00 preface

Previous articles provided a brief introduction to the concept and implementation of Python coroutines. In order to gain a more comprehensive understanding of concurrent programming in Python, I also studied the concepts of Python threads and processes and the use of related techniques, resulting in this article.

0x01 Threads and Processes

When we open an application on a phone or PC, the operating system creates a process instance and starts executing the main thread of the process, which has its own memory space and data structure. Threads are lightweight processes. Multiple threads share memory and data structures within the same process, making communication between threads easier.

0x02 Concurrency using threads

Those familiar with Java programming will notice that the threading model in Python is very similar to Java. For the rest of this article we’ll focus on using Python’s threading package. Thread is not recommended for beginners with low-level API modules. All code in this article will use Python 3.7.)

threading

To use threads we import the Threading package, which wraps a number of high-level apis on top of the _Thread package (the low-level Thread module mentioned above). The threading package should be the first choice for development.

Typically, there are two ways to build a Thread: pass a Callable object through the Thread constructor, or inherit the Thread class and override the Run method.

import threading

import time


def do_in_thread(arg):
    print('do in thread {}'.format(arg))
    time.sleep(2)


if __name__ == '__main__':
    start_time = time.time()
    
    t1 = threading.Thread(target=do_in_thread, args=(1,), name='t1')
    t2 = threading.Thread(target=do_in_thread, args=(2,), name='t2')

    t1.start()
    t2.start()
    
    The # join method makes the main thread wait for the child thread to complete
    t1.join()
    t2.join()
    print("\nduration {} ".format(time.time() - start_time))
    
# do in thread 1
# do in thread 2
# duration 2.001628875732422 

Copy the code

Threads can also be defined by inheriting from the threading.thread class

import threading

import time


def do_in_thread(arg):
    print('do in thread {}'.format(arg))
    time.sleep(2)

    
class MyThread(threading.Thread):

    def __init__(self, arg):
        super().__init__()
        self.arg = arg

    def run(self):
        start_time = time.time()
        do_in_thread(self.arg)
        print("duration {} ".format(time.time() - start_time))


def start_thread_2(a):
    start_time = time.time()

    print("duration {} ".format(time.time() - start_time))


if __name__ == '__main__':
    mt1 = MyThread(3)
    mt2 = MyThread(4)

    mt1.start()
    mt2.start()
    
    The # join method makes the main thread wait for the child thread to complete
    mt1.join()
    mt2.join() 
    
    
# do in thread 3
# do in thread 4
# duration 2.004937171936035 
Copy the code

The join method makes the calling thread wait for it to complete.

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
Copy the code

When defining a thread, you can set the thread name by specifying the constructor’s name parameter. Target is used to specify the Callable object that will be called in the run method. Args Sets the parameters to which the target object is called, of type tuple (), such as the parameters to the do_in_thread(arg) method above. Kwargs is a dictionary-type parameter that is also used for the target object. The daemon sets the identity of the daemon thread. If set to True, the thread is the daemon thread. If the main thread ends, the daemon thread is killed immediately. Therefore, when there is a resource operation in the daemon thread, such as opening files, databases, etc., the release of resources can be an error.

The thread pool

If a large number of threads are created and destroyed in the program, the performance impact is significant. We can use thread pools. Again, its API is very similar to Java.

Executor
concurrent.futures.Executor
Copy the code

This is an abstract class that defines the interface to the thread pool.

  • submit(fn, *args, **kwargs)

    Execute fn(args,kwargs) and return onefutureObject, throughfutureYou can obtain the execution result
  • map(func, *iterables, timeout=None, chunksize=1)

    This method is related tomap(func,*iterables)similar
  • shutdown(wait=True)

    Closing the thread pool
from concurrent.futures import ThreadPoolExecutor
# use the max_workers parameter to set the maximum number of threads in the thread pool to 2
with ThreadPoolExecutor(max_workers=2) as executor:
    Submit the task to the thread pool
    future = executor.submit(pow, 2.31) # 计算2^31
    future2 = executor.submit(pow, 1024.2)
    Get the result of the execution using the future
    print(future.result())
    print(future2.result())

# Execution result
# 2147483648
# 1048576
Copy the code
synchronous

A race condition can occur when multiple threads are accessing or manipulating the same resource or memory. Python provides synchronization primitives such as locks, semaphores, conditions, and events to help implement synchronization mechanisms for threads.

Lock

Lock has two states: Locked and unlocked. It has two basic methods: acquire() and release(), and both operate atomically. If a thread acquires a Lock through acquire(), the Lock becomes locked, while another thread can only wait until acquire() is released. The Lock state becomes unlocked when the thread calls release(), and only one of the other waiting threads will acquire the Lock.

import threading

share_mem_lock = 0
share_mem = 0
count = 1000000

locker = threading.Lock()


def add_in_thread_with_lock(a):
    global share_mem_lock
    for i in range(count):
        locker.acquire()
        share_mem_lock += 1
        locker.release()


def minus_in_thread_with_lock(a):
    global share_mem_lock
    for i in range(count):
        locker.acquire()
        share_mem_lock -= 1
        locker.release()


def add_in_thread(a):
    global share_mem

    for i in range(count):
        share_mem += 1


def minus_in_thread(a):
    global share_mem

    for i in range(count):
        share_mem -= 1


if __name__ == '__main__':
    t1 = threading.Thread(target=add_in_thread_with_lock)
    t2 = threading.Thread(target=minus_in_thread_with_lock)

    t3 = threading.Thread(target=add_in_thread)
    t4 = threading.Thread(target=minus_in_thread)

    t1.start()
    t2.start()

    t3.start()
    t4.start()

    t1.join()
    t2.join()

    t3.join()
    t4.join()

    print("share_mem_lock : ", share_mem_lock)
    print("share_mem : ", share_mem)

# Execution result
# share_mem_lock : 0
# share_mem : 51306
Copy the code

Code that does not use locking is likely to end up with a value that is not zero. Locking code ensures synchronization.

RLock

A Reentrant Lock is a Reentrant Lock that can be entered repeatedly. It has three characteristics:

  • Whoever takes the lock will release it. If thread A acquires the lock, only thread A can release the lock
  • The lock can be acquired multiple times by the same thread. You can callacquireMany times
  • acquireHow many times is thatreleaseHow many times, and one last timereleaseBefore the lock is released.
Condition

Conditions are another synchronization primitive mechanism. Acquire () and release() are RLock methods. Other commonly used APIS for Condition are the wait(), notify(), and notify_all() methods. Wait () releases the lock and blocks until another thread wakes up with notify() or notify_all(). The wait() method returns when it recaptures the lock. Notify () wakes up one of the waiting threads, and notify_all() wakes up all waiting threads. Note that the lock is not released after notify() or notify_all() is executed, but only after release() is called. Let’s take a look at a producer and consumer example from the Python Parallel Programming Manual

from threading import Thread, Condition
import time

items = []
condition = Condition()


class consumer(Thread):

    def __init__(self):
        Thread.__init__(self)

    def consume(self):
        global condition
        global items
        # acquiring a lock
        condition.acquire()
        if len(items) == 0:
            # When items is empty, release the lock and wait for the producer notify
            condition.wait()
            print("Consumer notify : no item to consume")
        # Start spending
        items.pop()
        print("Consumer notify : consumed 1 item")
        print("Consumer notify : items to consume are " + str(len(items)))
        Notify wakes up the producer after consumption because notify does not release the lock, so release is also called
        condition.notify()
        condition.release()

    def run(self):
        for i in range(0.10):
            time.sleep(2)
            self.consume()


class producer(Thread):

    def __init__(self):
        Thread.__init__(self)

    def produce(self):
        global condition
        global items
        condition.acquire()
        if len(items) == 5:
            If the items count is full, execute wait, release the lock, and wait for consumer notify
            condition.wait()
            print("Producer notify : items producted are " + str(len(items)))
            print("Producer notify : stop the production!!")
        # Start production
        items.append(1)
        print("Producer notify : total items producted " + str(len(items)))
        Release release is also performed because notify does not release the lock.
        condition.notify()
        condition.release()

    def run(self):
        for i in range(0.10):
            time.sleep(1)
            self.produce()


if __name__ == "__main__":
    producer = producer()
    consumer = consumer()
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()

Copy the code
Semaphore

The semaphore maintains a counter internally. Acquire () decreases the count, release() increases it, and the counter is never less than 0. When the counter equals zero, the acquire() method waits for another thread to call Release (). Again, let’s use the example of producer versus consumer

# -*- coding: utf-8 -*-

"""Using a Semaphore to synchronize threads"""
import threading
import time
import random

The internal count is set to 0 by default.
# If set to negative, ValueError is raised
semaphore = threading.Semaphore(0)


def consumer(a):
    print("consumer is waiting.")
    The internal count is set to 0 at initialization, so it is initially in wait state
    semaphore.acquire()
    # Start spending
    print("Consumer notify : consumed item number %s " % item)


def producer(a):
    global item
    time.sleep(2)
    # create a random item
    item = random.randint(0.1000)
    # Start production
    print("producer notify : produced item number %s" % item)
    # Release semaphore, internal counter +1. When the waiting thread finds that the counter is greater than 0, it wakes up and returns from the Acquire method
    semaphore.release()


if __name__ == '__main__':
    for i in range(0.5) : t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) t1.start() t2.start() t1.join() t2.join() print("program terminated")

Copy the code

Semaphores are often used in scenarios where resource capacity is determined, such as database connection pooling.

Event

The way events communicate between threads is very simple. One thread sends an event and another thread waits to receive it. The Event object maintains a bool variable flag. Set this variable to True using the set() method and flag to False using the clear() method. The wait() method waits until flag changes to True

Combined with the example

# -*- coding: utf-8 -*-

import time
from threading import Thread, Event
import random

items = []
event = Event()

class consumer(Thread):
    def __init__(self, items, event):
        Thread.__init__(self)
        self.items = items
        self.event = event

    def run(self):
        while True:
            time.sleep(2)
            # wait for events
            self.event.wait()
            # Start spending
            item = self.items.pop()
            print('Consumer notify : %d popped from list by %s' % (item, self.name))


class producer(Thread):
    def __init__(self, integers, event):
        Thread.__init__(self)
        self.items = items
        self.event = event

    def run(self):
        global item
        while True:
            time.sleep(2)
            # Start production
            item = random.randint(0.256)
            self.items.append(item)
            print('Producer notify : item N° %d appended to list by %s' % (item, self.name))
            print('Producer notify : event set by %s' % self.name)
            # Send events to inform consumers of consumption
            self.event.set()
            print('Produce notify : event cleared by %s ' % self.name)
            Set the event internal variable to False, and the consumer thread blocks when it calls wait()
            self.event.clear()


if __name__ == '__main__':
    t1 = producer(items, event)
    t2 = consumer(items, event)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

Copy the code
Timer

Timer Timer is a subclass of Thread. Used to process scheduled tasks. Start () is used to start the timer and cancel() is used to cancel the timer.

from threading import Timer

def hello(a):
    print("hello, world")

t = Timer(3.0, hello)
t.start()  Print "Hello, world" after 3 seconds
Copy the code
With the grammar

Lock, RLock, Condition, and Semaphore can use the with syntax. These objects all implement acquire() and release() methods, and all implement context management protocols.

with some_lock:
    # do something...
Copy the code

Is equivalent to

some_lock.acquire()
try:
    # do something...
finally:
    some_lock.release()
Copy the code

0 x03 summary

This article introduces the use of threads in Python. The threading module provides an overview of common uses of Thread objects and Thread pool executors. You also learned about the use of the synchronization primitives Lock, RLock, Condition, Semaphore, Event and Timer apis for threads.

0 x04 reference

  • python-parallel-programmning-cookbook.readthedocs.io
  • Docs.python.org/3/library/t…
  • Docs.python.org/3.7/library…
  • Docs.python.org/3/glossary….
  • Docs.python.org/3/library/c…