在输出窗口之前,如何将转换应用于无边界Apache Beam管道的窗口中的所有元素?

skeller88

我正在编写一个数据流管道,该管道将从Google Pub / Sub读取并将数据写入Google Cloud Storage:

    pipeline.apply(marketData)
        .apply(ParDo.of(new PubsubMessageToByteArray()))
        .apply(ParDo.of(new ByteArrayToString()))
        .apply(ParDo.of(new StringToMarketData()))
        .apply(ParDo.of(new AddTimestamps()))
        .apply(Window.<MarketData>into(FixedWindows.of(Duration.standardMinutes(options.getMinutesPerWindow())))
                .withAllowedLateness(Duration.standardSeconds(options.getAllowedSecondLateness()))
                .accumulatingFiredPanes())
        .apply(ParDo.of(new MarketDataToCsv()))
        .apply("Write File(s)", TextIO
                .write()
                .to(options.getOutputDirectory())
                .withWindowedWrites()
                .withNumShards(1)
                .withFilenamePolicy(new WindowedFilenamePolicy(outputBaseDirectory))
                .withHeader(csvHeader));

    pipeline.run().waitUntilFinish();

我想在输出结果之前对窗口中的元素进行重复数据删除和排序。这与典型的PTransform不同,我希望在窗口结束后执行转换。

“发布/订阅”主题将重复,因为如果一个工作程序失败,则多个工作程序会产生相同的消息。如何在写入之前删除窗口中的所有重复项?我看到Beam版本0.2中存在RemoveDuplicates类,但当前版本中不存在。

我了解到,在后台,Beam使跨工人的PTransform并行化。但是由于此管道写入withNumShards(1),只有一个工作人员将写入最终结果。这意味着从理论上讲,应该有可能让该工作人员在编写之前应用重复数据删除转换。

Beam python sdk仍然具有RemoveDuplicates方法,因此我可以在Java中重现该逻辑,但是为什么要删除它,除非有更好的方法?我以为该实现将是在某些窗口触发器之后执行的重复数据删除ParDo。

编辑:GroupByKeySortValues看起来像他们会做我所需要的。我正在尝试使用这些。

skeller88

这是重复数据删除部分的答案:

.apply(Distinct
 // MarketData::key produces a String. Use withRepresentativeValue() 
 // because Apache beam deserializes Java objects into bytes, which 
 // could cause two equal objects to be interpreted as not equal. See 
 // org/apache/beam/sdk/transforms/Distinct.java for details. 
 .withRepresentativeValueFn(MarketData::key)
 .withRepresentativeType(TypeDescriptor.of(String.class)))

这是用于元素排序和重复数据删除的解决方案(如果还需要排序):

public static class DedupAndSortByTime extends 
        Combine.CombineFn<MarketData, TreeSet<MarketData>, List<MarketData>> {
    @Override
    public TreeSet<MarketData> createAccumulator() {
        return new TreeSet<>(Comparator
                .comparingLong(MarketData::getEventTime)
                .thenComparing(MarketData::getOrderbookType));
    }

    @Override
    public TreeSet<MarketData> addInput(TreeSet<MarketData> accum, MarketData input) {
        accum.add(input);
        return accum;
    }

    @Override
    public TreeSet<MarketData> mergeAccumulators(Iterable<TreeSet<MarketData>> accums) {

        TreeSet<MarketData> merged = createAccumulator();
        for (TreeSet<MarketData> accum : accums) {
            merged.addAll(accum);
        }
        return merged;
    }

    @Override
    public List<MarketData> extractOutput(TreeSet<MarketData> accum) {
        return Lists.newArrayList(accum.iterator());
    }
}

所以更新的管道是

    // Pipeline
    pipeline.apply(marketData)
        .apply(ParDo.of(new MarketDataDoFns.PubsubMessageToByteArray()))
        .apply(ParDo.of(new MarketDataDoFns.ByteArrayToString()))
        .apply(ParDo.of(new MarketDataDoFns.StringToMarketDataAggregate()))
        .apply(ParDo.of(new MarketDataDoFns.DenormalizeMarketDataAggregate()))
        .apply(ParDo.of(new MarketDataDoFns.AddTimestamps()))
        .apply(Window.<MarketData>into(FixedWindows.of(Duration.standardMinutes(options.getMinutesPerWindow())))
                .withAllowedLateness(Duration.standardSeconds(options.getAllowedSecondLateness()))
                .accumulatingFiredPanes())
        .apply(Combine.globally(new MarketDataCombineFn.DedupAndSortByTime()).withoutDefaults())
        .apply(ParDo.of(new MarketDataDoFns.MarketDataToCsv()))
        .apply("Write File(s)", TextIO
                .write()
                // This doesn't set the output directory as expected. 
                // "/output" gets stripped and I don't know why,
                // so "/output" has to be added to the directory path 
                // within the FilenamePolicy.
                .to(options.getOutputDirectory())
                .withWindowedWrites()
                .withNumShards(1)
                .withFilenamePolicy(new MarketDataFilenamePolicy.WindowedFilenamePolicy(outputBaseDirectory))
                .withHeader(csvHeader));

    pipeline.run().waitUntilFinish();

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

Apache Beam with Python:如何在会话窗口中计算最小值,并将其应用于所有相关的PCollections

如何将脚本应用于所有模态弹出窗口

如何将AND应用于列表的所有元素?

如何在窗口中定义中间线并使所有元素与该线对齐

pywinauto:遍历窗口中的所有控件

如何在python Apache Beam中的窗口中排序元素?

如何移动无边界窗口?

使用AppleScript列出窗口中所有UI元素的名称(GUI脚本)

调用嵌套函数会弄乱窗口中元素的所有对正

如何将OperationContract应用于接口中的所有方法

git-如何将转换应用于所有过去的提交消息?

WPF-如何将转换器应用于所有DataGridTextColumn?

将Python函数应用于所有元素以输出向量(从R转换)

如何将ScikitLearn分类器应用于大图像中的图块/窗口

如何将多处理应用于滑动窗口

如何将groupBy和聚合函数应用于PySpark DataFrame中的特定窗口?

如何将函数应用于列表中的每个元素,然后列出输出列表?

如何将样式应用于元素的所有子元素

如何将命令应用于管道输出中的每一行并添加所有这些数字?

如何从XMonad中的所有浮动窗口中删除边框

如何记录在Powershell窗口中编写的所有内容?

如何更改窗口中所有按钮的属性?

如何在XAML WPF窗口中具有动态元素

如何将ljust()转换应用于字符串列表的每个元素?

如何将转换应用于相同的元素但处于不同的状态:悬停

如何将函数应用于数组中的所有元素(在C ++模板类中)

如何将逻辑运算符应用于Java中的所有元素

如何将CSS高度设置应用于所有子元素?

如何将 .roundSlider() 应用于 Array 的所有元素?