00steven
00steven
5月前 · 25 人阅读

最近工作中有这么一种需求,需要定时将三种任务(假设任务为:A、B、C)分配到10台Windows Server中执行,而且这三种任务中还分有优先级的(为了简单就以每种任务分三种优先级为例吧)。很容易想到这不就是做一个异步调度嘛,找一个有优先级的消息队列就应该可以搞定了。可以后来发现目前Python这边的消息队列竟然主流不支持Windows,如:RQ、高版本的Celery,还有优先级支持也不是很好,于是乎打算自己造一个。
看了一些相关的博客,发现可以用Redis的list结构做队列,对于优先级的支持呢目前我是打算采用这种方式:

每一种任务每一种优先级都单独放一个队列存储(那么三种任务并且每种任务三个优先级别的话就需要9个Redis队列)。

上代码前先简单说明一下实现流程,其实主要就两个模块:入队、出队,说清楚这两块就OK了。

  1. 入队时,定时任务将A、B、C任务以及它们的优先级别传过来,接着我们对其进行判断,看各些任务进那些队列中(也就是各些任务在Redis队列中的键是什么)。我目前采用这么一种键的组合方式:任务类型-优先级(taskType-level),比如:A类型任务中优先级为1的任务最后进入的Redis队列的键为:A-1,那么优先级为100的B类型任务在Redis队列中的键也就为:B-100。简单弄了一张图,凑合着看吧。
  1. 到出队了,出队这边其实挺简单,第一种是:如果该Redis的DB下只有我们的任务,那么我们把所有的键取出来即可,取出来后可以对键按优先级排列(像SQL:order by level),或按任务类型和优先级排列(像SQL:order by taskType, level),排列后得到一个键的列表,再根据这个键的列表去pop任务即可。第二种是:我们可以配置某台客户端可执行的任务的类型,比如其中一台电脑我只想让它跑A类型任务。那我只给它配置A,这样让它去模式匹配Redis中的键(A-[0-9]*),这样取出来的就是A类型的所有优先级的任务了,如果想让它跑A、B任务就可以循环匹配嘛。
    我也不知道有没有讲清楚这个流程,看代码吧(代码写得丑,萌新请各位大大多指教)
    https://github.com/wikizero/MyScripts/blob/master/forWork/MyRedisQueue.py
# coding:utf-8
import redis
import re
import json
import time
from itertools import chain
from datetime import datetime, date


class ExpandJsonEncoder(json.JSONEncoder):
    '''
        采用json方式序列化传入的任务参数,而原生的json.dumps()方法不支持datetime、date,这里做了扩展
    '''

    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.strftime('%Y-%m-%d %H:%M:%S')
        elif isinstance(obj, date):
            return obj.strftime('%Y-%m-%d')
        else:
            return json.JSONEncoder.default(self, obj)


class MyRedisQueue:

    def __init__(self):
        self.redis_connect = redis.Redis()

    def get_len(self, key):
        keys = self.get_keys(key)
        # 每个键的任务数量
        key_len = [(k, self.redis_connect.llen(k)) for k in keys]
        # 所有键的任务数量
        task_len = sum(dict(key_len).values())
        return task_len, key_len

    def get_keys(self, key):
        # Redis的键支持模式匹配
        keys = self.redis_connect.keys(key + '-[0-9]*')
        # 按优先级将键降序排序
        keys = sorted(keys, key=lambda x: int(x.split('-')[-1]), reverse=True)
        return keys

    def push_task(self, key, tasks, level=1):
        '''
        双端队列,左边推进任务
        :param level: 优先级(int类型),数值越大优先级越高,默认1
        :return: 任务队列任务数量
        '''
        # 重新定义优先队列的key
        new_key = key + '-' + str(level)
        # 序列化任务参数
        tasks = [json.dumps(t, cls=ExpandJsonEncoder) for t in tasks]

        print 'RedisQueue info > the number of push tasks:', len(tasks)

        if not tasks:
            return self.get_len(key)

        self.redis_connect.lpush(new_key, *tasks)
        return self.get_len(key)

    def pop_task(self, keys=None, priority=False):
        '''
        双端队列 右边弹出任务
        :param keys: 键列表,默认为None(将获取所有任务的keys)
        :return:
        '''
        while True:
            # 避免在while循环中修改参数,将keys参数赋值到临时变量
            temp_keys = keys

            # 不指定keys,将获取所有任务
            if not keys:
                temp_keys = self.redis_connect.keys()
                temp_keys = list(set([re.sub('-d+$', '', k) for k in temp_keys if re.findall('w+-d+$', k)]))

            # 根据key作为关键字获取所有的键
            all_keys = list(chain(*[self.get_keys(k) for k in temp_keys]))

            # 屏蔽任务差异性,只按优先级高到低弹出任务
            if priority:
                all_keys = sorted(all_keys, key=lambda x: int(x.split('-')[-1]), reverse=True)

            if all_keys:
                task_key, task = self.redis_connect.brpop(all_keys)
                return task_key, json.loads(task)
            time.sleep(2)


if __name__ == '__main__':
    mrq = MyRedisQueue()

    # 把任务推入redis 队列
    # lst = [i for i in xrange(0, 40)]
    # print mrq.push_task('C', lst, level=4)

    # 从redis queue取出任务
    # while True:
    #     task_type, task = mrq.pop_task(keys=['A', 'B', 'C', 'D', 'E'], priority=True)
    #     print task_type, task
    #     time.sleep(1)

    # 查看任务数量以及优先级情况
    # count, key_len = mrq.get_len('task')
    # print key_len


收藏 0
关键词: keys key 任务 self 优先级 task
评论