显示在 pyspark 中收集的数据

ZZZSharePoint

在我尝试从存储帐户中的 JSON 文件读取数据的情况下,下面的代码运行没有任何错误。我想知道如何查看输出,即 Databricks 笔记本中的内容。该文件很长,所以我只需要验证输出是我要找的吗?所以想看看前 10 个项目。我们该怎么做?

import re
import json
%pip install azure
import azure
from azure.storage.blob import AppendBlobService

abs = AppendBlobService(account_name="azurestorage", account_key="mykey")
base_path = "resourceId=/SUBSCRIPTIONS/5315MyId/RESOURCEGROUPS/AZURE-DEV/PROVIDERS/MICROSOFT.CONTAINERSERVICE/MANAGEDCLUSTERS/AZURE-DEV/y=2022/m=05/d=23/h=13/m=00/PT1H.json"
pattern = base_path + "/*/*/*/*/m=00/*.json"
filter = glob2re(pattern)
df1 = (
    spark.sparkContext.parallelize(
        [
            blob.name
            for blob in abs.list_blobs("insights-logs-kube-audit", prefix=base_path)
            if re.match(filter, blob.name)
        ]
    )
    .map(
        lambda blob_name: abs.get_blob_to_bytes("insights-logs-kube-audit", blob_name)
        .content.decode("utf-8")
        .splitlines()
    )
    .flatMap(lambda lines: [json.loads(l) for l in lines])
    .collect()
)
Abhishek Khandave-MT

collect() :- PySpark RDD/DataFramecollect()是一个动作操作,用于将数据集的所有元素(从所有节点)检索到驱动程序节点。我们应该在较小的数据集上使用 collect() 通常在 filter()、group() 等之后

take(num) :-它将前 num 行作为 Row 的列表返回。

DataFrame.take(num)

import re
import json
%pip install azure
import azure
from azure.storage.blob import AppendBlobService

abs = AppendBlobService(account_name="azurestorage", account_key="mykey")
base_path = "resourceId=/SUBSCRIPTIONS/5315MyId/RESOURCEGROUPS/AZURE-DEV/PROVIDERS/MICROSOFT.CONTAINERSERVICE/MANAGEDCLUSTERS/AZURE-DEV/y=2022/m=05/d=23/h=13/m=00/PT1H.json"
pattern = base_path + "/*/*/*/*/m=00/*.json"
filter = glob2re(pattern)
df1 = (
    spark.sparkContext.parallelize(
        [
            blob.name
            for blob in abs.list_blobs("insights-logs-kube-audit", prefix=base_path)
            if re.match(filter, blob.name)
        ]
    )
    .map(
        lambda blob_name: abs.get_blob_to_bytes("insights-logs-kube-audit", blob_name)
        .content.decode("utf-8")
        .splitlines()
    )
    .flatMap(lambda lines: [json.loads(l) for l in lines])
    .df1.take(10)
)

参考 - https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.take.html

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章