当前位置: 首页 > news >正文

Netty客户端

// 原包名保持不变
package pro.nbbt.xulian.business.rtk.proxy.socket;

import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.dubbo.config.annotation.Service;
import io.grpc.netty.shaded.io.netty.handler.timeout.IdleState;
import io.grpc.netty.shaded.io.netty.handler.timeout.IdleStateEvent;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import pro.nbbt.xulian.business.mower.api.dto.rtk.SupplierAccountDetail;
import pro.nbbt.xulian.business.mower.api.dubbo.RtkService;
import pro.nbbt.xulian.business.mower.api.entity.rtk.FenceDeviceCache;
import pro.nbbt.xulian.business.mower.api.rocketmq.RocketMqConstants;
import pro.nbbt.xulian.business.mower.api.rocketmq.RocketmqRtkMessage;
import pro.nbbt.xulian.business.rtk.proxy.utils.mqtt.RtkCommonMqttUtil;
import pro.nbbt.xulian.common.core.util.SpringContextHolder;
import pro.nbbt.xulian.common.core.util.tool.ToolUtil;
import pro.nbbt.xulian.common.util.redis.RedisUtils;
import pro.nbbt.xulian.common.util.redis.RtkRedisKeyCons;

import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;

@Service(group = "prod", version = "1.0.0")
@Slf4j
public class DubboRtkClientManager implements RtkService {

@Resource
RedisUtils redisUtils;private final Map<Long, Channel> clientMap = new ConcurrentHashMap<>();
private final NioEventLoopGroup group = new NioEventLoopGroup();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
private final Set<Long> localLockSet = ConcurrentHashMap.newKeySet();
private final ExecutorService executor = Executors.newFixedThreadPool(8);@Value("${spring.cloud.nacos.discovery.ip:}")
private String nacosIp;public DubboRtkClientManager() {scheduler.scheduleAtFixedRate(this::checkInactiveConnections, 30, 30, TimeUnit.SECONDS);
}private void checkInactiveConnections() {Set<Long> groupIds = clientMap.keySet();log.info("定时任务本次检测连接数量:{}", groupIds.size());for (Long groupId : groupIds) {executor.submit(() -> {try {String key = RtkRedisKeyCons.RTK_CONNECT_ACTIVE + groupId  +  "::" + nacosIp;Object val = redisUtils.get(key);if (val == null) {log.warn("[{}] Redis未检测到活跃心跳,准备断开连接", groupId);disconnect(groupId);}} catch (Exception e) {log.error("[{}] 定时检查连接异常: {}", groupId, e.getMessage(), e);}});}
}@Override
public void connectAndSend(Long groupId, String host, int port, String mountPoint, String username, String password, String gga) {String key = RtkRedisKeyCons.RTK_CONNECT_ACTIVE + groupId  +  "::" + nacosIp;redisUtils.set(key, 1, 80, TimeUnit.SECONDS);if (clientMap.containsKey(groupId)) {if (ToolUtil.isNotEmpty(gga)) {sendGga(groupId, gga);}return;}if (!localLockSet.add(groupId)) {log.info("观察锁本地锁状态,加锁失败,开始return:{}",groupId);return;}log.info("观察锁本地锁状态,加锁成功,开始建立建立:{}",groupId);Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new RtkClientHandler(groupId, mountPoint, username, password));ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(0, 1, 0, TimeUnit.MINUTES));ch.pipeline().addLast("closeIdleStateHandler", new ChannelInboundHandlerAdapter() {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE || event.state() == IdleState.WRITER_IDLE) {RocketMQTemplate rocketMQTemplate = SpringContextHolder.getBean(RocketMQTemplate.class);RocketmqRtkMessage rocketmqRtkMessage = RocketmqRtkMessage.message(groupId, "No activity for 3 minutes, closing the connection.");rocketMQTemplate.convertAndSend(RocketMqConstants.topic.MQTT_RTK_ERR, rocketmqRtkMessage);ctx.close();}}}});}});bootstrap.connect(new InetSocketAddress(host, port)).addListener((ChannelFutureListener) future -> {try {log.info("开始存入通道, 当前存在通道数:{}",clientMap.keySet().size());if (future.isSuccess()) {log.info("对应的通道:{} 状态:{}", groupId, future.channel().isActive());Channel oldChannel = clientMap.get(groupId);if (oldChannel != null) {log.info("通道已经存在:{}", groupId);if (oldChannel.isActive()) {log.info("旧通道是活跃的关闭新通道:{}", groupId);future.channel().close();} else {log.info("旧通道非活跃的,关闭旧通道,添加新通道:{}", groupId);oldChannel.close();clientMap.put(groupId, future.channel());}} else {clientMap.put(groupId, future.channel());log.info("通道建立成功 [{}] 当前连接数: {},对应的账号:{}", groupId, clientMap.size(), username);}log.info("通道:{}对应的gga信息:{}", groupId, gga);sendGga(groupId, gga);} else {RocketMQTemplate rocketMQTemplate = SpringContextHolder.getBean(RocketMQTemplate.class);RocketmqRtkMessage rocketmqRtkMessage = RocketmqRtkMessage.message(groupId, "[" + groupId + "] Connection failed: " + future.cause().getMessage());rocketMQTemplate.convertAndSend(RocketMqConstants.topic.MQTT_RTK_ERR, rocketmqRtkMessage);return;}}finally {localLockSet.remove(groupId);log.info("观察锁本地锁状态,释放锁成功:{}",groupId);}});/*if (ToolUtil.isNotEmpty(gga)) {try {Thread.sleep(500);} catch (InterruptedException ignored) {}sendGga(groupId, gga);}*/
}public void sendGga(Long groupId, String gga) {Channel channel = clientMap.get(groupId);if (channel != null && channel.isActive()) {String msg = gga.endsWith("\r\n") ? gga : gga + "\r\n";ByteBuf buf = channel.alloc().buffer();buf.writeBytes(msg.getBytes(StandardCharsets.UTF_8));channel.writeAndFlush(buf);log.info("[" + groupId + "] Sent GGA: " + msg.trim());} else {disconnect(groupId);log.error("[" + groupId + "] 通道无效,无法发送GGA. channel={}, active={}", channel, (channel != null && channel.isActive()));}
}@Override
public void disconnect(Long groupId) {Channel channel = clientMap.remove(groupId);if (channel != null) {channel.close().addListener((ChannelFutureListener) future -> {if (future.isSuccess()) {log.info("[{}] 通道关闭成功,当前连接数:{}", groupId, clientMap.size());} else {log.warn("[{}] 通道关闭失败:{}", groupId, future.cause().getMessage());}});}
}public void shutdown() {clientMap.values().forEach(Channel::close);group.shutdownGracefully();scheduler.shutdown();
}private static class RtkClientHandler extends SimpleChannelInboundHandler<ByteBuf> {private final Long groupId;private final String mountPoint;private final String username;private final String password;public RtkClientHandler(Long groupId, String mountPoint, String username, String password) {this.groupId = groupId;this.mountPoint = mountPoint;this.username = username;this.password = password;}@Overridepublic void channelActive(ChannelHandlerContext ctx) {String auth = Base64.getEncoder().encodeToString((username + ":" + password).getBytes(StandardCharsets.UTF_8));String request = "GET /" + mountPoint + " HTTP/1.1\r\n" +"Host: localhost\r\n" +"User-Agent: NettyRtkClient\r\n" +"Authorization: Basic " + auth + "\r\n\r\n";ctx.writeAndFlush(ctx.alloc().buffer().writeBytes(request.getBytes(StandardCharsets.UTF_8)));}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {byte[] bytes = new byte[msg.readableBytes()];msg.readBytes(bytes);log.info("接收到通道消息:{}",groupId);RedisUtils redisUtils = SpringUtil.getBean(RedisUtils.class);Map<String, Object> entries = redisUtils.hmget(RtkRedisKeyCons.FENCE_DEVICE + groupId);for (Object value : entries.values()) {if (value instanceof FenceDeviceCache) {FenceDeviceCache fenceDeviceCache = (FenceDeviceCache) value;if (redisUtils.setNx(RtkRedisKeyCons.FREQUENCY + fenceDeviceCache.getSn(), fenceDeviceCache.getFrequency())) {RtkCommonMqttUtil.sendRtk(bytes, fenceDeviceCache.getSn());}}}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("[" + groupId + "] Error: {}", cause.getMessage(), cause);ctx.close();}
}

}

package pro.nbbt.xulian.business.rtk.rocketmq;

import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import pro.nbbt.xulian.business.common.util.common.CommonUtils;
import pro.nbbt.xulian.business.common.util.toolkit.ThreadPoolUtils;
import pro.nbbt.xulian.business.mower.api.dto.rtk.SupplierAccountDetail;
import pro.nbbt.xulian.business.mower.api.dubbo.RtkService;
import pro.nbbt.xulian.business.mower.api.entity.rtk.Fence;
import pro.nbbt.xulian.business.mower.api.entity.rtk.FenceDevice;
import pro.nbbt.xulian.business.mower.api.entity.rtk.FenceDeviceCache;
import pro.nbbt.xulian.business.mower.api.entity.rtk.FenceDeviceEntity;
import pro.nbbt.xulian.business.mower.api.rocketmq.CommonMqMessage;
import pro.nbbt.xulian.business.mower.api.rocketmq.CommonStringMessage;
import pro.nbbt.xulian.business.mower.api.rocketmq.RocketMqConstants;
import pro.nbbt.xulian.business.mower.api.wireless.entity.common.WirelessDevice;
import pro.nbbt.xulian.business.mower.api.wireless.entity.common.WirelessDeviceSetting;
import pro.nbbt.xulian.business.rtk.service.IFenceDeviceService;
import pro.nbbt.xulian.business.rtk.service.IFenceService;
import pro.nbbt.xulian.business.rtk.service.ISupplierAccountService;
import pro.nbbt.xulian.business.rtk.utils.mqtt.RtkCommonMqttUtil;
import pro.nbbt.xulian.common.core.util.tool.ToolUtil;
import pro.nbbt.xulian.common.util.redis.RedisKeyCons;
import pro.nbbt.xulian.common.util.redis.RedisUtils;
import pro.nbbt.xulian.common.util.redis.RtkRedisKeyCons;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.ExecutorService;

@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMqConstants.topic.MQTT_RTK_SET, consumerGroup = RocketMqConstants.ROCKET_GROUP + RocketMqConstants.topic.MQTT_RTK_SET)
public class RtkSetMessageListener implements RocketMQListener {

private static final ExecutorService executorService = ThreadPoolUtils.newCpuCachedThreadPool();@Resource
RedisUtils redisUtils;@Reference(version = "1.0.0",group = "prod",injvm = false,loadbalance = "consistenthash",timeout = 2000,parameters = {"hash.arguments", "0"}
)
private RtkService rtkService;@Resource
IFenceService fenceService;
@Resource
IFenceDeviceService fenceDeviceService;
@Resource
ISupplierAccountService supplierAccountService;@Override
public void onMessage(CommonMqMessage commonMqMessage) {executorService.execute(() -> {try {long currentTimeMillis1 = System.currentTimeMillis();String topic = commonMqMessage.getTopic();String content = commonMqMessage.getPayload();String sn = CommonUtils.subString(topic, "/rtk/", "/cmd");if (ToolUtil.isEmpty(sn)) {return;}JSONObject contentJson = JSONObject.parseObject(content);if (!redisUtils.setNx(RtkRedisKeyCons.RTK_REGISTER_REPEAT + sn, 5)) {log.error("5s内不允许重复注册处理:{}", sn);contentJson.put("code", -1);RtkCommonMqttUtil.sendRtk(contentJson.toJSONString(), sn);return;}if (!contentJson.containsKey("data")) {contentJson.put("code", -1);RtkCommonMqttUtil.sendRtk(contentJson.toJSONString(), sn);return;}long currentTimeMillis2 = System.currentTimeMillis();FenceDeviceEntity fenceDeviceEntity = fenceDeviceService.saveAndUpdate(sn, contentJson);SupplierAccountDetail supplierAccountDetail = supplierAccountService.selectSupplierAccount(fenceDeviceEntity.getFenceId(), fenceDeviceEntity.getIsNative(), sn);if (supplierAccountDetail == null) {log.error("注册未获取到账号:{}", sn);contentJson.put("code", -1);RtkCommonMqttUtil.sendRtk(contentJson.toJSONString(), sn);return;}long currentTimeMillis3 = System.currentTimeMillis();rtkService.connectAndSend(fenceDeviceEntity.getFenceId(), supplierAccountDetail.getDomain(), supplierAccountDetail.getPort(), supplierAccountDetail.getMountPoint(),supplierAccountDetail.getUserName(), supplierAccountDetail.getPassword(), "");long currentTimeMillis4 = System.currentTimeMillis();log.debug("json处理,耗时:{}", currentTimeMillis2 - currentTimeMillis1);log.debug("注册和选账号,耗时:{}", currentTimeMillis3 - currentTimeMillis2);log.debug("调用dubbo,耗时:{}", currentTimeMillis4 - currentTimeMillis3);//执行成功后告知rtkcontentJson.put("code", 0);RtkCommonMqttUtil.sendRtk(contentJson.toJSONString(), sn);} catch (Exception e) {log.error("处理rtk注册消息异常 {}", e.getMessage(), e);e.printStackTrace();}});
}

}

http://www.sczhlp.com/news/1463/

相关文章:

  • IACheck助力智能家电安全检测报告的精准性
  • 认知无线电合作感知,合作下的检测和虚警概率关系以及最有门限选择
  • 谷歌插件沉浸式翻译翻译本地文件
  • 结构化数据自动生成文本技术解析
  • [Record] Ynoi2018Ynoi2019 大分块系列
  • AI 赋能的故障排除:技术趋势与实践
  • vim E575: viminfo: Illegal starting char in line 的解决方案
  • 剑指offer-17、树的⼦结构
  • 2025年:是时候重新认识System.Text.Json了
  • 阿萨QSDFG - kkksc03
  • HTML学习地址 - kkksc03
  • 我天,IntelliJ IDEA 要免费使用了?
  • Java入门:解释型和解释型
  • 【数据库基石】聚簇索引 vs 非聚簇索引:结构图解、性能差异与最佳实践
  • VMware ESXi 8.0U3g macOS Unlocker OEM BIOS 2.7 集成网卡驱动和 NVMe 驱动 (集成驱动版)
  • 搭建imx6ull环境--tftp加载镜像,nfs挂载根文件系统
  • VMware ESXi 8.0U3g macOS Unlocker OEM BIOS 2.7 标准版和厂商定制版
  • 大概是……北京游记?
  • 探索Docker容器化技术
  • 加密货币硬件钱包安全使用的10条黄金法则
  • Splunk Enterprise 10.0.0 (macOS, Linux, Windows) - 搜索、分析和可视化,数据全面洞察平台
  • 数据库查询通信开销降低97%的新方法
  • 开源智能体框架
  • 2025 WAIC世界人工智能大会 - 汽车智能/自动驾驶分会场大佬们都分享了些什么?
  • 面板级封装(PLP)2025年技术、市场和供应链全览
  • 砺算科技GPU背后的故事
  • Qt/C++开发监控GB28181系统/录像回放/切换播放进度立即跳转/支持8倍速播放/倍速和跳转进度无缝切换
  • 失业潮下,究竟谁在不停拿offer?(转发猎头文章)
  • 读用数据说服:如何设计、呈现和捍卫你的数据09读后总结与感想兼导读
  • webapi第二天