有条件地将列和值添加到Spark Rows

斯迈布

我正在处理一个Spark DataFrameDF),并且需要在流中添加一个列,方法是mapPartitions

// Don't worry about what 'widget' is or represents
val rdd = df.mapPartitions { rows => addColIfNecessary(rows, widget) }

然后:

def addColIfNecessary(rows : Iterator[Row], widget : Widget) : Iterator[Row] = {
    rows.foreach { row =>
        if(widget.determineWhetherRowNeedsNewCol(row)) {
            // TODO: Add a new "fizz" column (of StringType) to the row
            val newVal : String = widget.getValueOfNewCol(row)
            row.addColumn("fizz", StringType, newVal)
        }
    }

    rows
}

这显然只是伪代码,但传达了我想做的事情。关于如何实际实施的任何想法?

马斯格

DataFrame是面向列的结构,这意味着将列添加到某些行不是一个好主意。相反,您可以利用对DataFrames中可为空值的支持,而不是添加额外的列,而是根据一些条件向行添加可选值。

一个例子:让我们看一下用户和页面的DF:

val users = Seq("Alice" , "Bob", "Charly", "Dean", "Eve", "Flor", "Greta")
val pages = (1 to 9).map(i => s"page_$i")
val userPages = for {u <- users
                     p <- pages} yield (u,p) 

val userPagesDF = sparkContext.parallelize(userPages).toDF("user","page")

// a user defined function that takes the last digit from the page and uses it to calculate a "rank". It only ranks pages with a number higher than 7

val rankUDF = udf((p:String) => if (p.takeRight(1).toInt>7) "top" else null)

// New DF with the extra column "rank", which contains values for only some rows
val ranked = userPagesDF.withColumn("rank", topPage($"page"))

ranked.show

+-----+-------+----+
| user|   page|rank|
+-----+-------+----+
|Alice| page_1|null|
|Alice| page_2|null|
|Alice| page_3|null|
|Alice| page_4|null|
|Alice| page_5|null|
|Alice| page_6|null|
|Alice| page_7|null|
|Alice| page_8| top|
|Alice| page_9| top|
|  Bob| page_1|null|
|  Bob| page_2|null|
|  Bob| page_3|null|
|  Bob| page_4|null|
|  Bob| page_5|null|
|  Bob| page_6|null|
|  Bob| page_7|null|
|  Bob| page_8| top|
|  Bob| page_9| top|
+-----+-------+----+

ranked.printSchema

root
 |-- user: string (nullable = true)
 |-- page: string (nullable = true)
 |-- rank: string (nullable = true)

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

定义列表时有条件地将项目添加到列表?

有条件地将选项添加到列表

使用jq有条件地将元素添加到json数组

Spark:有条件地将列添加到数据框

如何使用熊猫有条件地分隔单元格值并将其添加到列

有条件地将CircleMarkers添加到传单

有条件地将键/值对添加到Woocommerce中的数组

实体框架-有条件地将列添加到分组依据

将信息添加到有条件地比较字符串的列中

根据函数变量的值有条件地将层添加到gglplot中

有条件地将具有默认值的列添加到SQL Server中的现有表

打字稿:有条件地将项目添加到对象

根据名称有条件地将rbind或bind_rows应用于列表

有条件地将12小时添加到列中

有条件地将标签选项参数添加到select2

有条件地将值添加到新列并替换R中的条件列中的值

有条件地添加到地图

有条件地将文件添加到C ++项目中的库

有条件地将JSF组件添加到翻译字符串中

有条件地将报价单和方括号添加到单元格的格式

使用Polymer将CSS类有条件地添加到元素

有条件地将字母添加到字符串

R - 如何有条件地将连续列添加到数据框?

有条件地将列添加到 dplyr R 中的 groupby

如何根据python字典的键和值有条件地将数据添加到Excel文件中的列,同时保持其他列一致

有条件地将键/值对添加到对象的最佳方法是什么

如何有条件地将元素添加到 std::array - C++11

如何有条件地将“按钮”添加到“工具栏”“表单”“组件”?

有条件地将列添加到数据框列表中