Canal安装及配置kafka(Canal installation and configuration Kafka)

Canal介绍原理

Canal是阿里巴巴开源的一款主要用于数据库同步业务的项目,基于数据库的日志解析,获取增量变更进行同步,衍生出了Canal增量订阅&消费的实时数据库同步。
基本原理:
1、canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
2、mysql master收到dump请求,开始推送binary log给slave(也就是canal)
3、canal解析binary log对象(原始为byte流)

Mysql的安装及binary log的开启

windows下5.7安装

1.下载安装mysql-5.7.38-winx64.zip并解压
2.解压后的文件根目录下my.cnf

[mysql]  
# 设置mysql客户端默认字符集  
default-character-set=utf8  
[mysqld]  
#设置3306端口  
port = 3306  
# 设置mysql的安装目录  
basedir=D://softs//mysql//mysql-5.7.38-winx64
# 设置mysql数据库的数据的存放目录  
datadir=D://softs//mysql//mysql-5.7.38-winx64/data  
# 允许最大连接数  
max_connections=200  
# 服务端使用的字符集默认为8比特编码的latin1字符集  
character-set-server=utf8  
# 创建新表时将使用的默认存储引擎  
default-storage-engine=INNODB

# 打开binlog
log-bin=mysql_bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1

CentOS7安装的8.0

1. 下载rpm安装包

https://dev.mysql.com/downloads/mysql/
mysql-8.0.29-1.el7.x86_64.rpm-bundle.tar

2. 上传服务器,并解压安装

tar -zxvf mysql-8.0.29-1.el7.x86_64.rpm-bundle.tar
rpm -ivh mysql-community-common-8.0.29-1.el7.x86_64.rpm
rpm -ivh mysql-community-client-plugins-8.0.29-1.el7.x86_64.rpm
rpm -ivh mysql-community-libs-8.0.29-1.el7.x86_64.rpm
rpm -ivh mysql-community-client-8.0.29-1.el7.x86_64.rpm
rpm -ivh mysql-community-server-8.0.29-1.el7.x86_64.rpm
rpm -ivh mysql-community-icu-data-files-8.0.29-1.el7.x86_64.rpm
rpm -ivh mysql-community-server-8.0.29-1.el7.x86_64.rpm
rpm -ivh mysql-community-libs-compat-8.0.29-1.el7.x86_64.rpm
rpm -ivh mysql-community-embedded-compat-8.0.29-1.el7.x86_64.rpm
rpm -ivh mysql-community-devel-8.0.29-1.el7.x86_64.rpm

3. 启动mysql并创建用户

##初始化
 mysqld --initialize --console
##授权
chown -R mysql:mysql /var/lib/mysql/
##启动mysql
systemctl start mysqld
##查看默认密码
cat /var/log/mysqld.log | grep password
##登录mysql并更改密码
mysql -u root -p
alter USER 'root'@'localhost' IDENTIFIED BY '123456';
use mysql;
##创建canal用户
create user 'canal'@'%' identified by '123456';
## 授权
 grant all privileges on *.* to 'canal'@'%' with grant option;
## 刷新
flush privileges;

Canal-admin搭建

  • 文件下载
    https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz
  • 修改配置文件
server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: canal
  password: 123456
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

  • conf目录下canal_manager.sql ddl创建数据库以及表
  • startup.bat启动admin管理界面,以admin/123456登录进去
  • 新建集群canal_local,zk地址127.0.0.1:2185,127.0.0.1:2186,127.0.0.1:2187
  • 下载canal.deployer-1.1.5.tar.gz并解压
  • 修改canal_local.properties文件
# register ip   多个server的ip是不同的,单机上只能是一个server
canal.register.ip = 127.0.0.1 
canal.port = 11110
canal.metrics.pull.port = 11112

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11113
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = canal_local
# 注册到 canal admin 上server的名字,唯一有意义即可
canal.admin.register.name = canal_server_01

# 注意”canal.admin.register.cluster”,这个配置如果不写代表当前的 Canal Server 是一个单机节点,
# 如果添加的名字在 Canal Admin 上面没有提前注册,Canal Server 启动时会报错。

  • 修改canal.properties文件,主要涉及kafka
canal.register.ip =127.0.0.1
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers = 127.0.0.1:2185,127.0.0.1:2186,127.0.0.1:2187
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### 		     Kafka 		     #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = ../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file = ../conf/kerberos/jaas.conf

  • 启动server
    startup.bat即可
  • 配置instance
    新建instance后贼入魔板,这里就要配置相关表的过滤规则了
# instance.properties核心配置
canal.instance.master.address=127.0.0.1:3306 //数据库IP端口
canal.instance.master.journal.name=mysql-bin.00003 //从哪个binlog文件开始dump
canal.instance.master.position=154 //binlog文件偏移量
canal.instance.master.timestamp= 1557454483242  //ms,默认从canal启动后最新开始消费
canal.instance.dbUsername=canal//数据库用户名
canal.instance.dbPassword=123456 //数据库密码
canal.instance.connectionCharset mysql= UTF-8  //数据解析编码
canal.instance.filter.regex= canal.test1
//mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
a. 所有表:.* or .*\\..*
b. canal schema下所有表: canal\\..*
c. canal下的以canal打头的表:canal\\.canal.*
d. canal schema下的一张表:canal.test1
e. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔) 


  • 测试MySQL
    查找最后一个 binlog 的文件名。
    show binary logs;
    查看最后一个binlog 最后的位置
    show binlog events in ‘mysql-bin.000003’;
    INSERT INTO canal.test1(username) VALUES (‘123ed’);
    查看最后一个 binlog 最后的位置
    show binlog events in ‘mysql-bin.000003’;
    可以看到位置发生变化了,之后查看一下kafka的内容变化
    bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic bym_test –from-beginning
————————

Canal介绍原理

Canal是阿里巴巴开源的一款主要用于数据库同步业务的项目,基于数据库的日志解析,获取增量变更进行同步,衍生出了Canal增量订阅&消费的实时数据库同步。
基本原理:
1、canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
2、mysql master收到dump请求,开始推送binary log给slave(也就是canal)
3、canal解析binary log对象(原始为byte流)

Mysql的安装及binary log的开启

windows下5.7安装

1. Download and install mysql-5.7.38-winx64 Zip and unzip
2. Unzip the file to the root directory of my cnf

[mysql]  
# 设置mysql客户端默认字符集  
default-character-set=utf8  
[mysqld]  
#设置3306端口  
port = 3306  
# 设置mysql的安装目录  
basedir=D://softs//mysql//mysql-5.7.38-winx64
# 设置mysql数据库的数据的存放目录  
datadir=D://softs//mysql//mysql-5.7.38-winx64/data  
# 允许最大连接数  
max_connections=200  
# 服务端使用的字符集默认为8比特编码的latin1字符集  
character-set-server=utf8  
# 创建新表时将使用的默认存储引擎  
default-storage-engine=INNODB

# 打开binlog
log-bin=mysql_bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1

CentOS7安装的8.0

1. Download RPM installation package

https://dev.mysql.com/downloads/mysql/
mysql-8.0.29-1.el7.x86_64.rpm-bundle.tar

2. Upload the server, extract and install it

tar -zxvf mysql-8.0.29-1.el7.x86_64.rpm-bundle.tar
rpm -ivh mysql-community-common-8.0.29-1.el7.x86_64.rpm
rpm -ivh mysql-community-client-plugins-8.0.29-1.el7.x86_64.rpm
rpm -ivh mysql-community-libs-8.0.29-1.el7.x86_64.rpm
rpm -ivh mysql-community-client-8.0.29-1.el7.x86_64.rpm
rpm -ivh mysql-community-server-8.0.29-1.el7.x86_64.rpm
rpm -ivh mysql-community-icu-data-files-8.0.29-1.el7.x86_64.rpm
rpm -ivh mysql-community-server-8.0.29-1.el7.x86_64.rpm
rpm -ivh mysql-community-libs-compat-8.0.29-1.el7.x86_64.rpm
rpm -ivh mysql-community-embedded-compat-8.0.29-1.el7.x86_64.rpm
rpm -ivh mysql-community-devel-8.0.29-1.el7.x86_64.rpm

3. Start MySQL and create user

##初始化
 mysqld --initialize --console
##授权
chown -R mysql:mysql /var/lib/mysql/
##启动mysql
systemctl start mysqld
##查看默认密码
cat /var/log/mysqld.log | grep password
##登录mysql并更改密码
mysql -u root -p
alter USER 'root'@'localhost' IDENTIFIED BY '123456';
use mysql;
##创建canal用户
create user 'canal'@'%' identified by '123456';
## 授权
 grant all privileges on *.* to 'canal'@'%' with grant option;
## 刷新
flush privileges;

Canal-admin搭建

  • 文件下载
    https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz
  • Modify profile
server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: canal
  password: 123456
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

  • conf目录下canal_manager.sql ddl创建数据库以及表
  • startup.bat启动admin管理界面,以admin/123456登录进去
  • New cluster canal_ Local, ZK address 127.0.0.1:2185127.0.0.1:2186127.0.0.1:2187
  • 下载canal.deployer-1.1.5.tar.gz并解压
  • 修改canal_local.properties文件
# register ip   多个server的ip是不同的,单机上只能是一个server
canal.register.ip = 127.0.0.1 
canal.port = 11110
canal.metrics.pull.port = 11112

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11113
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = canal_local
# 注册到 canal admin 上server的名字,唯一有意义即可
canal.admin.register.name = canal_server_01

# 注意”canal.admin.register.cluster”,这个配置如果不写代表当前的 Canal Server 是一个单机节点,
# 如果添加的名字在 Canal Admin 上面没有提前注册,Canal Server 启动时会报错。

  • 修改canal.properties文件,主要涉及kafka
canal.register.ip =127.0.0.1
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers = 127.0.0.1:2185,127.0.0.1:2186,127.0.0.1:2187
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### 		     Kafka 		     #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = ../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file = ../conf/kerberos/jaas.conf

  • 启动server
    startup.bat即可
  • Configure instance
    After you create an instance, the thief is possessed by the magic board. Here you need to configure the filtering rules of related tables
# instance.properties核心配置
canal.instance.master.address=127.0.0.1:3306 //数据库IP端口
canal.instance.master.journal.name=mysql-bin.00003 //从哪个binlog文件开始dump
canal.instance.master.position=154 //binlog文件偏移量
canal.instance.master.timestamp= 1557454483242  //ms,默认从canal启动后最新开始消费
canal.instance.dbUsername=canal//数据库用户名
canal.instance.dbPassword=123456 //数据库密码
canal.instance.connectionCharset mysql= UTF-8  //数据解析编码
canal.instance.filter.regex= canal.test1
//mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
a. 所有表:.* or .*\\..*
b. canal schema下所有表: canal\\..*
c. canal下的以canal打头的表:canal\\.canal.*
d. canal schema下的一张表:canal.test1
e. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔) 


  • 测试MySQL
    查找最后一个 binlog 的文件名。
    show binary logs;
    查看最后一个binlog 最后的位置
    show binlog events in ‘mysql-bin.000003’;
    INSERT INTO canal.test1(username) VALUES (‘123ed’);
    查看最后一个 binlog 最后的位置
    show binlog events in ‘mysql-bin.000003’;
    可以看到位置发生变化了,之后查看一下kafka的内容变化
    bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic bym_test –from-beginning