As a small programmer in my programming field, I am currently working as team lead in an entrepreneurial team. The technology stack involves Android, Python, Java and Go, which is also the main technology stack of our team.
0 x00 preface
Previous articles provided a brief introduction to the concept and implementation of Python coroutines. In order to gain a more comprehensive understanding of concurrent programming in Python, I also studied the concepts of Python threads and processes and the use of related techniques, resulting in this article.
0x01 Threads and Processes
When we open an application on a phone or PC, the operating system creates a process instance and starts executing the main thread of the process, which has its own memory space and data structure. Threads are lightweight processes. Multiple threads share memory and data structures within the same process, making communication between threads easier.
0x02 Concurrency using threads
Those familiar with Java programming will notice that the threading model in Python is very similar to Java. For the rest of this article we’ll focus on using Python’s threading package. Thread is not recommended for beginners with low-level API modules. All code in this article will use Python 3.7.)
threading
To use threads we import the Threading package, which wraps a number of high-level apis on top of the _Thread package (the low-level Thread module mentioned above). The threading package should be the first choice for development.
Typically, there are two ways to build a Thread: pass a Callable object through the Thread constructor, or inherit the Thread class and override the Run method.
import threading
import time
def do_in_thread(arg):
print('do in thread {}'.format(arg))
time.sleep(2)
if __name__ == '__main__':
start_time = time.time()
t1 = threading.Thread(target=do_in_thread, args=(1,), name='t1')
t2 = threading.Thread(target=do_in_thread, args=(2,), name='t2')
t1.start()
t2.start()
The # join method makes the main thread wait for the child thread to complete
t1.join()
t2.join()
print("\nduration {} ".format(time.time() - start_time))
# do in thread 1
# do in thread 2
# duration 2.001628875732422
Copy the code
Threads can also be defined by inheriting from the threading.thread class
import threading
import time
def do_in_thread(arg):
print('do in thread {}'.format(arg))
time.sleep(2)
class MyThread(threading.Thread):
def __init__(self, arg):
super().__init__()
self.arg = arg
def run(self):
start_time = time.time()
do_in_thread(self.arg)
print("duration {} ".format(time.time() - start_time))
def start_thread_2(a):
start_time = time.time()
print("duration {} ".format(time.time() - start_time))
if __name__ == '__main__':
mt1 = MyThread(3)
mt2 = MyThread(4)
mt1.start()
mt2.start()
The # join method makes the main thread wait for the child thread to complete
mt1.join()
mt2.join()
# do in thread 3
# do in thread 4
# duration 2.004937171936035
Copy the code
The join method makes the calling thread wait for it to complete.
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
Copy the code
When defining a thread, you can set the thread name by specifying the constructor’s name parameter. Target is used to specify the Callable object that will be called in the run method. Args Sets the parameters to which the target object is called, of type tuple (), such as the parameters to the do_in_thread(arg) method above. Kwargs is a dictionary-type parameter that is also used for the target object. The daemon sets the identity of the daemon thread. If set to True, the thread is the daemon thread. If the main thread ends, the daemon thread is killed immediately. Therefore, when there is a resource operation in the daemon thread, such as opening files, databases, etc., the release of resources can be an error.
The thread pool
If a large number of threads are created and destroyed in the program, the performance impact is significant. We can use thread pools. Again, its API is very similar to Java.
Executor
concurrent.futures.Executor
Copy the code
This is an abstract class that defines the interface to the thread pool.
submit(fn, *args, **kwargs)
Execute fn(args,kwargs) and return onefuture
Object, throughfuture
You can obtain the execution resultmap(func, *iterables, timeout=None, chunksize=1)
This method is related tomap(func,*iterables)
similarshutdown(wait=True)
Closing the thread pool
from concurrent.futures import ThreadPoolExecutor
# use the max_workers parameter to set the maximum number of threads in the thread pool to 2
with ThreadPoolExecutor(max_workers=2) as executor:
Submit the task to the thread pool
future = executor.submit(pow, 2.31) # 计算2^31
future2 = executor.submit(pow, 1024.2)
Get the result of the execution using the future
print(future.result())
print(future2.result())
# Execution result
# 2147483648
# 1048576
Copy the code
synchronous
A race condition can occur when multiple threads are accessing or manipulating the same resource or memory. Python provides synchronization primitives such as locks, semaphores, conditions, and events to help implement synchronization mechanisms for threads.
Lock
Lock has two states: Locked and unlocked. It has two basic methods: acquire() and release(), and both operate atomically. If a thread acquires a Lock through acquire(), the Lock becomes locked, while another thread can only wait until acquire() is released. The Lock state becomes unlocked when the thread calls release(), and only one of the other waiting threads will acquire the Lock.
import threading
share_mem_lock = 0
share_mem = 0
count = 1000000
locker = threading.Lock()
def add_in_thread_with_lock(a):
global share_mem_lock
for i in range(count):
locker.acquire()
share_mem_lock += 1
locker.release()
def minus_in_thread_with_lock(a):
global share_mem_lock
for i in range(count):
locker.acquire()
share_mem_lock -= 1
locker.release()
def add_in_thread(a):
global share_mem
for i in range(count):
share_mem += 1
def minus_in_thread(a):
global share_mem
for i in range(count):
share_mem -= 1
if __name__ == '__main__':
t1 = threading.Thread(target=add_in_thread_with_lock)
t2 = threading.Thread(target=minus_in_thread_with_lock)
t3 = threading.Thread(target=add_in_thread)
t4 = threading.Thread(target=minus_in_thread)
t1.start()
t2.start()
t3.start()
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()
print("share_mem_lock : ", share_mem_lock)
print("share_mem : ", share_mem)
# Execution result
# share_mem_lock : 0
# share_mem : 51306
Copy the code
Code that does not use locking is likely to end up with a value that is not zero. Locking code ensures synchronization.
RLock
A Reentrant Lock is a Reentrant Lock that can be entered repeatedly. It has three characteristics:
- Whoever takes the lock will release it. If thread A acquires the lock, only thread A can release the lock
- The lock can be acquired multiple times by the same thread. You can call
acquire
Many times acquire
How many times is thatrelease
How many times, and one last timerelease
Before the lock is released.
Condition
Conditions are another synchronization primitive mechanism. Acquire () and release() are RLock methods. Other commonly used APIS for Condition are the wait(), notify(), and notify_all() methods. Wait () releases the lock and blocks until another thread wakes up with notify() or notify_all(). The wait() method returns when it recaptures the lock. Notify () wakes up one of the waiting threads, and notify_all() wakes up all waiting threads. Note that the lock is not released after notify() or notify_all() is executed, but only after release() is called. Let’s take a look at a producer and consumer example from the Python Parallel Programming Manual
from threading import Thread, Condition
import time
items = []
condition = Condition()
class consumer(Thread):
def __init__(self):
Thread.__init__(self)
def consume(self):
global condition
global items
# acquiring a lock
condition.acquire()
if len(items) == 0:
# When items is empty, release the lock and wait for the producer notify
condition.wait()
print("Consumer notify : no item to consume")
# Start spending
items.pop()
print("Consumer notify : consumed 1 item")
print("Consumer notify : items to consume are " + str(len(items)))
Notify wakes up the producer after consumption because notify does not release the lock, so release is also called
condition.notify()
condition.release()
def run(self):
for i in range(0.10):
time.sleep(2)
self.consume()
class producer(Thread):
def __init__(self):
Thread.__init__(self)
def produce(self):
global condition
global items
condition.acquire()
if len(items) == 5:
If the items count is full, execute wait, release the lock, and wait for consumer notify
condition.wait()
print("Producer notify : items producted are " + str(len(items)))
print("Producer notify : stop the production!!")
# Start production
items.append(1)
print("Producer notify : total items producted " + str(len(items)))
Release release is also performed because notify does not release the lock.
condition.notify()
condition.release()
def run(self):
for i in range(0.10):
time.sleep(1)
self.produce()
if __name__ == "__main__":
producer = producer()
consumer = consumer()
producer.start()
consumer.start()
producer.join()
consumer.join()
Copy the code
Semaphore
The semaphore maintains a counter internally. Acquire () decreases the count, release() increases it, and the counter is never less than 0. When the counter equals zero, the acquire() method waits for another thread to call Release (). Again, let’s use the example of producer versus consumer
# -*- coding: utf-8 -*-
"""Using a Semaphore to synchronize threads"""
import threading
import time
import random
The internal count is set to 0 by default.
# If set to negative, ValueError is raised
semaphore = threading.Semaphore(0)
def consumer(a):
print("consumer is waiting.")
The internal count is set to 0 at initialization, so it is initially in wait state
semaphore.acquire()
# Start spending
print("Consumer notify : consumed item number %s " % item)
def producer(a):
global item
time.sleep(2)
# create a random item
item = random.randint(0.1000)
# Start production
print("producer notify : produced item number %s" % item)
# Release semaphore, internal counter +1. When the waiting thread finds that the counter is greater than 0, it wakes up and returns from the Acquire method
semaphore.release()
if __name__ == '__main__':
for i in range(0.5) : t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) t1.start() t2.start() t1.join() t2.join() print("program terminated")
Copy the code
Semaphores are often used in scenarios where resource capacity is determined, such as database connection pooling.
Event
The way events communicate between threads is very simple. One thread sends an event and another thread waits to receive it. The Event object maintains a bool variable flag. Set this variable to True using the set() method and flag to False using the clear() method. The wait() method waits until flag changes to True
Combined with the example
# -*- coding: utf-8 -*-
import time
from threading import Thread, Event
import random
items = []
event = Event()
class consumer(Thread):
def __init__(self, items, event):
Thread.__init__(self)
self.items = items
self.event = event
def run(self):
while True:
time.sleep(2)
# wait for events
self.event.wait()
# Start spending
item = self.items.pop()
print('Consumer notify : %d popped from list by %s' % (item, self.name))
class producer(Thread):
def __init__(self, integers, event):
Thread.__init__(self)
self.items = items
self.event = event
def run(self):
global item
while True:
time.sleep(2)
# Start production
item = random.randint(0.256)
self.items.append(item)
print('Producer notify : item N° %d appended to list by %s' % (item, self.name))
print('Producer notify : event set by %s' % self.name)
# Send events to inform consumers of consumption
self.event.set()
print('Produce notify : event cleared by %s ' % self.name)
Set the event internal variable to False, and the consumer thread blocks when it calls wait()
self.event.clear()
if __name__ == '__main__':
t1 = producer(items, event)
t2 = consumer(items, event)
t1.start()
t2.start()
t1.join()
t2.join()
Copy the code
Timer
Timer Timer is a subclass of Thread. Used to process scheduled tasks. Start () is used to start the timer and cancel() is used to cancel the timer.
from threading import Timer
def hello(a):
print("hello, world")
t = Timer(3.0, hello)
t.start() Print "Hello, world" after 3 seconds
Copy the code
With the grammar
Lock, RLock, Condition, and Semaphore can use the with syntax. These objects all implement acquire() and release() methods, and all implement context management protocols.
with some_lock:
# do something...
Copy the code
Is equivalent to
some_lock.acquire()
try:
# do something...
finally:
some_lock.release()
Copy the code
0 x03 summary
This article introduces the use of threads in Python. The threading module provides an overview of common uses of Thread objects and Thread pool executors. You also learned about the use of the synchronization primitives Lock, RLock, Condition, Semaphore, Event and Timer apis for threads.
0 x04 reference
- python-parallel-programmning-cookbook.readthedocs.io
- Docs.python.org/3/library/t…
- Docs.python.org/3.7/library…
- Docs.python.org/3/glossary….
- Docs.python.org/3/library/c…