Flink内存不足

wspeirs

我有一些相当简单的流代码,它们通过时间窗口聚合数据。窗口偏大(1小时,限制为2小时),流中的值是来自数百个服务器的度量。我的内存不足,因此添加了RocksDBStateBackend这导致JVM出现段错误。接下来,我尝试了FsStateBackend这两个后端都从未将任何数据写入磁盘,而只是使用JobID创建了一个目录。我在独立模式下(未部署)运行此代码。是否对为什么State Backends不写数据以及为什么即使有8GB的堆却耗尽了内存有什么想法?

    final SingleOutputStreamOperator<Metric> metricStream =
            objectStream.map(node -> new Metric(node.get("_ts").asLong(), node.get("_value").asDouble(), node.get("tags"))).name("metric stream");

    final WindowedStream<Metric, String, TimeWindow> hourlyMetricStream = metricStream
            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Metric>(Time.hours(2)) { // set how long metrics can come late
                @Override
                public long extractTimestamp(final Metric metric) {
                    return metric.get_ts() * 1000; // needs to be in ms since Java epoch
                }
            })
            .keyBy(metric -> metric.getMetricName()) // key the stream so we can run the windowing in parallel
            .timeWindow(Time.hours(1)); // setup the time window for the bucket

    // create a stream for each type of aggregation
    hourlyMetricStream.sum("_value") // we want to sum by the _value
    .addSink(new MetricStoreSinkFunction(parameters, "sum"))
    .name("hourly sum stream")
    .setParallelism(6);

    hourlyMetricStream.aggregate(new MeanAggregator())
    .addSink(new MetricStoreSinkFunction(parameters, "mean"))
    .name("hourly mean stream")
    .setParallelism(6);

    hourlyMetricStream.aggregate(new ReMedianAggregator())
    .addSink(new MetricStoreSinkFunction(parameters, "remedian"))
    .name("hourly remedian stream")
    .setParallelism(6);

    env.execute("flink test");
约书亚·德瓦尔德

很难说出为什么除非您有大量的度量标准名称(除非我根据您发布的代码提供了唯一的解释),否则为什么会耗尽内存。

关于磁盘写入,默认情况下,RocksDB实际上将为其实际数据库文件使用一个临时目录。您还可以在配置期间传递一个显式目录。您可以通过致电state.setDbStoragePath(someDirectory)

令人困惑的是FSStateBackend,实际上仅在检查点期间写入磁盘,否则它完全基于堆。因此,如果未启用检查点,则目录中可能看不到任何内容。这样就可以解释为什么使用FSStateBackend时仍然可能会用尽内存。

假设您确实有RocksDB(或任何)状态后端在工作,则可以通过执行以下操作来启用检查点:

env.enableCheckpointing(5000); // value is in MS, so however frequently you want to checkpoint
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(5000); // this is to help prevent your job from making progress if checkpointing takes a bit. For large state checkpointing can take multiple seconds

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章