视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
Python开发之多个定时任务在单线程下执行的实例分析
2020-11-27 14:13:44 责编:小采
文档


单线程多定时任务

1、初始版本:

思路:定时器,说白了就是延时执行指定的程序,目前自己重构python里面的定时器不太现实,能力达不到,所以延时操作时还得用到系统定时器,不过我们可以改一下规则;把所有要进行定时操作的程序添加到特定列表中,把列表中定时时间最短程序拿出来,进行threading.Timer(time,callback)绑定,等时间超时触发自定义的callback,执行刚刚列表取出的程序;然后把时间更新,再次把列表中时间最短的程序拿出了,继续threading.Timer绑定,不断的迭代循环;当有新的定时任务加入到列表时,把当前的threading.Timer绑定取消,更新列表中的时间,再次取出最短时间,进行threading.Timer绑定......

代码:

import threading
import time

class Timer():
 '''单线程下的定时器'''

 def __init__(self):
 self.queues = []
 self.timer = None
 self.last_time = time.time()

 def start(self):
 item = self.get()
 if item:
 self.timer = threading.Timer(item[0],self.execute)
 self.timer.start()

 def add(self,item):
 print('add',item)
 self.flush_time()
 self.queues.append(item)
 self.queues.sort(key=lambda x:x[0])

 if self.timer:
 self.timer.cancel()
 self.timer = None
 self.start()

 def get(self):
 item = None
 if len(self.queues) > 0:
 item = self.queues[0]
 return item

 def pop(self):
 item = None
 if len(self.queues) > 0:
 item = self.queues.pop(0)
 return item

 def flush_time(self):
 curr_time = time.time()
 for i in self.queues:
 i[0] = i[0] - (curr_time - self.last_time)
 self.last_time = curr_time

 def execute(self):
 # if self.timer:
 # self.timer.cancel()
 # self.timer = None
 item = self.pop()
 self.flush_time()
 if item:
 callback = item[1]
 args = item[0]
 callback(args)
 self.start()

执行及输出:

if __name__ == '__main__': # 检测线程数
 def func(): while True: print(threading.active_count())
 time.sleep(1)
 
 f1 = threading.Thread(target=func)
 f1.start() 
 import logging
 logging.basicConfig(level=logging.INFO,format="%(asctime)s %(message)s", datefmt="%m/%d/%Y %H:%M:%S [%A]") def func1(*args):
 logging.info('func1 %s'%args) # time.sleep(5)
 
 def func2(*args):
 logging.info('func2 %s' % args) # time.sleep(5)
 def func3(*args):
 logging.info('func3 %s' % args) # time.sleep(5)
 
 def func4(*args):
 logging.info('func4 %s' % args) # time.sleep(5)
 
 def func5(*args):
 logging.info('func5 %s' % args) # time.sleep(5)
 
 
 # 测试
 t1 = Timer()
 logging.info('start')
 t1.add([5,func1])
 time.sleep(0.5)
 t1.add([4,func2])
 time.sleep(0.5)
 t1.add([3,func3])
 time.sleep(0.5)
 t1.add([2,func4])
 time.sleep(0.5)
 t1.add([1,func5])
 time.sleep(5)
 t1.add([1,func1])
 t1.add([2,func2])
 t1.add([3,func3])
 t1.add([4,func4])
 t1.add([5,func5]) 
 # 
输出 # 2 # 07/27/2017 10:36:47 [Thursday] start # add [5, <function func1 at 0x000000D79FC77E18>] # add [4, <function func2 at 0x000000D79FCA8488>] # 3 # add [3, <function func3 at 0x000000D79FCA8510>] # add [2, <function func4 at 0x000000D79FCA8598>] # 3 # add [1, <function func5 at 0x000000D79FCA8620>] # 3 # 07/27/2017 10:36:50 [Thursday] func5 1 # 07/27/2017 10:36:51 [Thursday] func4 0.498349666595459 # 3 # 07/27/2017 10:36:51 [Thursday] func3 0.49782633781433105 # 07/27/2017 10:36:52 [Thursday] func2 0.49848270416259766 # 3 # 07/27/2017 10:36:52 [Thursday] func1 0.48449039459228516 # 2 # 2 # add [1, <function func1 at 0x000000D79FC77E18>] # add [2, <function func2 at 0x000000D79FCA8488>] # add [3, <function func3 at 0x000000D79FCA8510>] # add [4, <function func4 at 0x000000D79FCA8598>] # add [5, <function func5 at 0x000000D79FCA8620>] # 3 # 07/27/2017 10:36:55 [Thursday] func1 0.9990766048431396 # 3 # 07/27/2017 10:36:56 [Thursday] func2 0.9988017082214355 # 3 # 07/27/2017 10:36:57 [Thursday] func3 0.99928879737854 # 07/27/2017 10:36:58 [Thursday] func4 0.9991350173950195 # 3 # 3 # 07/27/2017 10:36:59 [Thursday] func5 0.9988160133361816

执行代码

注:查看代码输出,所有的定时器都按照标定的时间依次执行,非常完美,一切看起来很美好,只是看起来,呵呵哒,当你把func里面的time.sleep(5)启用后,线程数蹭蹭的上来了;原因是上个定时器callback还是执行中,下个定时器已经启动了,这时就又新增了一个线程,哎,失败

2、修订版本

思路:利用生成者消费者模型,用到threading.Condition条件变量;强制永远启用的是一个Timer!

代码:

import time
import threading
import logging

class NewTimer(threading.Thread):
 '''单线程下的定时器'''
 def __init__(self):
 super().__init__()
 self.queues = []
 self.timer = None
 self.cond = threading.Condition()

 def run(self):
 while True:
 # print('NewTimer',self.queues)
 self.cond.acquire()
 item = self.get()
 callback = None
 if not item:
 logging.info('NewTimer wait')
 self.cond.wait()
 elif item[0] <= time.time():
 new_item = self.pop()
 callback = new_item[1]
 else:
 logging.info('NewTimer start sys timer and wait')
 self.timer = threading.Timer(item[0]-time.time(),self.execute)
 self.timer.start()
 self.cond.wait()
 self.cond.release()

 if callback:
 callback(item[0])

 def add(self, item):
 # print('add', item)
 self.cond.acquire()
 item[0] = item[0] + time.time()
 self.queues.append(item)
 self.queues.sort(key=lambda x: x[0])
 logging.info('NewTimer add notify')
 if self.timer:
 self.timer.cancel()
 self.timer = None
 self.cond.notify()
 self.cond.release()

 def pop(self):
 item = None
 if len(self.queues) > 0:
 item = self.queues.pop(0)
 return item

 def get(self):
 item = None
 if len(self.queues) > 0:
 item = self.queues[0]
 return item

 def execute(self):
 logging.info('NewTimer execute notify')
 self.cond.acquire()
 self.cond.notify()
 self.cond.release()

执行及输出:

if __name__ == '__main__': def func(): while True: print(threading.active_count())
 time.sleep(1)

 f1 = threading.Thread(target=func)
 f1.start()
 logging.basicConfig(level=logging.INFO,format="%(asctime)s %(message)s", datefmt="%m/%d/%Y %H:%M:%S [%A]")

 newtimer = NewTimer()
 newtimer.start() def func1(*args):
 logging.info('func1 %s'%args)
 time.sleep(5) def func2(*args):
 logging.info('func2 %s' % args)
 time.sleep(5) def func3(*args):
 logging.info('func3 %s' % args)
 time.sleep(5) def func4(*args):
 logging.info('func4 %s' % args)
 time.sleep(5) def func5(*args):
 logging.info('func5 %s' % args)
 time.sleep(5)

 newtimer.add([5,func1])
 newtimer.add([4,func2])
 newtimer.add([3,func3])
 newtimer.add([2,func4])
 newtimer.add([1,func5])
 time.sleep(1)
 newtimer.add([1,func1])
 newtimer.add([2,func2])
 newtimer.add([3,func3])
 newtimer.add([4,func4])
 newtimer.add([5,func5])# 
输出# 2# 07/27/2017 11:26:19 [Thursday] NewTimer wait# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer add notify# 07/27/2017 11:26:19 [Thursday] NewTimer start sys timer and wait# 07/27/2017 11:26:20 [Thursday] NewTimer execute notify# 4# 07/27/2017 11:26:20 [Thursday] func5 1501125980.2175007# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 07/27/2017 11:26:20 [Thursday] NewTimer add notify# 3# 3# 3# 3# 3# 07/27/2017 11:26:25 [Thursday] func4 1501125981.2175007# 3# 3# 3# 3# 07/27/2017 11:26:30 [Thursday] func1 1501125981.218279# 3# 3# 3# 3# 3# 3# 07/27/2017 11:26:35 [Thursday] func3 1501125982.2175007# 3# 3# 3# 3# 07/27/2017 11:26:40 [Thursday] func2 1501125982.218279# 3# 3# 3# 3# 3# 07/27/2017 11:26:45 [Thursday] func2 1501125983.2175007# 3# 3# 3# 3# 3# 07/27/2017 11:26:50 [Thursday] func3 1501125983.218279# 3# 3# 3# 3# 3# 07/27/2017 11:26:55 [Thursday] func1 1501125984.2175007# 3# 3# 3# 3# 3# 07/27/2017 11:27:00 [Thursday] func4 1501125984.218279# 3# 3# 3# 3# 3# 07/27/2017 11:27:05 [Thursday] func5 1501125985.218279# 3# 3# 3# 3# 3# 07/27/2017 11:27:10 [Thursday] NewTimer wait

输出

注:这次无论如何测试线程数也不会蹭蹭的上涨,同时可以实现多定时器任务要求;缺点:用到了两线程,没有用到单线程去实现,第二时间精准度问题,需要等待上个定时程序执行完毕,程序才能继续运行

下载本文
显示全文
专题