如何使用SparkRunner重新调整Apache Beam

拉斐尔

我正在使用火花流道进行此模拟:

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();

Pipeline p = Pipeline.create(options);
p.apply(Create.of(1))
 .apply(ParDo.of(new DoFn<Integer, Integer>() {
                    @ProcessElement
                    public void apply(@Element Integer element, OutputReceiver<Integer> outputReceiver) {
                        IntStream.range(0, 4_000_000).forEach(outputReceiver::output);

                    }
                }))
.apply(Reshuffle.viaRandomKey())
.apply(ParDo.of(new DoFn<Integer, Integer>() {
                    @ProcessElement
                    public void apply(@Element Integer element, OutputReceiver<Integer> outputReceiver) {
                        try {
                            // simulate a rpc call of 10ms
                            Thread.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        outputReceiver.output(element);

                    }
                }));
PipelineResult result = p.run();
result.waitUntilFinish();

我正在运行,--runner=SparkRunner --sparkMaster=local[8]但改组后仅使用了1个线程。为什么Rechuffle不起作用?

如果我为此更改改组:

.apply(MapElements.into(kvs(integers(), integers())).via(e -> KV.of(e % 8, e)))
.apply(GroupByKey.create())
.apply(Values.create())
.apply(Flatten.iterables())

然后我得到8个线程运行。

BR,拉斐尔。

罗伯特

看起来Beam on Spark的Reshuffle归结为

https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java#L191

我在这种情况下,不知道这两个rdd.context().defaultParallelism()rdd.getNumPartitions()是1.我提起https://issues.apache.org/jira/browse/BEAM-10834进行调查。

同时,您可以按照指示使用GroupByKey获得所需的并行性。(如果您实际上没有整数,则可以尝试使用元素的哈希,Math.random()或什至递增计数器作为键)。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章