Python rq 轻量级的任务队列
写于2020年06月27日

Python rq 轻量级的任务队列

rq是类celery的一个轻量级的任务队列,但是任务存储后端没有celery丰富,只支持redis,也依赖于redis,不过简单好用。

安装

1
$ pip install rq

Queue 队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from rq import Queue
from redis import Redis

def foo(name):
print(f"hello {name}")

# 创建一个redis连接
host = "xxx.xxx.x.xxx"
port = 6379
db = 0
redis_conn = Reids(host=host, port=port, db=db)
# 创建一个队列
q = Queue("myqueue", connection=redis_conn)

# 任务入队 传递函数引用
# result_ttl 任务结果存储过期时间 默认500秒 -1 永不过期
# ttl 任务如果在30秒内没有被执行 则会被丢弃
# fail_ttl 失败任务过期时间 默认1年
# job_timeou 任务执行超时时间 默认180秒
job = q.enqueue(foo, "world", result_ttl=600, ttl=30, fail_ttl=600, job_timeou=600)
# 如果当前进程无法访问到工作进程函数引用时,可以选择传递字符串
# job = q.enqueue(“mypackage.mymodule.foo”, "world")

# enqueue函数除了可以接收函数引用和字符串引用,也可以直接接收Job实例
# from rq.job import Job
# job = Job.create(foo, "world", id="job_id")

# 获取队列长度
print(len(q))

# 获取队列所有任务id
print(q.job_ids)

# 获取队列所有任务实例
print(q.jobs)

# 获取队列指定ID的任务实例
job = q.fetch_job("job_id")

# 清空队列 将会删除队列内所有任务
q.empty()

# 删除队列 delete_jobs参数将会删除所有任务
# 删除后队列将不可用 可以通过enqueue将任务重新入队创建
q.delete(delete_jobs=True)

Job 任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from rq import get_current_job
# 任务除了可以通过enqueue入队,也可以使用类似于celery的task装饰器
from rq.decorators import job

host = "xxx.xxx.x.xxx"
port = 6379
db = 0
redis_conn = Reids(host=host, port=port, db=db)

@job("myqueue", connection=redis_conn)
def foo(name):
return f"hello, {name}"

# 或者
# q = Queue("myqueue", connection=redis_conn)
# @job(q)
# def foo(name):
# return f"hello, {name}"

# 执行任务
job = foo.delay("world")

# 或者直接从redis检索
# job = Job.fetch("job_id", connection=redis_conn)
# 也可以批量获取
# jobs = Job.fetch_many(["job_id1", "job_id2"], connection=redis_conn)

# 获取任务id
print(job.id)

# 获取任务状态 可能的值有 queued, started, deferred, finished, failed
print(job.get_status())

# 任务函数名
print(job.func_name)

# 任务函数位置参数
print(job.args)

# 任务函数关键字参数
print(job.kwargs)

# 获取返回值 完成前将返回None 保存时间为500秒(可以通过result_ttl设置保存时间)
print(job.result)

# 获取异常信息(任务没有成功的情况下)
print(job.exc_info)

# 可以在函数内部获取当前任务实例
def bar():
job = get_current_job()
# 通过meta属性可以存储自定义数据
job.meta["name"] = "world"
job.save_meta()
return "Bar"

# 任务依赖 job2将会在job1完成时入队
def job1():
return "job1 ok"

def job2():
return "job2 ok"

job_1 = q.enqueue(job1)
q.enqueue(job2, depends_on=job_1)

# 任务失败将默认放入FailedJobRegistry
from rq import Worker
def test():
print(1/0)

test_job = q.enqueue(test)
worker = Worker([queue])
worker.work(burst=True)

print(test_job.is_failed) # True

registry = queue.failed_job_registry
assert len(registry) == 1
# 重新入队
registry.requeue(test_job)
assert len(registry) == 0
assert queue.count == 1
# 或者通过CLI工具重新入队
# 通过任务id指定失败任务入队
# rq requeue --queue queue_name -u redis://localhost:6379 foo_job_id bar_job_id
# 全部失败任务重新入队
# rq requeue --queue queue_name -u redis://localhost:6379 --all

Worker 工作者

1
2
// 启动一个工作者
$ rq worker queue_name

rq工作者每次只处理一个任务,如果要同时执行多个任务,可以开启多个工作者

1
2
3
4
// 以Burst模式启动
// 默认启动方式在所有任务完成时将会阻塞等待新任务到来
// burst参数可以在所有任务完成时退出当前工作者
$ rq -burst worker queue_name

其他参数

  • --url or -u: redis连接地址 (e.g rq worker --url redis://:[email protected]:1234/9 or rq worker --url unix:///var/run/redis/redis.sock)
  • --path or -P: 支持多个导入路径 (e.g rq worker --path foo --path bar)
  • --config or -c: RQ配置文件模块路径
  • --results-ttl: 任务结果存储秒数(默认为500秒)
  • --worker-class or -w: RQ Worker类 (e.g rq worker --worker-class ‘foo.bar.MyWorker’)
  • --job-class or -j: RQ Job类
  • --queue-class: RQ Queue类
  • --connection-class: redis连接类(默认为redis.StrictRedis)
  • --log-format: worker打印日志格式(默认为’%(asctime)s %(message)s’)
  • --date-format: worker打印日志日期格式(默认为’%H:%M:%S’)
  • --disable-job-desc-logging: 关闭任务描述日志
  • --max-jobs: 最大执行任务数

性能优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 如果你的任务都依赖于同一模块,则每次运行时worker都会fork子进程重新导入
# 这种方式不会发生内存泄露,但是速度很慢,你可以写一个辅助脚本预加载你需要导入的模块
import sys
from rq import Connection, Worker

# 预加载模块
import preload_lib

with Connection():
# 从命令行传递队列名称 类似rq worker queue_name
qs = sys.argv[1:] or ["default"]

w = Worker(qs)
w.work()

检索工作者信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from redis import Redis
from rq import Queue, Worker

# 返回此连接中注册的所有工作者
redis_conn = Redis()
workers = Worker.all(connection=redis_conn)

# 返回此队列中注册的所有工作者
queue = Queue('queue_name')
workers = Worker.all(queue=queue)

# 返回工作者数量
print(Worker.count(connection=redis_conn))
print(Worker.count(queue=queue))

worker = workers[0]

# 工作者名称
print(worker.name)

# 工作者当前主机名
print(worker.hostname)

# 工作者进程PID
print(worker.pid)

# 工作者当前监听队列
print(worker.queues)

# 工作者当前状态 suspended, started, busy, idle
print(worker.state)

# 工作者当前执行任务实例
print(worker.current_job)

# 工作者最后一次心跳
print(worker.last_heartbeat)

# 工作者创建时间
print(worker.birth_date)

# 工作者成功完成任务数
print(worker.successful_job_count)

# 工作者失败任务数
print(worker.failed_job_count)

# 工作者执行任务花费的时间(秒)
print(worker.total_working_time)

使用配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
REDIS_URL = 'redis://localhost:6379/1'
# You can also specify the Redis DB to use
# REDIS_HOST = 'redis.example.com'
# REDIS_PORT = 6380
# REDIS_DB = 3
# REDIS_PASSWORD = 'very secret'

# Queues to listen on
QUEUES = ['high', 'default', 'low']

# If you're using Sentry to collect your runtime exceptions, you can use this
# to configure RQ for it in a single step
# The 'sync+' prefix is required for raven: https://github.com/nvie/rq/issues/350#issuecomment-43592410
SENTRY_DSN = 'sync+http://public:[email protected]/1'

# If you want custom worker name
# NAME = 'worker-1024'
1
$ rq worker -c settings

无工作者模式

1
2
3
4
5
6
7
8
9
10
11
12
# is_async=False开启无工作者模式,代码将会以同步方式运行,可以用作测试
# 但redis连接是必须要传的,用以存储任务完成的相关状态
from redis import Redis
from rq import Queue, Worker

def foo():
print("foo")

redis_conn = Redis()
q = Queue("queue_name", is_async=False, connection=redis_conn)
job = q.enqueue(foo)
print(job.result)

Scheduler 调度器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from datetime import datetime, timedelta
from rq import Queue
from rq.registry import ScheduledJobRegistry
from redis import Redis

def foo():
print("hi")

q = Queue(name="default", connection=Redis())

# 任务将在本地时区2020年6月24日9点57分执行
job = q.enqueue(datetime(2020, 6, 24, 9, 57), foo)

# 任务将在10秒内执行
job = q.enqueue(timedelta(seconds=10), foo)

# 计划执行的任务不会放入队列,而是存储在ScheduledJobRegistry中
print(job in q) # False

registry = ScheduledJobRegistry(queue=queue)
print(job in registry) # True

运行调度器

1
$ rq worker --with-scheduler

通过代码运行

1
2
3
4
5
6
7
8
from rq import Worker, Queue
from redis import Redis

redis = Redis()

queue = Queue(connection=redis)
worker = Worker(queues=[queue], connection=redis)
worker.work(with_scheduler=True)

Connections 连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 单redis连接
# 已弃用 不要用在开发环境
from rq import use_connection
# 会使用一个本地redis连接(全局)
use_connection()
# or
# redis = Redis('my.host.org', 6789, password='secret')
# use_connection(redis)

# 多redis连接
from rq import Queue
from redis import Redis

conn1 = Redis('localhost', 6379)
conn2 = Redis('remote.host.org', 9836)

q1 = Queue('foo', connection=conn1)
q2 = Queue('bar', connection=conn2)

# or
# 将使用上下文内的redis连接
# with Connection(Redis('localhost', 6379)):
# q1 = Queue('foo')
# with Connection(Redis('remote.host.org', 9836)):
# q2 = Queue('bar')
# q3 = Queue('qux')

# 单元测试可以使用push_connection()和pop_connection()
import unittest
from rq import push_connection, pop_connection

class MyTest(unittest.TestCase):
def setUp(self):
push_connection(Redis())

def tearDown(self):
pop_connection()

def test_foo(self):
q = Queue()

Job Registries 任务注册表

每个队列都会维护一组任务注册表

  • StartedJobRegistry 保存当前正在执行的任务。任务在执行前添加,完成(成功或失败)后被删除。
  • FinishedJobRegistry 保存成功执行的任务。
  • FailedJobRegistry 保存完成但失败的任务。
  • DeferredJobRegistry 保存延迟任务(依赖任务)。
  • ScheduledJobRegistry 保存计划任务。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    from redis import Redis
    from rq import Queue
    from rq.registry import StartedJobRegistry

    def foo():
    print("foo")

    redis = Redis()
    queue = Queue("queue_name", connection=redis)
    job = queue.enqueue(foo)

    # 根据队列实例获取队列StartedJobRegistry
    registry1 = StartedJobRegistry(queue=queue)

    # 根据队列名称和连接获取队列StartedJobRegistry
    registry2 = StartedJobRegistry(name="queue_name", connection=redis)

    # 获取队列实例
    print(registry1.get_queue())

    # 获取注册表任务数
    print(registry1.count)

    # 获取注册表所有任务ID
    print(registry1.get_job_ids())

    # 判断任务是否存在注册表中
    # 实例判断
    print(job in registry1)
    # id判断
    print(job.id in registry1)

    # 也可以直接从队列中访问注册表
    print(queue.started_job_registry) # 返回StartedJobRegistry
    print(queue.deferred_job_registry) # 返回DeferredJobRegistry
    print(queue.finished_job_registry) # 返回FinishedJobRegistry
    print(queue.failed_job_registry) # 返回FailedJobRegistry
    print(queue.scheduled_job_registry) # 返回ScheduledJobRegistry

    # 删除作业
    from rq.registry import FailedJobRegistry
    faild_registry = FailedJobRegistry(queue=queue)

    for job_id in faild_registry.get_job_ids():
    faild_registry.remove(job_id)
    # 删除作业并从队列中删除
    # faild_registry.remove(job_id, delete_job=True)

Serialize 序列化

1
2
3
4
5
6
7
8
9
10
11
# 自定义序列化器至少具有loads和dumps函数 
# 默认的序列化器为pickle
import json
from rq import Job, Queue, Worker

# 自定义任务序列化器
job = Job(connection=connection, serializer=json)
# 自定义队列序列化器
queue = Queue(connection=connection, serializer=json)
# 自定义工作者序列化器
worker = Worker(queue, serializer=json)

Monitoring 监控

使用RQ dashboard

RQ dashboard

1
2
$ pip install rq-dashboard
$ rq-dashboard

使用CLI工具

rq info 命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
$ rq info -u redis://127.0.0.1:6379/1 
cost | 0
mailbox | 0
2 queues, 0 jobs total

8e006dec6d0146f392ab624af6cbb738 (root 21767): idle cost
a2a1ecde7c934e3ba6751bffc499e4f4 (root 44482): idle mailbox
2 workers, 2 queues

Updated: 2020-06-24 10:58:51.994777
```
指定队列名称
```bash
$ rq info cost -u redis://127.0.0.1:6379/1
cost | 0
1 queues, 0 jobs total

8e006dec6d0146f392ab624af6cbb738 (root 21767): idle cost
1 workers, 1 queues

Updated: 2020-06-24 10:59:06.786965

按队列分组

1
2
3
4
5
6
7
8
9
10
$ rq info -R -u redis://127.0.0.1:6379/1 
cost | 0
mailbox | 0
2 queues, 0 jobs total

cost: 8e006dec6d0146f392ab624af6cbb738 (idle)
mailbox: a2a1ecde7c934e3ba6751bffc499e4f4 (idle)
2 workers, 2 queues

Updated: 2020-06-24 11:02:58.784870

间隔轮询(默认rq info命令打印一次信息就会退出,加上interval参数将会每隔N秒刷新一次屏幕)

1
$ rq info --interval 1

Exceptions 异常处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 默认异常处理机制是存入FailedJobRegistry
# 可以自定义异常处理程序
from redis import Redis
from rq import Queue, Worker
from exception_handlers import foo_handler, bar_handler

# rq异常处理为链式处理
# 当其中一个异常处理程序返回False时,后面的异常处理程序将不会执行
# 没有返回值时,将被解释为True,继续执行后面的异常处理程序
def my_handler(job, exc_type, exc_value, traceback):
pass

q = Queue(connection=Redis())
w = Worker([q], exception_handlers=[foo_handler, bar_handler, my_handler])

单元测试

👉

Flask中使用RQ添加上下文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from redis import Redis
from rq import Queue, Job
from flask.cli import ScriptInfo

class FlaskRqJob(Job):
'''
自定义Job类 添加flask上下文
需要设置FLASK_APP环境变量
'''
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.script_info = ScriptInfo()

def perform(self):
if current_app:
app = current_app
else:
app = self.script_info.load_app()
with app.app_context():
return super().perform()

q = Queue(name="queue_name", connection=Redis(), job_class=FlaskRqJob)

运行worker时也要指定相应job class,且需要指定flask app路径(设置FLASK_APP环境变量)

1
2
$ export FLASK_APP=app:app
$ rq worker -j package.module.FlaskRqJob