我有一些相当简单的流代码,它们通过时间窗口聚合数据。窗口偏大(1小时,限制为2小时),流中的值是来自数百个服务器的度量。我的内存不足,因此添加了RocksDBStateBackend
。这导致JVM出现段错误。接下来,我尝试了FsStateBackend
。这两个后端都从未将任何数据写入磁盘,而只是使用JobID创建了一个目录。我在独立模式下(未部署)运行此代码。是否对为什么State Backends不写数据以及为什么即使有8GB的堆却耗尽了内存有什么想法?
final SingleOutputStreamOperator<Metric> metricStream =
objectStream.map(node -> new Metric(node.get("_ts").asLong(), node.get("_value").asDouble(), node.get("tags"))).name("metric stream");
final WindowedStream<Metric, String, TimeWindow> hourlyMetricStream = metricStream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Metric>(Time.hours(2)) { // set how long metrics can come late
@Override
public long extractTimestamp(final Metric metric) {
return metric.get_ts() * 1000; // needs to be in ms since Java epoch
}
})
.keyBy(metric -> metric.getMetricName()) // key the stream so we can run the windowing in parallel
.timeWindow(Time.hours(1)); // setup the time window for the bucket
// create a stream for each type of aggregation
hourlyMetricStream.sum("_value") // we want to sum by the _value
.addSink(new MetricStoreSinkFunction(parameters, "sum"))
.name("hourly sum stream")
.setParallelism(6);
hourlyMetricStream.aggregate(new MeanAggregator())
.addSink(new MetricStoreSinkFunction(parameters, "mean"))
.name("hourly mean stream")
.setParallelism(6);
hourlyMetricStream.aggregate(new ReMedianAggregator())
.addSink(new MetricStoreSinkFunction(parameters, "remedian"))
.name("hourly remedian stream")
.setParallelism(6);
env.execute("flink test");
很难说出为什么除非您有大量的度量标准名称(除非我根据您发布的代码提供了唯一的解释),否则为什么会耗尽内存。
关于磁盘写入,默认情况下,RocksDB实际上将为其实际数据库文件使用一个临时目录。您还可以在配置期间传递一个显式目录。您可以通过致电state.setDbStoragePath(someDirectory)
令人困惑的是FSStateBackend
,实际上仅在检查点期间写入磁盘,否则它完全基于堆。因此,如果未启用检查点,则目录中可能看不到任何内容。这样就可以解释为什么使用FSStateBackend时仍然可能会用尽内存。
假设您确实有RocksDB(或任何)状态后端在工作,则可以通过执行以下操作来启用检查点:
env.enableCheckpointing(5000); // value is in MS, so however frequently you want to checkpoint
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(5000); // this is to help prevent your job from making progress if checkpointing takes a bit. For large state checkpointing can take multiple seconds
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句