深圳高端网站定制建设,东莞推广seo关键词排名优化,西部数码 wordpress,济南防疫最新动态目录
数据导入
MQ的常见问题
消息可靠性问题
生产者确认机制
SpringAMQP实现生产者确认
消息持久化
消费者消息确认
失败重试机制
消费者失败消息处理策略
死信交换机
TTL
延时队列
安装插件
SpringAMQP使用插件
消息堆积问题
惰性队列
MQ的高可用
普通集群 …目录
数据导入
MQ的常见问题
消息可靠性问题
生产者确认机制
SpringAMQP实现生产者确认
消息持久化
消费者消息确认
失败重试机制
消费者失败消息处理策略
死信交换机
TTL
延时队列
安装插件
SpringAMQP使用插件
消息堆积问题
惰性队列
MQ的高可用
普通集群
获取Cookie
准备配置文件
创建实例文件夹
启动集群
测试创建队列
镜像集群
精确模式
all模式
nodes模式
测试
仲裁队列
使用AMQP实现仲裁队列 数据导入
资料下载地址day05MQ高级
MQ的常见问题
消息可靠性如何确保消息至少被消费一次延迟消息问题如何实现消息的延迟投递消息堆积问题如何解决数百万消息堆积无法及时消费的问题高可用问题如何避免单点的MQ故障而导致的不可用问题
消息可靠性问题
消息丢失的三大类
发送时丢失 生产者发送的消息未送达到exchange消息到达exchange后未到达queueMQ宕机queue将消息丢失consumer接收到消息后未消费就宕机
生产者确认机制
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后会返回一个结果给发送者表示消息是否处理成功。结果有两种请求:
publisher-confirm发送者确认 消息成功投递到交换机返回ack。消息未投递到交换机返回nack。publisher-return发送者回执 消息投递到交换机了但是没有路由到队列。返回ACK及路由失败原因。 需要注意的是确认机制发送消息时需要给每个消息设置一个全局唯一id以区分不同消息避免ack冲突。
SpringAMQP实现生产者确认 在publisher模块中配置如下内容 spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true
publish-confirm-type开启publisher-confirm这里支持两种类型 simple同步等待confirm结果直到超时correlated异步回调定义ConfirmCallbackMQ返回结果时会回调这个ConfirmCallbackpublish-returns开启publish-return功能同样是基于callback机制不过是定义ReturnCallbacktemplate.mandatory定义消息路由失败时的策略。 true则调用ReturnCallback;false则直接丢弃消点 在生产者模块中配置全局ReturnCallback一个RabbitTemplate只能配置一个ReturnCallback Slf4j
Configuration
public class CommonConfig implements ApplicationContextAware {Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate template applicationContext.getBean(RabbitTemplate.class);template.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) - {log.info(消息发送失败,应答码{},原因{},交换机{},路由键{},消息{},replyCode,replyText,exchange,routingKey,message.toString());}));}
}进行测试 Slf4j
RunWith(SpringRunner.class)
SpringBootTest
public class SpringAmqpTest {Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSendMessage2SimpleQueue() throws InterruptedException {String routingKey simple;String message hello, spring amqp!;//准备消息idCorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());correlationData.getFuture().addCallback(result-{if (result.isAck()){log.debug(消息发送成功,ID:{},correlationData.getId());}else {log.error(消息发送失败ID:{},原因{},correlationData.getId(),result.getReason());}},ex-{log.error(消息发送异常ID:{},原因:{},correlationData.getId(),ex.getMessage());});rabbitTemplate.convertAndSend(amq.topic, routingKey, message,correlationData);}
} 运行观察控制台 测试一种路由失败的情况这种情况可以正常发送到交换机但是不能发送到Queue 消息持久化
MQ默认是内存存储当服务重启后数据就会丢失。因此我们需要对交换机与队列进行持久化操作。在消费者模块添加如下代码
Configuration
public class CommonConfig {Beanpublic DirectExchange directExchange(){/*** name:交换机名称* durable:是否持久化* autoDelete:当没有队列绑定时是否删除*/return new DirectExchange(direct.exchange,true,false);}Beanpublic Queue simpleQueue(){/*** 使用Builder创建持久化队列* 使用 new Queue名称创建也可以默认就是持久化的*/return QueueBuilder.durable(simple.queue).build();}
}
启动消费者就可以看到交换机与队列被持久到磁盘中但需要注意的时消息并没有持久化当重启服务器消息还是会丢失。之前我们发送的消息是String类型现在我们使用AMQP的Message对消息进行持久化。 Testpublic void testDurableMessage() throws Exception {Message msg MessageBuilder.withBody(hello spring.getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();rabbitTemplate.convertAndSend(simple.queue,msg);}
消费者消息确认
RabbitMQ支持消费者确认机制即消费者处理消息后可以向MQ发送ack回执MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式
manual手动ack需要在业务代码结束后调用api发送ack。业务处理成功后调用channel.basicAck()手动签收如果出现异常则调用channle.basicNack()方法。auto自动ack由spring监测listener代码是否出现异常没有异常则返回ack抛出异常则返回nack。none关闭ackMQ假定消费者获取消息后会成功处理因此消息投递后立即被删除。 manual模式对代码有一定入侵需要添加发送ack的代码。因此不推荐使用 auto模式是通过Spring的AOP机制来对消息进行自动确认。推荐使用 none模式不对消息进行确认不使用 在消费者模块的配置文件中配置如下内容
spring:rabbitmq:listener:simple:acknowledge-mode: auto 进行测试在监听器处添加错误代码 Component
public class SpringRabbitListener {RabbitListener(queues simple.queue)public void listenSimpleQueue(String msg) {System.out.println(消费者接收到simple.queue的消息【 msg 】);System.out.println(1/0);}
} 进行debug观察Rabbit控制台 对断点放行会发现控制台抛出错误后立即再进入断点那么就可以确定MQ会再次投递失败的消息。取消断点放行会发现控制台无休止进行打印错误这种处理方式并不友好因此我们可以自定义失败重试机制。
失败重试机制
当消费者消费消息抛出异常后会将消息投递给MQ。而MQ又会立即投递给消费者。这样循环往复会导致MQ的消息处理飙升带来不必要的压力。因此我们可以采用Spring的重试机制在本地重试不返回ack也不返回nack来避免这种情况。
消费者模块的配置文件添加如下内容
spring:rabbitmq:listener:simple:retry:enabled: true #开启消费者失败重试initial-interval: 1000 #初始的失败等待时长为1smultiplier: 2 #下次失败的等待时长倍数下次灯带时长 multiplier * last-intervalmax-attempts: 3 #最大重试次数stateless: true # true无状态false有状态如果业务中包含事务这里改为false 接下来进行测试 首先是重试时间分别为12对应着配置中的1s与1s*2如果还有下次重试次数那么重试时间就是1s*2*2。其次是在RabbitMQ中找不到这条错误的消息了。具体原因如下 消费者失败消息处理策略
在开启重试模式后重试次数耗尽如果消息依然失败则需要有MessageRecoverer接口来处理它包含三种不同的实现
RejectAndDontRequeueRecoverer重试耗尽后直接reject丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer重试耗尽后返回nack消息重新入队RepublishMessageRecoverer重试耗尽后将失败消息投递到指定的交换机。流程图如下 添加一个新的Config Configuration
public class ErrorMessageConfig {Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange(error.exchange);}Beanpublic Queue errorQueue(){return new Queue(error.queue,true);}Beanpublic Binding errorBinging(){return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with(error);}Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,error.exchange,error);}
} 重启发送一条消息测试 观察Rabbit的控制台 死信交换机
当一个队列中的消息满足下列情况之一时可以成为死信 (dead letter)
消费者使用basic.reject或 basic.nack声明消费失败并且消息的requeue参数设置为false。消息是一个过期消息超时无人消费。要投递的队列消息堆积满了最早的消息可能成为死信。
如果该队列配置了dead-letter-exchange属性指定了一个交换机那么队列中的死信就会投递到这个交换机中而这个交换机称为死信交换机 (Dead Letter Exchange简称DLX) 与RepublishRecoverer的区别在于该种方式是通过MQ进行转发而RepublishRecoverer是通过消费者进行转发。如果只是保存失败的消息那么推荐使用RepublishRecoverer。
TTL
TTLtime to live超时时间分为两种情况
消息本身设置了超时时间消息所在的队列设置了超时时间
当消息到达存活时间后还没有被消费会被自动清除。如果同时设置了消息过期时间和队列过期时间以时间短的为准队列过期会将所有消息移除如果一个已经过期的消息不在队列顶端时并不会立即移除一旦它到了队列顶端则会进行判断是否移除。
延时队列
我们可以通过TTL来实现一个延时队列对消息设置过期时间存放在ttl.queue但是没有消费者监听该队列等到过期之后放入死信队列而消费者监听死信队列对过期消息进行消费从而实现延时队列。具体流程如下 接下来实现延时队列 编写ttl部分 Slf4j
Configuration
public class TTLMessageConfig {Beanpublic Queue ttlQueue(){return QueueBuilder.durable(ttl.queue).ttl(10000)//超时时间.deadLetterExchange(dl.exchange)//指定死信队列.deadLetterRoutingKey(dl)//死信队列的路由key.build();}Beanpublic DirectExchange ttlExchange(){return new DirectExchange(ttl.exchange);}Beanpublic Binding simpleBinding(){return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with(ttl);}
} 编写消费者方的监听 RabbitListener(bindings QueueBinding(value Queue(name dl.queue,durable true),exchange Exchange(name dl.exchange),key dl))public void listenDlQueue(String msg){log.info(消费者接收到了延时消息{},msg);} 编写测试方法 Testpublic void testTTLMessage() throws Exception {Message msg MessageBuilder.withBody(hello TTL.getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();rabbitTemplate.convertAndSend(ttl.exchange,ttl,msg);log.info(消息成功发送);} 至此实现了延时处理消息。
但是通过死信队列来实现延迟队列的做法有点麻烦我们可以使用RabbitMQ的原生插件DelayExchange来实现这个功能。
安装插件
基于Linux下载插件文档Scheduling Messages with RabbitMQ | RabbitMQ - Blog
而我们是基于Docker下载插件下载地址为Community Plugins — RabbitMQ 需要注意RabbitMQ与插件的版本对应关系即可。下载好之后将其拖入mq的数据卷中。接下来安装插件如要进入docker容器中
#进入容器
docker exec -it mq bash
#开启插件功能
rabbitmq-plugins enable rabbitmq_delayed_message_exchange 这样就安装完毕了。
DelayExchange插件的原理是对官方原生的Exchange做了功能的升级
将DelayExchange接受到的消息暂存在内存中(官方的Exchange是无法存储消息的)在DelayExchange中计时超时后才投递消息到队列中
DelayExchange的声明是在RabbitMQ的控制台中 其次消息的延迟时间也需要在Exchange中指定 控制台的使用方法肯定不符合开发中使用因此我们接下来使用代码使用该插件
SpringAMQP使用插件
声明延迟队列交换机
RabbitListener(bindings QueueBinding(value Queue(value delay.queue,durable true),exchange Exchange(name delay.exchange,delayed true),key delay
))
public void listenDelayQueue(String msg){log.info(接收到延迟消息{},msg);
} 编写测试类 Testpublic void testDelayMessage() throws Exception {Message msg MessageBuilder.withBody(hello delay.getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setHeader(x-delay,5000).build();CorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(delay.exchange,delay,msg,correlationData);log.info(消息成功发送);} 因此我们需要修改发送方的判断逻辑。判断是否存在receiveDelay值 Slf4j
Configuration
public class CommonConfig implements ApplicationContextAware {Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate template applicationContext.getBean(RabbitTemplate.class);template.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) - {//如果是延迟消息则直接返回if (message.getMessageProperties().getReceivedDelay()0) {return;}log.info(消息发送失败,应答码{},原因{},交换机{},路由键{},消息{},replyCode,replyText,exchange,routingKey,message.toString());}));}
}
消息堆积问题
当生产者发送消息的速度超过了消费者处理消息的速度就会导致队列中的消息堆积直到队列存储消息达到上限。最早接收到的消息可能就会成为死信会被丢弃这就是消息堆积问题。
解决消息堆积有三种种思路:
增加更多消费者提高消费速度在消费者内开启线程池加快消息处理速度扩大队列容积提高堆积上限
前两种情况是解决消息堆积问题后一种是环节消息堆积问题。开启线程池也需要看情况对于处理时间短的消息会因为频繁的CPU上下文切换导致CPU占用内存因此开启线程池适用于处理消息时间长的情况。
惰性队列
从RabbitMQ的3.6.0版本开始就增加了Lazy Queues的概念也就是惰性队列。惰性队列的特征如下
接收到消息后直接存入磁盘而非内存消费者要消费消息时才会从磁盘中读取并加载到内存支持数百万条的消息存储
之所以引用惰性队列就是为了提高消息的堆积能力传统的RabbitMQ的消息默认是存储在内存当中当并发量高的时候容易造成消息堆积当占用内存百分之40时MQ会暂时停止生产者的消息投递将一部分消息保存在磁盘中从到导致暂时的不可用状态MQ的性能也就忽高忽低。而惰性队列是直接保存在磁盘当中保证了MQ的稳定性但损耗了性能。 使用AMQP实现惰性队列的声明 //基于注解
RabbitListener(queuesToDeclare Queue(name lazy.queue,durable true,arguments Argument(name x-queue-mode,value lazy))
)
public void listenLazyQueue(String msg){log.info(接收到延迟消息{},msg);
}//基于Bean
Configuration
public class LazyConfig {Beanpublic Queue LazyQueue(){return QueueBuilder.durable(lazy.queue).lazy().build();}
}
MQ的高可用
同其他中间件解决高可用的方法一样那就是搭建集群。
RabbitMQ的是基于Erlang语言编写而Erlang又是一个面向并发的语言天然支持集群模式。RabbitMQ的集群有两种模式
普通集群是一种分布式集群将队列分散到集群的各个节点从而提高整个集群的并发能力。镜像集群是一种主从集群普通集群的基础上添加了主从备份功能提高集群的数据可用性。
镜像集群虽然支持主从但主从同步并不是强一致的,在同步期间可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后推出了新的功能:仲裁队列来代替镜像集群底层采用Raft协议确保主从的数据一致性。
普通集群
普通集群或者叫标准集群 (classic cluster)具备下列特征
会在集群的各个节点间共享部分数据包括:交换机、队列元信息。不包含队列中的消息。当访问集群某节点时如果队列不在该节点会从数据所在节点传递到当前节点并返回。队列所在节点宕机队列中的消息就会丢失。 现在开始搭建集群
获取Cookie
集群模式中的每个RabbitMQ节点使用Cookie来确定它们是否被允许相互通信。
要使两个节点能够通信它们必须具有相同的共享秘密称为Erlang cookie。Cookie只是一串最多255个字符的字母数字字符。
每个集群节点必须具有相同的Cookie。实例之间也需要它来相互通信。
从一个启动的MQ实例获取Cookie
docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie 获取内容为XSLBYZHRKDOLAMZYTNML 准备配置文件 在/tmp目录新建一个配置文件 rabbitmq.conf cd /tmp
# 创建文件
touch rabbitmq.conf 文件内容如下 loopback_users.guest false #禁用guest用户访问
listeners.tcp.default 5672 #访问端口
cluster_formation.peer_discovery_backend rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 rabbitmq1 #节点名称
cluster_formation.classic_config.nodes.2 rabbitmq2
cluster_formation.classic_config.nodes.3 rabbitmq3 接下来再创建一个配置文件用来存放Cookie信息 # 创建文件
touch .erlang.cookie
# 写入cookie
echo XSLBYZHRKDOLAMZYTNML .erlang.cookie
# 修改cookie文件的权限为只读不允许其他人修改
chmod 600 .erlang.cookie
创建实例文件夹
mkdir mq1 mq2 mq3
# 将配置文件拷贝到其他文件夹
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3
启动集群 准备一个docker网络 docker network create mq-net 启动容器 # 启动第一个容器
docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USERadmin \
-e RABBITMQ_DEFAULT_PASSadmin \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:tag
# 启动第二个容器
docker run -d --net mq-net \
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USERadmin \
-e RABBITMQ_DEFAULT_PASSadmin \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3.8-management
# 启动第三个容器
docker run -d --net mq-net \
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USERadmin \
-e RABBITMQ_DEFAULT_PASSadmin \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:tag 访问8081端口 测试创建队列
在8081中创建一个队列在8082与8083中查看是否可以查看该队列信息 可以正常看到队列信息。
镜像集群
镜像集群本质是主从模式具备下面的特征
交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。创建队列的节点被称为该队列的主节点备份到的其它节点叫做该队列的镜像节点。一个队列的主节点可能是另一个队列的镜像节点。所有操作都是主节点完成然后同步给镜像节点。主节点宕机后镜像节点会被当做主节点 看图解Rabbit的镜像集群类似于ES的数据分片。 镜像集群有三个模式 ha-mode ha-params 效果 准确模式 exactly 队列的副本量 count 集群中队列副本主服务器和镜像服务器之和的数量。count如果为1意味着单个副本即队列主节点。count值为2表示2个副本1个队列主和1个队列镜像。换句话说count 镜像数量 1。如果群集中的节点数少于count则该队列将镜像到所有节点。如果有集群总数大于count1并且包含镜像的节点出现故障则将在另一个节点上创建一个新的镜像。 all (none) 队列在群集中的所有节点之间进行镜像。队列将镜像到任何新加入的节点。镜像到所有节点将对所有群集节点施加额外的压力包括网络I / O磁盘I / O和磁盘空间使用情况。推荐使用exactly设置副本数为N / 2 1。 nodes node names 指定队列创建到哪些节点如果指定的节点全部不存在则会出现异常。如果指定的节点在集群中存在但是暂时不可用会创建节点到当前客户端连接到的节点。
精确模式
配置模式的命令
rabbitmqctl set_policy ha-two ^two\. {ha-mode:exactly,ha-params:2,ha-sync-mode:automatic}
rabbitmqctl set_policy固定写法ha-two策略名称自定义^two\.匹配队列的正则表达式符合命名规则的队列才生效这里是任何以two.开头的队列名称{ha-mode:exactly,ha-params:2,ha-sync-mode:automatic}: 策略内容 ha-mode:exactly策略模式此处是exactly模式指定副本数量ha-params:2策略参数这里是2就是副本数量为21主1镜像ha-sync-mode:automatic同步策略默认是manual即新加入的镜像节点不会同步旧的消息。如果设置为automatic则新加入的镜像节点会把主节点中所有消息都同步会带来额外的网络开销
all模式
rabbitmqctl set_policy ha-all ^all\. {ha-mode:all}
ha-all策略名称自定义^all\.匹配所有以all.开头的队列名{ha-mode:all}策略内容 ha-mode:all策略模式此处是all模式即所有节点都会称为镜像节点
nodes模式
rabbitmqctl set_policy ha-nodes ^nodes\. {ha-mode:nodes,ha-params:[rabbitnodeA, rabbitnodeB]}
rabbitmqctl set_policy固定写法ha-nodes策略名称自定义^nodes\.匹配队列的正则表达式符合命名规则的队列才生效这里是任何以nodes.开头的队列名称{ha-mode:nodes,ha-params:[rabbitnodeA, rabbitnodeB]}: 策略内容 ha-mode:nodes策略模式此处是nodes模式ha-params:[rabbitmq1, rabbitmq2]策略参数这里指定副本所在节点名称
测试 进入mq1容器 接下来宕机mq1观察队列变化 随后再次启动mq1two.queue队列也和mq1没有关系了。
仲裁队列
仲裁队列仲裁队列是3.8版本以后才有的新功能用来替代镜像队列具备下列特征
与镜像队列一样都是主从模式支持主从数据同步(默认的count为5)使用非常简单没有复杂的配置主从同步基于Raft协议强一致 使用AMQP实现仲裁队列 修改配置文件配置节点信息 spring:rabbitmqaddresses: 192.168.116.131:8071,192.168.116.131:8072,192.168.116.131:8073username: adminpassword: adminvirtual-host: / 创建队列 Configuration
public class QuorumConfig {Beanpublic Queue quorumQueue(){return QueueBuilder.durable(quorum.queue2).quorum().build();}
}
启动消费者就可以看到已经创建出quorum.queue2队列了