我正在使用Flink的Scala API。我对进行了一些转换reports = DataStream[Tuple15]
(这Tuple15
是一个Scala元组,所有字段都是Int
)。问题位于此处:
reports
.filter(_._1 == 0) // some filter
.map( x => (x._3, x._4, x._5, x._7, x._8))
(TypeInformation.of(classOf[(Int,Int,Int,Int,Int)])) // keep only 5 fields as a Tuple5
.keyBy(2,3,4) // the error is in apply, but I think related to this somehow
.timeWindow(Time.minutes(5), Time.minutes(1))
// the line under is line 107, where the error is
.apply( (tup, timeWindow, iterable, collector: Collector[(Int, Int, Int, Float)]) => {
...
})
错误状态:
InvalidProgramException: Specifying keys via field positions is only valid for
tuple data types. Type: GenericType<scala.Tuple5>
整个错误跟踪(我在上面指出了指向错误的行,即第107行,与apply
上面代码中的方法相对应):
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: GenericType<scala.Tuple5>
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:217)
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:208)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:256)
at org.apache.flink.streaming.api.scala.DataStream.keyBy(DataStream.scala:289)
here -> at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad$.latestAverageVelocity(LinearRoad.scala:107)
at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad$.main(LinearRoad.scala:46)
at du.tu_berlin.dima.bdapro.flink.linearroad.houcros.LinearRoad.main(LinearRoad.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
但这对我来说没有意义。我正在使用元组类型,不是吗?还是对付什么GenericType<...>
?
我应该如何修复map
才能使keyBy
工作正常?
原因是TypeInformation
属于Java API,因此不了解Scala元组。因此,它返回的aGenericType
不能用作keyBy
具有字段位置的操作的输入。
如果要手动生成Scala元组类型信息,则必须使用/包对象中createTypeInformation
包含的方法。org.apache.flink.api.scala
org.apache.flink.streaming.api.scala
但是,如果您导入包对象,则无需手动指定类型信息,因为TypeInformation
是操作的上下文绑定,map
并且createTypeInformation
是隐式函数。
以下代码段显示了惯用的处理方式TypeInformations
。
import org.apache.flink.streaming.api.scala._
reports
.filter(_._1 == 0) // some filter
.map( x => (x._3, x._4, x._5, x._7, x._8))
.keyBy(2,3,4) // the error is in apply, but I think related to this somehow
.timeWindow(Time.minutes(5), Time.minutes(1))
// the line under is line 107, where the error is
.apply( (tup, timeWindow, iterable, collector: Collector[(Int, Int, Int, Float)]) => {
...
})
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句