网站建设 碧辉腾乐,网站建设做什么好,个体户门头图片,如何添加网站图标目录
一、消费消息的规则
二、消费消息的具体实现方法
#x1f345; 1、编写消费者类#xff08;ConsumerEnv#xff09;
#x1f345; 2、编写Consumer函数式接口#xff08;回调函数#xff09;
#x1f345; 3、编写ConsumeerManager类 #x1f384;定义成员变…目录
一、消费消息的规则
二、消费消息的具体实现方法 1、编写消费者类ConsumerEnv 2、编写Consumer函数式接口回调函数 3、编写ConsumeerManager类 定义成员变量 notifyConsume()方法 添加构造方法 addConsumer()方法 完善consumeMessage()方法 4、完成VirtualHost类编写 basicConsume()方法编写 编写basicAck类手动应答 三、测试VirtualHost 1、准备工作和收尾工作 2、测试交换机的创建和删除
3、测试队列的创建和删除 4、测试绑定的创建和删除 5、测试发布消息
6、测试消费消息 先订阅队列再发送消息 先发送消息再订阅队列 测试basicAck 一、消费消息的规则
前面主要讲了basicPublish发布消息这一块同时写了Router类实现了bindingKey和routingKey的命名规则和匹配规则主要就是讲的是生产消息。
那么接下来就实现消费者消费消息。 推送给消费者消息的基本思路 1、broker server管理者哪些消费者 2、收到了对应的消息把消息推送给消费者 已知一个broker server中是包含了很多个队列的 消费者调用basicConsume就是订阅某个队列的消息 1、消费者是以队列的维度订阅消息 2、一个队列可以有多个消费者 此处只需要约定消费者如何消费即可。
这里使用“轮询”的方式消费消息轮询举例子如上图有123三个消费者让他们分别轮流消费一条消息依次轮流来一次消费一个。 具体实现 1、定义一个类描述一个消费者 2、然后给每个队列对象MSGQueue对象加上属性相当于一个List包含若干个消费者对象。 二、消费消息的具体实现方法
在VirtualHost类中实现一个订阅消息的方法basicConsume() 添加一个队列的订阅者当队列收到消息以后就要把消息推送给对应的订阅者。 consumerTag消费者的身份标识 aotoAck消息被消费完成后应答的方式为true自动应答为false就手动应答。 Consumer一个回调函数也就是一个函数式接口lambda函数底层实现这样在后面调用basicConsume的时候并且传实参的时候就可以写作lambda样子 1、编写消费者类ConsumerEnv /** 表示一个消费者* */
Data
public class ConsumerEnv {private String consumerTag; //消费者身份标识private String queueName;private boolean autoAck;
// 通过回调处理收到的消息private Consumer consumer;
}
然后再MSGQueue.java类中进行相应的扩充。 private ListConsumerEnv consumerEnvList new ArrayList();
// 记录取到了第几个消费者方便实现轮询策略
// AtomicInteger是一个原子性类型因为consumerSeq再消费信息的时候会被修改
// 如果使用int可能造成线程不安全于是这里就使用AtomicIntegerpublic AtomicInteger consumerSeq new AtomicInteger();
// 添加一个新的订阅者public void addConsumerEnv(ConsumerEnv consumerEnv){synchronized (this){consumerEnvList.add(consumerEnv);}}
// 挑选一个订阅者处理当前的消息轮询public ConsumerEnv chooseConsumer(){if (consumerEnvList.size() 0){
// 该队列没有人订阅return null;}
// 计算当前要取的元素的下标int index consumerSeq.get() % consumerEnvList.size();
// getAndIncrement()先获取当前值再加1。相当于 getAndAdd(1).consumerSeq.getAndIncrement(); //进行自增return consumerEnvList.get(index);} 2、编写Consumer函数式接口回调函数 创建一个Consumer接口。 /*
* 只是一个函数式接口
* 收到消息之后要处理消息时调用的方法
* */
FunctionalInterface
public interface Consumer {
// 处理投递
// 每次服务器收到消息之后调用消息通过这个方法把消息推送给对应的消费者void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException;
} 3、编写ConsumeerManager类
这个类主要就是用来实现消费者消费消息的核心逻辑。主要有以下几块。 消费消息就是让线程池执行对应消费者中的回调函数。在调用回调函数的时候就把消息的内容通过参数传进去。消费者在最初订阅消息的时候就把回调注册给broker server。回调函数的内容时消费者确定的取决于消费者的业务逻辑。 扫描线程能够感知到哪个队列里面收到了新的消息扫描线程会取出该消息找出对应的消费者将该内容打包成一个任务丢给线程池去调用 为什么需要线程池 一些消费者给出的回调函数处理起来可能会比较耗时如果只有一个扫描线程那么可能会导致处理不及时导致队列中消息越来越多。所以这里引入的扫描线程就轻量的取消息和获取回调而线程池就用来执行处理的回调函数。 扫描线程如何明白哪个队列中有了新消息 引入一个阻塞队列。该队列中的元素是有消息的队列的名字哪一个队列有消息了就把队列名放到该阻塞队列中。扫描线程就可以从阻塞队列中获取到新增消息的队列的名字。 如何保证消息不被丢失 使用消息确认ACK。在消息确认就是为了避免消费者的回调方法在执行过程中出错导致消息丢失这种情况。 为了保证消息不丢失 1在真正执行回调之前把该消息放到“待确认集合”中也就是前面MemoryDataCenter中的queueMessageWaitAckMap集合中 2执行回调 3当前消费者采取的是autoAck true也就是回调执行完毕不抛异常就算消费成功消费成功以后删除消息硬盘内存哈希表待确认集合 4当前消息采取的是autoAck false手动应答。也就是消费者这边在回调方法内部显示调用basicAck这个核心API。 定义成员变量
也就是上面提到过的阻塞队列扫描线程线程池。
public class ConsumerManager {
// 持有上层VirtualHostprivate VirtualHost parent;
// 指定一个线程池负责去执行具体的回调任务private ExecutorService workerPool Executors.newFixedThreadPool(4);
// 引入一个阻塞队列存放队列名的private BlockingQueueString tokenQueue new LinkedBlockingDeque();
// 扫描线程private Thread scannerThread null;} notifyConsume()方法
这个方法主要就是为了通知什么时候消费这里主要就是在发送消息的时候通知消费将含有该消息的队列名放在阻塞队列中
// 通知消费
// 调用时机发送消息的时候就调用sendMessagepublic void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}
所以我们就需要在前面VirtualHost类中的sendMessage方法中再调用一个通知消费的方法
异常大家自己向上抛一下。
// 通知消费者进行消费consumerManager.notifyConsume(queue.getName()); 添加构造方法
添加构造方法构造一个线程编写从队列中取出消息的过程
其中的consumeMessage(queue)是消费消息的具体实现方法先列在这里不实现 public ConsumerManager(VirtualHost p){parent p;scannerThread new Thread(()-{
// 持续运行while (true){try {
// 1、从阻塞队列中拿到队列名String queueName tokenQueue.take();
// 2、根据队列名找到队列MSGQueue queue parent.getMemoryDataCenter().getQueue(queueName);if (queue null){throw new MqException([ConsumerManager]取出令牌后发现该队列名不存在queuName queueName);}
// 3、从队列中消费一个消息synchronized (queue){consumeMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});
// 把线程设为后台线程scannerThread.setDaemon(true);scannerThread.start();}private void consumeMessage(MSGQueue queue) {//TODO
} addConsumer()方法
该方法主要是为了新增一个Consumer对象到指定的队列中。
// 新增一个Consumer对象到指定的队列中public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
// 找到对应的队列MSGQueue queue parent.getMemoryDataCenter().getQueue(queueName);if (queue null){throw new MqException([ConsumerManager]队列不存在queueName queueName);}ConsumerEnv consumerEnv new ConsumerEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue){queue.addConsumerEnv(consumerEnv);
// 如果当前队列中已经有了一些消息需要立即消费掉int n parent.getMemoryDataCenter().getMessageCount(queueName);for (int i 0; i n; i) {
// 调用一次就消费一条消息consumeMessage(queue);}}}完善consumeMessage()方法
这个方法前面只列了一下没有实现这里具体实现一下。 主要有以下几步 1按照轮询的方式找出一个消费者 2从队列中取出一个消息 3把消息丢给回调函数给线程池处理。 a. 把消息放到待确认集合中 b. 真正的执行回调操作 c. 如果是自动应答直接删除消息手动应答先不处理交给后续消费者调用 basicAck()。 private void consumeMessage(MSGQueue queue) {
// 1、按照轮询的方式找出一个消费者来ConsumerEnv luckyDog queue.chooseConsumer();if (luckyDog null){
// 当前没有消费者暂时不消费return;}
// 2、从队列中取出一个消息
// pollMessage是为了从队列中取出消息Message message parent.getMemoryDataCenter().pollMessage(queue.getName());if (message null) {
// 当前队列没有消息return;}
// 3、把消息丢给回调函数中给线程池处理workerPool.submit(() - {try {
// 1、把消息放到待确认集合中parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message);
// 2、真正执行回调操作luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());
// 3、如果当前是自动应答就可以直接删除消息
// 如果是手动应答就需要调用basicAck()if (luckyDog.isAutoAck()){
// 1.删除硬盘先看是不是持久化消息if (message.getDeliverMode() 2){parent.getDiskDataCenter().deleteMessage(queue,message);}
// 2、待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(),message.getMessageId());
// 3、删除内存中消息中心的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println([ConsumerManager]消息被成功消费queueName queue.getName());}} catch (Exception e) {e.printStackTrace();}});} 4、完成VirtualHost类编写 basicConsume()方法编写
该方法主要作用是订阅消息(消费消息)。在VirtualHost中实现。其中调用了ConsumerManager中的方法。
首先在VirtualHost添加consumerManager的实例。 private ConsumerManager consumerManager new ConsumerManager(this);然后写订阅消的方法。
// 订阅消息public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer){
// 构造一个ConsumerEnv对象也就是消费者对象把对应的队列找到然后将Consumer对象添加到该队列中。queueName virtualHostName queueName;try {consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);System.out.println([VirtualHost]basicConsume成功 queueName queueName);return true;} catch (Exception e){System.out.println([VirtualHost]basicConsume失败 queueName queueName);e.printStackTrace();return false;}} 编写basicAck类手动应答
public boolean basicAck(String queueName,String messageId){queueName virtualHostName queueName;try{
// 1、获取到消息和队列Message message memoryDataCenter.getMessage(messageId);if (message null){throw new MqException([VirtualHost] 消息不存在messgeId messageId);}MSGQueue queue memoryDataCenter.getQueue(queueName);if (queue null){throw new MqException([VirtualHost] 要确认的队列不存在queueName queueName);}
// 2、删除硬盘上的数据if (message.getDeliverMode() 2){diskDataCenter.deleteMessage(queue,message);}
// 3、、删除内存中的数据memoryDataCenter.removeMessage(messageId);
// 4、删除待确认集合中的数据memoryDataCenter.removeMessageWaitAck(queueName,messageId);System.out.println([VirtualHost]basicAck成功消息被成功确认queueName queueName);return true;
//}catch (Exception e){System.out.println([VirtualHost]basicAck失败消息被成功失败queueName queueName);e.printStackTrace();return false;}}
到这里我们的虚拟主机VirtualHost类就算全部写完了。 三、测试VirtualHost 1、准备工作和收尾工作
SpringBootTest
public class VirtualHostTests {private VirtualHost virtualHost null;BeforeEachpublic void setUp(){TigerMqApplication.context SpringApplication.run(TigerMqApplication.class);virtualHost new VirtualHost(default);}public void tearDown() throws IOException {TigerMqApplication.context.close();virtualHost null;
// 把硬盘目录删除File dataDir new File(./data);FileUtils.deleteDirectory(dataDir);}
} 2、测试交换机的创建和删除
// 测试创建和删除交换机Testpublic void testExchange(){boolean ok virtualHost.exchangeDeclare(testExchange, ExchangeType.DIRECT,true);Assertions.assertTrue(ok);ok virtualHost.exchangeDelete(testExchange);Assertions.assertTrue(ok);}3、测试队列的创建和删除
//测试创建队列和删除队列Testpublic void testQueue(){boolean ok virtualHost.queueDeclare(testQueue, true);Assertions.assertTrue(ok);ok virtualHost.queueDelete(testQueue);Assertions.assertTrue(ok);} 4、测试绑定的创建和删除
// 测试创建绑定和删除绑定Testpublic void testQueueBind(){boolean ok virtualHost.exchangeDeclare(testExchange, ExchangeType.DIRECT,true);Assertions.assertTrue(ok);ok virtualHost.queueDeclare(testQueue, true);Assertions.assertTrue(ok);ok virtualHost.queueBind(testQueue,testExchange,testBindingKey);Assertions.assertTrue(ok);ok virtualHost.queueUnbind(testQueue,testExchange);Assertions.assertTrue(ok);} 5、测试发布消息
// 测试发布消息
Test
public void testBasicPublish() {boolean ok virtualHost.queueDeclare(testQueue, true);Assertions.assertTrue(ok);ok virtualHost.exchangeDeclare(testExchange, ExchangeType.DIRECT,true);Assertions.assertTrue(ok);ok virtualHost.basicPublish(testExchange, testQueue, null,hello.getBytes());Assertions.assertTrue(ok);
}
6、测试消费消息 先订阅队列再发送消息 // 消费消息// 先订阅队列, 后发送消息Testpublic void testBasicConsume1() throws InterruptedException {boolean ok virtualHost.queueDeclare(testQueue, true);Assertions.assertTrue(ok);ok virtualHost.exchangeDeclare(testExchange, ExchangeType.DIRECT,true);Assertions.assertTrue(ok);// 先订阅队列ok virtualHost.basicConsume(testConsumerTag, testQueue, true, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println(messageId basicProperties.getMessageId());System.out.println(body new String(body, 0, body.length));Assertions.assertEquals(testQueue, basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals(hello.getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);// 再发送消息ok virtualHost.basicPublish(testExchange, testQueue, null,hello.getBytes());Assertions.assertTrue(ok);}打印的日志如下
[DataBaseManger]创建表完成
[DataBaseManger]创建初始数据已经完成
[DataBaseManger]数据库初始化完成
[MemoryDataCenter]队列删除成功queueName defaulttestQueue
[VirtualHost]队列创建成功queueName defaulttestQueue
[MemoryDataCenter]新交换机添加成功exchangeName defaulttestExchange
[VirtualHost] 交换机创建完成exchangeName defaulttestExchange
[VirtualHost]basicConsume成功 queueName defaulttestQueue
[MemoryDataCenter]新消息添加成功messageId M-a500879e-5461-4550-8d56-5bef00571ab3
[MemoryDataCenter]消息被投递到到队列中! messageId M-a500879e-5461-4550-8d56-5bef00571ab3
[MemoryDataCenter]消息从队列中取出!messageId M-a500879e-5461-4550-8d56-5bef00571ab3
[MemoryDataCenter]消息进入待确认队列!messageId M-a500879e-5461-4550-8d56-5bef00571ab3
messageIdM-a500879e-5461-4550-8d56-5bef00571ab3
bodyhello
[MemoryDataCenter]消息从待确认队列删除!messageId M-a500879e-5461-4550-8d56-5bef00571ab3
[MemoryDataCenter]消息被移除messageId M-a500879e-5461-4550-8d56-5bef00571ab3
[ConsumerManager]消费被成功消费queueName defaulttestQueue先发送消息再订阅队列 Testpublic void testBasicConsume2() throws InterruptedException {boolean ok virtualHost.queueDeclare(testQueue, true);Assertions.assertTrue(ok);ok virtualHost.exchangeDeclare(testExchange, ExchangeType.DIRECT,true);Assertions.assertTrue(ok);// 先发送消息ok virtualHost.basicPublish(testExchange, testQueue, null,hello.getBytes());Assertions.assertTrue(ok);Thread.sleep(500);// 再订阅队列ok virtualHost.basicConsume(testConsumerTag, testQueue, true, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println(messageId basicProperties.getMessageId());System.out.println(body new String(body, 0, body.length));Assertions.assertEquals(testQueue, basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals(hello.getBytes(), body);}});Assertions.assertTrue(ok);}
[MessageFileManager]恢复Message数据完成
[VirtualHost]队列已经存在queueName defaulttestQueue
[VirtualHost]交换机已经存在exchangeName defaulttestExchange
[MemoryDataCenter]新消息添加成功messageId M-b765df78-6997-4ce2-b87e-a6d37e3ee3c8
[MemoryDataCenter]消息被投递到到队列中! messageId M-b765df78-6997-4ce2-b87e-a6d37e3ee3c8
[MemoryDataCenter]消息从队列中取出!messageId M-b765df78-6997-4ce2-b87e-a6d37e3ee3c8
[VirtualHost]basicConsume成功 queueName defaulttestQueue
[MemoryDataCenter]消息进入待确认队列!messageId M-b765df78-6997-4ce2-b87e-a6d37e3ee3c8
messageIdM-b765df78-6997-4ce2-b87e-a6d37e3ee3c8
bodyhello
[MemoryDataCenter]消息从待确认队列删除!messageId M-b765df78-6997-4ce2-b87e-a6d37e3ee3c8
[MemoryDataCenter]消息被移除messageId M-b765df78-6997-4ce2-b87e-a6d37e3ee3c8
[ConsumerManager]消费被成功消费queueName defaulttestQueue测试basicAck
Testpublic void testBasicAck() throws InterruptedException {boolean ok virtualHost.queueDeclare(testQueue, true);Assertions.assertTrue(ok);ok virtualHost.exchangeDeclare(testExchange, ExchangeType.DIRECT,true);Assertions.assertTrue(ok);// 先发送消息ok virtualHost.basicPublish(testExchange, testQueue, null,hello.getBytes());Assertions.assertTrue(ok);// 再订阅队列 [把 autoAck 改成 false]ok virtualHost.basicConsume(testConsumerTag, testQueue, false, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println(messageId basicProperties.getMessageId());System.out.println(body new String(body, 0, body.length));Assertions.assertEquals(testQueue, basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals(hello.getBytes(), body);// [新增手动调用 basicAck]boolean ok virtualHost.basicAck(testQueue, basicProperties.getMessageId());Assertions.assertTrue(ok);}});Assertions.assertTrue(ok);Thread.sleep(500);}
[DataBaseManger]创建表完成
[DataBaseManger]创建初始数据已经完成
[DataBaseManger]数据库初始化完成
[MemoryDataCenter]队列删除成功queueName defaulttestQueue
[VirtualHost]队列创建成功queueName defaulttestQueue
[MemoryDataCenter]新交换机添加成功exchangeName defaulttestExchange
[VirtualHost] 交换机创建完成exchangeName defaulttestExchange
[MemoryDataCenter]新消息添加成功messageId M-72d857bf-fea8-4cf3-a94b-2c87c5226107
[MemoryDataCenter]消息被投递到到队列中! messageId M-72d857bf-fea8-4cf3-a94b-2c87c5226107
[MemoryDataCenter]消息从队列中取出!messageId M-72d857bf-fea8-4cf3-a94b-2c87c5226107
[VirtualHost]basicConsume成功 queueName defaulttestQueue
[MemoryDataCenter]消息进入待确认队列!messageId M-72d857bf-fea8-4cf3-a94b-2c87c5226107
messageIdM-72d857bf-fea8-4cf3-a94b-2c87c5226107
bodyhello
[MemoryDataCenter]消息被移除messageId M-72d857bf-fea8-4cf3-a94b-2c87c5226107
[MemoryDataCenter]消息从待确认队列删除!messageId M-72d857bf-fea8-4cf3-a94b-2c87c5226107
[VirtualHost]basicAck成功消息被成功确认queueName defaulttestQueue