如何在Spark中转置RDD

赵祥宇

我有一个这样的RDD:

1 2 3
4 5 6
7 8 9

它是一个矩阵。现在,我要像这样转置RDD:

1 4 7
2 5 8
3 6 9

我怎样才能做到这一点?

丹尼尔·达拉博斯(Daniel Darabos)

假设您有一个N×M矩阵。

如果N和M都小到可以在内存中容纳N×M个项目,那么使用RDD并没有多大意义。但是转置很容易:

val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
val transposed = sc.parallelize(rdd.collect.toSeq.transpose)

如果N或M太大,以致您无法在内存中保存N或M个条目,那么您将无法拥有此大小的RDD行。在这种情况下,原始矩阵或转置矩阵都无法表示。

N和M的大小可能适中:您可以在内存中保存N或M个条目,但不能保存N×M个条目。在这种情况下,您必须炸毁矩阵,然后再次将其放在一起:

val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
// Split the matrix into one number per line.
val byColumnAndRow = rdd.zipWithIndex.flatMap {
  case (row, rowIndex) => row.zipWithIndex.map {
    case (number, columnIndex) => columnIndex -> (rowIndex, number)
  }
}
// Build up the transposed matrix. Group and sort by column index first.
val byColumn = byColumnAndRow.groupByKey.sortByKey().values
// Then sort by row index.
val transposed = byColumn.map {
  indexedRow => indexedRow.toSeq.sortBy(_._1).map(_._2)
}

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章