Debezium for CDC : Postgres to Postgres
July 20, 2022
+--------------+
| |
| PostgreSQL |
| |
+------+-------+
|
|
|
+---------------v------------------+
| |
| Kafka Connect |
| (Debezium, JDBC connectors) |
| |
+---------------+------------------+
|
|
|
|
+-------v--------+
| |
| PostgreSQL |
| |
+----------------+
I'm using Docker environment to do this, however if you want to try it on host machine, you can use Confluent Platform installed on your OS.
1version: "3.7"
2services:
3 postgres:
4 image: debezium/postgres:13
5 container_name: pg1
6 expose:
7 - 5433
8 ports:
9 - 5433:5433
10 environment:
11 - POSTGRES_USER=mine
12 - POSTGRES_PASSWORD=qwepoi123
13 - POSTGRES_DB=sharingcdc
14 - PGPORT=5433
15
16 postgres-copy:
17 image: debezium/postgres:13
18 container_name: pg2
19 expose:
20 - 5434
21 ports:
22 - 5434:5434
23 environment:
24 - POSTGRES_USER=mine
25 - POSTGRES_PASSWORD=qwepoi123
26 - POSTGRES_DB=sharingcdc-copy
27 - PGPORT=5434
28
29 zookeeper:
30 image: confluentinc/cp-zookeeper:5.5.3
31 container_name: zookeeper
32 environment:
33 ZOOKEEPER_CLIENT_PORT: 2181
34
35 kafka:
36 image: confluentinc/cp-enterprise-kafka:5.5.3
37 container_name: kafka
38 links:
39 - zookeeper
40 depends_on: [zookeeper]
41 environment:
42 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
43 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
44 KAFKA_BROKER_ID: 1
45 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
46 KAFKA_JMX_PORT: 9991
47 ports:
48 - 9092:9092
49
50 debezium:
51 image: dbz-conn-jdbc-sink
52 container_name: dbz
53 links:
54 - kafka
55 - postgres
56 - postgres-copy
57 environment:
58 BOOTSTRAP_SERVERS: kafka:9092
59 GROUP_ID: 1
60 CONFIG_STORAGE_TOPIC: connect_configs
61 OFFSET_STORAGE_TOPIC: connect_offsets
62 STATUS_STORAGE_TOPIC: connect-status
63 KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
64 VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
65 CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
66 CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
67 depends_on: [kafka]
68 ports:
69 - 8083:8083
70
71 schema-registry:
72 image: confluentinc/cp-schema-registry:5.5.3
73 container_name: schema
74 environment:
75 - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
76 - SCHEMA_REGISTRY_HOST_NAME=schema-registry
77 - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081,http://localhost:8081
78 ports:
79 - 8081:8081
80 depends_on: [zookeeper, kafka]
81
82 kafkacat:
83 image: edenhill/kafkacat:1.5.0
84 container_name: kafkacat
85 entrypoint:
86 - /bin/sh
87 - -c
88 - |
89 while [ 1 -eq 1 ];do sleep 60;done
docker-compose.ymlFirst, we need the first database to have some data. Open pg1 (as we name it) bash and login:
1psql -U mine -d sharingcdc -W
-W option will prompt us to input password to connect. Once we are in the psql mode, create table:
1CREATE TABLE public.todo (
2 id uuid NOT NULL,
3 created_date timestamp NULL DEFAULT now(),
4 is_done bool NULL,
5 modified_date timestamp NULL,
6 title varchar(255) NULL,
7 CONSTRAINT todo_pkey PRIMARY KEY (id)
8);
To enable CDC with Postgres, we need to give replication permission to our database:
1ALTER TABLE public.todo REPLICA IDENTITY FULL;
Let's insert some data in it:
1INSERT INTO public.todo
2(id, created_date, is_done, modified_date, title)
3VALUES('79fe5ffa-94ed-4871-a5cd-300471586914'::uuid, '2022-05-31 14:47:12.198', false, '2022-06-20 22:52:10.648', 'do laundry');
4INSERT INTO public.todo
5(id, created_date, is_done, modified_date, title)
6VALUES('129eef91-8f55-4edd-9c63-804c6f1a3f5b'::uuid, '2022-05-31 14:59:58.150', false, '2022-06-20 22:52:11.481', 'feed the dog');
Next, we need to establish a connector using Debezium connector to access our first database and stream it to a Kafka topic. Fortunately, it's quite easy to do that. Debezium connector can be accessed through REST API. There are two connectors we need to create, one is for source, the process when stream are inserted into a topic, and the other one is sink, which taking out data from a topic.
source connector
To create source connector, we need to use the PostgresConnector class. Below is an example configs on how to do it. debezium-source-config.json (for publisher / source connector)
1{
2 "name": "sharingcdc-source-connector",
3 "config": {
4 "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
5 "plugin.name": "pgoutput",
6 "database.hostname": "postgres",
7 "database.port": "5433",
8 "database.user": "mine",
9 "database.password": "qwepoi123",
10 "database.dbname": "sharingcdc",
11 "database.server.name": "postgres",
12 "database.include.list":"sharingcdc",
13 "tasks.max": 1,
14 "transforms": "route",
15 "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
16 "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
17 "transforms.route.replacement": "$3"
18 }
19}
debezium-source-config.jsonFinally, we can just use curl to start creating the connector.
1curl -i -X POST -H "Content-Type:application/json" -H "Accept:application/json" localhost:8083/connectors -d "@debezium-source-config.json"
The response we get will be:
1HTTP/1.1 201 Created
2Date: Tue, 12 Jul 2022 09:09:28 GMT
3Location: http://localhost:8083/connectors/sharingcdc-connector
4Content-Type: application/json
5Content-Length: 608
6Server: Jetty(9.4.33.v20201020)
7
8{
9 "name":"sharingcdc-connector",
10 "config":{
11 "connector.class":"io.debezium.connector.postgresql.PostgresConnector",
12 "plugin.name":"pgoutput",
13 "database.hostname":"postgres",
14 "database.port":"5433",
15 "database.user":"mine",
16 "database.password":"qwepoi123",
17 "database.dbname":"sharingcdc",
18 "database.server.name":"postgres",
19 "database.include.list":"sharingcdc",
20 "tasks.max":"1",
21 "transforms":"route",
22 "transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter",
23 "transforms.route.regex":"([^.]+)\\.([^.]+)\\.([^.]+)",
24 "transforms.route.replacement":"$3",
25 "name":"sharingcdc-connector"
26 },
27 "tasks":[],
28 "type":"source"
29}
Notice the HTTP status 201 Created tells us the connection is successfully created.
Check connector status
1curl localhost:8083/connectors/sharingcdc-connector/status
This will give result:
1{
2 "name":"sharingcdc-connector",
3 "connector":{
4 "state":"RUNNING",
5 "worker_id":"172.21.0.6:8083"
6 },
7 "tasks":[
8 {
9 "id":0,
10 "state":"RUNNING",
11 "worker_id":"172.21.0.6:8083"
12 }
13 ],
14 "type":"source"
15}
Important!
If you want to stop our connector, it is important to delete it before starting new connector with the same name. For example above, we declared that the connector name is sharingcdc-connector, to delete this connector we simply send a DELETE to the API.
1curl -X DELETE localhost:8083/connectors/sharingcdc-connector
To confirm that the connector has been deleted, inquiry the connectors that we have using:
1curl -X GET localhost:8083/connectors
The /connectors endpoint is the root context for Debezium connectors, and sending a GET request without any parameters returns the list of available connectors.
Checking your topic
According to the official documentation of Debezium connector for postgres : Postgres Topic Names, the topic name is generated automatically using the server name + schema name + table name. So in our case, since we defined the database.server.name in the debezium-config.json file as postgres, and our schema name is public and the table is todo, the constructed topic name will be: postgres.public.todo. That should be it right? Nope! If you see correctly in our debezium-source-config.json, we defined a transformation route which transforms the predefined generated topic into just table name with $3 value.
1 "transforms": "route",
2 "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
3 "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
4 "transforms.route.replacement": "$3"
debezium-source-config.jsonSo in our case, to name of our topic should be todo instead of postgres.public.todo
To check your topic list, Kafka Connect also provide a CLI to check our topics using the kafka-topics.shfile located in the /bindirectory. Open terminal in our Kafka Connect container, and run:
1./bin/kafka-topics.sh --bootstrap-server=kafka:9092 --list
Another tool we can use is Kafkacat which I also include in the docker-compose.yml file. To use it, we simply run:
1docker exec kafkacat kafkacat -b kafka:9092 -L # List existing topic in kafka:9092
Monitor changes detection by tailing it
Now, we need to check whether the source connector is actually able to stream the data changes. There are several ways to do this:
Via Schema Registry
We can use kafka-avro-console-consumer, by running:
1docker-compose exec schema-registry /usr/bin/kafka-avro-console-consumer --bootstrap-server kafka:9092 --from-beginning --property print.key=true --property schema-registry.url=http://schema-registry:8081 --topic postgres.public.todo
or if you prefer via bash :
- Open schema-registry bash:
1docker exec -it <container-name> sh
- then run:
1./usr/bin/kafka-avro-console-consumer -bootstrap-server kafka:9092 --from-beginning --property print.key=true --property schema-registry.url=http://schema-registry:8081 --topic postgres.public.todo
Via Kafkacat
Or we can also use kafkacat:
1docker exec kafkacat kafkacat -b kafka:9092 -t todo -C
Checking if the connector works
Now that we have tailed our connector, we can confirm by trying to modify data, insert new data or even delete data.
Try to insert new data
1INSERT INTO public.todo
2(id, created_date, is_done, modified_date, title)
3VALUES('6fc341f0-04a8-4c91-92bb-3dbb1ef0b557'::uuid, '2022-06-20 22:11:09.613', false, '2022-06-20 22:52:26.648', 'test');
What we will get in the topic monitoring console
1{
2 "id":"6fc341f0-04a8-4c91-92bb-3dbb1ef0b557"
3}
4{
5 "before":null,
6 "after":{
7 "postgres.public.todo.Value":{
8 "id":"6fc341f0-04a8-4c91-92bb-3dbb1ef0b557",
9 "created_date":{
10 "long":1655763069613000
11 },
12 "is_done":{
13 "boolean":false
14 },
15 "modified_date":{
16 "long":1655765546648000
17 },
18 "title":{
19 "string":"test"
20 }
21 }
22 },
23 "source":{
24 "version":"1.4.2.Final",
25 "connector":"postgresql",
26 "name":"postgres",
27 "ts_ms":1657617691850,
28 "snapshot":{
29 "string":"false"
30 },
31 "db":"sharingcdc",
32 "schema":"public",
33 "table":"todo",
34 "txId":{
35 "long":493
36 },
37 "lsn":{
38 "long":23876504
39 },
40 "xmin":null
41 },
42 "op":"c",
43 "ts_ms":{
44 "long":1657617692088
45 },
46 "transaction":null
47}
Check the schema created by Debezium
1curl localhost:8081/subjects/todo-value/versions/1 | jq ".schema | fromjson"
This REST API will show us the schema as below.
1{
2 "type": "record",
3 "name": "Envelope",
4 "namespace": "postgres.public.todo",
5 "fields": [
6 {
7 "name": "before",
8 "type": [
9 "null",
10 {
11 "type": "record",
12 "name": "Value",
13 "fields": [
14 {
15 "name": "id",
16 "type": {
17 "type": "string",
18 "connect.version": 1,
19 "connect.name": "io.debezium.data.Uuid"
20 }
21 },
22 {
23 "name": "created_date",
24 "type": [
25 "null",
26 {
27 "type": "long",
28 "connect.version": 1,
29 "connect.name": "io.debezium.time.MicroTimestamp"
30 }
31 ],
32 "default": null
33 },
34 {
35 "name": "is_done",
36 "type": [
37 "null",
38 "boolean"
39 ],
40 "default": null
41 },
42 {
43 "name": "modified_date",
44 "type": [
45 "null",
46 {
47 "type": "long",
48 "connect.version": 1,
49 "connect.name": "io.debezium.time.MicroTimestamp"
50 }
51 ],
52 "default": null
53 },
54 {
55 "name": "title",
56 "type": [
57 "null",
58 "string"
59 ],
60 "default": null
61 }
62 ],
63 "connect.name": "postgres.public.todo.Value"
64 }
65 ],
66 "default": null
67 },
68 {
69 "name": "after",
70 "type": [
71 "null",
72 "Value"
73 ],
74 "default": null
75 },
76 {
77 "name": "source",
78 "type": {
79 "type": "record",
80 "name": "Source",
81 "namespace": "io.debezium.connector.postgresql",
82 "fields": [
83 {
84 "name": "version",
85 "type": "string"
86 },
87 {
88 "name": "connector",
89 "type": "string"
90 },
91 {
92 "name": "name",
93 "type": "string"
94 },
95 {
96 "name": "ts_ms",
97 "type": "long"
98 },
99 {
100 "name": "snapshot",
101 "type": [
102 {
103 "type": "string",
104 "connect.version": 1,
105 "connect.parameters": {
106 "allowed": "true,last,false"
107 },
108 "connect.default": "false",
109 "connect.name": "io.debezium.data.Enum"
110 },
111 "null"
112 ],
113 "default": "false"
114 },
115 {
116 "name": "db",
117 "type": "string"
118 },
119 {
120 "name": "schema",
121 "type": "string"
122 },
123 {
124 "name": "table",
125 "type": "string"
126 },
127 {
128 "name": "txId",
129 "type": [
130 "null",
131 "long"
132 ],
133 "default": null
134 },
135 {
136 "name": "lsn",
137 "type": [
138 "null",
139 "long"
140 ],
141 "default": null
142 },
143 {
144 "name": "xmin",
145 "type": [
146 "null",
147 "long"
148 ],
149 "default": null
150 }
151 ],
152 "connect.name": "io.debezium.connector.postgresql.Source"
153 }
154 },
155 {
156 "name": "op",
157 "type": "string"
158 },
159 {
160 "name": "ts_ms",
161 "type": [
162 "null",
163 "long"
164 ],
165 "default": null
166 },
167 {
168 "name": "transaction",
169 "type": [
170 "null",
171 {
172 "type": "record",
173 "name": "ConnectDefault",
174 "namespace": "io.confluent.connect.avro",
175 "fields": [
176 {
177 "name": "id",
178 "type": "string"
179 },
180 {
181 "name": "total_order",
182 "type": "long"
183 },
184 {
185 "name": "data_collection_order",
186 "type": "long"
187 }
188 ]
189 }
190 ],
191 "default": null
192 }
193 ],
194 "connect.name": "postgres.public.todo.Envelope"
195}
sink connector
After we have all the provider config set up, now it's time to consume the message and replicate all the data into another PostgresDB.
Same with creating source connector, we need set configurations to be sent to the connector endpoint to create sink connector. Here, we're using JdbcSinkConnector class which installed manually while creating Docker image.
debezium-sink-config.json (for consumer / sink connector)
1{
2 "name": "sharingcdc-sink-connector",
3 "config": {
4 "auto.create": "true",
5 "connection.url": "jdbc:postgresql://host.docker.internal:5434/sharingcdc-copy?user=mine&password=qwepoi123",
6 "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
7 "insert.mode": "upsert",
8 "pk.fields": "id",
9 "pk.mode": "record_key",
10 "delete.enabled": "true",
11 "tasks.max": "1",
12 "topics": "todo",
13 "transforms": "unwrap",
14 "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
15 "transforms.unwrap.drop.tombstones": "false"
16 }
17}
debezium-sink-config.jsonSend the payload through curl:
1curl -i -X POST -H "Content-Type:application/json" -H "Accept:application/json" localhost:8083/connectors -d "@debezium-sink-config.json"
It will gives us this:
1HTTP/1.1 201 Created
2Date: Tue, 12 Jul 2022 06:43:15 GMT
3Location: http://localhost:8083/connectors/sharingcdc-sink-connector
4Content-Type: application/json
5Content-Length: 539
6Server: Jetty(9.4.43.v20210629)
7
8{
9 "name":"sharingcdc-sink-connector",
10 "config":{
11 "auto.create":"true",
12 "connection.url":"jdbc:postgresql://host.docker.internal:5434/sharingcdc-copy?user=mine&password=qwepoi123",
13 "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
14 "insert.mode":"upsert",
15 "pk.fields":"id",
16 "pk.mode":"record_value",
17 "tasks.max":"1",
18 "topics":"postgres.public.todo",
19 "transforms":"unwrap",
20 "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
21 "transforms.unwrap.drop.tombstones":"false",
22 "name":"sharingcdc-sink-connector"
23 },
24 "tasks":[],
25 "type":"sink"
26}
Now, everything should be running normally. If so, let's try to connect to the second PostgreDB and check if it works 🤞
1docker exec -it pg2 sh #run bash of pg2
2
3psql -U mine -d sharingcdc-copy -W #login to sharingcdc-copy database
After submitting the password, you can run:
1\d --check all relations
We should see that table todo automatically created for us, which we never created it before.
1sharingcdc-copy=# \d
2 List of relations
3 Schema | Name | Type | Owner
4--------+------+-------+-------
5 public | todo | table | mine
6(1 row)
7
8
9sharingcdc-copy=# select * from todo;
10 is_done | id | created_date | modified_date | title
11---------+--------------------------------------+------------------+------------------+--------------
12 f | 129eef91-8f55-4edd-9c63-804c6f1a3f5b | 1654009198150000 | 1655765531481000 | feed the dog
13 f | 79fe5ffa-94ed-4871-a5cd-300471586914 | 1654008432198000 | 1655765530648000 | do laundry
14 f | 6fc341f0-04a8-4c91-92bb-3dbb1ef0b557 | 1655763069613000 | 1655765546648000 | test
15(3 rows)
Let's try to insert new data into our first database:
1sharingcdc=# INSERT INTO public.todo
2(id, created_date, is_done, modified_date, title)
3VALUES('850c5925-bb49-4795-9c19-4b7a6b4514b5'::uuid, '2022-06-20 22:12:02.335', false, NULL, 'Test12');
4INSERT 0 1
Then confirm it in the second database:
1sharingcdc-copy=# select * from todo;
2 is_done | id | created_date | modified_date | title
3---------+--------------------------------------+------------------+------------------+--------------
4 f | 129eef91-8f55-4edd-9c63-804c6f1a3f5b | 1654009198150000 | 1655765531481000 | feed the dog
5 f | 79fe5ffa-94ed-4871-a5cd-300471586914 | 1654008432198000 | 1655765530648000 | do laundry
6 f | 6fc341f0-04a8-4c91-92bb-3dbb1ef0b557 | 1655763069613000 | 1655765546648000 | test
7 f | 850c5925-bb49-4795-9c19-4b7a6b4514b5 | 1655763122335000 | | Test12
8(4 rows)
Voila! 🎉🎉🎉
You could also see the activity logged in the schema registry:
1{
2 "id":"850c5925-bb49-4795-9c19-4b7a6b4514b5"
3}
4{
5 "before":null,
6 "after":{
7 "postgres.public.todo.Value":{
8 "id":"850c5925-bb49-4795-9c19-4b7a6b4514b5",
9 "created_date":{
10 "long":1655763122335000
11 },
12 "is_done":{
13 "boolean":false
14 },
15 "modified_date":null,
16 "title":{
17 "string":"Test12"
18 }
19 }
20 },
21 "source":{
22 "version":"1.4.2.Final",
23 "connector":"postgresql",
24 "name":"postgres",
25 "ts_ms":1657618002239,
26 "snapshot":{
27 "string":"false"
28 },
29 "db":"sharingcdc",
30 "schema":"public",
31 "table":"todo",
32 "txId":{
33 "long":495
34 },
35 "lsn":{
36 "long":23877952
37 },
38 "xmin":null
39 },
40 "op":"c",
41 "ts_ms":{
42 "long":1657618002694
43 },
44 "transaction":null
45}
Source code of this project is also available in my Github repo here.
The latest Kafka connect image does not include JBDC Sink Connector we use. This is indicated with this message when we start the connector using the config above.
1HTTP/1.1 500 Internal Server Error
2Date: Mon, 11 Jul 2022 06:44:12 GMT
3Content-Type: application/json
4Content-Length: 4581
5Server: Jetty(9.4.33.v20201020)
6
7{
8 "error_code":500,
9 "message":"Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector, available connectors are: PluginDesc{klass=class <snip> }"
10}
Since it did not come out of the box with the connectors, we need to install it manually. Solving it needs a bit more trick tho.
Solution
Tweaking around I found out that actually official Debezium developer (Jiri Pechanec) provided how to build our own custom Debezium connector with JDBC Sink Connector included.
In that `Dockerfile` example, the connector used is quay.io/debezium/, however I decided to keep using the first tutorial I followed.
Here is my custom Dockerfile:
1FROM debezium/connect:1.4
2ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
3
4ARG POSTGRES_VERSION=42.2.8
5ARG KAFKA_JDBC_VERSION=5.3.2
6
7# Deploy PostgreSQL JDBC Driver
8RUN cd /kafka/libs && curl -sO https://jdbc.postgresql.org/download/postgresql-$POSTGRES_VERSION.jar
9
10# Deploy Kafka Connect JDBC
11RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
12curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar
Let's build it:
1docker build -t dbz-conn-jdbc-sink .
Note that dbz-conn-jdbc-sink tag above. It is the image that used in my docker-compose.yml file for debezium service.
If you happen to follow this tutorial, you can build the image yourself before running the docker-compose.yml file.
Primary key mode must be record_key when delete support is enabled
When creating the sink connector, for couple seconds the status endpoint tells us that it is running. However, I tried checking it after 10 seconds wondering why the data was not inserted into the new database.
1{
2 "name":"sharingcdc-sink-connector",
3 "connector":{
4 "state":"RUNNING",
5 "worker_id":"172.29.0.8:8083"
6 },
7 "tasks":[{
8 "id":0,
9 "state":"FAILED",
10 "worker_id":"172.29.0.8:8083",
11 "trace":"org.apache.kafka.common.config.ConfigException: Primary key mode must be 'record_key' when delete support is enabled <snip>"}],
12 "type":"sink"
13}
I found out that I'm using "pk.mode":"record_value" instead of record_key. So just simply change the value with record_key and we're good to go.
1{
2 "name":"sharingcdc-sink-connector",
3 "connector":{
4 "state":"RUNNING",
5 "worker_id":"172.29.0.8:8083"
6 },
7 "tasks":[{
8 "id":0,
9 "state":"FAILED",
10 "worker_id":"172.29.0.8:8083",
11 "trace":"org.apache.kafka.connect.errors.ConnectException: <snip> Caused by: org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Connection to postgres:5434 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections. <snip>"
12 }],
13 "type":"sink"
14}
Careful when using localhost in Docker. The term localhost in the host machine and in the Docker are different. In my case, I need to make the connection to the host machine localhost. Defining a static IP would solve the problem, but my IP is changed periodically, thus I need to use host.docker.internal keyword to point to the host localhost.
This keyword also used in my debezium-sink-config.json file:
1{
2 ...
3 "connection.url": "jdbc:postgresql://host.docker.internal:5434/sharingcdc-copy?user=mine&password=qwepoi123"
4 ...
5}
Now we have learned how to stream our database changes to Kafka topic and take advantage of it to replicate to another database. More examples are coming for this CDC topic. Hope this helps! 👋