根据Spark结构化流中的多个条件从其他列更新列值

伊山

我想根据多个条件使用另外两列更新一列中的值。例如-流就像:

    +---+---+----+---+
    | A | B | C  | D |
    +---+---+----+---+
    | a | T | 10 | 0 |
    | a | T | 100| 0 |
    | a | L | 0  | 0 |
    | a | L | 1  | 0 |
    +---+---+----+---+

我有多种情况,例如-

(B =“ T” && C> 20)或(B =“ L” && C = 0)

"T"20"L"0是动态的。AND/OR在运行时还提供了运算符。D = 1只要条件成立,我就想做,否则就应该保留D = 0条件的数量也是动态的。

我尝试将其与ie中UPDATE命令一起使用但是它说尚不支持该更新。结果数据框应为-spark-sqlUPDATE df SET D = '1' WHERE CONDITIONS

+---+---+----+---+
| A | B | C  | D |
+---+---+----+---+
| a | T | 10 | 0 |
| a | T | 100| 1 |
| a | L | 0  | 1 |
| a | L | 1  | 0 |
+---+---+----+---+

有什么办法可以实现?

帕维斯兰·拉马尚德兰

希望您使用的是Python。同样也将为Scala发布!使用udf

PYTHON

>>> df.show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  a|  T| 10|  0|
|  a|  T|100|  0|
|  a|  L|  0|  0|
|  a|  L|  1|  0|
+---+---+---+---+

>>> def get_column(B, C):
...     return int((B == "T" and C > 20) or (B == "L" and C == 0))
...
>>> fun = udf(get_column)
>>> res = df.withColumn("D", fun(df['B'], df['C']))>>> res.show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  a|  T| 10|  0|
|  a|  T|100|  1|
|  a|  L|  0|  1|
|  a|  L|  1|  0|
+---+---+---+---+

SCALA

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> df.show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  a|  T| 10|  0|
|  a|  T|100|  0|
|  a|  L|  0|  0|
|  a|  L|  1|  0|
+---+---+---+---+


scala> def get_column(B : String, C : Int) : Int = {     
     |     if((B == "T" && C > 20) || (B == "L" && C == 0))
     |         1     
     |     else
     |         0
     | }
get_column: (B: String, C: Int)Int

scala> val fun = udf(get_column _)
fun: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,IntegerType,Some(List(StringType, IntegerType)
))

scala> val res = df.withColumn("D", fun(df("B"), df("C")))
res: org.apache.spark.sql.DataFrame = [A: string, B: string ... 2 more fields]

scala> res.show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  a|  T| 10|  0|
|  a|  T|100|  1|
|  a|  L|  0|  1|
|  a|  L|  1|  0|
+---+---+---+---+

您也可以使用case whenotherwise这样的:

PYTHON

>>> df.show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  a|  T| 10|  0|
|  a|  T|100|  0|
|  a|  L|  0|  0|
|  a|  L|  1|  0|
+---+---+---+---+

>>> new_column = when(
        (col("B") == "T") & (col("C") > 20), 1
    ).when((col("B") == "L") & (col("C") == 0), 1).otherwise(0)

>>> res = df.withColumn("D", new_column)
>>> res.show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  a|  T| 10|  0|
|  a|  T|100|  1|
|  a|  L|  0|  1|
|  a|  L|  1|  0|
+---+---+---+---+

SCALA

scala> df.show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  a|  T| 10|  0|
|  a|  T|100|  0|
|  a|  L|  0|  0|
|  a|  L|  1|  0|
+---+---+---+---+

scala> val new_column = when(
     |     col("B") === "T" && col("C") > 20, 1
     | ).when(col("B") === "L" && col("C") === 0, 1 ).otherwise(0)

new_column: org.apache.spark.sql.Column = CASE WHEN ((B = T) AND (C > 20)) THEN 1 WHEN ((B = L) AND (C = 0)) THEN 1 ELSE 0 END

scala> df.withColumn("D", new_column).show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|  a|  T| 10|  0|
|  a|  T|100|  1|
|  a|  L|  0|  1|
|  a|  L|  1|  0|
+---+---+---+---+

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

根据其他列中的多个条件创建列值

根据其他列中的多个条件更新列字符串值

Spark结构化流时,DataFrame中的字符串列如何拆分为多个列

NumPy-根据结构化数组中的其他值设置结构化数组中的值

更新Pyspark中地图类型列的结构化值

如何根据python中其他列的多个条件更新数据框中的现有列?

根据Spark中的其他列值更新列中的值

根据其他列中的条件更新一个列中的值

根据其他多个条件获取列值

根据其他列条件更新列

列数据到Spark结构化流中的嵌套json对象

根据其他列中的值更新列值

根据其他列更新列值

选择满足其他列中多个条件的列值

根据其他列的某些条件,使用其他行中的值更新某些行中的值

根据其他列中的条件选择值对 - PostgreSQL

根据其他列中的条件替换值

如何根据python中其他列的条件计算值?

如何根据其他列更新向量中的值

在Spark结构化流中执行单独的流查询

如何根据Python中的多个条件更新列值?

Spark结构化流:多个接收器

如何根据R中其他列的多个条件创建多个新列?

如何在Spark结构化流中迭代分组的行以产生多个行?

Spark结构化流中对同一数据帧/数据集的多个操作/聚合

在带有水印和窗口聚合的Spark结构化流中运行多个查询

根据多个其他列的条件创建新的 pandas 列

根据其他列的条件生成多个列

通过在Pandas中选择其他列来根据条件更新列值