当前位置: 首页 > news >正文

武清网站建设做公司官网步骤

武清网站建设,做公司官网步骤,php如何自学做网站,新开传奇手游新服网#x1f34a; Java学习#xff1a;Java从入门到精通总结 #x1f34a; 深入浅出RocketMQ设计思想#xff1a;深入浅出RocketMQ设计思想 #x1f34a; 绝对不一样的职场干货#xff1a;大厂最佳实践经验指南 #x1f4c6; 最近更新#xff1a;2023年2月10日 #x… Java学习Java从入门到精通总结 深入浅出RocketMQ设计思想深入浅出RocketMQ设计思想 绝对不一样的职场干货大厂最佳实践经验指南 最近更新2023年2月10日 个人简介通信工程本硕 for NJU、Java程序员。做过科研paper发过专利优秀的程序员不应该只是CRUD 点赞 收藏 ⭐留言 都是我最大的动力 文章目录消息处理流程消息存储目录结构SendMessage源码processRequestsendMessage消息处理流程 SendMessageProcessor处理类接收到消息DefaultMessageStore实例将消息变成IndexFile、ConsumeQueue和CommitLog对象上述对象转成内存映射对象后进行落盘 消息存储目录结构 RocketMQ的文件存储在store文件夹里里面包含commitlogconfigconsumerqueueindex文件夹和abortcheckpoint两个文件。 文件夹 commitlog存储写入到commitLog的消息内容config存储配置信息consumerqueue存储消费者队列信息index存储消息队列的索引文件 文件 abort标记RocketMQ是否正常退出checkpoint存储commitlogconfigconsumerqueueindex文件的刷盘时间 ├── abort ├── checkpoint ├── commitlog │ ├── 00000000000000000000 │ ├── 00000000001073741824 ├── config │ ├── consumerFilter.json │ ├── consumerOffset.json │ ├── delayOffset.json │ ├── subscriptionGroup.json │ ├── topics.json ├── consumequeue │ ├── TopicA │ ├── TopicB │ ├── TopicC ├── index │ ├── 00000000000000000000 │ ├── 00000000001073741824RocketMQ内有专门对应磁盘上存储文件的封装类 CommitLog对应commitlog文件ConsumeQueue对应consumerqueue文件IndexFile对应index文件MappedFile直接内存映射业务的封装类通过操作该类实例可以把消息写入内存映射缓冲区或将消息刷盘MappedFileQueue连续物理存储的封装类可以通过offset快速定位消息所在的MappedFileMappedFileBuff堆外内存 SendMessage源码 SendMessageProcessor是接收消息的一个钩子函数该类的对象将会处理发送到Broker的消息 processRequest 主要流程已在代码片注释中给出 public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {private ListConsumeMessageHook consumeMessageHookList;public SendMessageProcessor(final BrokerController brokerController) {super(brokerController);}public CompletableFutureRemotingCommand asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 解析请求体SendMessageRequestHeader requestHeader parseRequestHeader(request);if (requestHeader null) {return CompletableFuture.completedFuture(null);}// 建立消息上下文mqtraceContext buildMsgContext(ctx, requestHeader);// 发送消息前的逻辑this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}/*** ......**/}根据源码可以看出首先解析发送消息的请求SendMessageRequestHeader然后调用asyncSend(Batch)Message方法进行消息的发送。 该类提供了发送或接收消息的钩子函数如果发送消息则调用sendMessage方法如果是接收消息则调用pullMessage拉取消息的方法。 sendMessage 消息发送给Broker服务器时调用的是sendMessage方法接收并存储消息主要流程已在代码片注释中给出 private CompletableFutureRemotingCommand asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 初始化响应final RemotingCommand response preSend(ctx, request, requestHeader);// 构建响应头final SendMessageResponseHeader responseHeader (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() ! -1) {return CompletableFuture.completedFuture(response);}final byte[] body request.getBody();int queueIdInt requestHeader.getQueueId();TopicConfig topicConfig this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt 0) {queueIdInt randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}// 设置消息体数据msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());MapString, String origProps MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() null ? 0 : requestHeader.getReconsumeTimes());// 获取Broker集群名称String clusterName this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);// 同步等待消息存储成功if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {String waitStoreMsgOKValue origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);} else {// 异步msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));}CompletableFuturePutMessageResult putMessageResult null;String transFlag origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (transFlag ! null Boolean.parseBoolean(transFlag)) {// Broker拒绝接收消息if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(the broker[ this.brokerController.getBrokerConfig().getBrokerIP1() ] sending transaction message is forbidden);return CompletableFuture.completedFuture(response);}putMessageResult this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {putMessageResult this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}
http://www.sczhlp.com/news/189352/

相关文章:

  • 做网站选哪家360元网站建设
  • 微网站开发微网站建设域名注册成功怎么做网站
  • 开发型网站报价方法ueditor 插件 wordpress
  • 给人家做网站服务器自己搭吗网站查询站长工具
  • 现在外国有哪个网站可以做卖东西宁波做网站哪家公司好
  • 网站兼职做计划赚小钱宁波优化网站排名价格表
  • 使用 私有云 做视频网站wordpress运行死慢
  • .net做网站的吗ppt模板免费背景
  • 网站制作评价标准网站建设毕业设计怎么做
  • 做网站费用多少钱企业邮箱登录入口和注册申请
  • 深入解析:2025年真实手机牧场CC攻击破防游戏盾?四维防御体系全面升级!
  • 机器人技术在现实世界中的挑战与创新
  • Motorola和Inter的区别
  • ROS2之TF
  • 美食网站建设设计方案合肥网站建设报价
  • 苏州网推广网站建设网站建设建设意见
  • 最超值的郑州网站建设12306网站开发时间
  • 云智网站建设公司微信小游戏怎么开发
  • wordpress网站之间互联北京建设主管部门官方网站
  • 学校网站建设有限公司免费cad图纸下载网站
  • VIP视频自助网站建设网站平台方案设计
  • 网站建设客户需求表 文库空中乘务专业简历制作
  • 网站开发要学什么北京app软件开发
  • 全屏网站网址可以做代销的网站都有哪些
  • 国内做交互网站wordpress时间中文
  • 西部虚拟主机网站后台不能访问dw手机网站建设
  • 网站建设 信息化程度内部建设网站需要什么条件
  • 做封面图什么网站wordpress 浏览缓慢
  • 青岛个人接网站建设推荐邯郸网站建设
  • 厦门哪里有做网站微信怎么创建自己的小程序