使用Apache Flink中的DataSet API在每次迭代之前计算变量

威廉·TH

我正在使用Flink(Kmeans提供的集群示例,并尝试扩展其功能。目的是通过计算由每个质心之间的距离组成的多维数组来减少距离计算的次数,以便可以在double[][]数组中找到这些距离当为点分配聚类时,必须在每次迭代的开始时计算并广播此数组。

我尝试了以下方法:

public static final class computeCentroidInterDistance implements MapFunction<Tuple2<Centroid, Centroid>, Tuple3<Integer, Integer, Double>> {

    @Override
    public Tuple3<Integer, Integer, Double> map(Tuple2<Centroid, Centroid> centroid) throws Exception {
        return new Tuple3<>(centroid.f0.id, centroid.f1.id, centroid.f0.euclideanDistance(centroid.f1));
    }
}

DataSet<Centroid> centroids = getCentroidDataSet(..);

DataSet<Tuple3<Integer, Integer, Double>> distances = centroids
    .crossWithTiny(centroids)
    .map(new computeCentroidInterDistance());

但是,我没有看到DataSet的距离如何用于我的用例,因为它没有以可用于查找两个不同质心之间的距离的任何特定顺序返回。有更好的方法吗?

Arvid Heise

数据集本质上是无序和分片的,两者都不适合您的用例。

您要做的是首先在一个方法调用中收集所有质心。

DataSet<double[][]> matrix = centroids.reduceGroup(...)

在reduceGroup中,您可以访问所有元素,并且可以执行计算。输出应该是您的double [] []矩阵。

然后可以通过广播分发矩阵

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

Apache Flink:DataSet API中的groupBy和partitioning有什么区别?

使用python在每次迭代中更改变量的值

使用apache flink Java API在Cassandra中读取和写入数据

是否可以将批量迭代中的部分输出保存在Flink Dataset中?

Apache Flink:如何从Cassandra读取DataStream / DataSet?

Apache Flink:如何计算DataStream中的事件总数

如何使变量可用于Apache Flink中的所有TaskManager?

Apache Flink:如何在ReduceFunction中访问广播变量?

使用Apache Flink连接到Twitter Streaming API时的IOExcpetion

Apache Flink中DataStream和Table API之间的区别

如何定义将在python类中的函数的每次迭代中使用的变量?

如何在PHP中使用for循环在每次迭代中将300加到变量中?

在Java中使用while循环在每次迭代中添加到不同的变量

Apache Flink-DataSet API-如何将n个结果分组在一起

Apache Flink如何实现迭代?

每次迭代中不同变量的均值

如何使用Maven仅重建Apache Flink中的更改

如何使用Apache Flink中的上载jar提交作业?

使用 Apache Flink SQL 从 Kafka 消息中获取嵌套字段

使用 Apache Flink 创建 CEP

Apache Flink中的并行度

Apache Flink中的全局排序

Apache Flink中的重叠分区

Apache Flink中Join的输出

Apache Flink:无法将 Table 对象转换为 DataSet 对象

我可以在同一个 Flink 作业中使用 DataSet API 和 DataStream API 吗?

为什么需要在循环的每次迭代中声明一个变量才能使用它?

将多个数据集传输到Apache Flink中的下一个迭代

失败消息:使用Apache Flink 1.11时,检查点在完成之前已过期