电子商务与网站平台建设的关系,个人网站怎么做推广,天津开发区建设工程管理中心网站,域名空间网站建设前言
消息从生产者发送到exchange, 再到 queue, 再到消费者。这个过程中有哪些有消息丢失的可能性呢#xff1f;
发送时丢失#xff1a; 生产者发送的消息未送达 exchange消息到达 exchange 后未到达 queue MQ 宕机#xff0c;queue将消息丢失consumer 接收到消息后未消费…前言
消息从生产者发送到exchange, 再到 queue, 再到消费者。这个过程中有哪些有消息丢失的可能性呢
发送时丢失 生产者发送的消息未送达 exchange消息到达 exchange 后未到达 queue MQ 宕机queue将消息丢失consumer 接收到消息后未消费就宕机 消息可靠性问题及其对应的解决方案:
场景publisher发送时丢失MQ消息丢失consumer消费问题解决方案生产者确认机制消息持久化消费者消息确认失败重试机制
下面我们先说一下publisher 发送时丢失的问题应该如何处理
生产者确认机制的理论说明
RabbitMQ 提供了 publisher confirm 机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后, 会返回一个结果给发送者表示消息是否处理成功。结果有两种请求
publish-confirm, 发送者确认 消息成功投递到交换机返回ack消息未投递到交换机返回nack publish-return, 发送回执 消息投递到交换机但是没有路由到队列返回ACK, 及路由失败原因 注意: 确认机制发送消息时, 需要给每个消息设置一个全局唯一 id, 以区分不同消息避免ack 冲突 代码实现
下面基于SpringAMQP 实现的生产者确认机制
在 publisher 服务的 application,yml 中添加以下配置
spring:rabbitmq:publisher-confirm-type: correlated # 开启异步回调publisher-returns: truetemplate:mandatory: true配置说明:
publish-confirm-type: 开启 publisher-confirm, 这里支持两种类型 simple: 同步等待 confirm 结果, 直到超时correlated: 异步回调, 定义ConfirmCallback, MQ 返回结果时会回调这个ConfirmCallback publish-returns: 开启 publish-return 功能同样是基于 callback 机制不过是定义 ReturnCallbcaktemplate.mandatory: 定义消息路由失败时的策略。true, 则调用ReturnCallback, false: 则直接丢弃消息 ConfirmCallBack是基于每条消息设置的所以需要一个全局唯一id 进行区分。 ReturenCallbcak 则是基于每个RabbitTemplate操作实例是一种全局性的回调。 由于每个 RabbitTemplate 只能配置一个 ReturnCallback, 因此需要在项目启动过程中配置: (这里可以实现ApplicationContextAware,它可以在SpringIOC 容器初始化的时候进行一些全局性回调的操作)
Slf4j
Configuration
public class CommonConfig implements ApplicationContextAware {Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取 RabbitTemplate对象RabbitTemplate rabbitTemplate applicationContext.getBean(RabbitTemplate.class);// 配置 ReturnCallbackrabbitTemplate.setReturnCallback((message, replayCode, replayText,exchange, routingKey) - {// 记录日志log.error(消息发送到队列失败, 响应码:{}, 失败原因:{},交换机:{}, 路由key:{},消息:{},,replayCode, replayText, exchange, routingKey, message.toString());// 如果有需要的话,重发消息});}}
} 为每条发送的消息指定消息 ID, 并编写对应的 ConfirmCallback
public void testSendMessage2SimpleQueue() throws InterruptedException {// 1. 准备消息String message hello, spring amqp!;// 2. 准备CorrelationData// 2.1 消息idCorrelationData correlationData newCorrelationData(UUID.randomUUID().toString());// 2.2 准备 ConfirmCallbackcorrelationData.getFuture().addCallback(confirm - {// 判断结果if(confirm.isAck()){// ACKlog.debug(消息成功投递到交换机!消息ID:{},correlationData.getId());}else {// NACKlog.error(消息投递到交换机失败!消息ID:{}, correlationData.getId());// 重发消息}}, throwable - {// 记录日志log.error(消息发送失败, throwable);// 重发消息});// 3.发送消息rabbitTemplate.convertAndSend(amq.topic, asimple.test, message, correlationData);
}总结
SpringAMQP 中处理消息确认的几种情况
publisher-confirm: 消息发送到 exchange, 返回 ack消息发送失败没有到达交换机返回 nack消息发送过程中出现异常没有收到回执 消息成功发送到 exchange, 但没有路由到 queue, 调用 ReturnCallback