Estou tentando encontrar a solução no spark para agrupar dados com um elemento comum em uma matriz.
key value
[k1,k2] v1
[k2] v2
[k3,k2] v3
[k4] v4
Se algum elemento corresponder na chave, temos que atribuir o mesmo groupid a ele. (Elemento comum Groupby)
Resultado:
key value GroupID
[k1,k2] v1 G1
[k2] v2 G1
[k3,k2] v3 G1
[k4] v4 G2
Algumas sugestões já são dadas com Spark Graphx, mas neste momento a curva de aprendizado será mais para implementar isso para um único recurso.
Incluir graphframes
(a versão mais recente do Spark compatível é 2.1, mas também deve ser compatível com 2.2; se você usar a mais recente, terá que construir o seu próprio com patch 2.3) substituindo XXX
pela versão Spark e YYY
pela versão Scala:
spark.jars.packages graphframes:graphframes:0.5.0-sparkXXX-s_YYY
Adicione chaves de explosão:
import org.apache.spark.sql.functions._
val df = Seq(
(Seq("k1", "k2"), "v1"), (Seq("k2"), "v2"),
(Seq("k3", "k2"), "v3"), (Seq("k4"), "v4")
).toDF("key", "value")
val edges = df.select(
explode($"key") as "src", $"value" as "dst")
Converter para graphframe
:
import org.graphframes._
val gf = GraphFrame.fromEdges(edges)
Defina o diretório do ponto de verificação (se não estiver definido):
import org.apache.spark.sql.SparkSession
val path: String = ???
val spark: SparkSession = ???
spark.sparkContext.setCheckpointDir(path)
Encontre componentes conectados:
val components = GraphFrame.fromEdges(edges).connectedComponents.setAlgorithm("graphx").run
Junte o resultado aos dados de entrada:
val result = components.where($"id".startsWith("v")).toDF("value", "group").join(df, Seq("value"))
Resultado da verificação:
result.show
// +-----+------------+--------+
// |value| group| key|
// +-----+------------+--------+
// | v3|489626271744|[k3, k2]|
// | v2|489626271744| [k2]|
// | v4|532575944704| [k4]|
// | v1|489626271744|[k1, k2]|
// +-----+------------+--------+
Este artigo é coletado da Internet.
Se houver alguma infração, entre em [email protected] Delete.
deixe-me dizer algumas palavras