当前位置: 首页 > news >正文

python中使用高并发分布式队列库celery的那些坑 - 指南

python中使用高并发分布式队列库celery的那些坑 - 指南

Celery 是一个用于 分布式任务队列 的 Python 库,常用于处理异步任务(即任务不需要立即执行,后台慢慢做),尤其适合执行定时任务或耗时操作。


? 简单理解

Celery 就是让你把“任务”扔到后台执行,而不是阻塞当前程序。


?️ 核心功能

功能说明
异步任务执行比如发邮件、处理图片、生成报告等不需要立即完成的操作。
分布式任务调度可以运行在多台服务器上,实现任务负载均衡。
定时任务(周期任务)类似 crontab,可设置任务定时执行(如每天 8 点发日报)。
任务重试机制失败任务可以自动重试,适用于网络波动等场景。
与Django/Flask集成非常适合与这些 Web 框架配合使用,将长耗时任务下放到 Celery。

? 工作机制

Celery 一般由以下几部分组成:

  1. Producer(生产者):你写的代码,会将任务“发”出去。
  2. Broker(中间人):任务先存放在消息队列(如 Redis、RabbitMQ)中。
  3. Worker(工人):后台运行的进程,专门“接收”和“执行”这些任务。
  4. Result Backend(结果后端):可选,记录任务结果,如执行成功或失败。

? 示例代码(使用 Redis 作为 broker)

# tasks.py
from celery import Celery
app = Celery('mytasks'
, broker='redis://localhost:6379/0'
)
@app.task
def add(x, y):
return x + y

运行方式:

celery -A tasks worker --loglevel=info

调用方式(异步执行):

add.delay(3
, 5
) # 返回一个异步结果对象

? 常见搭配


如果你正在开发一个 需要做“异步处理”或“后台任务”的系统,Celery 是 Python 中的主流选择之一。但是该库看似简单,却隐藏着无数坑,本文就带大家了解一下我在使用过程中遇到的那些坑。

? 我的环境

?第一个问题

执行命令:

celery -A main_async:celery_app worker --loglevel=info

报错:

[2025-05-29 19:40:22,107: INFO/MainProcess] Task main_async.background_content_similarity[4c84e1c8-6a13-4241-8e62-04e17b3884cb] received
[2025-05-29 19:40:22,142: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)'
)
billiard.einfo.RemoteTraceback:
"""
Traceback (most recent call last):
File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\billiard\pool.py", line 362,
in workloop
result = (True, prepare_result(fun(*args, **kwargs))
)
^^^^^^^^^^^^^^^^^^^^
File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\celery\app\trace.py", line 640,
in fast_trace_task
tasks, accept, hostname = _loc
^^^^^^^^^^^^^^^^^^^^^^^
ValueError: not enough values to unpack (expected 3, got 0
)
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\billiard\pool.py", line 362,
in workloop
result = (True, prepare_result(fun(*args, **kwargs))
)
^^^^^^^^^^^^^^^^^^^^
File "D:\ProgramData\Anaconda3\envs\gj_ai_new\Lib\site-packages\celery\app\trace.py", line 640,
in fast_trace_task
tasks, accept, hostname = _loc
^^^^^^^^^^^^^^^^^^^^^^^
ValueError: not enough values to unpack (expected 3, got 0
)

该问题是由于celery的默认并发网络编程线程库引起的,换成eventlet可以解决问题,只需修改启动命令即可,如下:

celery -A main_async:celery_app worker --loglevel=info -P eventlet

?第二个问题

第二个问题是日志问题,报错类似如下所示:

'LoggingProxy' object has no attribute 'encoding'"
原因分析

Celery 在启动 worker 时,默认会将标准输出和标准错误重定向到其日志系统中。这意味着 sys.stdout 和 sys.stderr 被替换为 LoggingProxy 对象。然而,某些库或代码可能期望这些对象具有标准文件对象的属性,如 encoding 或 fileno,从而导致 AttributeError。

此时只需要将worker_redirect_stdouts参数设置为False即可解决问题,代码如下:

# Celery 配置
celery_app.conf.update(
task_serializer="json"
,
accept_content=["json"]
,
result_serializer="json"
,
timezone="Asia/Shanghai"
,
enable_utc=True
,
include=["main_async"]
, # 显式指定任务模块
task_track_started=True
, # 跟踪任务开始状态
task_ignore_result=False
, # 保存任务结果
task_store_errors_even_if_ignored=True
, # 存储错误
worker_redirect_stdouts = False # 禁止将stdout和stderr重定向到当前记录器。
)
http://www.sczhlp.com/news/166311/

相关文章:

  • 深圳工程网站建设wordpress微信支付模板
  • 太原网站制作哪里便宜海尔网站建设的目标是什么
  • 自己做的网站不备案不能访问吗如何用dede做带下单的网站
  • 怎么申请域名建立网站网络营销网站建设论文
  • 做网站被骗了怎么办广州网站建设广州
  • wordpress微商城模板seo外链工具软件
  • 网站风格怎么写做网站什么服务器好
  • 网站线上推广方式网站建设的落地页
  • 用jsp做婚纱网站的流程网站空间注册
  • 大新网站制作滑县网站建设哪家专业
  • 关于江西建设监督网网站迁移视频 收费 网站怎么做
  • 做网上夫妻去哪个网站wordpress 图标插件
  • 中山网站搜索引擎优化西安市做网站的
  • 深圳福田区网站建设广州天河区网站建设
  • 怎么做网站内部链接的优化做网站开发服务商
  • 长沙市网站推广多少钱详情页设计怎么收费
  • w9y6新域名低价自适应网站建设优化建站
  • 网站可以自己做吗wordpress升级文章编辑器
  • Codeforces Round 1040 (Div. 1)
  • 在AI技术唾手可得的时代,挖掘新需求成为核心竞争力——某知名计算机控制AI框架需求洞察
  • 天津行业网站建设邢台市招生考试院
  • 乐平市网站建设东莞高端模板建站
  • 国外免费网站域名服务器入口小抖音小程序入口
  • 福州网站推广公司桂林人网
  • 呼市做无痛人流z首大网站pc网站 手机网站 微信
  • 徐州建站服务多店铺商城系统
  • 深圳集团网站建设简单的招聘网站怎么做
  • 网站图片规格南宁seo怎么做优化团队
  • 网站已有备案了 现在换空间商还用备案么千博企业网站管理系统 后台拿shell
  • 大连网站备案广告投放平台投放