假设我们有两个微服务:service_A
和service_B
.
每个人在单个 Postgres 服务器实例中都有自己的数据库(db_a
和db_b
分别)(这只是一个临时环境,所以我们没有集群)。
也有另一种服务,service_debezium
(与嵌入式Debezium v1.6.1Final应监听的变化)db_a
和db_b
。所以基本上在这个服务中配置了两个 Debezium 引擎。
但不知何故service_debezium
不能同时听db_a
和db_b
。由于某种原因,它只侦听其中一个,并且没有错误日志。
此外,如果我配置service_debezium
(即它的 Debezium 引擎)来侦听db_a
或db_b
,它会按预期工作,因此我确定它们的配置属性是正确的,并且(当只有一个引擎时)一切正常。
database.dbname
的配置需要它,所以我想首选的方法是为每个数据库定义一个新的 Debezium 引擎。那是对的吗?以下是 中的 Debezium 配置service_debezium
:
db_a
配置豆: @Bean
public io.debezium.config.Configuration dbAConnector() throws IOException {
File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
return io.debezium.config.Configuration.create()
.with("name", "db_a_connector")
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
.with("offset.flush.interval.ms", "60000")
.with("database.hostname", "localhost")
.with("database.port", 5432)
.with("database.user", "postgres")
.with("database.password", "*****")
.with("database.dbname", "db_a")
.with("table.whitelist", "public.dummy_table,public.another_dummy_table")
.with("plugin.name", "pgoutput")
.with("slot.name", "db_a_connector")
.with("database.server.name", "db_a_server")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
.build();
}
db_b
配置豆: @Bean
public io.debezium.config.Configuration dbBConnector() throws IOException {
File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
return io.debezium.config.Configuration.create()
.with("name", "db_b_connector")
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
.with("offset.flush.interval.ms", "60000")
.with("database.hostname", localhost)
.with("database.port", 5432)
.with("database.user", "postgres")
.with("database.password", "*****")
.with("database.dbname", "db_b")
.with("table.whitelist", "public.yet_another_table")
.with("plugin.name", "pgoutput")
.with("slot.name", "db_b_connector")
.with("database.server.name", "db_b_server")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
.build();
}
以下是我在 中创建 Debezium 引擎实例的方法service_debezium
:
db_a
听众:@Component
public class DBAListener {
public DBAListener(
@Qualifier("dbAConnector") Configuration connectorConfiguration /*, ... other services */) {
this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(connectorConfiguration.asProperties())
.notifying(this::handleChangeEvent)
.build();
// ...
}
// ...
}
db_b
听众:@Component
public class DBBListener {
public DBBListener(
@Qualifier("dbBConnector") Configuration connectorConfiguration /*, ... other services */) {
this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(connectorConfiguration.asProperties())
.notifying(this::handleChangeEvent)
.build();
// ...
}
// ...
}
postgresql.conf
:# ...other conf
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
# max number of walsender processes (i.e. # of replica databases + # of Debezium engines) (change requires restart)
# Abrupt streaming client disconnection might cause an orphaned connection slot until a timeout is reached,
# so this parameter should be set slightly higher than the maximum number of expected clients
# so disconnected clients can immediately reconnect
max_wal_senders = 3
max_replication_slots = 3 # max number of replication slots (change requires restart)
提前致谢!
更新:根据@Yuri Niitsuma 的回答更新了代码片段以提供完整示例。
创建 debezium 连接器时,它会创建一个默认名称为“debezium”的复制槽。然后您尝试创建另一个实例并尝试创建一个具有相同名称的复制槽,并且不能使用相同的复制槽同时使用两个实例,这将引发错误。这是一个糟糕的解释,但我会给出解决方案。
在每个连接器上添加此配置:
在 dbAConnector 上
.with("slot.name", "dbAConnector")
和 dbBConnector
.with("slot.name", "dbBConnector")
您可以使用以下命令列出可用的复制槽:
SELECT * FROM pg_replication_slots;
您可以删除未使用的复制槽,例如默认名称“debezium”:
SELECT pg_drop_replication_slot('debezium');
因为当没有人会消耗这个插槽时会消耗磁盘。
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句