# -*- coding: utf-8 -*-
from redis import ConnectionPool
import redis
import time
import uuid
POOL = ConnectionPool(host='127.0.0.1', max_connections=100)
class Redis(redis.Redis):
def __init__(self, host=None):
global POOL
if POOL.connection_kwargs["host"] != host:
POOL = ConnectionPool(host=host, max_connections=100)
super().__init__(connection_pool=POOL)
# 互斥锁
class Mutex:
def __init__(self, name, server="127.0.0.1"):
self.name = name
self.key_name = "MUTEX_" + name
self.id = uuid.uuid4().hex
self.redis = Redis(host=server)
def acquire(self, blocking=True, ex=30):
r = self.redis.set(self.key_name, self.id, ex=ex, nx=True)
if blocking:
while not r:
time.sleep(0.01)
r = self.redis.set(self.key_name, self.id, ex=ex, nx=True)
return r
def release(self):
if self.acquired():
self.redis.delete(self.key_name)
def acquired(self):
r = self.redis.get(self.key_name)
return r != None and r.decode() == str(self.id)
def __enter__(self):
self.acquire()
def __exit__(self, exc_type, exc_value, exc_trackback):
self.release()
if exc_value != None:
raise exc_value
# 读写锁
class ReadWriteLock:
def __init__(self, name, server="127.0.0.1"):
self.name = name
self.server = server
self.rlock_name = "RLOCK_" + name
self.wlock_name = "WLOCK_" + name
self.id = uuid.uuid4().hex
self.redis = Redis(host=server)
self.lock_type = None
def read_lock(self, blocking=True, ex=30):
mutex = Mutex(self.name, self.server)
try:
mutex.acquire()
wlock_locked = self.redis.get(self.wlock_name)
if wlock_locked:
if blocking:
while wlock_locked:
mutex.release()
time.sleep(0.05)
mutex.acquire()
wlock_locked = self.redis.get(self.wlock_name)
else:
return False
pipeline = self.redis.pipeline()
now = time.time()
pipeline.zremrangebyscore(self.rlock_name, 0, int((now-ex)*1000) )
pipeline.zadd(self.rlock_name, {self.id: int(now*1000)} )
pipeline.expire(self.rlock_name, ex)
pipeline.execute()
self.lock_type = 'R'
return True
except Exception as e:
raise
finally:
mutex.release()
def write_lock(self, blocking=True, ex=30):
mutex = Mutex(self.name, self.server)
try:
mutex.acquire()
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-ex)*1000) )
r = self.redis.zcard(self.rlock_name)
if r:
if blocking:
while r:
mutex.release()
time.sleep(0.05)
mutex.acquire()
self.redis.zremrangebyscore(self.rlock_name, 0, int((time.time()-ex)*1000) )
r = self.redis.zcard(self.rlock_name)
else:
return False
r = self.redis.set(self.wlock_name, self.id, ex=ex, nx=True)
if blocking:
while not r:
mutex.release()
time.sleep(0.05)
mutex.acquire()
r = self.redis.set(self.wlock_name, self.id, ex=ex, nx=True)
if r:
self.lock_type = 'W'
return r
except Exception as e:
raise
finally:
mutex.release()
def unlock(self):
if self.lock_type == 'R':
self.redis.zrem(self.rlock_name, self.id)
self.lock_type = None
elif self.lock_type == 'W':
r = self.redis.get(self.wlock_name)
if r != None and r.decode() == str(self.id):
self.redis.delete(self.wlock_name)
self.lock_type = None
考虑了锁住后崩溃,解决方案是超时后自动结束。
考虑了未崩溃但是超时(这种情况首先考虑默认超时时间是否应该调整,或者程序是否需要调整锁住的代码分多次锁?)。
在这种异常情况发生时,可能产生一边释放了锁但还在访问,另一边加上了锁,记住这时异常情况,但我们要保证即使它发生了也尽量能正常工作下去,
对于确实存在访问冲突的那么是没办法的,该异常就异常好了,还有种情况是虽然加了锁,但是并没有访问冲突,其实程序可以正常下去,但是这里会发生什么呢?
对于A线程,手动加锁----------超时自动解锁|------------------------手动解锁|
对于B线程,-------------手动加锁-----------|(等到此加锁成功)------------|---------手动解锁
对于C线程,---------------------------------|------------手动加锁------------|(等到此加锁成功)
A线程因为超时自动解锁后虽然和B线程没有发生访问冲突,但是它解了B线程的锁,导致C线程加锁成功,而B线程实际还没解锁,这又制造了潜在的B线程和C线程的访问冲突。
所以手动解锁时应该判断下当前的锁是否是自己加的。
支持阻塞和非阻塞加锁,默认阻塞,非阻塞用法
if lock.acquire(blocking=False):
pass
判断加锁成功在做一些事,一些其他语言里命名为tryLock方法。