视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
Python使用Beanstalkd做异步任务处理的方法
2020-11-27 14:12:43 责编:小采
文档
 这篇文章主要介绍了Python使用 Beanstalkd 做异步任务处理的方法,现在分享给大家,也给大家做个参考。一起过来看看吧

使用 Beanstalkd 作为消息队列服务,然后结合 Python 的装饰器语法实现一个简单的异步任务处理工具.

最终效果

定义任务:

from xxxxx.job_queue import JobQueue

queue = JobQueue()

@queue.task('task_tube_one')
def task_one(arg1, arg2, arg3):
 # do task

提交任务:

task_one.put(arg1="a", arg2="b", arg3="c")

然后就可以由后台的 work 线程去执行这些任务了。

实现过程

1、了解 Beanstalk Server

Beanstalk is a simple, fast work queue. https://github.com/kr/beanstalkd

Beanstalk 是一个 C 语言实现的消息队列服务。 它提供了通用的接口,最初设计的目的是通过异步运行耗时的任务来减少大量Web应用程序中的页面延迟。针对不同的语言,有不同的 Beanstalkd Client 实现。 Python 里就有 beanstalkc 等。我就是利用 beanstalkc 来作为与 beanstalkd server 通信的工具。

2、任务异步执行实现原理

beanstalkd 只能进行字符串的任务调度。为了让程序支持提交函数和参数,然后由woker执行函数并携带参数。需要一个中间层来将函数与传递的参数注册。

实现主要包括3个部分:

Subscriber: 负责将函数注册到 beanstalk 的一个tube上,实现很简单,注册函数名和函数本身的对应关系。(也就意味着同一个分组(tube)下不能有相同函数名存在)。数据存储在类变量里。

class Subscriber(object):
 FUN_MAP = defaultdict(dict)

 def __init__(self, func, tube):
 logger.info('register func:{} to tube:{}.'.format(func.__name__, tube))
 Subscriber.FUN_MAP[tube][func.__name__] = func

JobQueue: 方便将一个普通函数转换为具有 Putter 能力的装饰器

class JobQueue(object):
 @classmethod
 def task(cls, tube):
 def wrapper(func):
 Subscriber(func, tube)
 return Putter(func, tube)

 return wrapper

Putter: 将函数名、函数参数、指定的分组组合为一个对象,然后 json 序列化为字符串,最后通过 beanstalkc 推送到beanstalkd 队列。

class Putter(object):
 def __init__(self, func, tube):
 self.func = func
 self.tube = tube

 # 直接调用返回
 def __call__(self, *args, **kwargs):
 return self.func(*args, **kwargs)

 # 推给离线队列
 def put(self, **kwargs):
 args = {
 'func_name': self.func.__name__,
 'tube': self.tube,
 'kwargs': kwargs
 }
 logger.info('put job:{} to queue'.format(args))
 beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
 try:
 beanstalk.use(self.tube)
 job_id = beanstalk.put(json.dumps(args))
 return job_id
 finally:
 beanstalk.close()

Worker: 从 beanstalkd 队列中取出字符串,然后通过 json.loads 反序列化为对象,获得 函数名、参数和tube。最后从 Subscriber 中获得 函数名对应的函数代码,然后传递参数执行函数。

class Worker(object):
 worker_id = 0

 def __init__(self, tubes):
 self.beanstalk = beanstalkc.Connection(host=BEANSTALK_CONFIG['host'], port=BEANSTALK_CONFIG['port'])
 self.tubes = tubes
 self.reserve_timeout = 20
 self.timeout_limit = 1000
 self.kick_period = 600
 self.signal_shutdown = False
 self.release_delay = 0
 self.age = 0
 self.signal_shutdown = False
 signal.signal(signal.SIGTERM, lambda signum, frame: self.graceful_shutdown())
 Worker.worker_id += 1
 import_module_by_str('pear.web.controllers.controller_crawler')

 def subscribe(self):
 if isinstance(self.tubes, list):
 for tube in self.tubes:
 if tube not in Subscriber.FUN_MAP.keys():
 logger.error('tube:{} not register!'.format(tube))
 continue
 self.beanstalk.watch(tube)
 else:
 if self.tubes not in Subscriber.FUN_MAP.keys():
 logger.error('tube:{} not register!'.format(self.tubes))
 return
 self.beanstalk.watch(self.tubes)

 def run(self):
 self.subscribe()
 while True:
 if self.signal_shutdown:
 break
 if self.signal_shutdown:
 logger.info("graceful shutdown")
 break
 job = self.beanstalk.reserve(timeout=self.reserve_timeout) # 阻塞获取任务,最长等待 timeout
 if not job:
 continue
 try:
 self.on_job(job)
 self.delete_job(job)
 except beanstalkc.CommandFailed as e:
 logger.warning(e, exc_info=1)
 except Exception as e:
 logger.error(e)
 kicks = job.stats()['kicks']
 if kicks < 3:
 self.bury_job(job)
 else:
 message = json.loads(job.body)
 logger.error("Kicks reach max. Delete the job", extra={'body': message})
 self.delete_job(job)

 @classmethod
 def on_job(cls, job):
 start = time.time()
 msg = json.loads(job.body)
 logger.info(msg)
 tube = msg.get('tube')
 func_name = msg.get('func_name')
 try:
 func = Subscriber.FUN_MAP[tube][func_name]
 kwargs = msg.get('kwargs')
 func(**kwargs)
 logger.info(u'{}-{}'.format(func, kwargs))
 except Exception as e:
 logger.error(e.message, exc_info=True)
 cost = time.time() - start
 logger.info('{} cost {}s'.format(func_name, cost))

 @classmethod
 def delete_job(cls, job):
 try:
 job.delete()
 except beanstalkc.CommandFailed as e:
 logger.warning(e, exc_info=1)

 @classmethod
 def bury_job(cls, job):
 try:
 job.bury()
 except beanstalkc.CommandFailed as e:
 logger.warning(e, exc_info=1)

 def graceful_shutdown(self):
 self.signal_shutdown = True

写上面代码的时候,发现一个问题:

通过 Subscriber 注册函数名和函数本身的对应关系,是在一个Python解释器,也就是在一个进程里运行的,而 Worker 又是异步在另外的进程运行,怎么样才能让 Worker 也能拿到和 Putter 一样的 Subscriber。最后发现通过 Python 的装饰器机制可以解决这个问题。

就是这句解决了 Subscriber 的问题

import_module_by_str('pear.web.controllers.controller_crawler')
# import_module_by_str 的实现
def import_module_by_str(module_name):
 if isinstance(module_name, unicode):
 module_name = str(module_name)
 __import__(module_name)

执行 import_module_by_str 时, 会调用 __import__ 动态加载类和函数。将使用了 JobQueue 的函数所在模块加载到内存之后。当 运行 Woker 时,Python 解释器就会先执行 @修饰的装饰器代码,也就会把 Subscriber 中的对应关系加载到内存。

实际使用可以看 https://github.com/jiyangg/Pear/blob/master/pear/jobs/job_queue.py

下载本文
显示全文
专题