admin管理员组文章数量:1632069
canal官网下载地址:https://github/alibaba/canal
必须先安装JDK
下载jdk:http://www.oracle/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
2、创建文件夹:mkdir /usr/local/java
3、解压文件:tar -zxvf jdk-8u181-linux-x64.tar.gz -C /usr/local/java/
4、修改环境变量:vim /etc/profile
添加如下内容:
JAVA_HOME=/usr/local/java/jdk1.8.0_161
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH
5、重启机器或执行命令 :source /etc/profile
下载canal的压缩包 : canal下载
自己选择一个版本下载(ps:选择deployer的版本)
https://github/alibaba/canal/releases
-rw-r--r-- 1 root root 241409486 Dec 8 14:42 canal.adapter-1.1.6-SNAPSHOT.tar.gz
-rw-r--r-- 1 root root 38379450 Dec 8 14:43 canal.admin-1.1.6-SNAPSHOT.tar.gz
-rw-r--r-- 1 root root 95191928 Dec 8 14:42 canal.deployer-1.1.6-SNAPSHOT.tar.gz
-rw-r--r-- 1 root root 23133667 Dec 8 14:43 canal.example-1.1.6-SNAPSHOT.tar.gz
首先连接到linux,切换至root用户
使用canal要先开启mysql的Binlog写入功能,配置 binlog-format 为ROW模式,故须使用如下命令修改mysql的myf中配置
vi /etc/myf
在myf配置中加入以下配置
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
binlog-ignore-db=information_schema
binlog-ignore-db=mysql
binlog-ignore-db=performance_schema
binlog-ignore-db=sys
#验证
show variables like 'log_bin%'; #log_bin ON
show variables like 'binlog_format%'; #binlog_format ROW
log-bin用于指定binlog日志文件名前缀,默认存储在/var/lib/mysql 目录下。
server-id用于标识唯一的数据库,不能和别的服务器重复,建议使用ip的最后一段,默认值也不可以。
binlog-ignore-db:表示同步的时候忽略的数据库。
binlog-do-db:指定需要同步的数据库(如果没有此项,表示同步所有的库)。
添加配置并保存后,使用如下命令重启mysql服务
service mysql restart
重启服务之后,进入mysql命令行新增并授权 canal 链接 mysql账号具有作为 mysql slave 的权限
create user 'canal'@'%';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
alter user 'canal'@'%' identified by 'canal';
select host,user,authentication_string from user;
create user 'root'@'%' identified by 'root';
GRANT ALL ON *.* TO 'root'@'%';
FLUSH PRIVILEGES;
select host,user,authentication_string from user;
UPDATE user SET Password=PASSWORD('canal') where USER='canal';
select host,user,password from user;
配置好好mysql之后,进入到usr/local目录下,在该目录下使用如下命令新建canal文件夹
cd /usr/local
ls
mkdir canal
使用如下命令给文件夹授权
//给local赋予读写权限
chmod 777 canal
//给local及其以下子目录赋予读写权限
chmod -R 777 canal
赋权成功后,使用wget将下载好的压缩包传输到canal目录下
进入/usr/local/canal目录查看压缩包是否存在,存在则使用如下命令解压压缩包
cd /usr/local/canal
ls
mkdir canalservice
cd canalservice
tar -zxvf canal.deployer-1.1.6-SNAPSHOT.tar.gz
解压完后会多出几个目录
根据之前创建的用户修改conf/下的canal.properties配置文件,主要是修改与mysql相关的配置,其余基本不变,修改完成后 如何你登陆了canal账号了这里不用动,如果你要用root账号就需要修改,我这里创建了canal账号密码是canal所以这个文件没有进行改动
修改配置文件 cd conf/
vi canal.properties
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instancework.receiveBufferSize = 16384
canal.instancework.sendBufferSize = 16384
canal.instancework.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
#################################################
######### destinations #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafkapression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =
##################################################
######### Pulsar #############
##################################################
pulsarmq.serverUrl =
pulsarmq.roleToken =
pulsarmq.topicTenantPrefix =
cd /usr/local/canal/deployer/conf/example 下配置instance.properties
vi instance.properties 这里只修改了要同步的表规则 canal.instance.filter.regex=
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
"instance.properties" 60L, 2486C
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
#canal.instance.filter.regex=.*\\..*
canal.instance.filter.regex=datacenter.uc_apiadv_download_log,datacenter.uc_op_ad_api_day,datacenter.uc_op_ad_api_day.*,datacenter.uc_op_business_reports,datacenter.uc_op_empty_payment_business,datacenter.uc_op_payment,datacenter.user
#canal.instance.filter.regex=datacenter.uc_op_business_reports,datacenter.uc_op_empty_payment_business,datacenter.uc_op_payment,datacenter.user
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
cd到bin目录下使用如下命令启动canal
./startup.sh
启动后,使用以下命令查看是否启动成功
ps -ef | grep canal
或
观察canal日志:
tail -n 50 /usr/local/canal/logs/canal/canal.log
没有错误则表示启动正常
至此,canal启动成功
若需要看canal的日志,则cd到log目录下查看即可
canal server的默认端口为:11111,若需要修改,可以去conf目录下的canal.properties配置文件中进行修改
客户端想连接canal服务端还需开放linux防火墙的11111端口
//开启端口
firewall-cmd --zone=public --add-port=11111/tcp --permanent
//查询端口号11111是否开启:
firewall-cmd --query-port=11111/tcp
//重启防火墙:
firewall-cmd --reload
//查询有哪些端口是开启的:
firewall-cmd --list-port
//禁用端口
firewall-cmd --zone=public --remove-port=11111/tcp --permanent
安装canal-admin
mkdir canaladmin
cd canaladmin
tar -zxcvf canal.admin-1.1.6-SNAPSHOT.tar.gz
配置环境变量
export CANAL_ADMIN_HOME=/usr/local/canal/canaladmin
export PATH=${CANAL_ADMIN_HOME}/bin:$PATH
export CANAL_SERVER_HOME=/usr/local/canal/canalservice
export PATH=${CANAL_SERVER_HOME}/bin:$PATH
修改配置:application.yml
路径:/usr/local/canal/canaladmin/conf/application.yml 内容如下:
主要修改数据库的账号:账号在mysql 里面新建
[root@slave01 conf]# pwd
/opt/operation/canal/canaladmin/conf
[root@slave01 conf]# vim application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 127.0.0.1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
添加驱动包
ln -s /usr/share/java/mysql-connector-java-5.1.48.jar /usr/local/canal/canaladmin/lib/mysql-connector-java-5.1.48.jar
初始化元数据
1.mysql -uroot -p
source /usr/local/canal/canaladmin/conf/canal_manager.sql
启动 canal-admin 命令
/usr/local/canal/canaladmin/bin/startup.sh
日志位置在log下面
访问地址
安装 canalservice
解压安装
mkdir -p app/canal-server;
tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal/canalservice/conf
配置canal service
cd /usr/local/canal/canalservice/conf/;
mv canal.properties canal.properties.bak;
mv canal_local.properties canal.properties;
vi /usr/local/canal/canalservice/conf/canal.properties;
## admmin 的地址
canal.admin.manager = slave01:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
~
添加驱动包
ln -s /usr/share/java/mysql-connector-java-8.0.18.jar /usr/local/canal/lib/mysql-connector-java-8.0.18.jar
启动canal-service
/usr/local/canal/canalservice/bin/startup.sh
/opt/local/canal/canalservice/bin/stop.sh
/opt/local/canal/canalservice/bin/restart.s
页面操作
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=192:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal%123
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
配置canal 适配器
/usr/local/canal/adapter
解压安装
mkdir -p /usr/local/canal/adapter
tar -zxvf canal.adapter-1.1.5.tar.gz -C /usr/local/canal/adapter/
cd conf/ 修改配置: application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.automit: false
kafka.automit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.88.192:3306/datacenter?useUnicode=true
username: root
password: root
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://192.168.88.243:3306/datacenter?useUnicode=true
jdbc.username: root
jdbc.password: root
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
# - name: es
# hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
# properties:
# mode: transport # or rest
# # security.auth: test:123456 # only used for rest mode
# cluster.name: elasticsearch
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
配置对应的映射文件
[root@slave01 conf]# pwd
/opt/operation/canal/canaladapter/conf
[root@slave01 conf]# ll
总用量 12
-rwxrwxrwx 1 root root 2905 3月 12 19:10 application.yml
-rwxrwxrwx 1 root root 183 3月 12 17:54 bootstrap.yml
drwxr-xr-x 2 root root 86 3月 12 15:23 es6
drwxr-xr-x 2 root root 86 3月 12 15:23 es7
drwxr-xr-x 2 root root 40 3月 12 15:23 hbase
drwxr-xr-x 2 root root 31 3月 12 15:23 kudu
-rwxrwxrwx 1 root root 2172 3月 12 18:53 logback.xml
drwxrwxrwx 2 root root 30 3月 12 15:23 META-INF
drwxrwxrwx 2 root root 29 3月 15 16:31 rdb
[root@slave01 conf]# cd rdb/
[root@slave01 rdb]# ll
总用量 4
-rwxr-xr-x 1 root root 513 3月 12 19:17 mytest_user.yml
[root@slave01 rdb]#
*********************************************************************************
[root@slave01 rdb]# vi mytest_user.yml
dataSourceKey: defaultDS
destination: slave01
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: test
table: test02
targetTable: mytest2.test
targetPk:
id: id
# mapAll: true
targetColumns:
id:
name:
etlCondition: "where c_time>={}"
commitBatch: 3 # 批量提交的大小
## Mirror schema synchronize config
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
#dbMapping:
# mirrorDb: true
# database: mytest
日志
2021-03-15 16:47:56.525 [pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.rdb.support.BatchExecutor - Batch executor commit 1 rows
2021-03-15 16:48:08.611 [pool-7-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":1,"name":"flink"}],"database":"test","destination":"slave01","es":1615798088000,"groupId":"g1","isDdl":false,"old":null,"pkNames":[],"sql":"","table":"test02","ts":1615798088610,"type":"INSERT"}
2021-03-15 16:48:08.625 [pool-2-thread-1] TRACE c.a.o.canal.client.adapter.rdb.service.RdbSyncService - Insert into target table, sql: INSERT INTO mytest2.test (`id`,`name`) VALUES (?,?)
2021-03-15 16:48:08.627 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":1,"name":"flink"},"database":"test","destination":"slave01","old":null,"table":"test02","type":"INSERT"}
2021-03-15 16:48:08.630 [pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.rdb.support.BatchExecutor - Batch executor commit 1 rows
2021-03-15 16:48:09.136 [pool-7-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":1,"name":"flink","create_time":null,"age":null}],"database":"db_test","destination":"slave01","es":1615798088000,"groupId":"g1","isDdl":false,"old":null,"pkNames":[],"sql":"","table":"test02","ts":1615798089136,"type":"INSERT"}
2021-03-15 16:48:57.948 [pool-7-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":null,"database":"test","destination":"slave01","es":1615798137000,"groupId":"g1","isDdl":false,"old":null,"pkNames":[],"sql":"DROP TABLE `test0002` /* generated by server */","table":"test0002","ts":1615798137947,"type":"ERASE"}
启动项目
/usr/local/canal/canaladapter/bin/startup.sh
查看同步情况
版权声明:本文标题:linux系统下安装并配置canal 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dongtai/1725677595a1035946.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论