我们有以下依赖关系:
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % kafkaVersion
libraryDependencies += "io.confluent" % "kafka-streams-avro-serde" % confluentVersion
libraryDependencies += "io.confluent" % "kafka-schema-registry-client" % confluentVersion
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
libraryDependencies += "com.typesafe" % "config" % "1.4.0"
libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "3.0.4"
我们使用代码生成器从AVRO模式文件中生成Scala案例类。一个这样生成的案例类具有一个Either值作为其字段之一。在AVRO模式中,这是用type = [t1,t2]表示的,因此生成看起来很不错,即求和类型:可以是t1类型或t2类型。
问题变成从主题到案例类(二进制-> Avro Map->案例类)的反序列化路径上缺少什么。
基本上我现在收到此错误:
could not find implicit value for parameter consumed: org.apache.kafka.streams.scala.kstream.Consumed[String, custom.UserEvent]
[error] .stream[String, UserEvent]("schma.avsc")
首先想到的是kafka-streams-avro-serde,但可能是该库仅确保AVRO Map的Serde [GenericRecord]而不是case类。因此,其他依赖关系之一是帮助AVRO GenericRecord映射到案例类并返回。我们也有一些手写代码可以从模式中生成案例类,这似乎可以直接与Spray json一起使用。
我认为在(二进制<-> Avro GenericRecord <-> case class instance)转换中,存在间隙,并且可能是在case class中有一个Either字段的事实?
我现在正在尝试尝试创建Serde [UserEvent]实例。以我的理解,这将涉及在UserEvent和AVRO GenericRecord(类似于Map)之间进行转换,然后在AVRO Record与二进制文件之间进行转换-可能会被kafka-streams-avro-serde依赖项所覆盖,就像应该有一个Serde [GenericRecord]或类似。
明智地导入,我们可以用它来导入隐式:
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream.Consumed
实际上,缺少导入。现在可以编译了。这里是进口:
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句