java.lang.ClassCastException:org.apache.avro.generic.GenericData $ Record无法转换为java.lang.String

肉山熊猫

我正在执行kafka消费者程序以从主题读取avro格式的数据。合并通用记录后,我将遍历通用记录并获取通用record.value()。我想将值转换为字符串但失败。

 def getProp():Properties = {

    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers)
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, serializer)
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid)
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffset)
    //props.put("specific.avro.reader", specificAvroReader)
    props.put("schema.registry.url", schemaRegestry)
    props.put("consumer-timeout-ms", "30000")
    props
  }

def consume(props: Properties, spark: SparkSession) = {
    val conSumer = new KafkaConsumer[String, String](props)
    conSumer.subscribe(util.Collections.singletonList(topic))
    while (true) {
      val records: ConsumerRecords[String,String] = conSumer.poll(100)
      for(record <- records.asScala){
        val m:String = record.value() ```

error:-
20/02/01 05:04:55 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [xx1, xx2, xx3, xx4] for group xxxxxx
Exception in thread "main" java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.String
        at com.hbc.IntellicheckConsumer$$anonfun$consume$1.apply(TestScala.scala:52)
        at com.hbc.IntellicheckConsumer$$anonfun$consume$1.apply(TestScala.scala:49)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at com.hbc.IntellicheckConsumer.consume(TestScala.scala:49)
        at com.hbc.TestScala$.main(TestScala.scala:93)
        at com.hbc.TestScala.main(TestScala.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
20/02/01 05:04:55 INFO spark.SparkContext: Invoking stop() from shutdown hook
20/02/01 05:04:55 INFO server.AbstractConnector: Stopped Spark@33aecef7{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
20/02/01 05:04:55 INFO ui.SparkUI: Stopped Spark web UI at http://172.1

板球运动员

您是在告诉消费者想要字符串

new KafkaConsumer[String, String](props)
ConsumerRecords[String,String]

相反,您可能想要

new KafkaConsumer[String, GenericRecord](props)
ConsumerRecords[String,GenericRecord]

有什么方法可以读取Spark数据框中的那些Avor记录器以进行进一步处理

好吧,您必须重写所有代码才能实际使用Spark结构化流

您不需要spark-submit只运行Scala代码

我收到的值是嵌套json的形式

不确定将JSON字符串放入字段中时为何使用Avro

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章