This article is participating in Python Theme Month. See the link for details

Locking operation is non-atomic

import threading, time, redis
from redis import StrictRedis
# Method 1: lock operation is non-atomic
# Failed cases

class RedisLock(object) :
    def __init__(self, redis_conn) :
        self.redis_conn = redis_conn

    def get_lock_key(self, key) :
        lock_key = 'lock_%s' % key
        return lock_key

    def get_lock(self, key) :
        "" :param key: distributed lock Key :return: Get_lock obtains the lock in a loop. Only those who have obtained the lock can exit the loop. All threads that need to perform operation A must first obtain a specified key in Redis. If the key obtained by one thread is null, the key is successfully obtained, and key=1 is set to perform operation A. All other threads will fail to obtain the key again and the thread that obtained the key is null will perform operation A. Delete key=1 so that the next thread can acquire the lock. However, there is a loophole in this method: after the first thread obtains the key, the second thread obtains the "" before it can put the key=1.
        lock_key = self.get_lock_key(key)
        while True:
            value = self.redis_conn.get(lock_key)
            if not value:
                self.redis_conn.set(lock_key, 1)
                return True
            time.sleep(0.01)

    def del_lock(self, key) :
        lock_key = self.get_lock_key(key)
        return self.redis_conn.delete(lock_key)

def increase_data(redis_conn, lock, key) :
    lock_value = lock.get_lock(key)  # acquiring a lock
    value = redis_conn.get(key)  # Fetch data
    time.sleep(0.1)
    if value:
        value = int(value) + 1
    else:
        value = 0
    redis_conn.set(key, value)
    thread_name = threading.current_thread().name
    print(thread_name, value)
    lock.del_lock(key)  # releases the lock


# # the main program
if __name__ == "__main__":
    pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=8)
    redis = StrictRedis(connection_pool=pool)
    lock = RedisLock(redis)
    key = 'test_key'
    thread_count = 10
    redis.delete(key)
    for i in range(thread_count):
        Start 10 threads to call increase_data
        The lock passed in is like a lock tool. Anyone can use the lock tool to obtain a lock in Redis, but the number of locks is unique
        thread = threading.Thread(target=increase_data, args=(redis, lock, key))
        thread.start()
    "" Actual output result thread-1 0 thread-8 1 thread-6 thread-7 1 1 thread-9 1 thread-2thread-4 2 2 thread-3 2 thread-10 2 thread-5 2 "" "
Copy the code

Use setNx atomic locking

import threading, time, redis
from redis import StrictRedis


class RedisLock(object) :
    def __init__(self, redis_conn) :
        self.redis_conn = redis_conn

    def get_lock_key(self, key) :
        lock_key = 'lock_%s' % key
        return lock_key

    def get_lock(self, key) :
        """ :param key: :return: In view of the above version because the command is not atomic operation caused by two or more threads to acquire the lock at the same time, this version is changed to use redis setnx command to query and set the lock operation setnx is set if not exists, Value is set when the key does not exist and returns 1. If the key already exists, nothing is done and 0 is returned. Setnx takes only one step to acquire the lock and set the lock. This atomicity is the key to success. Yes, but there is a problem. If one thread throws an exception when operating on A, all the other threads waiting to acquire the lock will fall into an infinite loop, causing a deadlock.
        lock_key = self.get_lock_key(key)
        while True:
            value = self.redis_conn.setnx(lock_key, 1)
            if value:
                return True
            time.sleep(0.01)


    def del_lock(self, key) :
        lock_key = self.get_lock_key(key)
        return self.redis_conn.delete(lock_key)



def increase_data(redis_conn, lock, key) :
    lock_value = lock.get_lock(key)  # acquiring a lock
    value = redis_conn.get(key)  # Fetch data
    time.sleep(0.1)
    if value:
        value = int(value) + 1
    else:
        value = 0
    redis_conn.set(key, value)
    thread_name = threading.current_thread().name
    print(thread_name, value)
    # Simulate deadlock
    # if thread_name == "Thread-2":
    # print("thread-2 crash ...." )
    # import sys
    # sys.exit(1)
    lock.del_lock(key)  # releases the lock


# # the main program
if __name__ == "__main__":
    pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=8)
    redis = StrictRedis(connection_pool=pool)
    lock = RedisLock(redis)
    key = 'test_key'
    thread_count = 10
    redis.delete(key)
    for i in range(thread_count):
        thread = threading.Thread(target=increase_data, args=(redis, lock, key))
        thread.start()


Copy the code

To solve the deadlock

import threading, time, redis
from redis import StrictRedis


class RedisLock(object) :
    def __init__(self, redis_conn) :
        self.redis_conn = redis_conn

    def get_lock_key(self, key) :
        lock_key = 'lock_%s' % key
        return lock_key

    def get_lock(self, key, timeout=1) :
        """ :param key: :param timeout: :return: According to the TTL extension data of REDis, redis has two types of failure mechanisms: 1 passive failure: when the client proactively obtains the TTL, it determines whether the TTL is invalid (timeout). 2 Active failure: Redis has a scheduled task, which is executed 10 times per second. It randomly obtains 20 keys from the set invalidation mechanism, deletes expired keys, and checks whether more than 1/4 of the keys are invalid. If the first step is not performed, it obtains 20 keys again. Deadlocks are solved but there are still problems with the locking mechanism: After thread 1 exits the deadlock, thread 2 immediately acquires the lock, but thread 1 still has a step to delete the lock, which will delete the lock that thread 2 just put in, so that the data will be confused. You can also add a process PID, machine IP, or timestamp to make sure that it's absolutely unique and then you can set it in the lock delete operation that the set lock and the lock delete must be executed in the same thread.
        lock_key = self.get_lock_key(key)
        while True:
            value = self.redis_conn.set(lock_key, 1, nx=True, ex=timeout)
            if value:
                break
            time.sleep(0.1)



    def del_lock(self, key) :
        lock_key = self.get_lock_key(key)
        return self.redis_conn.delete(lock_key)



def increase_data(redis_conn, lock, key) :
    lock_value = lock.get_lock(key)  # acquiring a lock
    value = redis_conn.get(key)  # Fetch data
    time.sleep(2.5)  # Simulate some time-consuming operation in a real situation that takes longer than the lock expires
    time.sleep(0.1)
    if value:
        value = int(value) + 1
    else:
        value = 0
    redis_conn.set(key, value)
    thread_name = threading.current_thread().name
    print(thread_name, value)
    # Simulate deadlock
    if thread_name == "Thread-2":
        print("thread-2 crash ....")
        import sys
        sys.exit(1)
    lock.del_lock(key)  # releases the lock


# # the main program
if __name__ == "__main__":
    pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=8)
    redis = StrictRedis(connection_pool=pool)
    lock = RedisLock(redis)
    key = 'test_key'
    thread_count = 10
    redis.delete(key)
    for i in range(thread_count):
        thread = threading.Thread(target=increase_data, args=(redis, lock, key))
        thread.start()


Copy the code

The thread uniqueness is used to solve the error deletion lock

import os
import socket
import threading, time, redis
from redis import StrictRedis


class RedisLock(object) :
    def __init__(self, redis_conn) :
        self.redis_conn = redis_conn
        Get the machine IP
        self.ip = socket.gethostbyname(socket.gethostname())
        Get process PID
        self.pid = os.getpid()

    def get_lock_key(self, key) :
        lock_key = 'lock_%s' % key
        return lock_key

    def gen_unique_value(self) :
        thread_name = threading.current_thread().name
        time_now = time.time()
        Machine IP thread PID process name timestamp to establish uniqueness
        unique_value = "{0} - {1} {2} - {3}".format(self.ip, self.pid, thread_name, time_now)
        return unique_value

    def get_lock(self, key, timeout=4) :
        """ :param key: :param timeout: :return: After acquiring the lock, it will return the unique identifier (gen_unique_value). When deleting the lock, it will delete the lock by judging whether the unique identifier is equal or not. The execution failed, and the error is even bigger than before. It's about to delete the key, thread A's key expires, thread B grabs the lock, writes to it, and then thread A deletes the lock that thread B wrote to it and basically the delete operation is not atomic, there's a slot that can be inserted in the lock although this can be avoided by setting a reasonable timeout period for the lock, For example, when setting a lock with a timeout longer than the thread execution time, we assume that the Redis server is a single cluster and highly available, ignoring the following issues: If the redis master node fails at some time and a slave node becomes the master node in the cluster, the lock on the original master node may not be synchronized to the slave node in time, resulting in other threads acquiring the lock at the same time. For this problem, you can refer to the official Redis redLock algorithm, but unfortunately, this algorithm does not solve the lock expiration problem well. "" "
        lock_key = self.get_lock_key(key)
        unique_value = self.gen_unique_value()
        print("unique value %s" % unique_value)
        while True:
            value = self.redis_conn.set(lock_key, 1, nx=True, ex=timeout)
            if value:
                return unique_value
            else:
                thread_name = threading.current_thread().name
            time.sleep(0.1)

    def del_lock(self, key, value) :
        Key: lock :param value: process information :return: """
        lock_key = self.get_lock_key(key)
        old_lock_value = self.redis_conn.get(lock_key)
        if old_lock_value == value:
            return self.redis_conn.delete(lock_key)



def increase_data(redis_conn, lock, key) :
    lock_value = lock.get_lock(key) # acquiring a lock
    value = redis_conn.get(key) # Fetch data
    time.sleep(2.5) # Simulate some time-consuming operation in a real situation that takes longer than the lock expires
    if value:
        value = int(value) + 1
    else:
        value = 0
    redis_conn.set(key, value)
    thread_name = threading.current_thread().name
    print(thread_name, value)
    if thread_name == "Thread-2":
        print("thread-2 crash ....")
        import sys
        sys.exit(1)
    lock.del_lock(key, lock_value) # releases the lock

# # the main program
if __name__ == "__main__":
    pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=8)
    redis = StrictRedis(connection_pool=pool)
    lock = RedisLock(redis)
    key = 'test_key'
    thread_count = 10
    redis.delete(key)
    for i in range(thread_count):
        thread = threading.Thread(target=increase_data, args=(redis, lock, key))
        thread.start()

    "" unique value 10.0.0.13-52504 thread-2-1621907713.510651 "" unique value 10.0.0.13-52504 thread-2-1621907713.510651 Unique value 10.0.0.13-52504- thread-3-1621907713.510732 unique value 10.0.0.13-52504- thread-4-1621907713.510732 Value 10.0.0.13-52504- thread-5-1621907713.511217 unique value 10.0.0.13-52504- thread-6-1621907713.511217 unique value 10.0.0.13-52504- thread-6-1621907713.511217 unique value 10.0.0.13-52504- thread-6-1621907713.511217 unique value 10.0.0.13-52504- thread-7-1621907713.5151 unique value 10.0.0.13-52504- thread-8-1621907713.5151 unique value 10.0.0.13-52504- thread-8-1621907713.5151 unique value 10.0.0.13-52504 thread-9-1621907713.512355 unique value 10.0.0.13-52504 thread-10-1621907713.512751 thread-1 0 thread-6 0 Thread-3 0 Thread-4 1 Thread-9 1 Thread-7 1 Thread-8 2 Thread-10 2 Thread-2 2 thread-2 crash .... Thread-5 3 """
Copy the code