一个商城网站开发周期,重庆中小企业建站价格,怎么看网站蜘蛛,uc官方网站开发中心1. 项目问题分析
现在项目中有三个独立的微服务#xff1a;
商品微服务#xff1a;原始数据保存在 MySQL 中#xff0c;从 MySQL 中增删改查商品数据。搜索微服务#xff1a;原始数据保存在 ES 的索引库中#xff0c;从 ES 中查询商品数据。商品详情微服务#xff1a;做…1. 项目问题分析
现在项目中有三个独立的微服务
商品微服务原始数据保存在 MySQL 中从 MySQL 中增删改查商品数据。搜索微服务原始数据保存在 ES 的索引库中从 ES 中查询商品数据。商品详情微服务做了页面静态化静态页面的商品数据不会随着数据库发生变化。
思考上面问题的同时我们会想起一件事情其实商品数据如果发生了增、删、改不仅仅静态页面需要处理我们的索引库数据也需要同步这又该如何解决
因为商品新增后需要上架用户才能看到商品修改需要先下架然后修改再上架。因此上述问题可以统一的设计成这样的逻辑处理
商品上架 生成静态页新增索引库数据商品下架 删除静态页删除索引库数据
这样既可保证数据库商品与索引库、静态页三者之间的数据同步。
那么如何实现上述逻辑呢
先看两种解决方案
方案1在商品微服务的上下架业务后加入修改索引库数据及静态页面的代码方案2搜索服务和静态页服务对外提供操作索引库和静态页接口商品微服务在商品上下架后调用接口。
以上两种方式都有同一个严重问题就是代码耦合后台服务中需要嵌入搜索和商品页面服务违背了微服务的独立原则而且严重违背了开闭原则。
所以我们会通过另外一种方式来解决这个问题消息队列
解决方案架构图 MQ三大功能异步化业务解耦合提高系统吞吐量流量削峰。
2.数据同步RabbitMQ回顾
1什么是消息队列
消息队列即MQMessage Queue。 消息队列是典型的生产者、消费者模型。生产者不断向消息队列中生产消息消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的而且只关心消息的发送和接收没有业务逻辑的侵入这样就实现了生产者和消费者的解耦。
结合前面所说的问题
商品服务对商品上下架以后无需去操作索引库或静态页面只是发送一条消息也不关心消息被谁接收。搜索服务和静态页面服务接收消息分别去处理索引库和静态页面。
如果以后有其它系统也依赖商品服务的数据同样监听消息即可商品服务无需任何代码修改
2AMQP和JMS
MQ是消息通信的模型并非具体实现。现在实现MQ的有两种主流方式AMQP、JMS。 两者间的区别和联系
JMS是定义了统一的接口来对消息操作进行统一AMQP是通过规定协议来统一数据交互的格式。JMS限定了必须使用Java语言AMQP只是协议不规定实现方式因此是跨语言的。JMS规定了两种消息模型分别是点对点和发布订阅两种而AMQP的消息模型更加丰富。
3常见MQ产品 ActiveMQ基于JMS Apache RabbitMQ基于AMQP协议erlang语言开发稳定性好 RocketMQ基于JMS阿里巴巴产品目前交由Apache基金会 RabbitMQ vs RocketMQ Queue Queue Exchange Topic Router Tag Kafka分布式消息系统高吞吐量
4RabbitMQ
RabbitMQ是基于AMQP的一款消息管理系统
官网 http://www.rabbitmq.com/
官方教程http://www.rabbitmq.com/getstarted.html RabbitMQ基于Erlang语言开发 5RabbitMQ下载
官网下载地址http://www.rabbitmq.com/download.html
6RabbitMQ安装
安装这里就忽略了。
7RabbitMQ五种模型(*)
RabbitMQ提供了6种消息模型但是第6种其实是RPC并不是MQ因此不予学习。那么也就剩下5种。
但是其实3、4、5这三种都属于订阅模型只不过进行路由的方式不同。 8RabbitMQ基本消息模型
说明
官方文档说明 RabbitMQ是一个消息的代理者Message Broker它接收消息并且传递消息。 你可以认为它是一个邮局当你投递邮件到一个邮箱你很肯定邮递员会终究会将邮件递交给你的收件人。与此类似RabbitMQ 可以是一个邮箱、邮局、同时还有邮递员。 不同之处在于RabbitMQ不是传递纸质邮件而是二进制的数据 基本消息模型图 在上图的模型中有以下概念
P生产者也就是要发送消息的程序C消费者消息的接受者会一直等待消息到来。queue消息队列图中红色部分。类似一个邮箱可以缓存消息生产者向其中投递消息消费者从其中取出消息。
生产者
连接工具类
public class ConnectionUtil {/*** 建立与RabbitMQ的连接* return* throws Exception*/public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory new ConnectionFactory();//设置服务地址factory.setHost(192.168.56.101);//端口factory.setPort(5672);//设置账号信息用户名、密码、vhostfactory.setVirtualHost(/leyou);factory.setUsername(leyou);factory.setPassword(leyou);// 通过工程获取连接Connection connection factory.newConnection();return connection;}
}
生产者发送消息
public class Send {private final static String QUEUE_NAME simple_queue;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 从连接中创建通道使用通道才能完成消息相关的操作Channel channel connection.createChannel();// 声明创建队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message Hello World!;// 向指定的队列中发送消息channel.basicPublish(, QUEUE_NAME, null, message.getBytes());System.out.println( [x] Sent message );//关闭通道和连接channel.close();connection.close();}
}
控制台
web控制台查看消息
进入队列页面可以看到新建了一个队列simple_queue 点击队列名称进入详情页可以查看消息 在控制台查看消息并不会将消息消费所以消息还在。
消费者获取消息
public class Recv {private final static String QUEUE_NAME simple_queue;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 创建通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [x] received : msg !);}};// 监听队列第二个参数是否自动进行消息确认。channel.basicConsume(QUEUE_NAME, true, consumer);}
}
控制台 这个时候队列中的消息就没了 消费者的消息确认机制
通过刚才的案例可以看出消息一旦被消费者接收队列中的消息就会被删除。
那么问题来了RabbitMQ怎么知道消息被接收了呢
这就要通过消息确认机制Acknowlege来实现了。当消费者获取消息后会向RabbitMQ发送回执ACK告知消息已经被接收。不过这种回执ACK分两种情况
自动ACK消息一旦被接收消费者自动发送ACK手动ACK消息接收后不会发送ACK需要手动调用
大家觉得哪种更好呢
这需要看消息的重要性
如果消息不太重要丢失也没有影响那么自动ACK会比较方便如果消息非常重要不容丢失。那么最好在消费完成后手动ACK否则接收消息后就自动ACKRabbitMQ就会把消息从队列中删除。如果此时消费者宕机那么消息就丢失了。
我们之前的测试都是自动ACK的如果要手动ACK需要改动我们的代码
public class Recv2 {private final static String QUEUE_NAME simple_queue;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 创建通道final Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [x] received : msg !);// 手动进行ACKchannel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列第二个参数false手动进行ACKchannel.basicConsume(QUEUE_NAME, false, consumer);}
}
注意到最后一行代码
channel.basicConsume(QUEUE_NAME, false, consumer);
如果第二个参数为true则会自动进行ACK如果为false则需要手动ACK。方法的声明 9RabbitMQ的work消息模型
说明 在刚才的基本模型中一个生产者一个消费者生产的消息直接被消费者消费。比较简单。
Work queues也被称为Task queues任务模型。
当消息处理比较耗时的时候可能生产消息的速度会远远大于消息的消费速度。长此以往消息就会堆积越来越多无法及时处理。此时就可以使用work 模型让多个消费者绑定到一个队列共同消费队列中的消息。队列中的消息一旦消费就会消失因此任务是不会被重复执行的。 角色
P生产者任务的发布者C1消费者领取任务并且完成任务假设完成速度较慢C2消费者2领取任务并完成任务假设完成速度快
生产者
生产者与案例1中的几乎一样
public class Send {private final static String QUEUE_NAME test_work_queue;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 循环发布任务for (int i 0; i 50; i) {// 消息内容String message task .. i;channel.basicPublish(, QUEUE_NAME, null, message.getBytes());System.out.println( [x] Sent message );Thread.sleep(i * 2);}// 关闭通道和连接channel.close();connection.close();}
}
不过这里我们是循环发送50条消息。
消费者1 消费者2 与消费者1基本类似就是没有设置消费耗时时间。
这里是模拟有些消费者快有些比较慢。
接下来两个消费者一同启动然后发送50条消息 可以发现两个消费者各自消费了25条消息而且各不相同这就实现了任务的分发。
能者多劳
刚才的实现有问题吗
消费者1比消费者2的效率要低一次任务的耗时较长然而两人最终消费的消息数量是一样的消费者2大量时间处于空闲状态消费者1一直忙碌
现在的状态属于是把任务平均分配正确的做法应该是消费越快的人消费的越多。
怎么实现呢
我们可以修改设置让消费者同一时间只接收一条消息这样处理完成之前就不会接收更多消息就可以让处理快的人接收更多消息 再次测试 10RabbitMQ订阅模型分类
订阅模型示意图 前面2个案例中只有3个角色
P生产者也就是要发送消息的程序C消费者消息的接受者会一直等待消息到来。queue消息队列图中红色部分。类似一个邮箱可以缓存消息生产者向其中投递消息消费者从其中取出消息。
而在订阅模型中多了一个exchange角色而且过程略有变化
P生产者也就是要发送消息的程序但是不再发送到队列中而是发给X交换机C消费者消息的接受者会一直等待消息到来。Queue消息队列接收消息、缓存消息。Exchange交换机图中的X。一方面接收生产者发送的消息。另一方面知道如何处理消息例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange的类型。Exchange有以下3种类型
Fanout广播将消息交给所有绑定到交换机的队列Direct定向把消息交给符合指定routing key 的队列Topic通配符把消息交给符合routing pattern路由模式 的队列
Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与Exchange绑定或者没有符合路由规则的队列那么消息会丢失
11RabbitMQ订阅模型-Fanout
Fanout也称为广播。
流程说明
流程图 在广播模式下消息发送流程是这样的
1 可以有多个消费者 2 每个消费者有自己的queue队列 3 每个队列都要绑定到Exchange交换机 4 生产者发送的消息只能发送到交换机交换机来决定要发给哪个队列生产者无法决定。 5 交换机把消息发送给绑定过的所有队列 6 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
生产者
两个变化
1 声明Exchange不再声明Queue2 发送消息到Exchange不再发送到Queue public class Send {private final static String EXCHANGE_NAME fanout_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明exchange指定类型为fanoutchannel.exchangeDeclare(EXCHANGE_NAME, fanout);// 消息内容String message Hello everyone;// 发布消息到Exchangechannel.basicPublish(EXCHANGE_NAME, , null, message.getBytes());System.out.println( [生产者] Sent message );channel.close();connection.close();}
} 消费者1
public class Recv {private final static String QUEUE_NAME fanout_exchange_queue_1;private final static String EXCHANGE_NAME fanout_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, );// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [消费者1] received : msg !);}};// 监听队列自动返回完成channel.basicConsume(QUEUE_NAME, true, consumer);}
}
要注意代码中队列需要和交换机绑定
消费者2
public class Recv2 {private final static String QUEUE_NAME fanout_exchange_queue_2;private final static String EXCHANGE_NAME fanout_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, );// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [消费者2] received : msg !);}};// 监听队列手动返回完成channel.basicConsume(QUEUE_NAME, true, consumer);}
}
测试
我们运行两个消费者然后发送1条消息 12RabbitMQ订阅模型-Direct
说明
在Fanout模式中一条消息会被所有订阅的队列都消费。但是在某些场景下我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下
队列与交换机的绑定不能是任意绑定了而是要指定一个RoutingKey路由key消息的发送方在 向 Exchange发送消息时也必须指定消息的 RoutingKey。Exchange不再把消息交给每一个绑定的队列而是根据消息的Routing Key进行判断只有队列的Routingkey与消息的 Routing key完全一致才会接收到消息
流程图 图解
P生产者向Exchange发送消息发送消息时会指定一个routing key。XExchange交换机接收生产者的消息然后把消息递交给 与routing key完全匹配的队列C1消费者其所在队列指定了需要routing key 为 error 的消息C2消费者其所在队列指定了需要routing key 为 info、error、warning 的消息
生产者
此处我们模拟商品的增删改发送消息的RoutingKey分别是insert、update、delete
public class Send {private final static String EXCHANGE_NAME direct_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明exchange指定类型为directchannel.exchangeDeclare(EXCHANGE_NAME, direct);// 消息内容String message 商品新增了 id 1001;// 发送消息并且指定routing key 为insert ,代表新增商品channel.basicPublish(EXCHANGE_NAME, insert, null, message.getBytes());System.out.println( [商品服务] Sent message );channel.close();connection.close();}
}
消费者1
我们此处假设消费者1只接收两种类型的消息更新商品和删除商品。
public class Recv {private final static String QUEUE_NAME direct_exchange_queue_1;private final static String EXCHANGE_NAME direct_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机同时指定需要订阅的routing key。假设此处需要update和delete消息channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, update);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, delete);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [消费者1] received : msg !);}};// 监听队列自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}
消费者2
我们此处假设消费者2接收所有类型的消息新增商品更新商品和删除商品。
public class Recv2 {private final static String QUEUE_NAME direct_exchange_queue_2;private final static String EXCHANGE_NAME direct_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机同时指定需要订阅的routing key。订阅 insert、update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, insert);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, update);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, delete);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [消费者2] received : msg !);}};// 监听队列自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}
测试
我们分别发送增、删、改的RoutingKey发现结果 13RabbitMQ订阅模型-Topic
说明
Topic类型的Exchange与Direct相比都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符
Routingkey 一般都是有一个或多个单词组成多个单词之间以”.”分割例如 item.insert
通配符规则
#匹配一个或多个词
*匹配不多不少恰好1个词
举例
item.#能够匹配item.spu.insert 或者 item.spu
item.*只能匹配item.spu
图示 解释
红色Queue绑定的是usa.# 因此凡是以 usa.开头的routing key 都会被匹配到黄色Queue绑定的是#.news 因此凡是以 .news结尾的 routing key 都会被匹配
生产者
使用topic类型的Exchange发送消息的routing key有3种 item.isnert、item.update、item.delete
public class Send {private final static String EXCHANGE_NAME topic_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明exchange指定类型为topicchannel.exchangeDeclare(EXCHANGE_NAME, topic);// 消息内容String message 新增商品 : id 1001;// 发送消息并且指定routing key 为insert ,代表新增商品channel.basicPublish(EXCHANGE_NAME, item.insert, null, message.getBytes());System.out.println( [商品服务] Sent message );channel.close();connection.close();}
}
消费者1
我们此处假设消费者1只接收两种类型的消息更新商品和删除商品
public class Recv {private final static String QUEUE_NAME topic_exchange_queue_1;private final static String EXCHANGE_NAME topic_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机同时指定需要订阅的routing key。需要 update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, item.update);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, item.delete);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [消费者1] received : msg !);}};// 监听队列自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}
消费者2
我们此处假设消费者2接收所有类型的消息新增商品更新商品和删除商品。
/*** 消费者2*/
public class Recv2 {private final static String QUEUE_NAME topic_exchange_queue_2;private final static String EXCHANGE_NAME topic_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机同时指定需要订阅的routing key。订阅 insert、update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, item.*);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [消费者2] received : msg !);}};// 监听队列自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}
14RabbitMQ持久化
如何避免消息丢失
1 消费者的ACK机制。可以防止消费者丢失消息。
2 但是如果在消费者消费之前MQ就宕机了消息就没了。
所以我们需要将消息持久化到硬盘以防服务宕机。
要将消息持久化前提是队列、Exchange都持久化
交换机持久化 队列持久化 消息持久化 3.数据同步创建rabbitMQ用户并授权
1 创建用户
在这里插入图片描述
效果如下 2 创建虚拟机 效果如下 3 给leyouxxx用户授权 效果如下 4 切换到leyouxxx用户 4.数据同步SpringAMQP的使用入门
1简介
Sprin有很多不同的项目其中就有对AMQP的支持 Spring AMQP的页面Spring AMQP 注意这里一段描述 Spring-amqp是对AMQP协议的抽象实现而spring-rabbit 是对协议的具体实现也是目前的唯一实现。底层使用的就是RabbitMQ。
2依赖和配置
添加AMQP的启动器
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.ithiema/groupIdartifactIdspring-boot-rabbitmq/artifactIdversion1.0-SNAPSHOT/versionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.1.6.RELEASE/versionrelativePath//parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependency/dependencies
/project
在application.yml中添加RabbitMQ地址
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /leyouusername: leyoupassword: leyou编写SpringBoot启动类
package cn.itcast;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/****/
SpringBootApplication
public class RabbitMQApplication {public static void main(String[] args) {SpringApplication.run(RabbitMQApplication.class,args);}
}
3监听者
在SpringAmqp中对消息的消费者进行了封装和抽象一个普通的JavaBean中的普通方法只要通过简单的注解就可以成为一个消费者。
package com.itheima.consumer;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消费方*/
Component
public class MQListener {/*** 接收消费的方法* value: 绑定队列信息* exchange: 绑定交换机信息* key 绑定路由信息*/RabbitListener(bindings QueueBinding(value Queue(name ly_test_queue),exchange Exchange(name ly_test_exchange,type ExchangeTypes.TOPIC),key user.#))public void retrireMsg(String msg){System.out.println(接收的消息msg);}
}
Componet类上的注解注册到Spring容器 RabbitListener方法上的注解声明这个方法是一个消费者方法需要指定下面的属性 bindings指定绑定关系可以有多个。值是QueueBinding的数组。QueueBinding包含下面属性 value这个消费者关联的队列。值是Queue代表一个队列 exchange队列所绑定的交换机值是Exchange类型 key队列和交换机绑定的RoutingKey
类似listen这样的方法在一个类中可以写多个就代表多个消费者。
4消息发送AmqpTemplate
Spring最擅长的事情就是封装把他人的框架进行封装和整合。
Spring为AMQP提供了统一的消息处理模板AmqpTemplate非常方便的发送消息其发送方法 红框圈起来的是比较常用的3个方法分别是
指定交换机、RoutingKey和消息体指定消息指定RoutingKey和消息会向默认的交换机发送消息
5测试代码
package com.itheima;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;RunWith(SpringRunner.class)
SpringBootTest(classes RabbitMQApplication.class)
public class MQTest {Autowiredprivate AmqpTemplate amqpTemplate;Testpublic void testSendMs(){amqpTemplate.convertAndSend(ly_test_exchange,user.insert,测试MQ是否可以发送消息);try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}
}
运行后查看日志