JavaDStream将Lambda中的RDD打印到控制台

马丁·布里西亚克

我是Spark的新手,我正在尝试创建简单的JavaDStream来使用spark-testing-base API测试我的工作。到目前为止,我所做的是:

    JavaStreamingContext streamingContext = new 
          JavaStreamingContext(jsc(),Durations.seconds(10));
    List<String> list = new LinkedList<String>();
    list.add("first");
    list.add("second");
    list.add("third");
    JavaRDD<String> myVeryOwnRDD = jsc().parallelize(list);
    Queue<JavaRDD<String>> queue = new LinkedList<JavaRDD<String>>();
    queue.add( myVeryOwnRDD );
    JavaDStream<String> javaDStream = streamingContext.queueStream( queue );

    javaDStream.foreachRDD( x-> {
        x.collect().stream().forEach(n-> System.out.println("item of list: "+n));
    });

我希望它能打印我的清单..没有。我得到了它:

12:19:05.454 [main] DEBUG org.apache.spark.util.ClosureCleaner - +++ Cleaning closure <function1> (org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3) +++
12:19:05.468 [main] DEBUG org.apache.spark.util.ClosureCleaner -  + declared fields: 3
12:19:05.469 [main] DEBUG org.apache.spark.util.ClosureCleaner -      public static final long org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.serialVersionUID
12:19:05.469 [main] DEBUG org.apache.spark.util.ClosureCleaner -      private final org.apache.spark.streaming.api.java.JavaDStreamLike org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.$outer
12:19:05.469 [main] DEBUG org.apache.spark.util.ClosureCleaner -      private final org.apache.spark.api.java.function.VoidFunction org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.foreachFunc$3
12:19:05.469 [main] DEBUG org.apache.spark.util.ClosureCleaner -  + declared methods: 2
12:19:05.470 [main] DEBUG org.apache.spark.util.ClosureCleaner -      public final java.lang.Object org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(java.lang.Object)
12:19:05.470 [main] DEBUG org.apache.spark.util.ClosureCleaner -      public final void org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(org.apache.spark.rdd.RDD)
12:19:05.470 [main] DEBUG org.apache.spark.util.ClosureCleaner -  + inner classes: 0
12:19:05.471 [main] DEBUG org.apache.spark.util.ClosureCleaner -  + outer classes: 1
12:19:05.472 [main] DEBUG org.apache.spark.util.ClosureCleaner -      org.apache.spark.streaming.api.java.JavaDStreamLike
12:19:05.472 [main] DEBUG org.apache.spark.util.ClosureCleaner -  + outer objects: 1
12:19:05.473 [main] DEBUG org.apache.spark.util.ClosureCleaner -      org.apache.spark.streaming.api.java.JavaDStream@7209ffb5
12:19:05.474 [main] DEBUG org.apache.spark.util.ClosureCleaner -  + populating accessed fields because this is the starting closure
12:19:05.478 [main] DEBUG org.apache.spark.util.ClosureCleaner -  + fields accessed by starting closure: 1
12:19:05.479 [main] DEBUG org.apache.spark.util.ClosureCleaner -      (interface org.apache.spark.streaming.api.java.JavaDStreamLike,Set())
12:19:05.479 [main] DEBUG org.apache.spark.util.ClosureCleaner -  + outermost object is not a closure, so do not clone it: (interface org.apache.spark.streaming.api.java.JavaDStreamLike,org.apache.spark.streaming.api.java.JavaDStream@7209ffb5)
12:19:05.480 [main] DEBUG org.apache.spark.util.ClosureCleaner -  +++ closure <function1> (org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3) is now cleaned +++
12:19:05.481 [main] DEBUG org.apache.spark.util.ClosureCleaner - +++ Cleaning closure <function2> (org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3) +++
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner -  + declared fields: 2
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner -      public static final long org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.serialVersionUID
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner -      private final scala.Function1 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.cleanedF$1
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner -  + declared methods: 2
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner -      public final java.lang.Object org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(java.lang.Object,java.lang.Object)
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner -      public final void org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(org.apache.spark.rdd.RDD,org.apache.spark.streaming.Time)
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner -  + inner classes: 0
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner -  + outer classes: 0
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner -  + outer objects: 0
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner -  + populating accessed fields because this is the starting closure
12:19:05.483 [main] DEBUG org.apache.spark.util.ClosureCleaner -  + fields accessed by starting closure: 0
12:19:05.483 [main] DEBUG org.apache.spark.util.ClosureCleaner -  + there are no enclosing objects!
12:19:05.483 [main] DEBUG org.apache.spark.util.ClosureCleaner -  +++ closure <function2> (org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3) is now cleaned +++

我想念什么吗?PS:给定的输出正好在我的打印清单应该放在的地方,并且我正在使用Spring Unit Tests做我的工作:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = config.class)
public class myTester extends SharedJavaSparkContext implements Serializable{
马斯格

我想您需要启动流上下文。

streamingContext.start()

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章