连接到现有的Kafka代理时,ksqlDB服务器无法启动

jiexun

我在<myurl>:9092有一个现有的kafka经纪人。该代理正在运行Apache Kafka版本2.2.0。我想使用ksqlDB对来自该kafka代理中主题的数据进行一些流处理。因此,根据https://docs.confluent.io/platform/current/installation/versions-interoperability.html上的兼容性表,我使用的是Confluent Platform 5.2版的ksqlDB

bootstrap.servers=<myurl>:9092进去了ksql/ksql-server.properties

但是,当我尝试通过运行启动ksql-server时ksql-server-start etc/ksql/ksql-server.properties,出现以下错误:

ERROR Failed to start KSQL (io.confluent.ksql.rest.server.KsqlServerMain:53)
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition _confluent-ksql-default__command_topic-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.confluent.ksql.rest.server.computation.Command`, problem: `java.lang.NullPointerException`
 at [Source: (byte[])"("statement":"CREATE STREAM KSQL_PROCESSING_LOG (logger VARCHAR, level VARCHAR, time BIGINT, message STRUCT<type INT, deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, productionError STRUCT<errorMessage VARCHAR>>) WITH(KAFKA_TOPIC='default_ksql_processing_log', VALUE_FORMAT='JSON');","originalProperties":{"ksql.extension.dir":"ext","ksql.streams.ca"[truncated 3011 bytes]; line: 1, column: 3511]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.confluent.ksql.rest.server.computation.Command`, problem: `java.lang.NullPointerException`
 at [Source: (byte[])"("statement":"CREATE STREAM KSQL_PROCESSING_LOG (logger VARCHAR, level VARCHAR, time BIGINT, message STRUCT<type INT, deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, productionError STRUCT<errorMessage VARCHAR>>) WITH(KAFKA_TOPIC='default_ksql_processing_log', VALUE_FORMAT='JSON');","originalProperties":{"ksql.extension.dir":"ext","ksql.streams.ca"[truncated 3011 bytes]; line: 1, column: 3511]
...

如果我使用本地代理并设置bootstrap.servers=localhost:9092,则ksql-server可以正常启动。

我如何解决此空记录/反序列化问题,以便将ksqldb服务器连接到现有的kafka代理?

罗宾·莫法特

正如@OneCricketeer指出的那样,问题是由群集上现有的命令主题引起的。您可以通过ksql.service.id在ksqlDB服务器属性中更改来使用新的命令主题请参阅配置ksqlDB服务器

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

启动hadoop时没有错误,但无法连接到服务器

无法使用Xpra托管服务器,仅连接到现有服务器

我需要使用go lang连接到现有的websocket服务器

使用 xrdp 隧道连接到现有的 X 服务器

如何将 Xcode 项目连接到现有的 MsSql 服务器

如何使用Java连接到现有的Kubernetes服务器并列出所有Pod?

如何将现有的 nodejs 服务器应用程序连接到 Azure SQL 数据库

无法连接到Kafka服务器的TCP套接字

通过代理连接到服务器

Mongodb:首次连接时无法连接到服务器

Mongo在首次连接时无法连接到服务器

如何启动新的 http 服务器或将现有的服务器用于 pprof?

通过SSH连接到没有运行SSH服务器的计算机,或者如何重用现有的ssh连接反向

JavaMail-使用代理服务器,因为在连接到办公室网络时无法读取gmail代码

执行Cordova仿真时无法连接到服务器

postgres服务器已启动,但无法连接到服务器

有时无法使用域名连接到ubuntu 14.04 LTS服务器

部署到现有的Apache Tomcat实例时,无法使JetBrains许可证服务器运行

如何逐步连接到现有的 Kafka MSK 集群?

将服务连接到现有的流星帐户

如何使用现有的tcp连接从bash脚本访问http服务器?

Python Kafka 客户端无法连接到远程 Kafka 服务器

kafka节点无法在本地环境中连接到kafka服务器

无法连接到PostgreSQL服务器

Ionic无法连接到服务器

无法连接到Postgres服务器

RPostgreSQL无法连接到服务器

Postgres无法连接到服务器

无法连接到链接的服务器