如何使用StreamXmlRecordReader解析单个文件中的单行和多行xml记录

鬼魂

我有一个输入文件(txt)如下

<a><b><c>val1</c></b></a>||<a><b><c>val2</c></b></a>||<a><b>
<c>val3</c></b></a>||<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>

如果仔细观察输入,则第三个'||'之后的xml数据记录 分为两行。

我想使用hadoop流的StreamXmlRecordReader解析此文件

-inputreader "org.apache.hadoop.streaming.StreamXmlRecordReader,begin=<a>,end=</a>,slowmatch=true

我无法解析第三条记录。

我收到以下错误

Traceback (most recent call last):
  File "/home/rsome/test/code/m1.py", line 13, in <module>
    root = ET.fromstring(xml_str.getvalue())
  File "/usr/lib64/python2.6/xml/etree/ElementTree.py", line 964, in XML
    return parser.close()
  File "/usr/lib64/python2.6/xml/etree/ElementTree.py", line 1254, in close
    self._parser.Parse("", 1) # end of data
xml.parsers.expat.ExpatError: no element found: line 1, column 18478

我也使用过slowmatch = true,但还是没有运气。

我的输出如下

$ hdfs dfs -text /poc/testout001/part-*
rec::1::mapper1
<a><b><c>val1</c></b></a>
rec::2::mapper1
<a><b><c>val2</c></b></a>
rec::3::mapper1
<a><b>
rec::4::mapper1
<c>val3</c></b></a>
rec::1::mapper2
<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>

我的预期输出是

$ hdfs dfs -text /poc/testout001/part-*
rec::1::mapper1
<a><b><c>val1</c></b></a>
rec::2::mapper1
<a><b><c>val2</c></b></a>
rec::3::mapper1
<a><b><c>val3</c></b></a>
rec::1::mapper2
<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>

任何帮助都会有很大帮助

鬼魂

基本上,StreamXmlInputFormat是hadoop-streaming的默认输入格式,它扩展了KeyValueTextInputFormat,它将在新行字符(\ r \ n)处拆分行,这在我的记录被拆分为多行的情况下是不希望的。

因此,为了克服这个问题,我实现了自己的输入格式,扩展了FileInputFormat,在这里我可以自由地为endTag查找新的换行符(\ r \ n)。

用法:

-libjars /path/to/custom-xml-input-format-1.0.0.jar
-D xmlinput.start="<a>" \
-D xmlinput.end="</a>" \    
-inputformat "my.package.CustomXmlInputFormat"

这是我使用的代码。

import java.io.*;
import java.lang.reflect.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.streaming.*;


public class CustomXmlInputFormat extends FileInputFormat {

  public static final String START_TAG_KEY = "xmlinput.start";
  public static final String END_TAG_KEY = "xmlinput.end";

  @SuppressWarnings("unchecked")
  @Override
  public RecordReader<LongWritable, Text> getRecordReader(final InputSplit genericSplit,
                                      JobConf job, Reporter reporter) throws IOException {
      return new XmlRecordReader((FileSplit) genericSplit, job, reporter);
  }


  public static class XmlRecordReader implements RecordReader<LongWritable, Text> {

    private final byte[] endTag;
    private final byte[] startTag;
    private final long start;
    private final long end;
    private final FSDataInputStream fsin;
    private final DataOutputBuffer buffer = new DataOutputBuffer();
    private LongWritable currentKey;
    private Text currentValue;

    public XmlRecordReader(FileSplit split, JobConf conf, Reporter reporter) throws IOException {
      startTag = conf.get(START_TAG_KEY).getBytes("UTF-8");
      endTag = conf.get(END_TAG_KEY).getBytes("UTF-8");

      start = split.getStart();
      end = start + split.getLength();
      Path file = split.getPath();
      FileSystem fs = file.getFileSystem(conf);
      fsin = fs.open(split.getPath());
      fsin.seek(start);
    }


    public boolean next(LongWritable key, Text value) throws IOException {
      if (fsin.getPos() < end && readUntilMatch(startTag, false)) {
        try {
          buffer.write(startTag);
          if (readUntilMatch(endTag, true)) {
            key.set(fsin.getPos());
            value.set(buffer.getData(), 0, buffer.getLength());
            return true;
          }
        } finally {
          buffer.reset();
        }
      }
      return false;
    }

    public boolean readUntilMatch(byte[] match, boolean withinBlock)
        throws IOException {
      int i = 0;
      while (true) {
        int b = fsin.read();
        if (b == -1) {
          return false;
        }

        if (withinBlock && b != (byte) '\r' && b != (byte) '\n') {
          buffer.write(b);
        }

        if (b == match[i]) {
          i++;
          if (i >= match.length) {
            return true;
          }
        } else {
          i = 0;
        }

        if (!withinBlock && i == 0 && fsin.getPos() >= end) {
          return false;
        }
      }
    }

    @Override
    public float getProgress() throws IOException {
      return (fsin.getPos() - start) / (float) (end - start);
    }

    @Override
    public synchronized long getPos() throws IOException {
        return fsin.getPos();
    }

    @Override
    public LongWritable createKey() {
      return new LongWritable();
    }

    @Override
    public Text createValue() {
      return new Text();
    }

    @Override
    public synchronized void close() throws IOException {
        fsin.close();
    }

  }
}

这是我的输出

$ hdfs dfs -text /poc/testout001/part-*
25      <a><b><c>val1</c></b></a>
52      <a><b><c>val2</c></b></a>
80      <a><b><c>val3</c></b></a>
141     <a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

如何使用StreamSets解析日志文件的多行记录?

如何解析正则表达式中的单行和多行?

单个记录中的多行

如何使用 Python 解析数据多行和多行字符串并将数据提取到 JSON 文件中

如何解析多行记录(使用awk?)

如何使用php解析csv文件中的多行

如何在多行而不是单行中查看json文件

如何使用T-SQL和XQuery选择多行作为单个XML列

如何使用awk解析和打印文件中的多行版本号,作为一行上的点分隔值

如何使用awk连接和求和文件中的多行

如何同时匹配多行和单行

如何区分单行和多行多维列表?

使用批处理文件在单个文件中写入多行

如何使用单个标签解析XML响应

如何使用XMLLINT和BASH解析XML文件中的名称空间

如何在单个查询中使用联接和聚合函数更新表中的多行

如何使用纯批处理从单行 XML 文件中获取多个属性值?

如何使用Java中的DOM解析器在XML文件中声明doctype,xml版本和编码?

如何在单行表格中插入多行?

如何使用UPDATE JOIN从多行更新单行

如何用文件中的单个单词替换多行(替换)?

使用Python计算单个文件中多行的平均值

使用explode解析XML文件并在单个数组中获取输出

如何从文本文件中删除多行记录?

您如何使用正则表达式来解析日志文件中的多行文本?

如何使用 XSL 从 XML 中获取多行

如何使用Java中的xPath正确解析此XML文件?

如何将数据从多列和多行移动到 excel 中的 1 个单行

使用Python将单行文本拆分为CSV文件中同一列的多行