如何使用wholeTextFiles在Spark中读取gz文件

亚尼夫·多嫩菲尔德

我有一个包含许多小.gz文件(压缩的csv文本文件)的文件夹。我需要在我的Spark作业中阅读它们,但是事情是我需要根据文件名中的信息进行一些处理。因此,我没有使用:

JavaRDD<<String>String> input = sc.textFile(...)

因为据我了解,我无法通过这种方式访问​​文件名。相反,我使用了:

JavaPairRDD<<String>String,String> files_and_content = sc.wholeTextFiles(...);

因为这样我得到了一对文件名和内容。但是,似乎这种方式使输入阅读器无法从gz文件中读取文本,而只能读取二进制的“乱码”。

因此,我想知道是否可以将其设置为以某种方式读取文本,或者使用以下方式访问文件名: sc.textFile(...)

亚伦曼

您无法读取具有WholeTextFiles的gzip压缩文件,因为它使用CombineFileInputFormat,后者由于无法拆分而无法读取gzip压缩文件(源证明):

  override def createRecordReader(
      split: InputSplit,
      context: TaskAttemptContext): RecordReader[String, String] = {

    new CombineFileRecordReader[String, String](
      split.asInstanceOf[CombineFileSplit],
      context,
      classOf[WholeTextFileRecordReader])
  }

您可以使用newAPIHadoopFilewholefileinputformat(没有内置的Hadoop,但在互联网上),以正确地得到这个工作。

更新1:我认为WholeFileInputFormat不起作用,因为它仅获取文件的字节,这意味着您可能必须编写自己的类,可能扩展WholeFileInputFormat以确保解压缩字节。

另一种选择是使用GZipInputStream自己解压缩字节

更新2:如果您可以像下面的OP注释中那样访问目录名,则可以获取所有这样的文件。

Path path = new Path("");
FileSystem fileSystem = path.getFileSystem(new Configuration()); //just uses the default one
FileStatus []  fileStatuses = fileSystem.listStatus(path);
ArrayList<Path> paths = new ArrayList<>();
for (FileStatus fileStatus : fileStatuses) paths.add(fileStatus.getPath());

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章