在Pyspark中读取tar.gz存档时使用特定模式过滤文件

超新星

我的文件夹中有多个CSV文件myfolder.tar.gz我是用这种方式创建的:首先将我的所有文件放在一个文件夹名称中,myfolder然后准备一个tar文件夹。然后准备.gz那个tar文件夹。

假设我们有5个文件。

abc_1.csv
abc_2.csv
abc_3.csv
def_1.csv
def_2.csv

我想使用Pyspark数据框以特定的文件名模式过滤读取的文件。就像我们要一起读取所有abc文件一样。

这不应给我们带来的结果,def反之亦然。目前,仅通过spark.read.csv()功能就可以读取所有CSV文件另外,当我使用以下pathGlobalFilter参数将文件保存在简单的文件夹中时,就可以过滤文件

df = spark.read.csv("mypath",pathGlobalFilter="def_[1-9].csv")

但是当我能够在中执行相同操作时tar.gz,例如:

df = spark.read.csv("myfolder.tar.gz", pathGlobalFilter="def_[1-9].csv")

我收到一个错误:

无法推断CSV的架构。如何从.tar.gz文件读取。

黑主教

根据这篇文章,您可以阅读.tar.gz文件,binaryFile然后使用pythontarfile提取存档成员并使用正则表达式过滤文件名def_[1-9]结果是rdd,您可以将其转换为数据框:

import re
import tarfile
from io import BytesIO

# extract only the files with which math regex 'def_[1-9].csv'
def extract_files(bytes):
    tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
    return [tar.extractfile(x).read() for x in tar if re.match(r"def_[1-9].csv", x.name)]

# read binary file and convert to df
rdd = sc.binaryFiles("/path/myfolder.tar.gz") \
        .mapValues(extract_files) \
        .flatMap(lambda row: [x.decode("utf-8").split("\n") for x in row[1]])\
        .flatMap(lambda row: [e.split(",") for e in row])

df = rdd.toDF(*csv_cols)

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章