我正在编写一个数据流管道,该管道将从Google Pub / Sub读取并将数据写入Google Cloud Storage:
pipeline.apply(marketData)
.apply(ParDo.of(new PubsubMessageToByteArray()))
.apply(ParDo.of(new ByteArrayToString()))
.apply(ParDo.of(new StringToMarketData()))
.apply(ParDo.of(new AddTimestamps()))
.apply(Window.<MarketData>into(FixedWindows.of(Duration.standardMinutes(options.getMinutesPerWindow())))
.withAllowedLateness(Duration.standardSeconds(options.getAllowedSecondLateness()))
.accumulatingFiredPanes())
.apply(ParDo.of(new MarketDataToCsv()))
.apply("Write File(s)", TextIO
.write()
.to(options.getOutputDirectory())
.withWindowedWrites()
.withNumShards(1)
.withFilenamePolicy(new WindowedFilenamePolicy(outputBaseDirectory))
.withHeader(csvHeader));
pipeline.run().waitUntilFinish();
我想在输出结果之前对窗口中的元素进行重复数据删除和排序。这与典型的PTransform不同,我希望在窗口结束后执行转换。
“发布/订阅”主题将重复,因为如果一个工作程序失败,则多个工作程序会产生相同的消息。如何在写入之前删除窗口中的所有重复项?我看到Beam版本0.2中存在RemoveDuplicates类,但当前版本中不存在。
我了解到,在后台,Beam使跨工人的PTransform并行化。但是由于此管道写入withNumShards(1)
,只有一个工作人员将写入最终结果。这意味着从理论上讲,应该有可能让该工作人员在编写之前应用重复数据删除转换。
Beam python sdk仍然具有RemoveDuplicates方法,因此我可以在Java中重现该逻辑,但是为什么要删除它,除非有更好的方法?我以为该实现将是在某些窗口触发器之后执行的重复数据删除ParDo。
编辑:GroupByKey和SortValues看起来像他们会做我所需要的。我正在尝试使用这些。
这是重复数据删除部分的答案:
.apply(Distinct
// MarketData::key produces a String. Use withRepresentativeValue()
// because Apache beam deserializes Java objects into bytes, which
// could cause two equal objects to be interpreted as not equal. See
// org/apache/beam/sdk/transforms/Distinct.java for details.
.withRepresentativeValueFn(MarketData::key)
.withRepresentativeType(TypeDescriptor.of(String.class)))
这是用于元素排序和重复数据删除的解决方案(如果还需要排序):
public static class DedupAndSortByTime extends
Combine.CombineFn<MarketData, TreeSet<MarketData>, List<MarketData>> {
@Override
public TreeSet<MarketData> createAccumulator() {
return new TreeSet<>(Comparator
.comparingLong(MarketData::getEventTime)
.thenComparing(MarketData::getOrderbookType));
}
@Override
public TreeSet<MarketData> addInput(TreeSet<MarketData> accum, MarketData input) {
accum.add(input);
return accum;
}
@Override
public TreeSet<MarketData> mergeAccumulators(Iterable<TreeSet<MarketData>> accums) {
TreeSet<MarketData> merged = createAccumulator();
for (TreeSet<MarketData> accum : accums) {
merged.addAll(accum);
}
return merged;
}
@Override
public List<MarketData> extractOutput(TreeSet<MarketData> accum) {
return Lists.newArrayList(accum.iterator());
}
}
所以更新的管道是
// Pipeline
pipeline.apply(marketData)
.apply(ParDo.of(new MarketDataDoFns.PubsubMessageToByteArray()))
.apply(ParDo.of(new MarketDataDoFns.ByteArrayToString()))
.apply(ParDo.of(new MarketDataDoFns.StringToMarketDataAggregate()))
.apply(ParDo.of(new MarketDataDoFns.DenormalizeMarketDataAggregate()))
.apply(ParDo.of(new MarketDataDoFns.AddTimestamps()))
.apply(Window.<MarketData>into(FixedWindows.of(Duration.standardMinutes(options.getMinutesPerWindow())))
.withAllowedLateness(Duration.standardSeconds(options.getAllowedSecondLateness()))
.accumulatingFiredPanes())
.apply(Combine.globally(new MarketDataCombineFn.DedupAndSortByTime()).withoutDefaults())
.apply(ParDo.of(new MarketDataDoFns.MarketDataToCsv()))
.apply("Write File(s)", TextIO
.write()
// This doesn't set the output directory as expected.
// "/output" gets stripped and I don't know why,
// so "/output" has to be added to the directory path
// within the FilenamePolicy.
.to(options.getOutputDirectory())
.withWindowedWrites()
.withNumShards(1)
.withFilenamePolicy(new MarketDataFilenamePolicy.WindowedFilenamePolicy(outputBaseDirectory))
.withHeader(csvHeader));
pipeline.run().waitUntilFinish();
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句