Je crée un dataframe DF à partir d'un fichier externe, qui a le schéma suivant :
(id, champ1, champ2, champ3) colonne de partition : id
exemple de données est
000, 11_field1, 22_field2, 33_field3
001, 111_field1, 222_field2, 333_field3
Je veux créer une autre trame de données à partir de DF dont le schéma est
(id, fieleName, fieldValue)
exemple de données est
000, field1, 11_field1
000, field2, 22_field2
000, field3, 33_field3
001, field1, 111_field1
001, field2, 222_field2
001, field3, 333_field3
Quelqu'un pourrait-il me dire comment obtenir le nouveau dataframe?
Vous pouvez y parvenir pyspark
comme ci-dessous en utilisant l' explode
option
Importez d'abord les bibliothèques et fonctions nécessaires
from pyspark.sql import SQLContext, Row
Disons que votre bloc de données est df
.
Si tu fais df.show()
vous devriez obtenir un résultat comme ci-dessous
+---+----------+----------+----------+
| id| field1| field2| field3|
+---+----------+----------+----------+
| 0| 11_field1| 22_field2| 33_field3|
| 1|111_field1|222_field2|333_field3|
+---+----------+----------+----------+
Ensuite, mappez toutes les colonnes que vous souhaitez exploser en 2 colonnes. Ici, vous voulez que toutes les colonnes sauf id explosent. Alors, faites ce qui suit
cols= df.columns[1:]
puis convertir data frame
à rdd
comme ci - dessous
rdd = data.rdd.map(lambda x: Row(id=x[0], val=dict(zip(cols, x[1:]))))
Pour vérifier comment le rdd a été mappé, faites ci-dessous
rdd.take()
vous obtiendrez un résultat comme ci-dessous
[Row(id=0, val={'field2': u'22_field2', 'field3': u'33_field3', 'field1': u'11_field1'}), Row(id=1, val={'field2': u'222_field2', 'field3': u'333_field3', 'field1': u'111_field1'})]
Ensuite, convertissez le rdd
dos en un data frame
motdf2
df2 = sqlContext.createDataFrame(rdd)
Alors fais df2.show()
. vous devriez obtenir un résultat comme ci-dessous
+---+--------------------+
| id| val|
+---+--------------------+
| 0|Map(field3 -> 33_...|
| 1|Map(field3 -> 333...|
+---+--------------------+
puis enregistrez la trame de données df2 en tant que table temporaire
df2.registerTempTable('mytempTable')
Exécutez ensuite une requête comme ci-dessous sur le bloc de données :
df3 = sqlContext.sql( """select id,explode(val) AS (fieldname,fieldvalue) from mytempTable""")
alors faites df3.show()
, vous devriez obtenir le résultat ci-dessous
+---+---------+----------+
| id|fieldname|fieldvalue|
+---+---------+----------+
| 0| field3| 33_field3|
| 0| field2| 22_field2|
| 0| field1| 11_field1|
| 1| field3|333_field3|
| 1| field2|222_field2|
| 1| field1|111_field1|
+---+---------+----------+
Cet article est collecté sur Internet, veuillez indiquer la source lors de la réimpression.
En cas d'infraction, veuillez [email protected] Supprimer.
laisse moi dire quelques mots