Flink错误:通过字段位置指定键仅对元组数据类型有效

豪克罗斯

我正在使用Flink的Scala API。我对进行了一些转换reports = DataStream[Tuple15](这Tuple15是一个Scala元组,所有字段都是Int)。问题位于此处:

reports
  .filter(_._1 == 0) // some filter
  .map( x => (x._3, x._4, x._5, x._7, x._8))
      (TypeInformation.of(classOf[(Int,Int,Int,Int,Int)])) // keep only 5 fields as a Tuple5
  .keyBy(2,3,4) // the error is in apply, but I think related to this somehow
  .timeWindow(Time.minutes(5), Time.minutes(1))
  // the line under is line 107, where the error is
  .apply( (tup, timeWindow, iterable, collector: Collector[(Int, Int, Int, Float)]) => {
       ... 
  })

错误状态:

InvalidProgramException: Specifying keys via field positions is only valid for 
tuple data types. Type: GenericType<scala.Tuple5>

整个错误跟踪(我在上面指出了指向错误的行,即第107行,与apply上面代码中方法相对应):

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: GenericType<scala.Tuple5>
    at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:217)
    at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:208)
    at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:256)
    at org.apache.flink.streaming.api.scala.DataStream.keyBy(DataStream.scala:289)
here -> at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad$.latestAverageVelocity(LinearRoad.scala:107)
    at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad$.main(LinearRoad.scala:46)
    at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad.main(LinearRoad.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

但这对我来说没有意义。正在使用元组类型,不是吗?还是对付什么GenericType<...>

我应该如何修复map才能使keyBy工作正常?

致罗尔曼

原因是TypeInformation属于Java API,因此不了解Scala元组。因此,它返回的aGenericType不能用作keyBy具有字段位置操作的输入

如果要手动生成Scala元组类型信息,则必须使用/包对象中createTypeInformation包含方法org.apache.flink.api.scalaorg.apache.flink.streaming.api.scala

但是,如果您导入包对象,则无需手动指定类型信息,因为TypeInformation是操作的上下文绑定,map并且createTypeInformation是隐式函数。

以下代码段显示了惯用的处理方式TypeInformations

import org.apache.flink.streaming.api.scala._

reports
  .filter(_._1 == 0) // some filter
  .map( x => (x._3, x._4, x._5, x._7, x._8))
  .keyBy(2,3,4) // the error is in apply, but I think related to this somehow
  .timeWindow(Time.minutes(5), Time.minutes(1))
  // the line under is line 107, where the error is
  .apply( (tup, timeWindow, iterable, collector: Collector[(Int, Int, Int, Float)]) => {
       ... 
  })

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

通过循环获取有效的枚举数据类型输入

查询错误ORA-01843:不是有效月份,数据类型为DATE

有效载荷数据类型是什么?

整数数据类型的有效值

“”插入到“语句由于”参数7(“”)导致错误:提供的值不是数据类型float的有效实例。“

类型错误:仅对 DatetimeIndex、TimedeltaIndex 或 PeriodIndex 有效,但获得了“RangeIndex”的实例

通过strip()有效使用元组

熊猫重新采样错误:仅对DatetimeIndex或PeriodIndex有效

JavaScript数据类型函数的有效性

在Java中何时何地使用原始数据类型有效?

输入固定数字的最有效数据类型

在Matlab中转换数据类型的有效方法(double与im2double)

WebGL中顶点数组的有效数据类型是什么?

滥用代数数据类型的代数-为什么这样做有效?

如何获取有效的Numpy数据类型范围?

什么是UiPath QueueItems的有效IEnumerable UiPath数据类型

从32位数据类型有效地删除2位

在模型中提供枚举数据类型后,获取“ 1”不是有效状态

通过将它们转换为更大的整数数据类型一次添加整个字节数组是否有效?

有没有更有效的方法来处理整数数据类型?

Flink Java API-Pojo类型到元组数据类型

XML Schema的xs:ID类型对XML元素有效,还是仅对属性有效?

如何设计RESTful API端点以通过具有不同数据类型的不同字段获取

MySQL - 多个外键,仅对单个检查约束有效

COUNTIFS带有> =,数据类型错误

如果其他列与某个数据类型匹配,我如何将行更新为有效?

在ZMQ中使用send_multipart()发送不同数据类型的序列的最有效方法是什么?

在 MySQL 中存储分类字符串变量的最有效数据类型是什么

Rails错误:自动播放有时仅对video_tag有效