当我使用Flink SlidingEventTimeWindows时,“缓冲池被破坏”

moyiguke

当我使用“ SlidingEventTimeWindows”时,Flink抛出“ java.lang.IllegalStateException:缓冲池被破坏”,但是当我更改为“ SlidingProcessingTimeWindows”时,一切正常。

以下是堆栈跟踪:

18:37:53,728 WARN  org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Error while emitting latency marker.
java.lang.RuntimeException: Buffer pool is destroyed.
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:147)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:683)
	at org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:151)
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:230)
	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:125)
	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:93)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:144)
	... 10 more

我终于解决了以下步骤。

首先,在My DataMockSource中将“ collect”替换为“ collectWithTimestamp”,用于生成流数据。执行此操作后,控制台中将消失“发出延迟标记时出错”。

其次,将BoundedOutOfOrdernessTimestampExtractor替换为用于EventTime处理的AscendingTimestampExtractor。在我的DataMockSource中,我生成数据并在同一时间发出,因此AscendingTimestampExtractor是生成水印的正确方法。

我将主要代码和完整项目发布在github上希望对您有所帮助。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(10000); //

DataStreamSource<MockData> mockDataDataStreamSource = env.addSource(new DataMockSource());
mockDataDataStreamSource.assignTimestampsAndWatermarks(
    new AscendingTimestampExtractor<MockData>() {
      @Override
      public long extractAscendingTimestamp(MockData element) {
        return element.getTimestamp();
      }
    });

SingleOutputStreamOperator<Tuple2<String, Long>> countStream = mockDataDataStreamSource
    .keyBy("country").window(
        SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(10)))
//        .allowedLateness(Time.seconds(5))
    .process(
        new FlinkEventTimeCountFunction()).name("count elements");

countStream.addSink(new SinkFunction<Tuple2<String, Long>>() {
  @Override
  public void invoke(Tuple2<String, Long> value, Context context) throws Exception {
    System.out.println(value);
  }
});

env.execute("count test ");

我的DataMockSource在这里:

private volatile boolean running = true;
  @Override
  public void run(SourceContext sourceContext) throws Exception {
    while (running){
      MockData mockData = new MockData();
      mockData.setAge(ThreadLocalRandom.current().nextInt(1,99));
      mockData.setCountry("country "+ThreadLocalRandom.current().nextInt(2,5));
      mockData.setId(ThreadLocalRandom.current().nextLong());
      mockData.setTimestamp(Instant.now().toEpochMilli());
      // emit record with timestamp
      sourceContext.collectWithTimestamp(mockData,Instant.now().toEpochMilli());
//      sourceContext.collect(mockData);

      TimeUnit.SECONDS.sleep(3);
    }
  }

  @Override
  public void cancel() {
     running = false;
  }

大卫·安德森

在事件时间中工作时,您需要安排在源中或使用assignTimestampsAndWatermarks进行时间戳提取和加水印。看来您没有这样做,这可以解释为什么您不会获得任何输出(永远不会触发事件时间窗口)。

另外,您的来源应有一个cancel方法。像这样:

private volatile boolean running = true;

@Override
public void run(SourceContext ctx) throws Exception {
    while (running) {
        ...
    }
}

@Override
public void cancel() {
    running = false;
}

我认为这可以解释您所看到的异常。在作业开始自行关闭之后,源可能正在继续运行并发送延迟标记。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

Flink。当我为OrcTableSource使用hdfs文件时出现UnsupportedFileSystemSchemeException

当我使用diff()时,它会破坏summarise()

当我使用.bashrc时颜色消失

当我使用setSupportActionBar()时,活动增加

在golang中,为什么当我使用缓冲(异步)通道时程序运行速度变慢?

当我使用 where 时 Firestore 不会检索我的数据

当我使用 jQuery 代码时,我的导航栏消失了

为什么当我使用 <TouchableWithoutFeedback> 时我的设计消失了?

当我使用 vuetify 时,我无法显示图像

当我们使用Spring Data或Hibernate时,我们如何做连接池?

为什么在使用LLVM时对std :: ifstream的缓冲会“破坏” std :: getline?

当我用<a>包装div时,破坏了flexbox的布局

当我们将 Flink 应用程序部署到 Kinesis Data Analytics 时,不会触发窗口化

为什么当我使用 SwingWorker 时我的图像没有下载,而当我不使用时却下载?

当我们切换屏幕时,使用 Ant 媒体服务器从 iOS 录制的视频被破坏

当我匹配特定列时使用gsub

当我使用jsonPath时,加特林死机了

当我们在服务中使用 ngOnInit 时?

当我使用publishSubscribeChannel的taskExecutor时如何设置errorChannel?

当我有多态的指针时,使用“零规则”

当我不使用双分号时,ocamlc会失败

当我使用js单击按钮时如何隐藏表格?

当我使用变量时,抓取不会转移

当我使用XAML时,它会继承颜色等属性吗?

当我在蝗虫中使用“for”时,如何控制流量?

当我包含使用“distinct”时出现语法错误

当我使用jQuery验证远程时很奇怪的情况

当我有多个资源时如何使用ADAL库

当我使用'zsh'时'echo $ SHELL'返回'fish'