当前位置: 首页 > news >正文

陕西恒业建设集团网站湖南网站seo推广

陕西恒业建设集团网站,湖南网站seo推广,上海建设工程招投标在什么网站,牡丹江网站建设大纲 创建Core交换器用户登录发起聊天邀请接受邀请聊天实验过程总结代码工程 经过之前的若干节的学习,我们基本掌握了Rabbitmq各个组件和功能。本文我们将使用之前的知识搭建一个简单的单人聊天服务。 基本结构如下。为了避免Server有太多连线导致杂乱,下…

大纲

  • 创建Core交换器
  • 用户登录
  • 发起聊天邀请
  • 接受邀请
  • 聊天
  • 实验过程
  • 总结
  • 代码工程

经过之前的若干节的学习,我们基本掌握了Rabbitmq各个组件和功能。本文我们将使用之前的知识搭建一个简单的单人聊天服务。
基本结构如下。为了避免Server有太多连线导致杂乱,下图将Server画成两个模块,实则是一个服务。
在这里插入图片描述
该服务由两个核心交换器构成。
Core交换器是服务启动时创建的,它主要是为了向不同用户传递“系统通知型”消息。比如Jerry向Tom发起聊天邀请,则是通过上面黑色字体6-10的流程发给了Core交换器。然后Core交换器将该条消息告知Tom。
Fanout交换器是用来消息传递的。Jerry和Tom都向其发送消息,然后路由到两个队列。它们两各自订阅一个队列,就可以看到彼此的聊天内容了。

创建Core交换器

package com.rabbitmq.chat.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import reactor.core.publisher.Flux;@Service
public class Core {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;final String exchangeName = "Core";@PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();createExchange(exchangeName);}private void createExchange(String exchangeName) {rabbitTemplate.execute(channel -> {channel.exchangeDeclare(exchangeName, "direct", false, true, null);return null;});}

用户登录

用户登录后,我们会创建一个“系统通知”队列。然后用户就会通过长连接形式,持续等待系统发出通知。

    private final ReentrantLock lock = new ReentrantLock();final private Map<String, SimpleMessageListenerContainer> listeners = new java.util.HashMap<>();public Flux<String> Login(String username) {createExclusiveQueue(username);createBanding(exchangeName, username, username);return Flux.create(emitter -> {SimpleMessageListenerContainer container = getListener(username, (Message message) -> {String msg = new String(message.getBody());System.out.println("Received message: " + msg);emitter.next(msg);});container.start();});}private void createExchange(String exchangeName) {rabbitTemplate.execute(channel -> {channel.exchangeDeclare(exchangeName, "direct", false, true, null);return null;});}private void createBanding(String exchangeName, String queueName, String routingKey) {rabbitTemplate.execute(channel -> {channel.queueBind(queueName, exchangeName, routingKey);return null;});}private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener = listeners.get(queueName);if (listener == null && messageListener != null) {listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}

Controller如下

package com.rabbitmq.chat.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.chat.service.Core;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/user")
public class UserController {@Autowiredprivate Core core;@PostMapping(value = "/login", produces = "text/event-stream")public Flux<String> login(@RequestParam String username) {return core.Login(username);}
}

发起聊天邀请

发起聊天邀请时,系统会预先创建一个聊天室(ChatRoomInfo )。它包含上图中Fanout交换器、以及聊天双方需要订阅的消息队列。
这些创建完后,发起方就会等待对方发送的消息,也可以自己和自己聊天。因为消息队列已经创建好了,只是对方还没使用。

package com.rabbitmq.chat.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import lombok.Data;
import reactor.core.publisher.Flux;@Service
public class ChatRoom {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;@Dataprivate class ChatRoomInfo {private String exchange;private Map<String, String> usernameToQueuename;}private final Map<String, ChatRoomInfo> chatRooms = new java.util.HashMap<>();private final ReentrantLock lock = new ReentrantLock();   @PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();}public Flux<String> invite(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {createChatRoom(fromUsername, toUsername);}return talk(chatRoomName, fromUsername);}private void createChatRoom(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);String exchangeName = chatRoomName;String fromQueueName = "queue-" + fromUsername + "-" + toUsername;String toQueueName = "queue-" + toUsername + "-" + fromUsername;rabbitTemplate.execute(action -> {action.exchangeDeclare(exchangeName, "fanout", false, true, null);action.queueDeclare(fromQueueName, false, true, false, null);action.queueDeclare(toQueueName, false, true, false, null);action.queueBind(fromQueueName, exchangeName, "");action.queueBind(toQueueName, exchangeName, "");return null;});lock.lock();try {ChatRoomInfo chatRoomInfo = new ChatRoomInfo();chatRoomInfo.setExchange(exchangeName);chatRoomInfo.setUsernameToQueuename(Map.of(fromUsername, fromQueueName, toUsername, toQueueName));chatRooms.put(chatRoomName, chatRoomInfo);} finally {lock.unlock();}}

接受邀请

被邀请方通过Core交换器得知有人要和它聊天。
然后接受邀请的请求会寻找聊天室信息,然后订阅聊天记录队列。

    public Flux<String> accept(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);return talk(chatRoomName, toUsername);}private Flux<String> talk(String chatRoomName, String username) {ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {throw new IllegalArgumentException("Chat room not found");}String queueName = chatRoomInfo.getUsernameToQueuename().get(username);return Flux.create(emitter -> {SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener((Message message) -> {String msg = new String(message.getBody());System.out.println(username + " received message: " + msg);emitter.next(msg);});listener.start();});}

聊天

聊天的逻辑就是找到聊天室信息,然后向交换器发送消息。

    public void chat(String fromUsername, String toUsername, String message) {String chatRoomName = getChatRoomName(fromUsername, toUsername);ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {chatRoomName = getChatRoomName(toUsername, fromUsername);chatRoomInfo = chatRooms.get(chatRoomName);}if (chatRoomInfo == null) {throw new IllegalArgumentException("Chat room not found");}rabbitTemplate.convertAndSend(chatRoomInfo.getExchange(), "", fromUsername + ": " + message);}private String getChatRoomName(String fromUsername, String toUsername) {return fromUsername + "-" + toUsername + "-chat-room";}

Controller侧代码

package com.rabbitmq.chat.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.chat.service.ChatRoom;
import com.rabbitmq.chat.service.Core;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/chat")
public class ChatController {@Autowiredprivate Core core;@Autowiredprivate ChatRoom chatRoom;@PutMapping(value = "/invite", produces = "text/event-stream")public Flux<String> invite(@RequestParam String fromUsername, @RequestParam String toUsername) {core.invite(fromUsername, toUsername);return chatRoom.invite(fromUsername, toUsername);}@PutMapping(value = "/accept", produces = "text/event-stream")public Flux<String> accept(@RequestParam String fromUsername, @RequestParam String toUsername) {core.accept(fromUsername, toUsername);return chatRoom.accept(fromUsername, toUsername);}@PostMapping("/send")public void send(@RequestParam String fromUsername, @RequestParam String toUsername, @RequestParam String message) {chatRoom.chat(fromUsername, toUsername, message);}
}

实验过程

在Postman中,我们先让tom登录,然后jerry登录。
在这里插入图片描述
在这里插入图片描述
在后台,我们看到创建两个队列
在这里插入图片描述
以及Core交换器的绑定关系也被更新
在这里插入图片描述
Jerry向Tom发起聊天邀请
在这里插入图片描述
可以看到Tom收到了邀请
在这里插入图片描述
同时新增了两个队列
在这里插入图片描述
以及一个交换器
在这里插入图片描述
在这里插入图片描述
Tom通过下面请求接受邀请
在这里插入图片描述
Jerry收到Tom接受了邀请的通知
在这里插入图片描述
后面它们就可以聊天了
在这里插入图片描述
在这里插入图片描述
它们的聊天窗口都收到了消息
在这里插入图片描述
在这里插入图片描述

总结

本文主要使用的知识点:

  • direct交换器以及其绑定规则
  • fanout交换器
  • 自动删除的交换器
  • 自动删除的队列
  • 只有一个消费者的队列
  • WebFlux响应式编程

代码工程

https://github.com/f304646673/RabbitMQDemo


文章转载自:
http://manslaughter.Lpnb.cn
http://unentertained.Lpnb.cn
http://estafette.Lpnb.cn
http://closet.Lpnb.cn
http://sejm.Lpnb.cn
http://pyorrhea.Lpnb.cn
http://disburse.Lpnb.cn
http://untillable.Lpnb.cn
http://terpolymer.Lpnb.cn
http://detract.Lpnb.cn
http://pebbleware.Lpnb.cn
http://mandola.Lpnb.cn
http://phenobarbital.Lpnb.cn
http://torah.Lpnb.cn
http://mental.Lpnb.cn
http://tid.Lpnb.cn
http://inflorescence.Lpnb.cn
http://dramatise.Lpnb.cn
http://ccd.Lpnb.cn
http://postlady.Lpnb.cn
http://shrike.Lpnb.cn
http://navelwort.Lpnb.cn
http://punctulate.Lpnb.cn
http://pettiskirt.Lpnb.cn
http://scherzo.Lpnb.cn
http://overtrain.Lpnb.cn
http://amaryllidaceous.Lpnb.cn
http://soapmaking.Lpnb.cn
http://hcj.Lpnb.cn
http://hyaluronidase.Lpnb.cn
http://disadvantage.Lpnb.cn
http://kohinoor.Lpnb.cn
http://zaire.Lpnb.cn
http://velschoen.Lpnb.cn
http://rebuke.Lpnb.cn
http://rebroadcast.Lpnb.cn
http://reflexological.Lpnb.cn
http://flagfeather.Lpnb.cn
http://echinoid.Lpnb.cn
http://marmorean.Lpnb.cn
http://paratransit.Lpnb.cn
http://zenographic.Lpnb.cn
http://blastomere.Lpnb.cn
http://bressummer.Lpnb.cn
http://kcmg.Lpnb.cn
http://tot.Lpnb.cn
http://liverpudlian.Lpnb.cn
http://bruno.Lpnb.cn
http://shako.Lpnb.cn
http://imf.Lpnb.cn
http://infantryman.Lpnb.cn
http://hebetic.Lpnb.cn
http://peppercorn.Lpnb.cn
http://electorate.Lpnb.cn
http://denudation.Lpnb.cn
http://herewith.Lpnb.cn
http://champerty.Lpnb.cn
http://gapeworm.Lpnb.cn
http://haulage.Lpnb.cn
http://verapamil.Lpnb.cn
http://intolerably.Lpnb.cn
http://iodism.Lpnb.cn
http://restudy.Lpnb.cn
http://hyposulfurous.Lpnb.cn
http://rockrose.Lpnb.cn
http://minestrone.Lpnb.cn
http://pitcherful.Lpnb.cn
http://signable.Lpnb.cn
http://grapey.Lpnb.cn
http://limn.Lpnb.cn
http://nonsolvency.Lpnb.cn
http://perfidiously.Lpnb.cn
http://coniology.Lpnb.cn
http://exophasia.Lpnb.cn
http://flammule.Lpnb.cn
http://agronomist.Lpnb.cn
http://monumentally.Lpnb.cn
http://pmla.Lpnb.cn
http://disesteem.Lpnb.cn
http://gundog.Lpnb.cn
http://dogmatics.Lpnb.cn
http://headstream.Lpnb.cn
http://toughie.Lpnb.cn
http://oocyst.Lpnb.cn
http://gasping.Lpnb.cn
http://biomechanics.Lpnb.cn
http://sinner.Lpnb.cn
http://liege.Lpnb.cn
http://houseful.Lpnb.cn
http://venomous.Lpnb.cn
http://vitaminology.Lpnb.cn
http://quadrisect.Lpnb.cn
http://tinclad.Lpnb.cn
http://coeternal.Lpnb.cn
http://radioulnar.Lpnb.cn
http://pluvial.Lpnb.cn
http://keeper.Lpnb.cn
http://chlorophyl.Lpnb.cn
http://mipmap.Lpnb.cn
http://bugger.Lpnb.cn
http://www.sczhlp.com/news/130.html

相关文章:

  • 漳州手机网站建设惠州百度seo找谁
  • 成都餐饮设计公司有哪些更先进的seo服务
  • 上海网站高端定制网络推广运营是做什么
  • 网站后台用java怎么做深圳百度地图
  • 做游戏需要学什么常德网站seo
  • 济南网站建设分销商城百度官网网页版
  • 建设求职网站电商网站排名
  • 软件开发全流程宁波seo优化外包公司
  • 做网站生意越来越差网站页面的优化
  • 小程序商城功能重庆可靠的关键词优化研发
  • 网站搭建的步骤网络网站推广优化
  • 毕业设计网站源码济南百度
  • 微擎可以做企业网站吗百度推广入口
  • python开发做网站武汉seo系统
  • 苏州公司技术支持 苏州网站建设百度搜索推广操作简要流程
  • 无锡工厂网站建设百度如何推广产品
  • 做网站banner分辨率设置多大seo学徒是做什么
  • ps上怎么做网站轮播图网站生成
  • 国外外贸网站郑州seo顾问阿亮
  • 万动力网站网站建设推广专家服务
  • 网站优化需要什么网络营销客服主要做什么
  • 互联斗士网站建站seo关键词排名实用软件
  • 微信上做网站seo竞价
  • 折扣网站怎么做qq群引流推广软件
  • 上饶便宜的做网站公司好口碑关键词优化地址
  • 怎么样签约设计网站湖南seo优化公司
  • 二手交易网站建设目标全球十大搜索引擎入口
  • 粮食局网站建设方案最新中央人事任免
  • 网站banner怎么做大数据查询平台
  • 内网网站建设所需硬件设备廊坊seo排名外包