您的位置 首页 知识

使用Python实现一个优雅的异步定时器 用python做

使用Python实现一个优雅的异步定时器 用python做

目录
  • 需求背景
  • 代码
  • 1. 单例事件循环的实现
  • 2. 事件循环的运行与关闭
  • 3. 定时器核心逻辑
  • 4. 启动与停止
  • 5. 使用示例
  • 设计亮点
  • 适用场景
  • 拓展资料

需求背景

定时器的核心功能是能够周期性地触发回调函数,同时需要支持启动、停止以及情形检查等操作。在多线程或异步编程场景中,希望定时器能够:

  1. 支持异步操作,避免阻塞主线程;
  2. 单例化事件循环,节省资源;
  3. 优雅地管理定时器的生活周期;
  4. 提供简单的接口,易于使用。

为此,设计了一个Timer类,结合asynciothreading,实现了一个高效的定时器。

代码

完整代码

import asyncioimport threadingimport timeimport sysclass Timer: _loop = None _thread = None _lock = threading.Lock() _running_timers = 0 @classmethod def _ensure_loop(cls): with cls._lock: if cls._loop is None or not cls._thread or not cls._thread.is_alive(): cls._loop = asyncio.new_event_loop() cls._thread = threading.Thread( target=cls._run_loop, args=(cls._loop,), daemon=True ) cls._thread.start() @classmethod def _run_loop(cls, loop): asyncio.set_event_loop(loop) try: loop.run_forever() except Exception as e: print(f”事件循环异常: e}”) finally: loop.close() @classmethod def _shutdown(cls): with cls._lock: if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running(): cls._loop.call_soon_threadsafe(cls._loop.stop) 不使用 join,由于守护线程会在主线程退出时自动结束 def __init__(self): self.is_running = False self._stop_event = asyncio.Event() self._task = None async def _timer_loop(self, interval, callback): try: while not self._stop_event.is_set(): await asyncio.sleep(interval) if not self._stop_event.is_set(): await asyncio.get_event_loop().run_in_executor(None, callback) except asyncio.CancelledError: pass 正常取消时忽略 except Exception as e: print(f”定时器循环异常: e}”) finally: self.is_running = False Timer._running_timers -= 1 Timer._shutdown() def start(self, interval, callback): if not self.is_running: Timer._ensure_loop() self.is_running = True self._stop_event.clear() self._task = asyncio.run_coroutine_threadsafe( self._timer_loop(interval, callback), Timer._loop ) Timer._running_timers += 1 print(f”定时器已启动,每interval}秒执行一次”) def stop(self): if self.is_running: self._stop_event.set() if self._task: Timer._loop.call_soon_threadsafe(self._task.cancel) self.is_running = False print(“定时器已停止”) def is_active(self): return self.is_running 使用示例def callback1(): print(f”回调1触发: time.strftime(‘%H:%M:%S’)}”)def callback2(): print(f”回调2触发: time.strftime(‘%H:%M:%S’)}”)if __name__ == “__main__”: timer1 = Timer() timer2 = Timer() timer1.start(2, callback1) timer2.start(3, callback2) try: time.sleep(100) timer1.stop() time.sleep(2) timer2.stop() except KeyboardInterrupt: timer1.stop() timer2.stop() finally: 确保在程序退出时清理 Timer._shutdown()

1. 单例事件循环的实现

为了避免每个定时器都创建一个独立的事件循环,在Timer类中使用了类变量和类技巧来管理全局唯一的事件循环:

class Timer: _loop = None _thread = None _lock = threading.Lock() _running_timers = 0 @classmethod def _ensure_loop(cls): with cls._lock: if cls._loop is None or not cls._thread or not cls._thread.is_alive(): cls._loop = asyncio.new_event_loop() cls._thread = threading.Thread( target=cls._run_loop, args=(cls._loop,), daemon=True ) cls._thread.start()

  • _loop:存储全局的asyncio事件循环。
  • _thread:将事件循环运行在一个独立的守护线程中,避免阻塞主线程。
  • _lock:线程锁,确保在多线程环境中创建事件循环时的线程安全。
  • _ensure_loop:在需要时创建或重用事件循环,确保只有一个全局循环。

守护线程(daemon=True)的设计使得程序退出时无需显式关闭线程,简化了资源清理。

2. 事件循环的运行与关闭

事件循环的运行逻辑封装在_run_loop中:

@classmethoddef _run_loop(cls, loop): asyncio.set_event_loop(loop) try: loop.run_forever() except Exception as e: print(f”事件循环异常: e}”) finally: loop.close()

  • run_forever:让事件循环持续运行,直到被外部停止。
  • 异常处理:捕获可能的错误并打印,便于调试。
  • finally:确保循环关闭时资源被正确释放。

关闭逻辑则由_shutdown技巧控制:

@classmethoddef _shutdown(cls): with cls._lock: if cls._running_timers == 0 and cls._loop is not None and cls._loop.is_running(): cls._loop.call_soon_threadsafe(cls._loop.stop)

当所有定时器都停止时(_running_timers == 0),事件循环会被安全停止。

3. 定时器核心逻辑

每个Timer实例负责管理一个独立的定时任务:

def __init__(self): self.is_running = False self._stop_event = asyncio.Event() self._task = Noneasync def _timer_loop(self, interval, callback): try: while not self._stop_event.is_set(): await asyncio.sleep(interval) if not self._stop_event.is_set(): await asyncio.get_event_loop().run_in_executor(None, callback) except asyncio.CancelledError: pass 正常取消时忽略 finally: self.is_running = False Timer._running_timers -= 1 Timer._shutdown()

  • _stop_event:一个asyncio.Event对象,用于控制定时器的停止。
  • _timer_loop:异步协程,每隔interval秒执行一次回调函数callback
  • run_in_executor:将回调函数运行在默认的线程池中,避免阻塞事件循环。

4. 启动与停止

启动和停止技巧是用户的主要接口:

def start(self, interval, callback): if not self.is_running: Timer._ensure_loop() self.is_running = True self._stop_event.clear() self._task = asyncio.run_coroutine_threadsafe( self._timer_loop(interval, callback), Timer._loop ) Timer._running_timers += 1def stop(self): if self.is_running: self._stop_event.set() if self._task: Timer._loop.call_soon_threadsafe(self._task.cancel) self.is_running = False

  • start:启动定时器,确保事件循环可用,并记录运行中的定时器数量。
  • stop:通过设置_stop_event并取消任务来停止定时器。

5. 使用示例

下面内容一个简单的使用示例:

def callback1(): print(f”回调1触发: time.strftime(‘%H:%M:%S’)}”)def callback2(): print(f”回调2触发: time.strftime(‘%H:%M:%S’)}”)timer1 = Timer()timer2 = Timer()timer1.start(2, callback1) 每2秒触发一次timer2.start(3, callback2) 每3秒触发一次time.sleep(10) 运行10秒timer1.stop()timer2.stop()

输出可能如下:

回调1触发: 14:30:02回调2触发: 14:30:03回调1触发: 14:30:04回调1触发: 14:30:06回调2触发: 14:30:06…

设计亮点

  1. 异步与多线程结合:通过asynciothreading,实现了非阻塞的定时器,适合高并发场景。
  2. 资源高效利用:全局唯一的事件循环避免了重复创建的开销。
  3. 优雅的生活周期管理:守护线程和自动关闭机制简化了资源清理。
  4. 线程安全:使用锁机制确保多线程环境下的稳定性。

适用场景

  • 周期性任务调度,如数据刷新、情形检查。
  • 后台服务中的定时监控。
  • 游戏或实时应用中的计时器需求。

拓展资料

这个异步定时器实现结合了 Python 的异步编程和多线程特性,提供了一个轻量、灵活的解决方案。无论是简单的脚本还是复杂的后台服务,它都能胜任。如果你需要一个可靠的定时器,不妨试试这个实现,或者根据需求进一步优化它!

以上就是使用Python实现一个优雅的异步定时器的详细内容,更多关于Python异步定时器的资料请关注风君子博客其它相关文章!

无论兄弟们可能感兴趣的文章:

  • pythonPyQt5事件机制和定时器原理分析及用法详解
  • Python中定时器用法详解之Timer定时器和schedule库
  • python中定时器的高质量使用方式详解
  • python?time模块定时器由浅入深应用实例
  • 详解Python定时器Timer的使用及示例

返回顶部