Posted on 2020-01-21 14:28
魔のkyo 阅读(372)
评论(0) 编辑 收藏 引用 所属分类:
Python 、
Programming
import threading
import time
import queue
from dataclasses import dataclass, field
from typing import Any
@dataclass(order=True)
class ScheduledItem:
time_sec: float
cb: Any = field(compare=False)
class Driver:
def __init__(self):
self.scheduled = queue.PriorityQueue()
self.scheduled_every_lock = threading.Lock()
self.scheduled_every = []
self.callbacks_lock = threading.Lock()
self.callbacks = {}
self.async_queue = queue.Queue()
self.epoch_sec = time.time()
self.last_epoch = None
# 得到driver内的当前时间秒数
def get_epoch(self):
return self.epoch_sec
# 执行一趟主逻辑,一般放在主循环中执行
def run(self, wait_sync_interval=0):
self.epoch_sec = time.time()
if self.last_epoch is not None:
if self.epoch_sec - self.last_epoch < wait_sync_interval:
t = wait_sync_interval - (self.epoch_sec - self.last_epoch)
time.sleep(t)
self.epoch_sec = time.time()
self.last_epoch = self.epoch_sec
self._do_async()
self._do_schedule()
self._do_schedule_every()
# 计划单次定时任务
def schedule(self, cb, time_sec):
self.scheduled.put_nowait( ScheduledItem(time_sec, cb) )
# 计划重复任务
def schedule_every(self, cb, interval_sec):
self.scheduled_every_lock.acquire()
self.scheduled_every.append( { "next_sec":self.epoch_sec+interval_sec, "interval":interval_sec, "cb":cb } )
self.scheduled_every_lock.release()
# 增加消息接收者
def add_receiver(self, topic_or_type, cb):
self.callbacks_lock.acquire()
if topic_or_type not in self.callbacks:
self.callbacks[topic_or_type] = set()
self.callbacks[topic_or_type].add(cb)
self.callbacks_lock.release()
return cb
# 删除消息接收者
def remove_receiver(self, topic_or_type, cb):
self.callbacks_lock.acquire()
if topic_or_type in self.callbacks:
if cb in self.callbacks[topic_or_type]:
self.callbacks[topic_or_type].remove(cb)
self.callbacks_lock.release()
# 同步发送消息
def send(self, obj, topic=None):
if topic == None:
topic = type(obj)
cbs = []
self.callbacks_lock.acquire()
if topic in self.callbacks.keys():
cbs = list(self.callbacks[topic])
self.callbacks_lock.release()
for cb in cbs:
cb(obj)
# 异步发送消息
def send_async(self, obj, topic=None):
self.async_queue.put_nowait( (obj, topic) )
def _do_async(self):
while not self.async_queue.empty():
self.send(*self.async_queue.get_nowait())
def _do_schedule(self):
i = 0
while not self.scheduled.empty():
item = self.scheduled.get_nowait()
if item.time_sec > self.epoch_sec:
self.scheduled.put_nowait(item)
break
item.cb(self.epoch_sec)
def _do_schedule_every(self):
cbs = []
self.scheduled_every_lock.acquire()
for o in self.scheduled_every:
while self.epoch_sec >= o["next_sec"]:
cbs.append(o["cb"])
o["next_sec"] += o["interval"]
self.scheduled_every_lock.release()
for cb in cbs:
cb(self.epoch_sec)
def bind(func, *args, **kw):
return lambda *_args, **_kw: func(*args, *_args, **kw, **_kw)
driver = Driver()