如何在单个 Postgres 服务器中为多个数据库使用嵌入式 Debezium?

emrekgn

假设我们有两个微服务:service_Aservice_B.
每个人在单个 Postgres 服务器实例中都有自己的数据库(db_adb_b分别)(这只是一个临时环境,所以我们没有集群)。

也有另一种服务,service_debezium(与嵌入式Debezium v1.6.1Final应监听的变化)db_adb_b所以基本上在这个服务中配置了两个 Debezium 引擎。

但不知何故service_debezium不能同时听db_adb_b由于某种原因,它只侦听其中一个,并且没有错误日志。

此外,如果我配置service_debezium(即它的 Debezium 引擎)来侦听db_adb_b,它会按预期工作,因此我确定它们的配置属性是正确的,并且(当只有一个引擎时)一切正常。

  1. 那么为什么我们不能使用多个 Debezium 引擎来侦听单个 Postgres 服务器中的多个数据库呢?我在这里缺少什么?
  2. 我认为的另一种选择是只使用一个 Debezium 引擎来侦听该 Postgres 服务器实例中的所有数据库,但显然它database.dbname的配置需要它,所以我想首选的方法是为每个数据库定义一个新的 Debezium 引擎。那是对的吗?

以下是 中的 Debezium 配置service_debezium

  • db_a 配置豆:
  @Bean
  public io.debezium.config.Configuration dbAConnector() throws IOException {
    File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
    File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
    return io.debezium.config.Configuration.create()
        .with("name", "db_a_connector")
        .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
        .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
        .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
        .with("offset.flush.interval.ms", "60000")
        .with("database.hostname", "localhost")
        .with("database.port", 5432)
        .with("database.user", "postgres")
        .with("database.password", "*****")
        .with("database.dbname", "db_a")
        .with("table.whitelist", "public.dummy_table,public.another_dummy_table")
        .with("plugin.name", "pgoutput")
        .with("slot.name", "db_a_connector")
        .with("database.server.name", "db_a_server")
        .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
        .with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
        .build();
  }
  • db_b 配置豆:
  @Bean
  public io.debezium.config.Configuration dbBConnector() throws IOException {
    File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
    File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
    return io.debezium.config.Configuration.create()
        .with("name", "db_b_connector")
        .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
        .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
        .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
        .with("offset.flush.interval.ms", "60000")
        .with("database.hostname", localhost)
        .with("database.port", 5432)
        .with("database.user", "postgres")
        .with("database.password", "*****")
        .with("database.dbname", "db_b")
        .with("table.whitelist", "public.yet_another_table")
        .with("plugin.name", "pgoutput")
        .with("slot.name", "db_b_connector")
        .with("database.server.name", "db_b_server")
        .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
        .with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
        .build();
  }

以下是我在 中创建 Debezium 引擎实例的方法service_debezium

  • db_a 听众:
@Component
public class DBAListener {

  public DBAListener(
      @Qualifier("dbAConnector") Configuration connectorConfiguration /*, ... other services */) {

    this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
        .using(connectorConfiguration.asProperties())
        .notifying(this::handleChangeEvent)
        .build();

    // ...
  }

  // ...

}
  • 和类似的db_b听众:
@Component
public class DBBListener {

  public DBBListener(
      @Qualifier("dbBConnector") Configuration connectorConfiguration /*, ... other services */) {

    this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
        .using(connectorConfiguration.asProperties())
        .notifying(this::handleChangeEvent)
        .build();

    // ...
  }

  // ...

}
  • postgresql.conf
# ...other conf

wal_level = logical             # minimal, archive, hot_standby, or logical (change requires restart)
# max number of walsender processes (i.e. # of replica databases + # of Debezium engines) (change requires restart)
# Abrupt streaming client disconnection might cause an orphaned connection slot until a timeout is reached, 
# so this parameter should be set slightly higher than the maximum number of expected clients 
# so disconnected clients can immediately reconnect
max_wal_senders = 3
max_replication_slots = 3       # max number of replication slots (change requires restart)

提前致谢!

更新:根据@Yuri Niitsuma 的回答更新了代码片段以提供完整示例

新津由里

创建 debezium 连接器时,它会创建一个默认名称为“debezium”的复制槽。然后您尝试创建另一个实例并尝试创建一个具有相同名称的复制槽,并且不能使用相同的复制槽同时使用两个实例,这将引发错误。这是一个糟糕的解释,但我会给出解决方案。

在每个连接器上添加此配置:

在 dbAConnector 上

.with("slot.name", "dbAConnector")

和 dbBConnector

.with("slot.name", "dbBConnector")

您可以使用以下命令列出可用的复制槽:

SELECT * FROM pg_replication_slots;

您可以删除未使用的复制槽,例如默认名称“debezium”:

SELECT pg_drop_replication_slot('debezium');

因为当没有人会消耗这个插槽时会消耗磁盘。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何设置Debezium卡夫卡在泊坞运行MSSQL服务器

无法为SqlServer连接器设置Debezium嵌入式引擎

如何从Debezium创建的Avro消息中获取字段?

如何确定数据库服务器或嵌入式数据库是否合适

如何在Kotlin中管理单元测试资源,例如启动/停止数据库连接或嵌入式Elasticsearch服务器?

如何在postgres数据库中创建单个表的备份?

如何在单个服务器上将Hibernate与多个数据库一起使用

如何使用debezium从Postgres流式传输更改

我应该如何在无服务器环境中管理postgres数据库句柄?

如何从debezium kafka connect接收的CDC事件中获取表名和数据库名

如何在一个数据库服务器/容器上创建多个数据库

Debezium postgres错误:不建议使用参数“ include-unchanged-toast”

返回日期为ISO-8601格式-Debezium / Postgres插件

Debezium Postgres接收器连接器无法插入类型为DATE的值

Postgres Debezium不发布记录的先前状态

debezium无法使用带有默认插件pgoutput的postgres 11访问文件“ decoderbufs”

使用Debezium审核数据更改

为数据库中的多个表配置一个debezium连接器

如何在单个服务器上运行多个Neo4j数据库?

如何在另一个ubuntu安装中使用单个文件导入多个postgres数据库

带 kafka 的 Debezium 还是仅嵌入的 Debezium?

无法从 debezium-postgres 的 kafka-stream 读取 kafka-stream 数据

使用 cursor.description 如何获取不同服务器中多个数据库的列名

Debezium postgres kafka 连接器因 Java 堆空间问题而失败

debezium postgres 连接器中的毫秒时间戳

如何使用来自 Quarkus 的 Debezium 主题的消息?

如何在 Kafka connect 中升级我的 Debezium 插件版本?

如何在 Kafka Source Connector(例如 Debezium)中禁用 JSON 模式

如何使用单个日期查询 Postgres DateRange?