Kafka Connect JDBC接收器连接器不起作用

Digvijay waghela

我正在尝试使用Kafka Connect JDBC接收器连接器将数据插入Oracle,但是它抛出错误。我尝试了该模式的所有可能配置。以下是示例。

如果我缺少以下内容,请提出建议,这些是我的配置文件和错误。

情况1-首先配置

internal.value.converter.schemas.enable=false .

所以我得到了

[2017-08-28 16:16:26,119] INFO Sink task WorkerSinkTask{id=oracle_sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:233)

[2017-08-28 16:16:26,606] INFO Discovered coordinator dfw-appblx097-01.prod.walmart.com:9092 (id: 2147483647 rack: null) for group connect-oracle_sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:597)

[2017-08-28 16:16:26,608] INFO Revoking previously assigned partitions [] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)

[2017-08-28 16:16:26,609] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)

[2017-08-28 16:16:27,174] INFO Successfully joined group connect-oracle_sink with generation 26 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)

[2017-08-28 16:16:27,176] INFO Setting newly assigned partitions [DJ-7, DJ-6, DJ-5, DJ-4, DJ-3, DJ-2, DJ-1, DJ-0, DJ-9, DJ-8] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)

[2017-08-28 16:16:28,580] ERROR Task oracle_sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)

org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: DJ

   at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:190)

   at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:58)

   at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:65)

   at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)

   at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)

   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)

   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)

   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

   at java.util.concurrent.FutureTask.run(FutureTask.java:266)

   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

   at java.lang.Thread.run(Thread.java:748)

第二配置-

internal.key.converter.schemas.enable=true

internal.value.converter.schemas.enable=true

日志:

[2017-08-28 16:23:50,993] INFO Revoking previously assigned partitions [] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)

[2017-08-28 16:23:50,993] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)

[2017-08-28 16:23:51,260] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)

[2017-08-28 16:23:51,381] INFO Successfully joined group connect-oracle_sink with generation 29 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)

[2017-08-28 16:23:51,384] INFO Setting newly assigned partitions [DJ-7, DJ-6, DJ-5, DJ-4, DJ-3, DJ-2, DJ-1, DJ-0, DJ-9, DJ-8] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)

[2017-08-28 16:23:51,727] ERROR Task oracle_sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

   at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)

Oracle connector.properties看起来像

name=oracle_sink

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector

tasks.max=1

topics=DJ

connection.url=jdbc:oracle:thin:@hostname:port:sid

connection.user=username

connection.password=password

#key.converter=org.apache.kafka.connect.json.JsonConverter

#value.converter=org.apache.kafka.connect.json.JsonConverter

auto.create=true

auto.evolve=true

Connect-Standalone.properties

我的JSON看起来像-

{"Item":"12","Sourcing Reason":"corr","Postal Code":"l45","OrderNum":"10023","Intended Node Distance":1125.8,"Chosen Node":"34556","Quantity":1,"Order Date":1503808765201,"Intended Node":"001","Chosen Node Distance":315.8,"Sourcing Logic":"reducesplits"}
罗宾·莫法特

根据文档

接收器连接器需要了解模式,因此您应该使用合适的转换器,例如,模式注册表附带的Avro转换器或启用了模式的JSON转换器

因此,如果您的数据是JSON,则将具有以下配置:

[...]
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
[...]

您在第二个实例中看到的错误是相关的--JsonConverter with schemas.enable requires "schema" and "payload" fields您共享的JSON不符合此必需格式。

这是带有schema的有效JSON消息的简单示例payload

{
    "schema": {
        "type": "struct",
        "fields": [{
            "type": "int32",
            "optional": true,
            "field": "c1"
        }, {
            "type": "string",
            "optional": true,
            "field": "c2"
        }, {
            "type": "int64",
            "optional": false,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "create_ts"
        }, {
            "type": "int64",
            "optional": false,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "update_ts"
        }],
        "optional": false,
        "name": "foobar"
    },
    "payload": {
        "c1": 10000,
        "c2": "bar",
        "create_ts": 1501834166000,
        "update_ts": 1501834166000
    }
}

您试图获取Oracle数据的来源是什么?如果是Kafka Connect入站,则只需使用相同的converter配置(Avro + Confluent Schema Registry)将更加容易和高效。如果是自定义应用程序,则需要将其设置为(a)使用Confluent Avro序列化程序,或(b)以上面要求的格式编写JSON,以提供与消息内联的有效负载模式。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

Kafka Connect JDBC接收器连接器

kafka connect - 使用 hdfs 接收器连接器的 ExtractTopic 转换抛出 NullPointerException

Kafka Connect AWS S3接收器连接器未从主题中读取

自定义Kafka Connect-ElasticSearch接收器连接器

Confluent Kafka Connect MQTT 源和接收器连接器的开源替代方案

使用Debezium和Kafka Connect JDBC接收器连接器同步数据库时,如何重命名主键?

重新启动kafka connect接收器和源连接器以从头开始读取

kafka-connect:连接器接收器cassandra的分布式配置中出现错误

Kafka Connect JDBC Sink连接器给WorkerSinkTask错误

Kafka Connect JDBC Source连接器是幂等的吗?

Kafka Connect与接收器流

Kafka Connect MQTT连接器的ClassNotFoundException

每个连接器的 Kafka Connect 日志

即使JSON数据包含架构和有效载荷字段,kafka connect hdfs接收器连接器也会失败

kafka mongodb 接收器连接器未启动

Kafka Connect JDBC接收器-存储来自KSQL的聚合数据

如何激活和配置ElasticSearch Kafka Connect接收器?

Kafka Connect接收器任务忽略公差极限

Kafka Connect:JDBC源连接器:创建具有多个分区的主题

Kafka Connect:多个DB2 JDBC源连接器失败

Kafka Connect JDBC源连接器不适用于Microsoft SQL Server

JDBC Sink 连接器 - 使用 kafka-connect 从多个主题插入多个表 - 跟进

Kafka 2.0-Kafka Connect接收器-创建Kafka生产者

Docker 容器中的 Kafka Connect:未添加连接器

Java中的Kafka Connect Dynamic连接器

Kafka Connect-如何删除连接器

在Kafka Connect Source连接器中使用消息密钥

指定Kafka Connect连接器插件版本

Debezium Kafka Connect连接器未成功更新