admin管理员组文章数量:1598075
前文:
采集Mysql的binlog同步到Kafka,下游可采用Spark解析对应的Binlog日志发送到不同的数据库。
一、配置
1、注意实项
Maxwell目前不适用于Mysql的XA技术。
Maxwell适用非常简单,只需要一个高权限的用户。
2、Mysql配置
设置binlog格式及解析的数据字段
binlog_row_image = FULL (满足更新记录发送所有数据)
binlog_format = ROW
3、Maxwell配置
vim /opt/maxwell-1.24.0/config.properties
#日志
log_level=info
#Kafka配置
producer=kafka
kafka.bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:9092
kafkapression.type=snappy
kafka.retries=0
kafka.acks=1
producer_partition_by=primary_key
#发送到Kafka对应的主题,可指定也可按如下匹配
kafka_topic=maxwell_%{table}
#Mysql数据库配置
host=hadoop101
port=3306
user=root
password=123456
#client_id为记录在Maxwell的客户端id,用于协调检测
#replica_server_id为从Mysql拉取binlog日志的id标志
client_id=1921681101
replica_server_id=1921681101
#匹配Mysql的数据库及对应的表
filter = exclude: *.*,include: test.student
4、Mysql开启Binlog
mysql> GRANT ALL on *.* to 'maxwell'@'192.168.1.101' identified by '123456' ;
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'192.168.1.101';
mysql> flush privaleges;
mysql> flush logs;
#重启Mysql
systemctl restart mysqld.service
5、启动Maxwell
cd /opt/maxwell-1.24.0
nohup ./bin/maxwell &
6、自动监控脚本
source /etc/profile
#监听对应maxwell的路径
maxwell_path="/opt/maxwell-1.24.0"
rusult=`pgrep -af $(basename $BASH_SOURCE) | wc -l` #监听此脚本是否启动,编辑此脚本时也被识别为启动
#crontab的运行环境与我们手动执行的环境不同
#未启动:启动脚本+判断脚本=2
#运行中:启动脚本+判断脚本+已存在进程=3 ,编辑此脚本则 3+1 =4
if [ "$rusult" = "2" ];then
cd $maxwell_path && nohup ./bin/maxwell &
curl http://127.0.0.1:10085/sender/mail -d "tos=cesarchoy@163&subject=Maxwell 出现故障!!&content=$(basename $BASH_SOURCE) 出现异常,触发重启机制。"
echo " `date "+%Y-%m-%d %H:%M:%S"` $maxwell_path 启动成功!" >> /home/caijiasheng/listen_maxwell_log
else
echo `date "+%Y-%m-%d %H:%M:%S"` $(basename $BASH_SOURCE) rusult=$rusult "运行中" >> /home/caijiasheng/listen_maxwell_log
fi
7、定时启动
crontab -e
*/30 * * * * sh /home/cesar/listen_maxwell-1.24.0.sh > /dev/null 2>&1 &
8、代码解析(以下为Scala方式)
样例类
case class GetMaxWelllog(
@SerializedName("type")
`type`: String,
@SerializedName("database")
database: String,
@SerializedName("table")
table: String,
@SerializedName("old")
old: JsonObject,
@SerializedName("data")
data: JsonObject
)
工具类
package com.google.gson;
public final class Gson {
//核心对象
public Gson() {}
//核心方法
public <T> T fromJson(String json, Type typeOfT) throws JsonSyntaxException {}
解析过程
val gson = new Gson
val jsonObj = gson.fromJson[GetMaxWelllog](Maxwelljson.toString, classOf[GetMaxWelllog])
版权声明:本文标题:Maxwell的配置及简单使用 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dianzi/1728298641a1152740.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论