当前位置: 首页 > news >正文

Redis实现消息队列

Redis 凭借其高性能、低延迟和丰富的数据结构,常被用来实现轻量级消息队列。

1、List实现简单队列

List 是 Redis 最基础的消息队列实现方式,利用其 有序、可重复 的特性,通过 LPUSH(生产者推送)和 BRPOP(消费者拉取)实现消息传递。

原理

  • 生产者:使用 LPUSH 将消息从列表左侧插入。
  • 消费者:使用 BRPOP 阻塞地从列表右侧取出消息。BRPOP 会在列表为空时自动阻塞连接,直到有新消息到来或超时,避免了消费者无效的轮询。

示例代码


import redis
import threading
import time# 连接 Redis
r = redis.Redis(host='localhost', port=6379, db=0)
QUEUE_KEY = 'message_queue'# 生产者
def producer():for i in range(5):message = f"消息{i}"r.lpush(QUEUE_KEY, message)print(f"生产者发送: {message}")time.sleep(1)  # 模拟生产延迟# 消费者
def consumer():while True:# 阻塞式获取消息,超时时间 0(永久等待)_, message = r.brpop(QUEUE_KEY, timeout=0)print(f"消费者接收: {message.decode()}")time.sleep(0.5)  # 模拟处理延迟# 启动生产者和消费者
if __name__ == "__main__":t1 = threading.Thread(target=producer)t2 = threading.Thread(target=consumer)t1.start()t2.start()t1.join()t2.join()

优点:
实现简单,性能高,利用 BRPOP 避免了轮询。

缺点:

  1. 无消息确认(ACK)机制:一旦 BRPOP 取出消息,消息就从队列中删除了。如果消费者在处理消息过程中崩溃,这条消息将永久丢失。
  2. 无重试机制:无法处理消费失败的消息。
  3. 单消费者:虽然多个消费者可以同时 BRPOP,但一条消息只会被一个消费者获取。这是优点也是缺点,取决于你的需求。

适用场景:仅适用于对消息可靠性要求极低的场景,比如丢一两条消息也无所谓。

2、Pub/Sub 实现广播队列

Pub/Sub(发布 - 订阅)是一种广播模式,生产者发布消息到指定 “频道”,所有订阅该频道的消费者都能收到消息。

原理

  • 生产者:向指定的频道(Channel)PUBLISH 消息。
  • 消费者:SUBSCRIBE 一个或多个频道,即可收到发布到该频道的所有消息。

示例代码


import redis
import time
import threadingr = redis.Redis(host='localhost', port=6379, db=0)
CHANNEL_NAME = 'my:news:channel'def publisher():""" 消息发布者 """for i in range(3):message = f"News Update #{i}"# 发布消息到频道r.publish(CHANNEL_NAME, message)print(f"Published: {message}")time.sleep(1)print("Publisher finished.")def subscriber(subscriber_id):""" 消息订阅者 """print(f"Subscriber {subscriber_id} started...")# 创建独立的连接用于订阅(重要!订阅会阻塞连接)pubsub = r.pubsub()pubsub.subscribe(CHANNEL_NAME) # 订阅频道# 监听消息for message in pubsub.listen():if message['type'] == 'message':data = message['data'].decode('utf-8')print(f"Subscriber {subscriber_id} received: {data}")# 启动一个发布者和三个订阅者
threading.Thread(target=publisher).start()
threading.Thread(target=subscriber, args=("S1",)).start()
threading.Thread(target=subscriber, args=("S2",)).start()
threading.Thread(target=subscriber, args=("S3",)).start()

优点:
高效的广播机制,实时性好。

缺点:

  1. 消息非持久化:Redis 不会保存 Pub/Sub 的消息。如果一个消费者在发布者发布消息时处于离线状态,它将永远错过这条消息。没有“消息堆积”的概念。
  2. 无消息回溯:无法重新消费历史消息。

适用场景:适用于实时通知、聊天应用、状态广播等场景,绝对不能用于需要保证消息必达的业务队列。

3、Streams 实现高级消息队列

Stream 是 Redis 5.0 新增的数据类型,专为消息队列设计,支持持久化消费确认分组消费等高级特性,接近专业 MQ(如 Kafka)的功能。

原理
持久化:消息写入后持久化到磁盘,Redis 重启不丢失。
消费确认:支持 XACK 确认机制,确保消息被处理。
分组消费:多个消费者可以组成一个组,来共同消费同一个 Stream。组保证了每条消息只会被组内的一个消费者处理,实现了负载均衡。
消息回溯:通过消息ID可回溯历史消息。


import redis
import threading
import timer = redis.Redis(host='localhost', port=6379, db=0)
STREAM_KEY = 'order_events'
GROUP_NAME = 'order_group'
CONSUMER1_NAME = 'consumer_1'
CONSUMER2_NAME = 'consumer_2'# 初始化消费组(第一次运行时创建)
def init_group():try:r.xgroup_create(STREAM_KEY, GROUP_NAME, id=0, mkstream=True)# 创建消费组(0 表示从第一条消息开始消费)print(f"消费组 {GROUP_NAME} 创建成功")except redis.exceptions.ResponseError as e:if "already exists" in str(e):print(f"消费组 {GROUP_NAME} 已存在")else:raise# 生产者:发送订单消息
def producer():for i in range(5):order_id = f"order_{i}"# 添加消息,* 表示自动生成消息ID(格式:时间戳-序列号)msg_id = r.xadd(STREAM_KEY, {'order_id': order_id,'status': 'created','amount': 100 + i*10})print(f"生产者发送消息: {msg_id.decode()} -> {order_id}")time.sleep(1)# 消费者1(属于消费组)
def consumer1():while True:# 从消费组获取未消费的消息(> 表示最新未消费)# BLOCK 0 表示阻塞等待,COUNT 1 表示一次取1条messages = r.xreadgroup(GROUP=GROUP_NAME,CONSUMER=CONSUMER1_NAME,STREAMS={STREAM_KEY: '>'},BLOCK=0,COUNT=1)if messages:_, msg_list = messages[0]msg_id, msg = msg_list[0]print(f"消费者1 接收: {msg_id.decode()} -> {msg[b'order_id'].decode()}")# 处理消息(模拟业务逻辑)time.sleep(0.5)# 确认消息已处理r.xack(STREAM_KEY, GROUP_NAME, msg_id)print(f"消费者1 确认: {msg_id.decode()}")# 消费者2(属于同一消费组,实现负载均衡)
def consumer2():while True:messages = r.xreadgroup(GROUP=GROUP_NAME,CONSUMER=CONSUMER2_NAME,STREAMS={STREAM_KEY: '>'},BLOCK=0,COUNT=1)if messages:_, msg_list = messages[0]msg_id, msg = msg_list[0]print(f"消费者2 接收: {msg_id.decode()} -> {msg[b'order_id'].decode()}")# 处理消息time.sleep(0.8)# 确认消息r.xack(STREAM_KEY, GROUP_NAME, msg_id)print(f"消费者2 确认: {msg_id.decode()}")if __name__ == "__main__":init_group()# 启动生产者和两个消费者t_producer = threading.Thread(target=producer)t_consumer1 = threading.Thread(target=consumer1)t_consumer2 = threading.Thread(target=consumer2)t_producer.start()t_consumer1.start()t_consumer2.start()t_producer.join()t_consumer1.join()t_consumer2.join()

优点:

  1. 支持持久化、消息确认、分组消费(负载均衡)。
  2. 可回溯历史消息,避免消息丢失。
  3. 接近专业 MQ 的功能,适合生产环境。

缺点:
实现相对复杂,Redis 版本需 ≥5.0。

适用场景:高可靠性要求的业务(如订单系统、支付通知)、需要负载均衡的多消费者场景。

http://www.sczhlp.com/news/71717/

相关文章:

  • asp模板网站修改公司网站进不去qq空间
  • 有哪些网站交互效果做的好的 l设计是做什么的
  • 专业做制作网站如何确定网站建设 栏目
  • 大连网站建设意动科技公司wordpress升级 没有ftp
  • c2c电商网站中小型网站建设效果
  • bfhk3_文章1
  • 自然语言处理工具开发与生产实践
  • 线程间共享数据
  • 腾讯云服务器如何安装 zsh oh-my-zsh(解决 github 连接问题)
  • 【运维自动化-标准运维】各类全局变量使用说明(上)
  • 东铁匠营网站建设公司wordpress主题打不开
  • 职教集团网站建设方案政协系统网站建设
  • 中裕隆建设有限公司网站平面设计短期培训班
  • 别人不能注册我的wordpress站wordpress怎么给图片添加超链接
  • 网站维护升级页面wordpress用户个人资料
  • php网站开发技术论文iis做网站跳转
  • app开发网站wordpress搭建工单
  • 双井网站建设网站建设规划书感受
  • 计划出行到行程追踪,实况窗助力航旅纵横为用户打造“好用易用”的出行体验
  • 开源能源管理系统:能源转型时代的民主化革命
  • POD Out of memory heap
  • 做网站设计怎么进企业广州网站推广找谁
  • 网站怎么做全站搜索网站做qq微信微博登录
  • 简历电商网站开发经验介绍微信红包封面开放平台
  • 公司门户网站建设方案网站开发与管理专业的就业信息
  • 湘潭企业网站建设 磐石网络简单个人网页设计模板
  • 一笔成形,秒绘标准图!Pen Kit重构“自然书写”体验
  • 水利建设专项收入在什么网站上申报河南关键词优化搜索
  • 大学做视频网站设计wordpress阅读类主题
  • 青岛正规公司网站建设公司wordpress收录p