人人做全免费网站,上饶高端网站建设,泰安网站建设哪家专业,徐州建立网站如何保证消息不丢失?——使用rabbitmq的死信队列#xff01;
1、什么是死信
在 RabbitMQ 中充当主角的就是消息#xff0c;在不同场景下#xff0c;消息会有不同地表现。 死信就是消息在特定场景下的一种表现形式#xff0c;这些场景包括#xff1a; 消息被拒绝访问
1、什么是死信
在 RabbitMQ 中充当主角的就是消息在不同场景下消息会有不同地表现。 死信就是消息在特定场景下的一种表现形式这些场景包括 消息被拒绝访问即 RabbitMQ返回 basicNack 的信号时 或者拒绝basicReject 消费者发生异常超过重试次数 。 其实spring框架调用的就是 basicNack 消息的Expiration 过期时长或队列TTL过期时间。 消息队列达到最大容量 上述场景经常产生死信即消息在这些场景中时被称为死信。
2、什么是死信队列
死信队列就是用于储存死信的消息队列在死信队列中有且只有死信构成不会存在其余类型的消息。 死信队列在 RabbitMQ 中并不会单独存在往往死信队列都会绑定这一个普通的业务消息队列当所绑定的消息队列中有消息变成死信了那么这个消息就会重新被交换机路由到指定的死信队列中去我们可以通过对这个死信队列进行监听从而手动的去对这一消息进行补偿。 人工干预
3、那么我们到底如何来使用死信队列呢
死信队列基本使用只需要在声明业务队列的时候绑定指定的死信交换机和RoutingKey即可。
生产者
/** Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.**/
package com.fpl.provider;import com.fpl.model.OrderingOk;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** pProject: spring-rabbitmq - DeadProvider/p* pPowered by fpl1116 On 2024-04-09 11:35:12/p* p描述p** author penglei* version 1.0* since 1.8*/
Service
public class DeadProvider {Autowiredprivate RabbitTemplate rabbitTemplate;public void send(OrderingOk orderingOk) {rabbitTemplate.convertAndSend(Direct_E01, RK01, orderingOk,new MessagePostProcessor(){Overridepublic Message postProcessMessage(Message message) throws AmqpException {int id orderingOk.getId();int expiration 0;if(id 1){expiration 50*1000;}else if(id 2){expiration 40*1000;}else if(id 3){expiration 30*1000;}else if(id 4){expiration 20*1000;}else if(id 5){expiration 10*1000;}//为每个消息设置过期时长但是有可能造成最前面的一个消息未过期一直阻塞后面的消息不能被消费message.getMessageProperties().setExpiration(String.valueOf(expiration));return message;}});}
}
消费者
/** Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.**/
package com.fpl.consumers;import com.fpl.model.OrderingOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;/*** pProject: spring-rabbitmq - DeadConsumer/p* pPowered by fpl1116 On 2024-04-09 11:32:59/p* p描述p** author penglei* version 1.0* since 1.8*/
//Configuration
Slf4j
public class DeadConsumer {//死信交换机Beanpublic DirectExchange deadExchange(){return ExchangeBuilder.directExchange(Dead_E01).build();}//死信队列Beanpublic Queue deadQueue1(){return QueueBuilder.durable(Dead_Q01).build();}//死信交换机与死信队列的绑定Beanpublic Binding deadBinding1(Queue deadQueue1,DirectExchange deadExchange){return BindingBuilder.bind(deadQueue1).to(deadExchange).with(RK_DEAD);}//业务队列Beanpublic Queue queue1(){return QueueBuilder.durable(Direct_Q01).deadLetterExchange(Dead_E01).deadLetterRoutingKey(RK_DEAD)//.ttl(10*1000) //该属性是队列的属性设置消息的过期时间消息在队列里面停留时间n毫秒后就会把这个消息投递到死信交换机针对的是所有的消息//.maxLength(20) //设置队列存放消息的最大个数x-max-length属性值当队列里面消息超过20会把队列之前的消息依次放进死信队列.build();}//业务交换机Beanpublic DirectExchange exchange(){return ExchangeBuilder.directExchange(Direct_E01).build();}//业务交换机与队列的绑定Beanpublic Binding binding1(Queue queue1,DirectExchange exchange){return BindingBuilder.bind(queue1).to(exchange).with(RK01);}// RabbitListener(queues Direct_Q01)
// public void receiveMessage(OrderingOk msg,Message message, Channel channel) throws IOException {
//
// long deliveryTag message.getMessageProperties().getDeliveryTag();
//
// System.out.println(消费者1 收到消息 msg tag:deliveryTag);
//
// channel.basicReject(deliveryTag, false);
// try {
// // 处理消息...
// int i 5/0;
// // 如果处理成功手动发送ack确认 ,Yes
// channel.basicAck(deliveryTag, false);
// } catch (Exception e) {
// // 处理失败可以选择重试或拒绝消息basicNack或basicReject NO
// channel.basicNack(deliveryTag, false, false); // 并重新入队
//
// }
}//}
测试
Testvoid test4() throws IOException {for (int i 1; i 5;i){OrderingOk orderingOk OrderingOk.builder().id(i).name(fpl i).build();deadProvider.send(orderingOk);System.out.println(发送成功:i);}System.in.read();}