个人网站对主机有什么要求,怎么做网页挣钱,奥美广告公司简介,wordpress域名二级目录如何跳转目录 一、初始Rabbitmq1、什么是Rabbitmq#xff0c;它的概述是什么#xff1f;2、RabbitMQ的应用场景3、RabbitMQ主要组件4、RabbitMQ 的优点5、与其他消息队列性能比较 二、RabbitMQ环境安装初始化三、SpringAMQPRabbitMQ实战入门#xff08;基本API#xff09;1、实战入… 目录 一、初始Rabbitmq1、什么是Rabbitmq它的概述是什么2、RabbitMQ的应用场景3、RabbitMQ主要组件4、RabbitMQ 的优点5、与其他消息队列性能比较 二、RabbitMQ环境安装初始化三、SpringAMQPRabbitMQ实战入门基本API1、实战入门Java API)一、WorkQueues模型二、交换机类型1、Fanout交换机2、Direct交换机3、Topic交换机 三、基于注解声明 一、初始Rabbitmq
1、什么是Rabbitmq它的概述是什么 RabbitMQ 是一个开源的消息队列中间件作为消息代理Message Broker实现它负责在不同的应用程序、服务或组件之间传递消息。RabbitMQ 实现了 AMQPAdvanced Message Queuing Protocol协议支持可靠的消息传递确保消息能够在系统中被可靠、顺序地传递并支持异步和解耦合的通信方式。 其中消息传递的基本模型可以分为 点对点Point-to-Point 和 发布/订阅Publish/Subscribe 两种类型。 2、RabbitMQ的应用场景 异步处理 RabbitMQ 非常适合需要异步处理的场景。例如在电商网站中订单生成后需要异步处理支付、发货等任务生产者将消息发送到 RabbitMQ消费者异步处理这些任务。 分布式系统通信解耦 RabbitMQ 可以用作分布式系统中各个服务之间的通信媒介服务之间通过消息队列进行解耦避免直接调用和依赖。 事件驱动架构EDA 在事件驱动架构中RabbitMQ 可以作为事件总线消费者根据事件类型处理不同的业务逻辑。 日志收集与分析 RabbitMQ 可以用于收集和转发日志信息将日志从不同的系统组件传递到中央日志处理系统。
3、RabbitMQ主要组件 publisher生产者也就是发送消息的一方 consumer消费者也就是消费消息的一方 queue队列存储消息。生产者投递的消息会暂存在消息队列中等待消费者处理 exchange交换机负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。 Binding绑定交换机和队列之间的联系定义了交换机将消息路由到队列的规则。 Message消息在生产者和消费者之间传递的数据通常包括消息体和一些元数据如路由键、消息头等。
4、RabbitMQ 的优点
解耦合生产者和消费者之间不直接交互它们通过队列进行消息交换从而解耦应用程序的各个组件。异步处理RabbitMQ 支持异步消息传递允许系统在接收请求时立即响应并在后台处理任务。可靠性支持消息持久化、消息确认等功能确保消息不会丢失。扩展性通过集群和镜像队列RabbitMQ 可以扩展到多个节点提供高可用性和负载均衡。多协议支持虽然 RabbitMQ 基于 AMQP但它还可以通过插件支持其他协议如 MQTT、STOMP 等。
5、与其他消息队列性能比较 追求可用性Kafka、 RocketMQ 、RabbitMQ追求可靠性RabbitMQ、RocketMQ追求吞吐能力RocketMQ、Kafka追求消息低延迟RabbitMQ、Kafka 据统计目前国内消息队列使用最多的还是RabbitMQ再加上其各方面都比较均衡稳定性也好因此我们课堂上选择RabbitMQ来学习。 二、RabbitMQ环境安装初始化
RabbitMQ详情安装教程地址教程安装地址
三、SpringAMQPRabbitMQ实战入门基本API
将来我们开发业务功能的时候肯定不会在控制台收发消息而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。 但是RabbitMQ官方提供的Java客户端编码相对复杂一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具SpringAMQP。并且还基于SpringBoot对其实现了自动装配使用起来非常方便。
Spring AMQP 的官方地址 Spring AMQP SpringAMQP提供了三个功能
自动声明队列、交换机及其绑定关系基于注解的监听器模式异步接收消息封装了RabbitTemplate工具用于发送消息
1、实战入门Java API)
导入依赖
!--AMQP依赖包含RabbitMQ--
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency一、WorkQueues模型
Work queues任务模型。简单来说就是让多个消费者绑定到一个队列共同消费队列中的消息.
当消息处理比较耗时的时候可能生产消息的速度会远远大于消息的消费速度。长此以往消息就会堆积越来越多无法及时处理。 此时就可以使用work 模型多个消费者共同处理消息处理消息处理的速度就能大大提高了。
1)声明队列
Configuration
public class FanoutConfig {/*** 第1个队列*/Beanpublic Queue fanoutQueue1(){return new Queue(simple.queue);}
}2消息发送 这次我们往队列中循环发送模拟出一个大量消息堆积的队列。
/*** workQueue* 向队列中不停发送消息模拟消息堆积。*/
Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName simple.queue;// 消息String message hello, message_;for (int i 0; i 50; i) {// 发送消息每20毫秒发送一次相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message i);Thread.sleep(20);}
}3) 消息接收 要模拟多个消费者绑定同一个队列我们在consumer服务的SpringRabbitListener中添加2个新的方法其中**RabbitListener**用来监听队列:
RabbitListener(queues work.queue)
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println(消费者1接收到消息【 msg 】 LocalTime.now());Thread.sleep(20);
}RabbitListener(queues work.queue)
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println(消费者2........接收到消息【 msg 】 LocalTime.now());Thread.sleep(200);
}注意到这两消费者都设置了Thead.sleep模拟任务耗时
消费者1 sleep了20毫秒相当于每秒钟处理50个消息
消费者2 sleep了200毫秒相当于每秒处理5个消息
4)测试 启动ConsumerApplication后在执行publisher服务中刚刚编写的发送测试方法testWorkQueue。 最终结果如下
消费者1和消费者2竟然每人消费了25条消息
消费者1很快完成了自己的25条消息
消费者2却在缓慢的处理自己的25条消息。
也就是说消息是平均分配给每个消费者并没有考虑到消费者的处理能力。**导致1个消费者空闲另一个消费者忙的不可开交。**没有充分利用每一个消费者的能力最终消息处理的耗时远远超过了1秒。这样显然是有问题的。
5)能者多劳 在spring中有一个简单的配置可以解决这个问题。我们修改consumer服务的application.yml文件添加配置
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息处理完成才能获取下一个消息在配置中prefetch: 1表示每个消费者每次只能从队列中预取1个消息消费完就能拿下一次不需要等轮询(RabbitMQ默认是轮询)。它可以帮助保证每个消息在被消费者处理时都能得到较为均匀的分配避免某个消费者处理速度慢而导致其他消费者空闲的情况。如果不配置的话那么RabbitMQ采用的就是一个公平轮询的方式将消息依次发给一个消费等他消费完了再发下一个给另外的消费者
二、交换机类型
之前的WorkQueues模型并没有交换机引入交换机后消息发送和接收的模式就会有很大的变化模型如下所示 Publisher生产者不再发送消息到队列中而是发给交换机 Exchange交换机一方面接收生产者发送的消息。另一方面知道如何处理消息例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange选的类型。 Queue消息队列接收消息、缓存消息。不过队列一定要与交换机绑定。 Consumer消费者订阅队列。
1、Fanout交换机
Fanout,在MQ中叫广播在广播的模式下消息发送的流程如下图所示 其主要特点
1 可以有多个队列
2 每个队列都可以绑定到Exchange交换机
3 生产者发送的消息只能发送到交换机
4 交换机把消息发送给绑定过的所有队列
5 订阅队列的消费者都能拿到消息
案例演示 1声明交换机和对应的队列并进行绑定 /*** 声明交换机*/Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(xuyuan.fanout);}/*** 第1个队列*/Beanpublic Queue fanoutQueue1(){return new Queue(fanout.queue1);}/*** 第2个队列*/Beanpublic Queue fanoutQueue2(){return new Queue(fanout.queue2);}
/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}2生产者发送消息
Test
public void testFanoutExchange() {// 交换机名称String exchangeName xuyuan.fanout;// 消息String message hello, xuyuan ,nihao!;rabbitTemplate.convertAndSend(exchangeName, , message);
}3消费者接收消息
RabbitListener(queues fanout.queue1)
public void listenFanoutQueue1(String msg) {System.out.println(消费者1接收到Fanout消息【 msg 】);
}
RabbitListener(queues fanout.queue2)
public void listenFanoutQueue2(String msg) {System.out.println(消费者2接收到Fanout消息【 msg 】);
}分析总结 交换机的作用主要是
接收对应Publisher发送的消息将消息按照规则路由道与之绑定的队列不能缓存消息路由失败消息丢失FanoutExchange会将消息路由到每个绑定的队列。
2、Direct交换机
场景在Fanout模式中一条消息会被所有订阅的队列都消费。但是在某些场景下我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。 在Direct模型下
队列与交换机的绑定不能是任意绑定的需要指定一个Routingkey路由key消息的发送方在向Exchange发送消息时也必须指定消息的RoutingKey。Exchange不在把消息发送给每一个绑定的队列而是根据消息的RoutingKey来发送到指定的队列中去。然后消费者监听对应的队列得到消息。
案例如下
Configuration
public class DirectConfig {/*** 声明交换机* return Direct类型交换机*/Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange(hmall.direct).build();}/*** 第1个队列*/Beanpublic Queue directQueue1(){return new Queue(direct.queue1);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with(red);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with(blue);}/*** 第2个队列*/Beanpublic Queue directQueue2(){return new Queue(direct.queue2);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with(red);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with(yellow);}
}3、Topic交换机
分析 使用Direct可以根据对应的RoutingKey路由到指定的队列但是对于多元组就比较麻烦只能一个一个绑定对应的RoutingKey此时Topic交换机就派上用场了可以同时划分一个子组一个消息可以根据一个组别的队列进行投递就需要用到Topic交换机 Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符 BindingKey 一般都是有一个或多个单词组成多个单词之间以“ .”分割例如 item.insert
通配符规则
#匹配一个或多个词
*只匹配1个词 举例子 xuyuan.# :它能够匹配xuyuan.com 或者xuyuan.xxx.xx等等 xuyuan.* 只能匹配xuyuan.aa 或者xuyuan.xx 1初始化
Configuration
public class Topic {/*** 声明交换机* return Topic类型交换机*/Beanpublic TopicExchange topicExchange(){return ExchangeBuilder.topicExchange(xuyuan.topic).build();}/*** 第1个队列*/Beanpublic Queue topicQueue1(){return new Queue(topic.queue1);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1WithRed(Queue topicQueue1, TopicExchange topicExchange){return BindingBuilder.bind(topicQueue1).to(topicExchange).with(xuyuan.new);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1WithBlue(Queue topicQueue1, TopicExchange topicExchange){return BindingBuilder.bind(topicQueue1).to(topicExchange).with(xuyuan.#);}/*** 第2个队列*/Beanpublic Queue topicQueue2(){return new Queue(topic.queue2);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2WithRed(Queue topicQueue2, TopicExchange topicExchange){return BindingBuilder.bind(topicQueue2).to(topicExchange).with(ooyl.*);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2WithYellow(Queue topicQueue2, TopicExchange topicExchange){return BindingBuilder.bind(topicQueue2).to(topicExchange).with(ooyl.#);}
}2消息发送
/*** topicExchange*/
Test
public void testSendTopicExchange() {// 交换机名称String exchangeName hmall.topic;// 消息String message 许苑2上某个人;// 发送消息rabbitTemplate.convertAndSend(exchangeName, xuyuan.news, message);
}3消息接收
RabbitListener(queues topic.queue1)
public void listenTopicQueue1(String msg){System.out.println(消费者1接收到topic.queue1的消息【 msg 】);
}
RabbitListener(queues topic.queue2)
public void listenTopicQueue2(String msg){System.out.println(消费者2接收到topic.queue2的消息【 msg 】);
}总结 下Direct交换机与Topic交换机的差异如下
Topic交换机接收的消息RoutingKey必须是多个单词以 . 分割
Topic交换机与队列绑定时的bindingKey可以指定通配符
#代表0个或多个词
*代表1个词
三、基于注解声明
基于Bean的方式声明队列和交换机比较麻烦Spring还提供了基于注解方式来声明。
例如我们同样声明Direct模式的交换机和队列
RabbitListener(bindings QueueBinding(value Queue(name direct.queue1),exchange Exchange(name hmall.direct, type ExchangeTypes.DIRECT),key {red, blue}
))
public void listenDirectQueue1(String msg){System.out.println(消费者1接收到direct.queue1的消息【 msg 】);
}RabbitListener(bindings QueueBinding(value Queue(name direct.queue2),exchange Exchange(name hmall.direct, type ExchangeTypes.DIRECT),key {red, yellow}
))
public void listenDirectQueue2(String msg){System.out.println(消费者2接收到direct.queue2的消息【 msg 】);
}是不是简单多了。 再试试Topic模式
RabbitListener(bindings QueueBinding(value Queue(name topic.queue1),exchange Exchange(name hmall.topic, type ExchangeTypes.TOPIC),key china.#
))
public void listenTopicQueue1(String msg){System.out.println(消费者1接收到topic.queue1的消息【 msg 】);
}RabbitListener(bindings QueueBinding(value Queue(name topic.queue2),exchange Exchange(name hmall.topic, type ExchangeTypes.TOPIC),key #.news
))
public void listenTopicQueue2(String msg){System.out.println(消费者2接收到topic.queue2的消息【 msg 】);
}