南宁网站制作哪家好,wordpress 移动模板,制作网页的网站,开发一个购物平台需要多少钱定义 
canal组件是一个基于mysql数据库增量日志解析#xff0c;提供增量数据订阅和消费#xff0c;支持将增量数据投递到下游消费者#xff08;kafka#xff0c;rocketmq等#xff09;或者存储#xff08;elasticearch,hbase等#xff09;canal感知到mysql数据变动…定义 
canal组件是一个基于mysql数据库增量日志解析提供增量数据订阅和消费支持将增量数据投递到下游消费者kafkarocketmq等或者存储elasticearch,hbase等canal感知到mysql数据变动然后解析变动数据将变动数据发送到mq或者同步到其他数据库等待进一步业务逻辑处理 
canal的工作原理 
mysql master将数据变更写入到二进制日志binary long简称binlogmysql slave将master的bin log拷贝到它的中继日志relay logmysql slave重放relay log操作将变更数据同步到最新 mysql binlog日志 
介绍 
mysql的binlog可以说mysql的最重要的日志它记录了所有的DDL(表的创建)和DML(数据发生变化)语句以事件形式记录mysql默认情况下是不开启binlog因为记录binlog日志需要消耗时间官方给出的数据是有1%的性能损耗 一般来说在下面两场场景下会开启binlog日志 mysql主从集群部署时需要将在master端开启binlog方便将数据同步到slaves中数据恢复了通过使用mysql binlog工具来使数据恢复   
binlog的分类 
分类介绍优点缺点STATEMENT语句级别记录每一次执行写操作的语句相当于row模式节省了空间但是可能产生的数据不一致有疑执行时间不同导致产生的数据不同节省空间可能造成数据不一致ROW行级记录每次操作后每行记录的变化假如一个update的sql执行结果是1万行statement只存一条如果是row的话会把这1万行的结果存着数据绝对一致性因为不管sql是什么引用了什么函数他只记录执行后的效果占用较大空间MIXED是对STATEMENT的升级如当函数中包含UUID()时包含AUTO_INCREMENT字段的表被更新时执行INSERT DELAYED语句时用UDF时会按照ROW的方式进行处理节省空间同时兼顾了一定的一致性还有些极个别情况依旧会造成不一致另外STATEMENT和mixed对于需要对binlog的监控的情况都不方便 
canal工作原理 
canal将自己伪装为mysql slave,向mysql master发送bump协议mysql master 收到dump请求开始推送binary log给slavecanalcanal接收并解析binlog日志得到变更的数据执行后续逻辑 canal应用场景 
数据同步 
canal可以帮助用户进行多种数据同步操作如实时同步mysql数据到elasticsearchredis等数据存储介质中   
数据库实时监控 
canal可以实时监控mysql的更新操作对于敏感数据的修改可以及时通知相关人员 
数据分析和挖掘 
canal可以将mysql增量数据投递到kafka等消息队列中为数据分析和挖掘提供数据来源 
数据库备份 
canal可以将mysql主库上的数据增量日志复制到备库上实现数据库的备份 
数据集成 
canal可以将多个mysql数据库中的数据进行集成为数据处理提供更加高效可靠的解决方案 
数据库迁移 
canal可以协助完成mysql数据库的辨别升级及数据迁移任务 
canal中mysql配置 
vim /etc/mysql/my.cnf
[mysqld]
# 设置主服务器唯一ID
server-id1
# 启用二进制日志
log-binmysql-bin
# 设置不要复制的数据库(可以设置多个)
binlog-ignore-dbmysql
# 设置需要监听的数据库,不配置表示所有数据库均开启
#binlog-do-dbcanal-demo
# 设置logbin格式
binlog_formatrow
bind-address  0.0.0.0canal下载配置文件 
官网下载
tar -zvxf canal.deployer-1.1.7.tar.gz
cd canal
cd conf
#################################################
#########               common argument         #############
#################################################
# tcp bind ip
canal.ip 
# register ip to zookeeper
canal.register.ip  192.168.7.129    # canal的ip
canal.port  11111
canal.metrics.pull.port  11112
# canal instance user/passwd
# canal.user  canal
# canal.passwd  E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal admin config
#canal.admin.manager  127.0.0.1:8089
canal.admin.port  11110
canal.admin.user  admin
canal.admin.passwd  4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto  true
#canal.admin.register.cluster 
#canal.admin.register.name canal.zkServers 192.168.5.6:2181   # kafka的端口的和ip
# flush data to zk
canal.zookeeper.flush.period  1000
canal.withoutNetty  false
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode  kafka     # 选择同步方式为kafka
# flush meta cursor/parse position to file
canal.file.data.dir  ${canal.conf.dir}
canal.file.flush.period  1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size  16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit  1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode  MEMSIZE
canal.instance.memory.rawEntry  true## detecing config
canal.instance.detecting.enable  false
#canal.instance.detecting.sql  insert into retl.xdual values(1,now()) on duplicate key update xnow()
canal.instance.detecting.sql  select 1
canal.instance.detecting.interval.time  3
canal.instance.detecting.retry.threshold  3
canal.instance.detecting.heartbeatHaEnable  false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size   1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds  60# network config
canal.instance.network.receiveBufferSize  16384
canal.instance.network.sendBufferSize  16384
canal.instance.network.soTimeout  30# binlog filter config
canal.instance.filter.druid.ddl  true
canal.instance.filter.query.dcl  false
canal.instance.filter.query.dml  false
canal.instance.filter.query.ddl  false
canal.instance.filter.table.error  false
canal.instance.filter.rows  false
canal.instance.filter.transaction.entry  false
canal.instance.filter.dml.insert  false
canal.instance.filter.dml.update  false
canal.instance.filter.dml.delete  false# binlog format/image check
canal.instance.binlog.format  ROW,STATEMENT,MIXED
canal.instance.binlog.image  FULL,MINIMAL,NOBLOB# binlog ddl isolation
canal.instance.get.ddl.isolation  false# parallel parser config
canal.instance.parser.parallel  true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize  16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize  256# table meta tsdb info
canal.instance.tsdb.enable  true
canal.instance.tsdb.dir  ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url  jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE1000;MODEMYSQL;
canal.instance.tsdb.dbUsername  canal
canal.instance.tsdb.dbPassword  canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval  24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire  360#################################################
#########               destinations            #############
#################################################
canal.destinations  example,script   # 需要进行同步的任务在conf/example文件下和script下
# conf root dir
canal.conf.dir  ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan  true
canal.auto.scan.interval  5
# set this value to true means that when binlog pos not found, skip to latest.
# WARN: pls keep false in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode  falsecanal.instance.tsdb.spring.xml  classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml  classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode  spring
canal.instance.global.lazy  false
canal.instance.global.manager.address  ${canal.admin.manager}
#canal.instance.global.spring.xml  classpath:spring/memory-instance.xml
canal.instance.global.spring.xml  classpath:spring/file-instance.xml
#canal.instance.global.spring.xml  classpath:spring/default-instance.xml##################################################
#########             MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey 
canal.aliyun.secretKey 
canal.aliyun.uidcanal.mq.flatMessage  true
canal.mq.canalBatchSize  50
canal.mq.canalGetTimeout  100
# Set this value to cloud, if you want open message trace feature in aliyun.
canal.mq.accessChannel  localcanal.mq.database.hash  true
canal.mq.send.thread.size  30
canal.mq.build.thread.size  8##################################################
#########                    Kafka                   #############
##################################################
kafka.bootstrap.servers  192.168.5.6:9092   # kafka的IP地址和端口
kafka.acks  all
kafka.compression.type  none
kafka.batch.size  16384
kafka.linger.ms  1
kafka.max.request.size  1048576
kafka.buffer.memory  33554432
kafka.max.in.flight.requests.per.connection  1
kafka.retries  0kafka.kerberos.enable  false
kafka.kerberos.krb5.file  ../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file  ../conf/kerberos/jaas.conf# sasl demo
# kafka.sasl.jaas.config  org.apache.kafka.common.security.scram.ScramLoginModule required \\n username\alice\ \\npasswordalice-secret\;
# kafka.sasl.mechanism  SCRAM-SHA-512
# kafka.security.protocol  SASL_PLAINTEXT##################################################
#########                   RocketMQ         #############
##################################################
rocketmq.producer.group  test
rocketmq.enable.message.trace  false
rocketmq.customized.trace.topic 
rocketmq.namespace 
rocketmq.namesrv.addr  127.0.0.1:9876
rocketmq.retry.times.when.send.failed  0
rocketmq.vip.channel.enabled  false
rocketmq.tag ##################################################
#########                   RabbitMQ         #############
##################################################
rabbitmq.host 
rabbitmq.virtual.host 
rabbitmq.exchange 
rabbitmq.username 
rabbitmq.password 
rabbitmq.deliveryMode ##################################################
#########                     Pulsar         #############
##################################################
pulsarmq.serverUrl 
pulsarmq.roleToken 
pulsarmq.topicTenantPrefix  
配置任务 
vim canal/conf/example
vim instance.properties
#################################################
## mysql serverId , v1.0.26 will autoGen
# canal.instance.mysql.slaveId0# enable gtid use true/false
canal.instance.gtidonfalse# position info
canal.instance.master.address192.168.7.129:3306  # 数据库的ip和端口
canal.instance.master.journal.name
canal.instance.master.position
canal.instance.master.timestamp
canal.instance.master.gtid# rds oss binlog
canal.instance.rds.accesskey
canal.instance.rds.secretkey
canal.instance.rds.instanceId# table meta tsdb info
canal.instance.tsdb.enabletrue
#canal.instance.tsdb.urljdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsernamecanal
#canal.instance.tsdb.dbPasswordcanal#canal.instance.standby.address 
#canal.instance.standby.journal.name 
#canal.instance.standby.position 
#canal.instance.standby.timestamp 
#canal.instance.standby.gtid# username/password
canal.instance.dbUsernamecanal              # 数据库的用户名
canal.instance.dbPassworddocker211102       # 数据库的密码
canal.instance.connectionCharset  UTF-8
# enable druid Decrypt database password
canal.instance.enableDruidfalse
#canal.instance.pwdPublicKeyMFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ# table regex
canal.instance.filter.regexscript\\..*    #监控script这个数据库下面的所有的表
# table black regex
canal.instance.filter.black.regexmysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.fieldtest1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.fieldtest1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topicexample             # 将数据发送到example这个主题里面
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopicmytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition0
# hash partition config
#canal.mq.enableDynamicQueuePartitionfalse
#canal.mq.partitionsNum3
#canal.mq.dynamicTopicPartitionNumtest.*:4,mycanal:6
#canal.mq.partitionHashtest.table:id^name,.*\\..*
#
# multi stream for polardbx
canal.instance.multi.stream.onfalse
################################################# 
vim canal/conf/script
vim instance.properties
#################################################
## mysql serverId , v1.0.26 will autoGen
# canal.instance.mysql.slaveId0# enable gtid use true/false
canal.instance.gtidonfalse# position info
canal.instance.master.address192.168.7.129:3306   # 数据库的ip和端口
canal.instance.master.journal.name
canal.instance.master.position
canal.instance.master.timestamp
canal.instance.master.gtid# rds oss binlog
canal.instance.rds.accesskey
canal.instance.rds.secretkey
canal.instance.rds.instanceId# table meta tsdb info
canal.instance.tsdb.enabletrue
#canal.instance.tsdb.urljdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsernamecanal
#canal.instance.tsdb.dbPasswordcanal#canal.instance.standby.address 
#canal.instance.standby.journal.name 
#canal.instance.standby.position 
#canal.instance.standby.timestamp 
#canal.instance.standby.gtid# username/password
canal.instance.dbUsernamecanal                # 数据库的用户名
canal.instance.dbPassworddocker211102         # 数据库的密码
canal.instance.connectionCharset  UTF-8
# enable druid Decrypt database password
canal.instance.enableDruidfalse
#canal.instance.pwdPublicKeyMFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ# table regex
canal.instance.filter.regexcanal-demo\\..*   # 监控cancl-demo这个库下面的所有表
# table black regex
canal.instance.filter.black.regexmysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.fieldtest1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.fieldtest1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topicscript          # 将数据发送到script这个主题里面
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopicmytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition0
# hash partition config
#canal.mq.enableDynamicQueuePartitionfalse
#canal.mq.partitionsNum3
#canal.mq.dynamicTopicPartitionNumtest.*:4,mycanal:6
#canal.mq.partitionHashtest.table:id^name,.*\\..*
#
# multi stream for polardbx
canal.instance.multi.stream.onfalse
#################################################