admin管理员组文章数量:1635850
canal在github上的文档:https://github/alibaba/canal/wiki/ClientExample
mysql
canal伪装成mysql slave,如此“降低”姿态,mysql当然要做出些表示:那便是开启binlog支持
[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
为显示重要性 创建用户并赋予相关权限:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
针对已有的账户可直接grant命令
问题一:
创建canal用户的目的是什么?直接使用现有的用户名可以吗,比如root。
答:有些用户没有REPLICATION SLAVE, REPLICATION CLIENT的权限,用这些用户连接canal时,无法获取到binlog。
这里的canal用户授权了全部权限,所以客户端可以从canal中获取binlog。
重要概念:canal server连接mysql,客户端连接canal server
- canal指的是canal server,它会读取mysql的binlog,解析后存储起来
- 客户端指的是消费canal server的binlog
本机连接服务端,验证binlog的格式是ROW
mysql -h192.168.6.52 -ucanal -pcanal
mysql> show variables like '%binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
mysql主从复制的原理
- master将改变记录到二进制日志binary log中
- slave将binary log events拷贝到其中继日志relay log
- slave重做中继日志中的事件,加工处理 (将改变反映它自己的数据)
mysql数据文件下会生成mysql-bin.xxx的binlog文件,以及索引文件;针对mysql的操作都会有二进制的事件记录到binlog文件中。下面的一些操作包括创建用户,授权,创建数据库,创建表,插入一条记录
canal
canal server的conf下有几个配置文件
canal.deployer-1.0.24 tree conf
conf
├── canal.properties
├── example
│ └── instance.properties
├── logback.xml
└── spring
├── default-instance.xml
├── file-instance.xml
├── group-instance.xml
├── local-instance.xml
└── memory-instance.xml
先来看canal.properties
的common属性前四个配置项
canal.id= 1 #编号,集群环境下不同canal的id不同,和mysql的server_id不同
canal.ip= #默认本机
canal.port= 11111
canal.zkServers= #用于cluster
再上盘canal.properties
下destinations相关的配置
canal.destinations = example #可设置*个,逗号隔开,对应需要创建*文件夹,且夹下有instance.properties文件
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5
#全局的canal实例
canal.instance.global.mode = spring #全局的canal实例管理用spring
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/file-instance.xml#会实例化所有的destinations instances
file-instance.xml
<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
<property name="ignoreResourceNotFound" value="true" />
<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
<property name="locationNames">
<list>
<value>classpath:canal.properties</value>
<!-- 比如canal.instance.destination等于example,就会加载example/instance.properties配置文件 -->
<value>classpath:${canal.instance.destination:}/instance.properties</value>
</list>
</property>
</bean>
<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
<property name="destination" value="${canal.instance.destination}" />
<property name="eventParser"><ref local="eventParser" /></property>
<property name="eventSink"><ref local="eventSink" /></property>
<property name="eventStore"><ref local="eventStore" /></property>
<property name="metaManager"><ref local="metaManager" /></property>
<property name="alarmHandler"><ref local="alarmHandler" /></property>
</bean>
example下instance.properties配置文件不需要修改,当然你要改也没人拦你,注意正则。一个canal server可以运行多个canal instance
## mysql serverId,这里的slaveId不能和myql集群中已有的server_id一样
canal.instance.mysql.slaveId = 1234
# position info 这里连接的是mysql master的地址。
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
# username/password
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex = .*\\..*
canal.instance.filter.black.regex =
canal client 与 server
C/S模式的通信,客户端采用NIO,服务端采用Netty;server启动后无client,server不会从mysql拉取binllog:client主动发起请求,服务端才会模拟mysql slave节点取主节点拉取binlog,通常client是死循环,一直调用get 服务器一直拉取
流程图:
代码:
public class AbstractCanalClientTest {
protected void process() {
int batchSize = 5 * 1024; // 一次请求拉取多条记录
try {
connector.connect(); // 先连接服务端,发送的数据包类型handshake,clientauthentication
connector.subscribe(); // 订阅,类型为subscription
// keep send request to canal server, thus canal server can fetch binlog from mysql
while (running) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
connector.ack(batchId); // 提交确认
//connector.rollback(batchId); // 处理失败, 回滚数据
}
} finally {
connector.disconnect();
}
}
}
对应服务端采用netty处理rpc请求:canalserverwithnetty:
public class CanalServerWithNetty extends AbstractCanalLifeCycle implements CanalServer {
public void start() {
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipelines = Channels.pipeline();
pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder());
// 处理客户端的HANDSHAKE请求
pipelines.addLast(HandshakeInitializationHandler.class.getName(),
new HandshakeInitializationHandler(childGroups));
// 处理客户端的CLIENTAUTHENTICATION请求
pipelines.addLast(ClientAuthenticationHandler.class.getName(),
new ClientAuthenticationHandler(embeddedServer));
//ClientAuthenticationHandler处理鉴权后,会移除HandshakeInitializationHandler和ClientAuthenticationHandler
// 处理客户端的会话请求,包括SUBSCRIPTION,GET等
SessionHandler sessionHandler = new SessionHandler(embeddedServer);
pipelines.addLast(SessionHandler.class.getName(), sessionHandler);
return pipelines;
}
});
}
}
重要的会话处理器sessionHandler
client与server的rpc交互过程:以client发送get,server从mysql得到binlog后,返回messages给client为例
synchronized是基于JVM层面实现的,而Lock是基于JDK层面实现的
public Message getWithoutAck(int batchSize,Long timeout,TimeUnit unit) throws CanalClientException{
waitClientRunning();
int size = (batchSize <=0 )?1000:batchSize;
long time = (timeout==null || timeout <0)?-1:timeout;//-1不做timeout控制
if(unit ==null )unit =TimeUnit.MILLISECONDS;
//client发送get请求
writeWithHeader(Packet.newBuilder()
.setType(PacketType.GET)
.setBody(Get.newBuilder()
.setAutoAck(false)
.setDestination(clientIdentity.getDestination())
.setClientId(String.valueOf(clientIdentity.getClientId()))
.setFetchSize(size)
.setTimeout(time)
.setUnit(unit.ordinal())
.build()
.toByteString())
.build()
.toByteArray());//client获取get结果
return receiveMessages();
}
private Message receiveMessages() throws IOException{
//读取server发送的数据包
Packet p = Packet.parseFrom(readNextPacket());
switch(p.getType()){
case MESSAGES: {
Messages messages = Messages.parseFrom(p.getBody());
Message result = new Message(messagess.getBatchId());
for(ByteString byteString:messages.getMessagesList()){
result.addEntry(Entry.parseFrom(byteString));
}
return result;
}
}
}
服务端SessionHandler处理客户端发送的GET请求流程:
case GET:
// 读取客户端发送的数据包,封装为Get对象
Get get = CanalPacket.Get.parseFrom(packet.getBody());
// destination表示canal instance
if (StringUtils.isNotEmpty(get.getDestination()) && StringUtils.isNotEmpty(get.getClientId())) {
clientIdentity = new ClientIdentity(get.getDestination(), Short.valueOf(get.getClientId()));
Message message = null;
if (get.getTimeout() == -1) {// 是否是初始值
message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
} else {
TimeUnit unit = convertTimeUnit(get.getUnit());
message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize(), get.getTimeout(), unit);
}
// 设置返回给客户端的数据包类型为MESSAGES
Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
packetBuilder.setType(PacketType.MESSAGES);
// 构造Message
Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
messageBuilder.setBatchId(message.getId());
if (message.getId() != -1 && !CollectionUtils.isEmpty(message.getEntries())) {
for (Entry entry : message.getEntries()) {
messageBuilder.addMessages(entry.toByteString());
}
}
packetBuilder.setBody(messageBuilder.build().toByteString());
// 输出数据,返回给客户端
NettyUtils.write(ctx.getChannel(), packetBuilder.build().toByteArray(), null);
}
get/ack/rollback协议
Message getWithoutAck(int batchSize)容许指定batchSize,一次可获取多条,每次返回对象为message
包含 batch id 唯一标识;entries 具体的数据对象及对应的数据对象格式entryprotocol.proto
对应的canal消息结构:
Entry
Header
logfileName
logfileOffset
executeTime
schemaName
tableName
entryType(insert/update/delete类型)
entryType(事务头begin/事务尾end/数据rowdata)
storeValue(byte数据,可展开,对应的类型为rowchange)
RowChange
isDdl是否是ddl变更操作:create/drop table
sql具体ddl sql
rowDatas具体insert update delete的变更数据,可为多条,1个binlog event可对应多条变更:批量
beforeColumns:column类型的数组,变更前的数据字段
afterColumns:变更后的
Colum
index:
sqlType:jdbc type
name:column name
isKey:
update:是否发生过变更
isNull
value:具体内容,string文本
void rollback(long batchId)回滚上次的get请求,重新获取数据,基于get获取batch进行提交 避免误操作
void ack(long batchId)确认已经消费成功,通知server删除数据,基于get获取batchId进行提交,避误操作
SessionHandler中服务端处理客户端的其他类型请求,都会调用CanalServerWithEmbedded的相关方法:
case SUBSCRIPTION:
Sub sub = Sub.parseFrom(packet.getBody());
embeddedServer.subscribe(clientIdentity);
case GET:
Get get = CanalPacket.Get.parseFrom(packet.getBody());
message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
case CLIENTACK:
ClientAck ack = CanalPacket.ClientAck.parseFrom(packet.getBody());
embeddedServer.ack(clientIdentity, ack.getBatchId());
case CLIENTROLLBACK:
ClientRollback rollback = CanalPacket.ClientRollback.parseFrom(packet.getBody());
embeddedServer.rollback(clientIdentity);// 回滚所有批次
所以真正的处理逻辑在CanalServerWithEmbedded中,转下篇,太多了
谢谢https://blog.csdn/varyall/article/details/79208574的分享,真的很详细
版权声明:本文标题:canal详解 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dongtai/1729217816a1190625.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论