1.Debezium and postgres introduction
Debezium
It is an open source project that provides a low-latency streaming platform for change data capture (CDC). You can install and configure Debezium to monitor your database, and your application can consume every row-level change in the database. Only committed changes are visible, so your application doesn’t have to worry about transactions or changes being rolled back. Debezium provides a unified model for all database change events, so your application does not have to worry about the intricacies of each database management system. In addition, because Debezium uses persistent, copy-backed logs to record the history of database data changes, your application can be stopped and restarted at any time without missing the events that occurred when it stopped running, ensuring that all events can be dealt with correctly and completely
postgres
PostgreSQL is a powerful open source database system. After more than 15 years of active development and continuous improvement, PostgreSQL has gained a high reputation in the industry for reliability, stability, and data consistency. Currently PostgreSQL can run on all major operating systems, including Linux, Unix (AIX, BSD, HP-UX, SGI IRIX, Mac OS X, Solaris and Tru64) and Windows. PostgreSQL is a complete transaction security database that fully supports foreign keys, unions, views, triggers and stored procedures (and supports the development of stored procedures in multiple languages). It supports most SQL:2008 standard data types, including integer, numeric, Boolean, byte, character, date, interval and time types. It also supports storing large binary objects. , including pictures, sounds and videos. PostgreSQL has native programming interfaces for many high-level development languages, such as C/C++, Java, .Net, Perl, Python, Ruby, Tcl, ODBC and other languages, and also contains various documents.
2.postgres environment setup
1.install
step1: pull image PostgreSQL 10.6
docker pull postgres:10.6
step2:create docker and run
Here, we will map the container’s port 5432 to the host’s port 30028, set the account password to postgres, and load the pgoutput plugin into the PostgreSQL instance:
docker run -d -p 30028:5432 --name postgres-10.6 -e POSTGRES_PASSWORD=postgres postgres:10.6 -c 'shared_preload_libraries=pgoutput'
step3: check service isalive?
docker ps | grep postgres-10.6
2.conguration
step1:Docker enters the container of Postgresql data:
docker exec -it postgres-10.6 bash
step2:Edit the postgresql.conf configuration file:
vi /var/lib/postgresql/data/postgresql.conf
The configuration content is as follows:
# change wal level to logical(:minimal、replica 、logical )
wal_level = logical
max_replication_slots = 20
max_wal_senders = 20
wal_sender_timeout = 180s
step3:restart container:
docker restart postgres-10.6
Connect to the database. If you query the statement, logical will be returned to indicate that the modification is successful:
SHOW wal_level
3. Create a new user and grant permissions
Log in to the Postgresql database using the account and password (postgres/postgres) used when creating the container.
First create the database and tables:
- Create database test_db
CREATE DATABASE test_db;
- Connect to the newly created database test_db
\c test_db
— create table t_user
CREATE TABLE "public"."t_user" (
"id" int8 NOT NULL,
"name" varchar(255),
"age" int2,
PRIMARY KEY ("id")
);
Create a new user and give the user permissions:
CREATE USER test1 WITH PASSWORD 'test123';
-Give users permission to copy streams
ALTER ROLE test1 replication;
-Give users permission to log in to the database
GRANT CONNECT ON DATABASE test_db to test1;
- Grant query permissions to all tables under the current library public to the user
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;
5. publish table
- Set publishing to true
update pg_publication set puballtables=true where pubname is not null;
- Publish all tables
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
- Query which tables have been published
select * from pg_publication_tables;
- Change the replication identity to include the previous values for updates and deletions (the purpose is to ensure that the table t_user can correctly capture and synchronize updated and deleted data changes during the real-time synchronization process. If these two statements are not executed, the replication identity of the t_user table may The default is NOTHING, which may cause updated and deleted data row information to be lost during real-time synchronization, thus affecting the accuracy of synchronization)
ALTER TABLE t_user REPLICA IDENTITY FULL;
Check the replication ID (the f ID indicates that the setting is successful, f (meaning full), otherwise it is n (meaning nothing), that is, the replication ID is not set)
select relreplident from pg_class where relname='t_user';
3.Code Project
Experimental goal: test reading t_user incremental data in postgres database
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>postgre</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<debezium.version>1.9.4.Final</debezium.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- debezium -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
</dependencies>
</project>
DebeziumConnectorConfig.java
package com.et.postgres.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import java.io.File;
import java.io.IOException;
@Configuration
public class DebeziumConnectorConfig {
@Bean
public io.debezium.config.Configuration customerConnector(Environment env) throws IOException {
File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
return io.debezium.config.Configuration.create()
.with("name", "customer_postgres_connector")
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
.with("offset.flush.interval.ms", "60000")
.with("database.hostname", env.getProperty("customer.datasource.host"))
.with("database.port", env.getProperty("customer.datasource.port")) // defaults to 5432
.with("database.user", env.getProperty("customer.datasource.username"))
.with("database.password", env.getProperty("customer.datasource.password"))
.with("database.dbname", env.getProperty("customer.datasource.database"))
.with("database.server.id", "10181")
.with("database.server.name", "customer-postgres-db-server")
.with("database.history", "io.debezium.relational.history.MemoryDatabaseHistory")
.with("table.include.list", "public.t_user")
.with("column.include.list", "public.t_user.name,public.t_user.age")
.with("publication.autocreate.mode", "filtered")
.with("plugin.name", "pgoutput")
.with("slot.name", "dbz_customerdb_listener")
.build();
}
}
DebeziumListener.java
package com.et.postgres.listener;
import io.debezium.config.Configuration;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@Slf4j
@Component
public class DebeziumListener {
private final Executor executor = Executors.newSingleThreadExecutor();
private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;
public DebeziumListener(Configuration customerConnectorConfiguration) {
this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(customerConnectorConfiguration.asProperties())
.notifying(this::handleChangeEvent)
.build();
}
private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());
Object sourceRecordChangeValue= (Struct) sourceRecord.value();
log.info("SourceRecordChangeValue = '{}'",sourceRecordRecordChangeEvent);
// if (sourceRecordChangeValue != null) {
// Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
// Operation.READ operation events are always triggered when application initializes
// We're only interested in CREATE operation which are triggered upon new insert registry
// if(operation != Operation.READ) {
// String record = operation == Operation.DELETE ? BEFORE : AFTER; // Handling Update & Insert operations.
// Struct struct = (Struct) sourceRecordChangeValue.get(record);
// Map<String, Object> payload = struct.schema().fields().stream()
// .map(Field::name)
// .filter(fieldName -> struct.get(fieldName) != null)
// .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
// .collect(toMap(Pair::getKey, Pair::getValue));
// // this.customerService.replicateData(payload, operation);
// log.info("Updated Data: {} with Operation: {}", payload, operation.name());
// }
// }
}
@PostConstruct
private void start() {
this.executor.execute(debeziumEngine);
}
@PreDestroy
private void stop() throws IOException {
if (Objects.nonNull(this.debeziumEngine)) {
this.debeziumEngine.close();
}
}
}
DemoApplication.java
package com.et.postgres;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
application.properties
customer.datasource.host=localhost
customer.datasource.port=30028
customer.datasource.database=test_db
customer.datasource.username=test1
customer.datasource.password=test123
logging.level.root=INFO
logging.level.io.debezium.postgres.BinlogReader=INFO
logging.level.io.davidarhcanjo=DEBUG
The above are just some key codes. For all codes, please see the code repository below.
Code Repository
4.Test
- Start the spring boot application
- Insert some data into the table
INSERT INTO public.t_user(id, "name", age) VALUES(1, 'harries', 18);
- Observe the console output
2024-04-07 14:22:01.621 INFO 29260 --- [pool-1-thread-1] i.d.connector.common.BaseSourceTask : 1 records sent during previous 00:00:42.015, last recorded offset: {transaction_id=null, lsn_proc=23559864, lsn=23559864, txId=575, ts_usec=1712470921044339}
2024-04-07 14:22:01.622 INFO 29260 --- [pool-1-thread-1] c.et.postgres.listener.DebeziumListener : Key = Struct{id=1}, Value = Struct{after=Struct{name=harries,age=18},source=Struct{version=1.9.4.Final,connector=postgresql,name=customer-postgres-db-server,ts_ms=1712470921044,db=test_db,sequence=[null,"23559864"],schema=public,table=t_user,txId=575,lsn=23559864},op=c,ts_ms=1712470921607}
2024-04-07 14:22:01.622 INFO 29260 --- [pool-1-thread-1] c.et.postgres.listener.DebeziumListener : SourceRecordChangeValue = 'EmbeddedEngineChangeEvent [key=null, value=SourceRecord{sourcePartition={server=customer-postgres-db-server}, sourceOffset={transaction_id=null, lsn_proc=23559864, lsn=23559864, txId=575, ts_usec=1712470921044339}} ConnectRecord{topic='customer-postgres-db-server.public.t_user', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{customer_postgres_db_server.public.t_user.Key:STRUCT}, value=Struct{after=Struct{name=harries,age=18},source=Struct{version=1.9.4.Final,connector=postgresql,name=customer-postgres-db-server,ts_ms=1712470921044,db=test_db,sequence=[null,"23559864"],schema=public,table=t_user,txId=575,lsn=23559864},op=c,ts_ms=1712470921607}, valueSchema=Schema{customer_postgres_db_server.public.t_user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}, sourceRecord=SourceRecord{sourcePartition={server=customer-postgres-db-server}, sourceOffset={transaction_id=null, lsn_proc=23559864, lsn=23559864, txId=575, ts_usec=1712470921044339}} ConnectRecord{topic='customer-postgres-db-server.public.t_user', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{customer_postgres_db_server.public.t_user.Key:STRUCT}, value=Struct{after=Struct{name=harries,age=18},source=Struct{version=1.9.4.Final,connector=postgresql,name=customer-postgres-db-server,ts_ms=1712470921044,db=test_db,sequence=[null,"23559864"],schema=public,table=t_user,txId=575,lsn=23559864},op=c,ts_ms=1712470921607}, valueSchema=Schema{customer_postgres_db_server.public.t_user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'