网站建设与网页设计考试题,上海网页制作报价,建设企业网站的具体步骤,个人网页设计概述目录
一、死信队列(延迟队列) 概念讲解
二、确认消息#xff08;局部方法处理消息#xff09;
三、代码实战
1.编写生产者代码#xff0c;配置消息、直连交换机、路由键 1.1代码解析#xff1a;
2.配置消费者接受类接受直连交换机的路由键
2.1. String msg#xff…目录
一、死信队列(延迟队列) 概念讲解
二、确认消息局部方法处理消息
三、代码实战
1.编写生产者代码配置消息、直连交换机、路由键 1.1代码解析
2.配置消费者接受类接受直连交换机的路由键
2.1. String msgChannel channel Header(AmqpHeaders.DELIVERY_TAG) long tag 方法参数解析 2.2.channel.basicAck(tag,true); 代码解析
2.3.channel.basicReject(tag,true); 代码解析 3.对死信交换机进行测试 一、死信队列(延迟队列) 概念讲解 死信在官网中对应的单词为 “Dead Letter”, 它是 RabbitMQ 的一种消息机制。 一般来说生产者将消息投递到 broker 或者直接到 queue 里了 consumer 从 queue 取出消息进行消费如果它一直无法消费某条数据那么可以把这条消息放入死信队列里面。等待 条件满足了再从死信队列中取出来再次消费从而避免消息丢失。 死信消息来源 消息 TTL 过期 队列满了无法再次添加数据 消息被拒绝reject 或 nack并且 requeue false
二、确认消息局部方法处理消息 默认情况下消息消费者是自动 ack 确认消息的如果要手动 ack确认则需要修改确认模式为 manual
spring:rabbitmq:listener:simple:acknowledge-mode: manual
三、代码实战 1.编写生产者代码配置消息、直连交换机、路由键 Beanpublic Queue queueA() {//正常MapString, Object config new HashMap();//message在该队列queue的存活时间最大为10秒config.put(x-message-ttl, 10000);//x-dead-letter-exchange参数是设置该队列的死信交换器DLXconfig.put(x-dead-letter-exchange, ExchangeB);//x-dead-letter-routing-key参数是给这个DLX指定路由键config.put(x-dead-letter-routing-key, bb);return new Queue(queueA,true,false,false,config);}Beanpublic DirectExchange ExchangeA(){return new DirectExchange(ExchangeA);}Beanpublic Binding bindingA(){return BindingBuilder.bind(queueA()).to(ExchangeA()).with(aa);}Beanpublic Queue queueB() {return new Queue(queueB);}Beanpublic DirectExchange ExchangeB(){return new DirectExchange(ExchangeB);}Beanpublic Binding bindingB(){return BindingBuilder.bind(queueB()).to(ExchangeB()).with(bb);} 1.1代码解析 MapString, Object config new HashMap(); //message在该队列queue的存活时间最大为10秒 config.put(x-message-ttl, 10000); //x-dead-letter-exchange参数是设置该队列的死信交换器DLX config.put(x-dead-letter-exchange, ExchangeB); //x-dead-letter-routing-key参数是给这个DLX指定路由键 config.put(x-dead-letter-routing-key, bb); return new Queue(queueA,true,false,false,config);
首先我们需要知道死信产生的原因:
消息过期 消息在队列中等待的时间超过了指定的过期时间。消息被拒绝 消费者拒绝消费消息并且消息被标记为不可重新投递。队列满 队列达到最大容量无法再接收新的消息。 上述代码的作用是让消息等待10秒如果10秒内没有进入消费者或者没有被操作那么它就会进入死信我们就会将它放入消息B也就是当作死信就行处理。 return new Queue(queueA,true,false,false,config); 中参数的作用
true: 持久化表示队列在消息代理例如 RabbitMQ重启后仍然存在。false: 非独占表示该队列不会被其他连接独占使用。false: 不自动删除表示即使没有消费者连接队列也不会被自动删除。config: 包含了上述配置的 Map 对象将这些配置应用到队列上。 总体来说这段代码创建了一个具有消息过期和死信队列功能的队列 queueA并配置了过期消息发送到名为 ExchangeB 的交换器并指定了死信的路由键为 bb。这样的配置在处理消息的时候能够更加灵活并且对于消息的生命周期有了额外的控制。 2.配置消费者接受类接受直连交换机的路由键
queueA
Component
SuppressWarnings(all)
Slf4j
RabbitListener(queues queueA)
public class ReceiverQA {RabbitHandler//手动确认消息public void process(String msg, Channel channel, Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{log.warn(QA接收到: msg);//channel.basicAck(tag,true);//拒绝 ture是否重新入队 会一直 循环 false 会直接变成死信channel.basicReject(tag,true);Thread.sleep(1000);}
}
2.1. String msgChannel channel Header(AmqpHeaders.DELIVERY_TAG) long tag 方法参数解析 Channel channel Channel 对象表示与消息代理例如 RabbitMQ建立的通信通道。该通道提供了与消息代理进行交互的方法如确认消息、拒绝消息等。在这个方法中channel 参数用于与消息代理进行通信。 Header(AmqpHeaders.DELIVERY_TAG) long tag Header 注解用于提取消息头中的信息。在这里通过 AmqpHeaders.DELIVERY_TAG 提取了消息的传送标签delivery tag。long tag 表示消息的唯一标识符。它是一个标识消息的数字通常在消息代理传递消息给消费者时分配。这个标识符可以用于在确认、拒绝或重新排队消息时指定特定的消息。 总体来说这个方法是一个消息处理方法其中 message 参数表示消息的内容channel 参数用于与消息代理进行通信而 tag 参数表示消息的唯一标识符用于在消息确认、拒绝等操作中指定特定的消息。这样的方法通常用于处理从消息队列中接收到的消息。 2.2.channel.basicAck(tag,true); 代码解析 channel.basicAck(tag,true); 用于确认acknowledge消息的方法通常在消费者成功处理消息后调用。以下是该方法的作用和参数含义 channel 这是表示与消息代理建立的通信通道的对象。在很多消息队列系统中通常通过该通道进行消息的发布、消费等操作。 basicAck 这是确认消息的方法。它告诉消息代理消费者已经成功处理了某个特定的消息。 tag 这是消息的唯一标识符或者句柄。在消费者接收到消息时消息会被分配一个唯一的标识符这个标识符用于确认或拒绝特定的消息。 true 这是一个布尔值通常表示确认多个消息。如果设置为 true则表示确认到指定 tag 及其之前的所有消息。如果设置为 false则仅确认指定的消息。 2.3.channel.basicReject(tag,true); 代码解析 channel.basicReject(tag,true);是在使用 AMQPAdvanced Message Queuing Protocol中的通道channel对象时用于拒绝一条消息的方法。让我们解释这个方法及其参数的意义 tag 参数 tag 表示消息的唯一标识符通常是通过消费者获取的消息标签。它用于标识要拒绝的特定消息。 multiple 参数 multiple 是一个布尔值用于指定是拒绝单个消息还是多个消息。如果 multiple 为 false则表示仅拒绝标记为 tag 的单个消息。如果 multiple 为 true则表示拒绝所有比 tag 小的、未被确认的消息。在这里true 表示拒绝所有比 tag 小的未确认消息。 作用 channel.basicReject(tag, true) 的作用是拒绝一条或多条消息。当消费者无法处理接收到的消息时可以使用该方法将消息标记为拒绝并根据需要将其重新排队或进入死信队列。 通过将 multiple 设置为 true可以一次性拒绝多条消息这对于批量处理消息的情况很有用。 拒绝消息会触发相应的处理机制例如将消息重新排队或将其发送到死信交换器具体取决于消息代理的配置。 总体来说channel.basicReject(tag, true) 是用于拒绝一条或多条消息的方法其中 tag 表示消息的标签而 multiple 决定了是拒绝单个消息还是多个消息。 queueB
Component
SuppressWarnings(all)
Slf4j
RabbitListener(queues queueB)
public class ReceiverQB {RabbitHandlerpublic void process(String msg) {log.warn(QB接收到: msg);//去数据库做修改//更改数据库的订单状态为取消//update order set status-1 where id#{id} and status1}
} 3.对死信交换机进行测试 RequestMapping(/send6)public String send6(){template.convertAndSend(ExchangeA,aa,43249849234);return ;}
最后页面网址访问方法/send6即可