ProtoBuf 字段中的集合的 Kryo 序列化问题

普加马宗达尔

我在使用 Kryo 序列化程序的 Spark (v1.6.1) 应用程序中收到来自 Kafka 的 protobuf 对象。protobuf 对象看起来像这样 -

  private A() {
          abc_ = "";
          xyz_ = "";
          ... some more fields
          aList_ = java.util.Collections.emptyList();
          ... some more fields
    }

当我运行 spark 应用程序时,它会为集合“aList_”抛出异常,并且出现以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 18.0 failed 4 times, most recent failure: Lost task 1.3 in stage 18.0 com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
    Serialization trace:
    aList_ (...packageName/...protoBufObject$A)
     at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
     at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228)
     at org.apache.spark.serializer.DeserializationStream.readValue(Serializer.scala:171)
     at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201)
     at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198)
     at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
     at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
     at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
     at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
     at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:152)
     at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:58)
     at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:83)
     at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
     at org.apache.spark.scheduler.Task.run(Task.scala:89)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
     at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.UnsupportedOperationException
         at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
         at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:102)
         at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
         at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
         ... 27 more

我在下面的链接中看到了类似的问题,但还没有解决方案。

Spark、Kryo 序列化问题与 ProtoBuf 字段

有没有其他人遇到过这个问题?

普加马宗达尔

如果有人遇到这个问题 - 我使用我另一篇文章中解释的方法让它工作 -如何在 Spark 代码中设置 Kryo 的不可修改集合序列化器

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章