1.Debezium introduction
Debezium is a distributed service used to capture database data changes so that your application can see these data changes and process them. Debezium records row-level changes for each table in the form of a change event stream. The application can then read the event stream change records in the order in which the event stream was generated. Currently supported Source Connectors are Mysql, MongoDB, PostgresSQL, Oracle, SQL Server, Db2, Cassamdra, and Vitesss.
2.mysql environment setup,enablebin-log
Use docker-compose to build a test environment
version: '3'
services:
mysql:
image: registry.cn-hangzhou.aliyuncs.com/zhengqing/mysql:5.7
container_name: mysql_3306
restart: unless-stopped
volumes:
- "./mysql/my.cnf:/etc/mysql/my.cnf"
- "./mysql/init-file.sql:/etc/mysql/init-file.sql"
- "./mysql/data:/var/lib/mysql"
# - "./mysql/conf.d:/etc/mysql/conf.d"
- "./mysql/log/mysql/error.log:/var/log/mysql/error.log"
- "./mysql/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d"
environment:
TZ: Asia/Shanghai
LANG: en_US.UTF-8
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: demo
ports:
- "3306:3306"
start
docker-compose -f docker-compose.yml -p mysql5.7 up -d
create table in the demo database
create table user_info
(
user_id varchar(64) not null
primary key,
username varchar(100) null comment 'username',
age int(3) null comment 'age',
gender tinyint(1) null comment 'gender',
remark varchar(255) null comment 'remark',
create_time datetime null comment 'create_time',
create_id varchar(64) null comment 'create_id',
update_time datetime null comment 'update_time',
update_id varchar(64) null comment 'update_id',
enabled tinyint(1) default 1 null comment 'enabled(1-normal,0-del)'
)
comment 'dic_table';
check bin-log is enable?
show variables like 'log_%';
3.Code Project
Realize using debezium to monitor bin-log logs to capture mysql change data
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<debezium.version>1.5.2.Final</debezium.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.4.3</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
</dependencies>
</project>
application.yaml
timely:
switch: true
offset-file-name: D:\IdeaProjects\ETFramework\debezium\docker\offsets.dat
offset-file-clean: true
offset-time: 1
history-file-name: D:\IdeaProjects\ETFramework\debezium\docker\dbhistory.dat
offline:
ip: 127.0.0.1
port: 3306
username: root
password: root
instance-name: mysql-connector
logic-name: mysql-customer
include-table: dbo.vehicle
include-db: demo
server-id: 1
server:
port: 8088
config
package com.et.debezium.config;
import cn.hutool.core.io.FileUtil;
import com.et.debezium.handler.ChangeEventHandler;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;
import java.util.concurrent.*;
/**
* @author lei
* @create 2021-06-22 15:36
* @desc sql server sync
**/
@Configuration
@Log4j2
public class ChangeEventConfig {
private final ChangeEventHandler changeEventHandler;
@Value("${timely.offset-file-name}")
private String offsetFileName;
@Value("${timely.offset-file-clean:true}")
private Boolean offsetFileDelete;
@Value("${timely.offset-time}")
private String offsetTime;
@Value("${timely.history-file-name}")
private String historyFileName;
@Value("${timely.offline.instance-name}")
private String instanceName;
@Value("${timely.offline.logic-name}")
private String logicName;
@Value("${timely.offline.ip}")
private String ip;
@Value("${timely.offline.port}")
private String port;
@Value("${timely.offline.username}")
private String username;
@Value("${timely.offline.password}")
private String password;
@Value("${timely.offline.include-table}")
private String includeTable;
@Value("${timely.offline.include-db}")
private String includeDb;
@Value("${timely.offline.server-id}")
private String serverId;
@Autowired
public ChangeEventConfig(ChangeEventHandler changeEventHandler) {
this.changeEventHandler = changeEventHandler;
}
@Bean
public void cleanFile() {
if (offsetFileDelete && FileUtil.exist(offsetFileName)) {
FileUtil.del(offsetFileName);
}
}
/**
* Debezium configuration
*
* @return configuration
*/
@Bean
io.debezium.config.Configuration debeziumConfig() {
return io.debezium.config.Configuration.create()
.with("connector.class", MySqlConnector.class.getName())
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", offsetFileName)
.with("offset.flush.interval.ms", offsetTime)
.with("name", instanceName)
.with("database.hostname", ip)
.with("database.port", port)
.with("database.user", username)
.with("database.password", password)
.with("database.include.list", includeDb)
.with("include.schema.changes", "false")
.with("database.server.id", serverId)
.with("database.server.name", logicName)
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", historyFileName)
.build();
}
/**
*
*sqlServerTimelyExecutor
* @param configuration
* @return
*/
@Bean
SqlServerTimelyExecutor sqlServerTimelyExecutor(io.debezium.config.Configuration configuration) {
SqlServerTimelyExecutor sqlServerTimelyExecutor = new SqlServerTimelyExecutor();
DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine
.create(ChangeEventFormat.of(Connect.class))
.using(configuration.asProperties())
.notifying(changeEventHandler::handlePayload)
.build();
sqlServerTimelyExecutor.setDebeziumEngine(debeziumEngine);
return sqlServerTimelyExecutor;
}
/**
* @author lei
* @version 1.0
* @date 2021-06-22 15:39
* @desc SqlServerTimelyExecutor
*/
@Data
@Log4j2
public static class SqlServerTimelyExecutor implements InitializingBean, SmartLifecycle {
private final ExecutorService executor = ThreadPoolEnum.INSTANCE.getInstance();
private DebeziumEngine<?> debeziumEngine;
@Override
public void start() {
log.warn(ThreadPoolEnum.SQL_SERVER_LISTENER_POOL + "线程池开始执行 debeziumEngine 实时监听任务!");
executor.execute(debeziumEngine);
}
@SneakyThrows
@Override
public void stop() {
log.warn("debeziumEngine 监听实例关闭!");
debeziumEngine.close();
Thread.sleep(2000);
log.warn(ThreadPoolEnum.SQL_SERVER_LISTENER_POOL + "线程池关闭!");
executor.shutdown();
}
@Override
public boolean isRunning() {
return false;
}
@Override
public void afterPropertiesSet() {
Assert.notNull(debeziumEngine, "DebeZiumEngine 不能为空!");
}
public enum ThreadPoolEnum {
/**
* 实例
*/
INSTANCE;
public static final String SQL_SERVER_LISTENER_POOL = "sql-server-listener-pool";
/**
* 线程池单例
*/
private final ExecutorService es;
/**
* 枚举 (构造器默认为私有)
*/
ThreadPoolEnum() {
final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(SQL_SERVER_LISTENER_POOL + "-%d").build();
es = new ThreadPoolExecutor(8, 16, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(256),
threadFactory, new ThreadPoolExecutor.DiscardPolicy());
}
/**
* 公有方法
*
* @return ExecutorService
*/
public ExecutorService getInstance() {
return es;
}
}
}
}
handler
package com.et.debezium.handler;
import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson.JSON;
import com.et.debezium.model.ChangeListenerModel;
import io.debezium.data.Envelope;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import lombok.Getter;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.debezium.data.Envelope.FieldName.*;
import static java.util.stream.Collectors.toMap;
/**
* @author lei
* @create 2021-06-22 16:11
* @desc 变更数据处理
**/
@Service
@Log4j2
public class ChangeEventHandler {
public static final String DATA = "data";
public static final String BEFORE_DATA = "beforeData";
public static final String EVENT_TYPE = "eventType";
public static final String SOURCE = "source";
public static final String TABLE = "table";
private enum FilterJsonFieldEnum {
table,
db,
ts_ms,
;
public static Boolean filterJsonField(String fieldName) {
return Stream.of(values()).map(Enum::name).collect(Collectors.toSet()).contains(fieldName);
}
}
/**
* @author lei
* @create 2021-06-24 16:04
* @desc 变更类型枚举
**/
public enum EventTypeEnum {
CREATE(1),
UPDATE(2),
DELETE(3),
;
@Getter
private final int type;
EventTypeEnum(int type) {
this.type = type;
}
}
public void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,
DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {
for (RecordChangeEvent<SourceRecord> r : recordChangeEvents) {
SourceRecord sourceRecord = r.record();
Struct sourceRecordChangeValue = (Struct) sourceRecord.value();
if (sourceRecordChangeValue == null) {
continue;
}
// 获取变更表数据
Map<String, Object> changeMap = getChangeTableInfo(sourceRecordChangeValue);
if (CollectionUtils.isEmpty(changeMap)) {
continue;
}
ChangeListenerModel changeListenerModel = getChangeDataInfo(sourceRecordChangeValue, changeMap);
if (changeListenerModel == null) {
continue;
}
String jsonString = JSON.toJSONString(changeListenerModel);
log.info("发送变更数据:{}", jsonString);
}
try {
recordCommitter.markBatchFinished();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private ChangeListenerModel getChangeDataInfo(Struct sourceRecordChangeValue, Map<String, Object> changeMap) {
// 操作类型过滤,只处理增删改
Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
if (operation != Envelope.Operation.READ) {
Integer eventType = null;
Map<String, Object> result = new HashMap<>(4);
if (operation == Envelope.Operation.CREATE) {
eventType = EventTypeEnum.CREATE.getType();
result.put(DATA, getChangeData(sourceRecordChangeValue, AFTER));
result.put(BEFORE_DATA, null);
}
// 修改需要特殊处理,拿到前后的数据
if (operation == Envelope.Operation.UPDATE) {
if (!changeMap.containsKey(TABLE)) {
return null;
}
eventType = EventTypeEnum.UPDATE.getType();
String currentTableName = String.valueOf(changeMap.get(TABLE).toString());
// 忽略非重要属性变更
Map<String, String> resultMap = filterChangeData(sourceRecordChangeValue, currentTableName);
if (CollectionUtils.isEmpty(resultMap)) {
return null;
}
result.put(DATA, resultMap.get(AFTER));
result.put(BEFORE_DATA, resultMap.get(BEFORE));
}
if (operation == Envelope.Operation.DELETE) {
eventType = EventTypeEnum.DELETE.getType();
result.put(DATA, getChangeData(sourceRecordChangeValue, BEFORE));
result.put(BEFORE_DATA, getChangeData(sourceRecordChangeValue, BEFORE));
}
result.put(EVENT_TYPE, eventType);
result.putAll(changeMap);
return BeanUtil.copyProperties(result,ChangeListenerModel.class);
}
return null;
}
/**
* 过滤非重要变更数据
*
* @param sourceRecordChangeValue
* @param currentTableName
* @return
*/
private Map<String, String> filterChangeData(Struct sourceRecordChangeValue, String currentTableName) {
Map<String, String> resultMap = new HashMap<>(4);
Map<String, Object> afterMap = getChangeDataMap(sourceRecordChangeValue, AFTER);
Map<String, Object> beforeMap = getChangeDataMap(sourceRecordChangeValue, BEFORE);
//todo 根据表过滤字段
resultMap.put(AFTER, JSON.toJSONString(afterMap));
resultMap.put(BEFORE, JSON.toJSONString(beforeMap));
return resultMap;
}
/**
* 校验是否仅仅是非重要字段属性变更
* @param currentTableName
* @param afterMap
* @param beforeMap
* @param filterColumnList
* @return
*/
private boolean checkNonEssentialData(String currentTableName, Map<String, Object> afterMap,
Map<String, Object> beforeMap, List<String> filterColumnList) {
Map<String, Boolean> filterMap = new HashMap<>(16);
for (String key : afterMap.keySet()) {
Object afterValue = afterMap.get(key);
Object beforeValue = beforeMap.get(key);
filterMap.put(key, !Objects.equals(beforeValue, afterValue));
}
filterColumnList.parallelStream().forEach(filterMap::remove);
if (filterMap.values().stream().noneMatch(x -> x)) {
log.info("表:{}无核心资料变更,忽略此次操作!", currentTableName);
return true;
}
return false;
}
public String getChangeData(Struct sourceRecordChangeValue, String record) {
Map<String, Object> changeDataMap = getChangeDataMap(sourceRecordChangeValue, record);
if (CollectionUtils.isEmpty(changeDataMap)) {
return null;
}
return JSON.toJSONString(changeDataMap);
}
public Map<String, Object> getChangeDataMap(Struct sourceRecordChangeValue, String record) {
Struct struct = (Struct) sourceRecordChangeValue.get(record);
Map<String, Object> changeData = struct.schema().fields().stream()
.map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null)
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
if (CollectionUtils.isEmpty(changeData)) {
return null;
}
return changeData;
}
private Map<String, Object> getChangeTableInfo(Struct sourceRecordChangeValue) {
Struct struct = (Struct) sourceRecordChangeValue.get(SOURCE);
Map<String, Object> map = struct.schema().fields().stream()
.map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null && FilterJsonFieldEnum.filterJsonField(fieldName))
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
if (map.containsKey(FilterJsonFieldEnum.ts_ms.name())) {
map.put("changeTime", map.get(FilterJsonFieldEnum.ts_ms.name()));
map.remove(FilterJsonFieldEnum.ts_ms.name());
}
return map;
}
}
model
package com.et.debezium.model;
import lombok.Data;
/**
* @author lei
* @create 2021-06-23 09:58
* @desc change model
**/
@Data
public class ChangeListenerModel {
private String db;
private String table;
private Integer eventType;
private Long changeTime;
private String data;
private String beforeData;
}
DemoApplication.java
package com.et.debezium;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
Code reponsitory
4.Test
start Spring Boot application
2024-03-20 10:31:25.968 INFO 23260 --- [rce-coordinator] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-mysql-customer-binlog-client
2024-03-20 10:31:25.971 INFO 23260 --- [-127.0.0.1:3306] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-mysql-customer-binlog-client
2024-03-20 10:31:25.983 INFO 23260 --- [-127.0.0.1:3306] c.g.shyiko.mysql.binlog.BinaryLogClient : Connected to 127.0.0.1:3306 at mysql-bin.000001/154 (sid:1, cid:8)
2024-03-20 10:31:25.983 INFO 23260 --- [-127.0.0.1:3306] i.d.c.m.MySqlStreamingChangeEventSource : Connected to MySQL binlog at 127.0.0.1:3306, starting at MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000001, currentBinlogPosition=154, currentRowNumber=0, serverId=0, sourceTime=2024-03-20T02:31:25.942Z, threadId=-1, currentQuery=null, tableIds=[demo.user], databaseName=demo], partition={server=mysql-customer}, snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000001, restartBinlogPosition=154, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null]
2024-03-20 10:31:25.983 INFO 23260 --- [rce-coordinator] i.d.c.m.MySqlStreamingChangeEventSource : Waiting for keepalive thread to start
2024-03-20 10:31:25.984 INFO 23260 --- [-127.0.0.1:3306] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-mysql-customer-binlog-client
2024-03-20 10:31:26.090 INFO 23260 --- [rce-coordinator] i.d.c.m.MySqlStreamingChangeEventSource : Keepalive thread is running
2024-03-20 10:33:36.536 WARN 23260 --- [-127.0.0.1:3306] io.debezium.util.SchemaNameAdjuster : The Kafka Connect schema name 'mysql-customer.demo.user_info.Value' is not a valid Avro schema name, so replacing with 'mysql_customer.demo.user_info.Value'
2024-03-20 10:33:36.536 WARN 23260 --- [-127.0.0.1:3306] io.debezium.util.SchemaNameAdjuster : The Kafka Connect schema name 'mysql-customer.demo.user_info.Key' is not a valid Avro schema name, so replacing with 'mysql_customer.demo.user_info.Key'
2024-03-20 10:33:36.537 WARN 23260 --- [-127.0.0.1:3306] io.debezium.util.SchemaNameAdjuster : The Kafka Connect schema name 'mysql-customer.demo.user_info.Envelope' is not a valid Avro schema name, so replacing with 'mysql_customer.demo.user_info.Envelope'
2024-03-20 10:33:36.540 INFO 23260 --- [-127.0.0.1:3306] i.d.r.history.DatabaseHistoryMetrics : Already applied 7 database changes
insert some data to user_info
INSERT INTO demo.user_info
(user_id, username, age, gender, remark, create_time, create_id, update_time, update_id, enabled)
VALUES('1', '1', 1, 1, '1', NULL, '1', NULL, '1', 1);
Observe the console output
2024-03-20 10:34:13.156 INFO 23260 --- [listener-pool-0] c.e.debezium.handler.ChangeEventHandler : 发送变更数据:{"changeTime":1710902052000,"data":"{\"update_id\":\"1\",\"create_id\":\"1\",\"gender\":1,\"user_id\":\"1\",\"remark\":\"1\",\"enabled\":1,\"age\":1,\"username\":\"1\"}","db":"demo","eventType":1,"table":"user_info"}