当前位置: 首页 > news >正文

南充网站建设略奥网络阳江市问政平台留言

南充网站建设略奥网络,阳江市问政平台留言,济南网站设计公司推荐,图片生成器免费一、 背景 具体的中间件私有化背景在上文 SaaS 电商设计 (二) 私有化部署-缓存中间件适配 已有做相关介绍.这里具体讨论的场景是通过解析mysql binlog 来实现mysql到其他数据源的同步.具体比如:在电商的解决方案业务流中经常有 ES 的使用场景,用以解决一些复杂的查询和搜索商品…一、 背景 具体的中间件私有化背景在上文 SaaS 电商设计 (二) 私有化部署-缓存中间件适配 已有做相关介绍.这里具体讨论的场景是通过解析mysql binlog 来实现mysql到其他数据源的同步.具体比如:在电商的解决方案业务流中经常有 ES 的使用场景,用以解决一些复杂的查询和搜索商品的支持以及某些数据分析的场景.那就需要做到 mysql 数据库到 ES 的数据同步.在支持 mysql 到 ES 数据同步的过程中,常用的技术方案有这样几种. 二、 设计主体 2.1 N种方案 方案1: 业务代码成功应答后操作目标数据源写入(本文用ES举例) 如上第一种方案在业务代码操作数据库, 异步执行 ES 数据同步写入.如:完成商品后写入数据,异步线程开启执行写入 ES 索引录入. 方案2:业务代码成功应答后,发送MQ,利用MQ来保证 ES 写入的最终一致 在第一种的方案中写入 ES 步骤中可能出现ES 写入失败case. 在方案一基础上为了保证可靠性引入 MQ ,保证在ES操作时出现异常抖动能够通过重试来保证数据的最终一致性.在业务代码中实际操作数据库后发送 MQ ,这边消费 MQ 执行 ES 数据同步.如:完成商品写入数据,发送消息 MQ , MQ consumer 消费写入 ES 索引录入. 方案 3.通过binlog 来实现数据库监听,保证数据同步脱离业务代码控制 在大部分的场景下方案二完全能够满足业务诉求. 这样的一个方案在具体实施过程中存在两个点. 业务开发的同时需要同步关心数据的同步 在某种意义上来说,数据的同步并不是业务代码需要去关心的.业务代码永远关心的只是自身的逻辑实现,关注的是产品迭代过程中如何保证业务模型的可持续演进和领域资产沉淀.基于这个原则我的理解是需要把数据的同步从业务代码里进行剥离的. 散落在各个业务代码角落的维护成本 方案二的场景在很长时间的迭代过程中很可能就将出现这样的情况.商品添加进行商品的 ES 数据更新,门店添加进行门店的 ES 数据更新.诸如此类,长期迭代将得到大量的脚本代码,随着开发人员的更替,不断的迭代和开发.最终可能变成一座岌岌可危的高楼,开发人员小心翼翼的在原来的代码上继续裹上自己这版的裹脚布.维护性和成本指数上升. 基于此我们尝试着借助 binlog 的这样一个工具来完善第二个方案适应更多索引更新,更加复杂的同步场景.首先 binlog 的形式能够通过仅监听数据库的 binlog 的消息来做到不同数据表数据更新的收口,我们可以在消费消息的入口来定义一个处理的接口,通过表名来进行不同表消费逻辑的实现.很简单就可以做到.一石二鸟做到数据处理的收口以及逻辑代码关于数据同步逻辑的抽离. 方案4:完美终极方案(抽离技术细节的实现,做到binlog解析的接口和数据同步的接口化.) 对于第三种方式来说的话,接下来引入了第二个讨论的点. 私有化支持 就是在去做一些 SaaS 场景的私有化时,咱们再去做数据同步的时候不得不依赖 binlog ,那对于 binlog 的解析常见的工具也比较多.常见的开源的 canal ,各大厂里也有相应的工具,东厂的 DRC (前身binlake),福包厂的精卫.基于此在项目中不得不在这些不同的实现之上完成抽象.这样我们就能够在既支持到内部项目的数据监听,也能够完成项目实施私有化的场景部署.同步目标逻辑的不同支持 在上文中我们提到的最多也就是关于 ES 数据的同步,那其实在实际的开发场景可能面临的更多,比如在数据库更新后的准实时缓存刷新,数据库写入商品成功后关于商品新建成功的三方消息同步.等等.同样我们在这个基础实现了一个接口,用来方便具体的使用方来进行具体消息处理.完美. 2.2 方案4 coding落地 2.2.1 类图 核心步骤: step1:抽象MessageListener 实现 BinlogListener 完成 binlog 中间件解析发送的 MQ msg 得到反序列化的表数据.内含本次选取的反序列化类型.如:是canal 还是 DRC . step2:抽象 BinlogClientAdapter 完成反序列化和处理msg接口定义.具体可以有 CanalBinlogAdapter,DrcBinlogClientAdapter实现. step3:抽象BinlogDataHandler 完成具体表具体操作**(insert,delete,update,query)** 接口定义.具体在接入方进行实现MultiCloundBinLogDataHandler,这样在进行注入时得到具体的实现类,进行具体的实现操作.如:CategoryBinlogDataHandler. 2.2.2 核心实现 BinlogHandlerAdapter 完成 binlog client 接口定义. package com.baixiu.middleware.binlog.adapter;import com.baixiu.middleware.binlog.model.BinlogData; import com.baixiu.middleware.mq.model.CommonMessage;/*** binlog 适配器接口* 适配中间件list:canal,jingwei,drc等。* function1:完成不同中间件解析能力* function2:完成不同中间件handlerMsg能力* author baixiu* date 2023年12月11日*/ public interface BinlogHandlerAdapter {/*** 反序列MQMsg To binlogMsg* param mqMsg mqMsg* return*/BinlogData deserializationMQMsg(CommonMessage mqMsg);/*** 反序列MQMsg To binlogMsg* param mqMsg mqMsg* return*/void handleBinLogData(BinlogData binLogData) throws Exception;} CanalBinlogHandlerAdapter 完成 canal 解析 package com.baixiu.middleware.binlog.adapter;import com.alibaba.fastjson.JSON; import com.alibaba.otter.canal.protocol.FlatMessage; import com.baixiu.middleware.binlog.consts.CommonConsts; import com.baixiu.middleware.binlog.core.AbstractBinlogHandler; import com.baixiu.middleware.binlog.core.BinlogTableHandlerRouter; import com.baixiu.middleware.binlog.enums.CommonRowTypeEnum; import com.baixiu.middleware.binlog.model.BinlogData; import com.baixiu.middleware.binlog.model.BinlogDataToDiffModel; import com.baixiu.middleware.binlog.model.BinlogTableRowDiffModel; import com.baixiu.middleware.mq.model.CommonMessage; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ArrayUtils; import org.springframework.beans.factory.annotation.Autowired; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;/*** canal binlog handler adapter* 当property配置的clientTypecanal时进行注入bean* canal client 用以解析 mq -starter 发送过来的消费消息* author baixiu* date 创建时间 2023/12/11 8:39 PM*/ Slf4j public class CanalBinlogHandlerAdapter implements BinlogHandlerAdapter{Autowiredprivate BinlogTableHandlerRouter binlogTableHandlerRouter;Overridepublic BinlogData deserializationMQMsg(CommonMessage mqMsg) {FlatMessage flatMessage JSON.parseObject(mqMsg.getText(),FlatMessage.class);BinlogData binLogDatanew BinlogData ();if(flatMessage!null){binLogData.setBinlogDataObject(flatMessage);}return binLogData;}Overridepublic void handleBinLogData(BinlogData binLogData) throws Exception {if(binLogDatanull || binLogData.getBinlogDataObject()null){return;}FlatMessage flatMessage (FlatMessage) binLogData.getBinlogDataObject ();ListMapString, String rowDatas flatMessage.getData();ListMapString, String oldDatas flatMessage.getOld();String tableName flatMessage.getTable();AbstractBinlogHandler handler binlogTableHandlerRouter.ALL_TABLE_HANDLERS.get(tableName);for (int i 0; i rowDatas.size(); i) {MapString, String rowData rowDatas.get(i);MapString, String oldData new HashMap(i,0.75f);if (oldDatas ! null oldDatas.size() rowDatas.size()) {oldData oldDatas.get(i);}MapString, String fieldsMaps Maps.newHashMapWithExpectedSize(20);BinlogDataToDiffModel binlogDataToDiffModel transRowDataToAllBinlogData(handler, rowData, oldData, fieldsMaps, flatMessage.getType());switch (binlogDataToDiffModel.getCommonRowTypeEnum()) {case INSERT:log.info(Canal.handleMessage.binlogTransConfigToMap.INSERT.{}, JSON.toJSONString(binlogDataToDiffModel.getCommonRowTypeEnum()));handler.insert(binlogDataToDiffModel.getAllFieldMaps(),binlogDataToDiffModel.getBinlogTableRowDiffModels());break;case UPDATE:log.info(Canal.handleMessage.binlogTransConfigToMap.UPDATE.{},JSON.toJSONString(binlogDataToDiffModel.getCommonRowTypeEnum()));handler.update(binlogDataToDiffModel.getAllFieldMaps(),binlogDataToDiffModel.getBinlogTableRowDiffModels());break;case DELETE:MapString, String delMap getBeforeColumnsFromBinlogData(handler, oldData);log.info(Canal.handleMessage.binlogTransConfigToMap.DELETE);handler.delete(delMap);break;default:log.info(CanalBinlogClientAdapter.handleMessage.binlogTransConfigToMap.default.{},JSON.toJSONString(binlogDataToDiffModel.getCommonRowTypeEnum()));break;}}}public static BinlogDataToDiffModel transRowDataToAllBinlogData(AbstractBinlogHandler binlogData, MapString, String afterColumns, MapString, String beforeColumns, MapString, String fieldsMap, String type) {try {String[] updateFields binlogData.getUpdateFields();String[] keyFields binlogData.getFields();ListBinlogTableRowDiffModel changeList new ArrayList ();for (String key : afterColumns.keySet()) {if (keyFields.length 1 ArrayUtils.contains(keyFields, CommonConsts.BINLOG_ALL_FIELDS)) {fieldsMap.put(key, afterColumns.get(key));} else if (ArrayUtils.contains(keyFields, key)) {fieldsMap.put(key, afterColumns.get(key));}if (beforeColumns ! null !beforeColumns.isEmpty() beforeColumns.get(key) ! null) {BinlogTableRowDiffModel bean new BinlogTableRowDiffModel();bean.setField(key);bean.setAfter(afterColumns.get(key));bean.setBefore(beforeColumns.get(key));if (updateFields.length 1 ArrayUtils.contains(updateFields,CommonConsts.BINLOG_ALL_FIELDS)) {changeList.add(bean);} else if (ArrayUtils.contains(updateFields, key)) {changeList.add(bean);}}}BinlogDataToDiffModel data new BinlogDataToDiffModel(changeList, fieldsMap, CommonRowTypeEnum.transType(type));log.info(transRowDataToAllBinlogData.changeList:{}.fieldsMap{}.data{},JSON.toJSONString(changeList), JSON.toJSONString(fieldsMap), JSON.toJSONString(data));return data;} catch (Exception e) {log.error(handleMessage.transRowDataToAllBinlogData.handleMessage.error.{}, JSON.toJSONString(binlogData), e);}return null;}/*** 删除操作* 不同的表需要从binlogData中获取的信息不同这里抽取** return*/private MapString, String getBeforeColumnsFromBinlogData(AbstractBinlogHandler binlogData, MapString, String beforeColumns) {MapString, String keys new HashMap();if (beforeColumns ! null !beforeColumns.isEmpty()) {String[] keyFields binlogData.getFields();for (String key : beforeColumns.keySet()) {// 找出关心的字段值if (ArrayUtils.contains(keyFields, key)) {keys.put(key, beforeColumns.get(key));}}}return keys;} } AbstractBinlogHandler 抽象binloghandler 处理类. package com.baixiu.middleware.binlog.core;import com.baixiu.middleware.binlog.model.BinlogTableRowDiffModel; import java.util.List; import java.util.Map;/*** author baixiu* date 创建时间 2023/12/12 11:31 AM*/ public interface AbstractBinlogHandler {/*** 需要关心的字段。实现后将仅实现的字段值放置于 fieldValues 中* return 监控字段*/String[] getFields();/*** 需要关心的变更字段。实现后将仅实现的字段值放置于 changeList 中* return 更新字段*/String[] getUpdateFields();/*** 新增时触发* param fieldValues 唯一字段用于确定一条数据* param changeList 字段的值发生变化的* throws Exception 业务exception*/void insert(MapString, String fieldValues, ListBinlogTableRowDiffModel changeList) throws Exception;/*** 数据修改时触发* param fieldValues 实现了getFields接口里得到的字段里的字段以及字段的值* param changeList 字段的值发生变化的* throws Exception 业务exception*/void update(MapString, String fieldValues, ListBinlogTableRowDiffModel changeList) throws Exception;/*** 删除时触发* param fieldValues 唯一字段用于确定一条数据* throws Exception 业务exception*/void delete(MapString, String fieldValues) throws Exception;}
http://www.sczhlp.com/news/195884/

相关文章:

  • 做网站一般收取多少钱wordpress编辑角色无法上传图片
  • 制作网站的公司2017优秀网站设计
  • 网站服务器租用怎么购买免费软件大全app下载
  • 邯郸做网络推广的公司南京seo网站管理
  • Mac怎么搭建网站开发环境哈尔滨企业网站建站推荐
  • 莆田网站自助建站个人网站备注模板
  • 网页设计与网站建设ppt仿珠宝首饰网站开发
  • 住房和城乡建设部网站规范答疑设计制作我们的小船
  • 企业网站备案网址泰安seo
  • html免费网站模板下载网站开发专业定制
  • 互动力 网站建设做一个平台网站大概多少钱
  • 内网穿透进阶:让 frpc 只代理「真正在线」的端口
  • 规则逻辑与人文逻辑的统一:AI元人文构想的演进之路
  • 2023 ICPC Jinan
  • 今日小雨
  • 好的网站怎么设计师jsp网站开发详解 pdf
  • 万网建站流程分析某个网站建设
  • 可做市值曲线的网站googlechrome浏览器
  • 建设网站需要什么资料网站建设用阿里还是华为云
  • 企业建设网站优势软文自动发布软件
  • 哪有做网站推广郑州汉狮公司做网站
  • 兰州网站建设设计848给我做一下88网站
  • 网站建设要多少钱app阿坝网站制作
  • 漯河网站建设电话泰国做彩票网站
  • 岳阳网站设计改版惠州市建设厅网站
  • 电影网站制作教程好不好网站模板 协会
  • 公司建网站搭建服务器企业网站多少钱一个
  • 低代码建站重庆网站建设价格费用
  • 网站内容优化方法有哪些南宁7天优化网络科技公司
  • 汶上网站开发福建祥盛建设有限公司网站