posts - 225, comments - 62, trackbacks - 0, articles - 0
   :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理

基于Redis的分布式Mutex, ReadWriteLock

Posted on 2020-01-15 13:39 魔のkyo 阅读(561) 评论(0)  编辑 收藏 引用 所属分类: Python
# -*- coding: utf-8 -*-
from redis import ConnectionPool
import redis
import time
import uuid

POOL = ConnectionPool(host='127.0.0.1', max_connections=100)

class Redis(redis.Redis):
    def __init__(self, host=None):
        global POOL
        if POOL.connection_kwargs["host"] != host:
            POOL = ConnectionPool(host=host, max_connections=100)
        super().__init__(connection_pool=POOL)

# 互斥锁
class Mutex:
    def __init__(self, name, server="127.0.0.1"):
        self.name = name
        self.key_name = "MUTEX_" + name
        self.id = uuid.uuid4().hex
        self.redis = Redis(host=server)

    def acquire(self, blocking=True, ex=30):
        r = self.redis.set(self.key_name, self.id, ex=ex, nx=True)
        if blocking:
            while not r:
                time.sleep(0.01)
                r = self.redis.set(self.key_name, self.id, ex=ex, nx=True)
        return r

    def release(self):
        if self.acquired():
            self.redis.delete(self.key_name)

    def acquired(self):
        r = self.redis.get(self.key_name)
        return r != None and r.decode() == str(self.id)

    def __enter__(self):
        self.acquire()

    def __exit__(self, exc_type, exc_value, exc_trackback):
        self.release()
        if exc_value != None:
            raise exc_value


# 读写锁
class ReadWriteLock:
    def __init__(self, name, server="127.0.0.1"):
        self.name = name
        self.server = server
        self.rlock_name = "RLOCK_" + name
        self.wlock_name = "WLOCK_" + name
        self.id = uuid.uuid4().hex
        self.redis = Redis(host=server)
        self.lock_type = None

    def read_lock(self, blocking=True, ex=30):
        mutex = Mutex(self.name, self.server)
        try:
            mutex.acquire()
            wlock_locked = self.redis.get(self.wlock_name)
            if wlock_locked:
                if blocking:
                    while wlock_locked:
                        mutex.release()
                        time.sleep(0.05)
                        mutex.acquire()
                        wlock_locked = self.redis.get(self.wlock_name)
                else:
                    return False
            
            pipeline = self.redis.pipeline()
            now = time.time()
            pipeline.zremrangebyscore(self.rlock_name, 0, int((now-ex)*1000) )
            pipeline.zadd(self.rlock_name, {self.id: int(now*1000)} )
            pipeline.expire(self.rlock_name, ex)
            pipeline.execute()
            self.lock_type = 'R'
            return True
        except Exception as e:
            raise
        finally:
            mutex.release()
    
    def write_lock(self, blocking=True, ex=30):
        mutex = Mutex(self.name, self.server)
        try:
            mutex.acquire()
            self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-ex)*1000) )
            r = self.redis.zcard(self.rlock_name)
            if r:
                if blocking:
                    while r:
                        mutex.release()
                        time.sleep(0.05)
                        mutex.acquire()
                        self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-ex)*1000) )
                        r = self.redis.zcard(self.rlock_name)
                else:
                    return False
            r = self.redis.set(self.wlock_name, self.id, ex=ex, nx=True)
            if blocking:
                while not r:
                    mutex.release()
                    time.sleep(0.05)
                    mutex.acquire()
                    r = self.redis.set(self.wlock_name, self.id, ex=ex, nx=True)
            if r:
                self.lock_type = 'W'
            return r
        except Exception as e:
            raise
        finally:
            mutex.release()

    def unlock(self):
        if self.lock_type == 'R':
            self.redis.zrem(self.rlock_name, self.id)
            self.lock_type = None
        elif self.lock_type == 'W':
            r = self.redis.get(self.wlock_name)
            if r != None and r.decode() == str(self.id):
                self.redis.delete(self.wlock_name)
            self.lock_type = None
考虑了锁住后崩溃,解决方案是超时后自动结束。
考虑了未崩溃但是超时(这种情况首先考虑默认超时时间是否应该调整,或者程序是否需要调整锁住的代码分多次锁?)。
在这种异常情况发生时,可能产生一边释放了锁但还在访问,另一边加上了锁,记住这时异常情况,但我们要保证即使它发生了也尽量能正常工作下去,
对于确实存在访问冲突的那么是没办法的,该异常就异常好了,还有种情况是虽然加了锁,但是并没有访问冲突,其实程序可以正常下去,但是这里会发生什么呢?
对于A线程,手动加锁----------超时自动解锁|------------------------手动解锁|
对于B线程,-------------手动加锁-----------|(等到此加锁成功)------------|---------手动解锁
对于C线程,---------------------------------|------------手动加锁------------|(等到此加锁成功)
A线程因为超时自动解锁后虽然和B线程没有发生访问冲突,但是它解了B线程的锁,导致C线程加锁成功,而B线程实际还没解锁,这又制造了潜在的B线程和C线程的访问冲突。
所以手动解锁时应该判断下当前的锁是否是自己加的。
支持阻塞和非阻塞加锁,默认阻塞,非阻塞用法
if lock.acquire(blocking=False):
    pass
判断加锁成功在做一些事,一些其他语言里命名为tryLock方法。
只有注册用户登录后才能发表评论。