当前位置: 首页 > news >正文

如何在搜索中找到自己做的网站小蚁人网站建设

如何在搜索中找到自己做的网站,小蚁人网站建设,创网科技,网页制作三剑客是什么上篇文章canal 消费进度说到直接使用ClusterCanalConnector并发消费是有问题的,可以先用单点将canal事件发送到mq中,再由mq并发处理,另外mq还可以做到削峰的作用,让canal数据不至于阻塞。 使用队列,可以自己起一个单实…

上篇文章canal 消费进度说到直接使用ClusterCanalConnector并发消费是有问题的,可以先用单点将canal事件发送到mq中,再由mq并发处理,另外mq还可以做到削峰的作用,让canal数据不至于阻塞。

使用队列,可以自己起一个单实例服务使用ClusterCanalConnector将消息丢队列里,也可以直接使用canal server, canal server原生支持几种队列:Kafka, RocketMQ ,RabbitMQ, PulsarMQ, 下面了解一下canal sever具体的处理过程。

canal server将消息投递到mq中

在canal server中,如果检测到配置了mq, 就会启动线程来读取bin log事件,并投递到mq中:
CanalMQStarter

while (running && destinationRunning.get()) {Message message;if (getTimeout != null && getTimeout > 0) {message = canalServer.getWithoutAck(clientIdentity,getBatchSize,getTimeout.longValue(),TimeUnit.MILLISECONDS);} else {message = canalServer.getWithoutAck(clientIdentity, getBatchSize);}final long batchId = message.getId();int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();if (batchId != -1 && size != 0) {canalMQProducer.send(canalDestination, message, new Callback() {@Overridepublic void commit() {canalServer.ack(clientIdentity, batchId); // 提交确认}@Overridepublic void rollback() {canalServer.rollback(clientIdentity, batchId);}}); // 发送message到topic} else {try {Thread.sleep(100);} catch (InterruptedException e) {// ignore}}}

从代码可以看到,首先调用getWithoutAck从实例获取事件,然后调用canalMQProducer.send将消息投递到队列中,如果投递成功就执行ack,否则执行rollback, 因为投递消息到队列是非常快的操作,所以这就降低了阻塞的风险。

最终发送mq消息的代码如下(CanalRocketMQProducer):

    private void sendMessage(Message message, int partition) {//...SendResult sendResult = this.defaultMQProducer.send(message, (mqs, msg, arg) -> {if (partition >= mqs.size()) {return mqs.get(partition % mqs.size());} else {return mqs.get(partition);}}, null);//...}

这里有个分区的概念,对于RocketMQ来说就是队列选择,这关系到顺序消费。

业务代码使用RocketMQCanalConnector消费数据

    while (running) {try {connector.connect();connector.subscribe();while (running) {List<Message> messages = connector.getListWithoutAck(1000L, TimeUnit.MILLISECONDS); // 获取messagefor (Message message : messages) {long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {// try {// Thread.sleep(1000);// } catch (InterruptedException e) {// }} else {printSummary(message, batchId, size);printEntry(message.getEntries());// logger.info(message.toString());}}connector.ack(); // 提交确认}} catch (Exception e) {logger.error(e.getMessage(), e);}}connector.unsubscribe();// connector.stopRunning();
}

可以看到这和之前ClusterCanalConnector一样的处理方法,只是底层实现不一样,在subscribe的时候,调用了mq的subscribe:

    public synchronized void subscribe(String filter) throws CanalClientException {//...rocketMQConsumer.subscribe(this.topic, "*");rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {context.setAutoCommit(true);boolean isSuccess = process(messageExts);if (isSuccess) {return ConsumeOrderlyStatus.SUCCESS;} else {return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}});rocketMQConsumer.start();//...}

可以看到这里使用了MessageListenerOrderly来进行顺序消费, 使用process来处理消息

private boolean process(List<MessageExt> messageExts) {//...for (MessageExt messageExt : messageExts) {//...if (!flatMessage) {Message message = CanalMessageDeserializer.deserializer(data);messageList.add(message);} else {FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class);messageList.add(flatMessage);}ConsumerBatchMessage batchMessage;if (!flatMessage) {batchMessage = new ConsumerBatchMessage<Message>(messageList);} else {batchMessage = new ConsumerBatchMessage<FlatMessage>(messageList);}try {messageBlockingQueue.put(batchMessage);} catch (InterruptedException e) {logger.error("Put message to queue error", e);throw new RuntimeException(e);}boolean isCompleted;try {isCompleted = batchMessage.waitFinish(batchProcessTimeout);} catch (InterruptedException e) {logger.error("Interrupted when waiting messages to be finished.", e);throw new RuntimeException(e);}boolean isSuccess = batchMessage.isSuccess();return isCompleted && isSuccess;}

这里将数据放到了messageBlockingQueue中,然后等待消息执行完成, ConsumerBatchMessage内置了一个CountDownLatch, batchMessage.waitFinish会阻塞在这里。
客户端使用getFlatList/getFlatListWithoutAck取数据时,就是从messageBlockingQueue取出数据,调用ack时,会释放ConsumerBatchMessage中的CountDownLatch, 这样mq消费者就可以继续从队列中拿数据了。

    @Overridepublic List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {if (this.lastGetBatchMessage != null) {throw new CanalClientException("mq get/ack not support concurrent & async ack");}ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);//...}@Overridepublic void ack() throws CanalClientException {if (this.lastGetBatchMessage != null) {this.lastGetBatchMessage.ack();}//...}

对于MessageListenerOrderly来说,是一个消费线程对应一个mq队列的,从而实现多线程消费,而这里把不同mq队列的消息在messageBlockingQueue中排队,并且使用getListWithoutAck/ack也不支持并发,又变成了单线程模式,这可能对性能造成影响,建议生产环境对性能有要求时,采用自己写代码来实现mq的消费。

配置

mq相关参数说明

http://www.sczhlp.com/news/63842/

相关文章:

  • 企业公司网站模版如何申请自己的网站空间
  • 石家庄网站建设维护上海普陀网站建设
  • Python做网站难不难郑州seo优化外包
  • 做数据分析的网站外贸wordpress收款插件
  • 深圳最好的网站制作公司wix做的网站
  • 微商招商网站源码无货源电商软件
  • 泉州关键词网站排名wordpress 帮助手册
  • 网站建站案凡客达人的运作模式
  • 专门做特产的网站网站添加flash
  • 中国建设银行网站公积金查询wordpress 织梦 淘客
  • 制作网站的公司叫什么天津网站建设企业系统
  • 网站开发对招聘人员要求好的互联网资讯网站
  • 游戏网站网页设计厦门模版网站
  • 上海网站建设宣传做最好的在线看片网站
  • 南昌做房地产用哪个网站网站进入沙盒后
  • 目前流行的网站开发技术权威发布信息
  • 网站建设专属名词官方网站焊工证查询
  • wordpress 动漫网站录入客户信息的软件
  • 做图表用的网站彩票做的最好是个网站好
  • 优秀的网站首页北京西站24小时人工服务电话
  • 怎么在网站后面做链接网络编程
  • 做pc端的网站首页尺寸是多少苏州网站建设推广咨询平台
  • 建设手机网站多少钱网站自适应布局
  • 网站页面建设规划文案沈阳人流价格
  • 基于PHP网站开发的管理系统设计与开发秦皇岛网站制作源码
  • 建网站难吗品牌建设推荐
  • 如何自己建一个网站可以做试卷网站数学试卷小学六
  • 网站建设技能考试试题三媒体网站的销售怎么做
  • 做设计时可以参考的网站为外国人做非法网站
  • 网站无法被百度收录saas平台设计