Python:如何从HDFS导入目录中的文件列表

考克斯

我尝试从python中的HDFS导入文件列表

如何从HDFS执行此操作:

path =r'/my_path'
allFiles = glob.glob(path + "/*.csv")

df_list = []
for file_ in allFiles:
    df = pd.read_csv(file_,index_col=None, header=0,sep=';')    
    df_list.append(df)

我认为subprocess.Popen可以解决问题,但是如何仅提取文件名?

import subprocess
p = subprocess.Popen("hdfs dfs -ls /my_path/ ",
    shell=True,
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT)


for line in p.stdout.readlines():
    print(line)

输出是这样的:

b'Found 32 items\n'
b'-rw-------   3 user hdfs   42202621 2019-01-21 10:05 /my_path/file1.csv\n'
b'-rw-------   3 user hdfs   99320020 2019-01-21 10:05 /my_path/file2.csv\n'
克里斯

免责声明:这将是漫长而乏味的。但是考虑到这种情况,我将尝试使其尽可能通用和可复制。


考虑到没有外部库(pandas除外的要求,没有选择的余地。我建议WebHDFS尽可能多地利用

AFAIK是HDFS的默认安装,其中包括WebHDFS的安装后续解决方案在很大程度上依赖于WebHDFS

第一步

首先,您必须了解WebHDFS URL。WebHDFS安装在HDFS Namenode上,默认端口为50070

因此,我们从开始http://[namenode_ip]:50070/webhdfs/v1/,其中/webhdfs/v1/是所有人的通用网址。

为了举例,我们假设为http://192.168.10.1:50070/web/hdfs/v1

第二步

通常,可以curl用来列出HDFS目录的内容。有关详细说明,请参阅WebHDFS REST API:列出目录

如果要使用curl,则以下提供FileStatuses了给定目录内的所有文件。

curl "http://192.168.10.1:50070/webhdfs/v1/<PATH>?op=LISTSTATUS"
             ^^^^^^^^^^^^ ^^^^^             ^^^^  ^^^^^^^^^^^^^
             Namenode IP  Port              Path  Operation

如前所述,这将在JSON对象中返回FileStatuses:

{
  "FileStatuses":
  {
    "FileStatus":
    [
      {
        "accessTime"      : 1320171722771,
        "blockSize"       : 33554432,
        "group"           : "supergroup",
        "length"          : 24930,
        "modificationTime": 1320171722771,
        "owner"           : "webuser",
        "pathSuffix"      : "a.patch",
        "permission"      : "644",
        "replication"     : 1,
        "type"            : "FILE"
      },
      {
        "accessTime"      : 0,
        "blockSize"       : 0,
        "group"           : "supergroup",
        "length"          : 0,
        "modificationTime": 1320895981256,
        "owner"           : "szetszwo",
        "pathSuffix"      : "bar",
        "permission"      : "711",
        "replication"     : 0,
        "type"            : "DIRECTORY"
      },
      ...
    ]
  }
}

使用python的默认库可以实现相同的结果:

import requests

my_path = '/my_path/'
curl = requests.get('http://192.168.10.1:50070/webhdfs/v1/%s?op=LISTSTATUS' % my_path)

并且如上所述,每个文件的实际状态比结果JSON低两个级别。换句话说,要获取每个文件的FileStatus:

curl.json()['FileStatuses']['FileStatus'] 

[
  {
    "accessTime"      : 1320171722771,
    "blockSize"       : 33554432,
    "group"           : "supergroup",
    "length"          : 24930,
    "modificationTime": 1320171722771,
    "owner"           : "webuser",
    "pathSuffix"      : "a.patch",
    "permission"      : "644",
    "replication"     : 1,
    "type"            : "FILE"
  },
  {
    "accessTime"      : 0,
    "blockSize"       : 0,
    "group"           : "supergroup",
    "length"          : 0,
    "modificationTime": 1320895981256,
    "owner"           : "szetszwo",
    "pathSuffix"      : "bar",
    "permission"      : "711",
    "replication"     : 0,
    "type"            : "DIRECTORY"
  },
  ...
]

第三步

由于您现在拥有所需的所有信息,因此您只需要做的就是解析。

import os

file_paths = []
for file_status in curl.json()['FileStatuses']['FileStatus']:
    file_name = file_status['pathSuffix']
    # this is the file name in the queried directory
    if file_name.endswith('.csv'):
    # if statement is only required if the directory contains unwanted files (i.e. non-csvs).
        file_paths.append(os.path.join(path, file_name))
        # os.path.join asserts your result consists of absolute path

file_paths
['/my_path/file1.csv',
 '/my_path/file2.csv',
 ...]

最后一步

现在您知道了文件和WebHDFS链接的路径,pandas.read_csv可以处理其余的工作。

import pandas as pd

dfs = []
web_url = "http://192.168.10.1:50070/webhdfs/v1/%s?op=OPEN"
#                                                  ^^^^^^^
#                                    Operation is now OPEN
for file_path in file_paths:
    file_url = web_url % file_path
    # http://192.168.10.1:50070/webhdfs/v1/my_path/file1.csv?op=OPEN
    dfs.append(pd.read_csv(file_url))

然后,将所有.csvs导入并分配给dfs

警告事项

如果你的HDFS配置为HA(高可用性),将有多个namenodes,因此您namenode_ip必须进行相应的设置:它必须是一个活动节点的IP。

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何从python中的同一目录导入文件?

如何从项目目录中的文件导入?

如何从父目录中的文件导入函数

从python中的其他目录导入文件?

从父目录python导入文件中的函数

如何在既用作 __main__ 又由不同目录中的文件导入的文件中导入 Python 中的模块?

如何从python中的不同目录导入模块

如何使用scala从hdfs目录中删除所有文件

如何从文件夹中的python文件和同一目录中的文件中导入文件?

Python3如何导入导入子目录文件的子目录文件?

目录中的Python导入

如何遍历包含目录列表的文件并使用python在每个目录中执行命令

如何批量获取目录中的文件列表?

如何显示指定目录中的文件列表

如何获取 KDB 目录中的文件列表?

Libgdx如何获取目录中的文件列表?

如何从片段访问目录中的文件列表?

如何获取目录中的文件列表

过滤python目录中的文件列表

如何导入位于pycharm项目中相同子目录中的python文件

如何在Python中导入.msg文件以及本地目录中的附件

如何在hadoop hdfs中列出目录及其子目录中的所有文件

如何从同一目录导入python类文件?

如何从外部目录将类导入python文件?

如何在Go中从当前目录导入文件

如何导入sbt中的.scala文件目录(非托管)?

遍历给定目录中的python文件并导入它们?

如何在另一个文件目录中的另一个脚本中导入 Python 文件?

循环播放HDFS目录中的文件