2016-06-10 13 views
0

ich eine Eingabedatei (txt), wie untenWie StreamXmlRecordReader verwenden single & mehrzeilige xml Datensätze innerhalb einer einzigen Datei

<a><b><c>val1</c></b></a>||<a><b><c>val2</c></b></a>||<a><b> 
<c>val3</c></b></a>||<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a> 

haben zu analysieren Wenn Sie die Eingabe sorgfältig die XML-Datensatz nach dem dritten "beobachten, || ' ist in zwei Zeilen aufgeteilt.

Ich möchte StreamXmlRecordReader von Hadoop Streaming verwenden, um diese Datei zu analysieren

-inputreader "org.apache.hadoop.streaming.StreamXmlRecordReader,begin=<a>,end=</a>,slowmatch=true 

, die ich nicht in der Lage bin der dritte Datensatz zu analysieren.

Ich erhalte die folgenden Fehler

Traceback (most recent call last): 
    File "/home/rsome/test/code/m1.py", line 13, in <module> 
    root = ET.fromstring(xml_str.getvalue()) 
    File "/usr/lib64/python2.6/xml/etree/ElementTree.py", line 964, in XML 
    return parser.close() 
    File "/usr/lib64/python2.6/xml/etree/ElementTree.py", line 1254, in close 
    self._parser.Parse("", 1) # end of data 
xml.parsers.expat.ExpatError: no element found: line 1, column 18478 

I slowmatch verwendet haben = true als gut, aber noch kein Glück.

Mein Ausgang kommt, wie unten

$ hdfs dfs -text /poc/testout001/part-* 
rec::1::mapper1 
<a><b><c>val1</c></b></a> 
rec::2::mapper1 
<a><b><c>val2</c></b></a> 
rec::3::mapper1 
<a><b> 
rec::4::mapper1 
<c>val3</c></b></a> 
rec::1::mapper2 
<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a> 

Meine erwartete Ausgabe ist

$ hdfs dfs -text /poc/testout001/part-* 
rec::1::mapper1 
<a><b><c>val1</c></b></a> 
rec::2::mapper1 
<a><b><c>val2</c></b></a> 
rec::3::mapper1 
<a><b><c>val3</c></b></a> 
rec::1::mapper2 
<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a> 

jede Hilfe zu diesem Thema eine große Hilfe

Antwort

2

Grundsätzlich wäre, die StreamXmlInputFormat, die die hadoop- ist Das Standard-Eingabeformat von streaming erweitert KeyValueTextInputFormat, das Zeilen auf neue Zeilenzeichen (\ r \ n) aufteilen würde, was in meinem Fall, in dem mein Datensatz über m aufgeteilt ist, nicht erwartet wird mehrere Zeilen.

Um dies zu überwinden, habe ich mein eigenes Eingabeformat implementiert, das FileInputFormat erweitert, wo ich die Freiheit hatte, die neuen Zeilenzeichen (\ r \ n) für mein endTag weiter zu betrachten.

Verbrauch:

-libjars /path/to/custom-xml-input-format-1.0.0.jar 
-D xmlinput.start="<a>" \ 
-D xmlinput.end="</a>" \  
-inputformat "my.package.CustomXmlInputFormat" 

Hier ist der Code, den ich verwendet.

import java.io.*; 
import java.lang.reflect.*; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.DataOutputBuffer; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.streaming.*; 


public class CustomXmlInputFormat extends FileInputFormat { 

    public static final String START_TAG_KEY = "xmlinput.start"; 
    public static final String END_TAG_KEY = "xmlinput.end"; 

    @SuppressWarnings("unchecked") 
    @Override 
    public RecordReader<LongWritable, Text> getRecordReader(final InputSplit genericSplit, 
             JobConf job, Reporter reporter) throws IOException { 
     return new XmlRecordReader((FileSplit) genericSplit, job, reporter); 
    } 


    public static class XmlRecordReader implements RecordReader<LongWritable, Text> { 

    private final byte[] endTag; 
    private final byte[] startTag; 
    private final long start; 
    private final long end; 
    private final FSDataInputStream fsin; 
    private final DataOutputBuffer buffer = new DataOutputBuffer(); 
    private LongWritable currentKey; 
    private Text currentValue; 

    public XmlRecordReader(FileSplit split, JobConf conf, Reporter reporter) throws IOException { 
     startTag = conf.get(START_TAG_KEY).getBytes("UTF-8"); 
     endTag = conf.get(END_TAG_KEY).getBytes("UTF-8"); 

     start = split.getStart(); 
     end = start + split.getLength(); 
     Path file = split.getPath(); 
     FileSystem fs = file.getFileSystem(conf); 
     fsin = fs.open(split.getPath()); 
     fsin.seek(start); 
    } 


    public boolean next(LongWritable key, Text value) throws IOException { 
     if (fsin.getPos() < end && readUntilMatch(startTag, false)) { 
     try { 
      buffer.write(startTag); 
      if (readUntilMatch(endTag, true)) { 
      key.set(fsin.getPos()); 
      value.set(buffer.getData(), 0, buffer.getLength()); 
      return true; 
      } 
     } finally { 
      buffer.reset(); 
     } 
     } 
     return false; 
    } 

    public boolean readUntilMatch(byte[] match, boolean withinBlock) 
     throws IOException { 
     int i = 0; 
     while (true) { 
     int b = fsin.read(); 
     if (b == -1) { 
      return false; 
     } 

     if (withinBlock && b != (byte) '\r' && b != (byte) '\n') { 
      buffer.write(b); 
     } 

     if (b == match[i]) { 
      i++; 
      if (i >= match.length) { 
      return true; 
      } 
     } else { 
      i = 0; 
     } 

     if (!withinBlock && i == 0 && fsin.getPos() >= end) { 
      return false; 
     } 
     } 
    } 

    @Override 
    public float getProgress() throws IOException { 
     return (fsin.getPos() - start)/(float) (end - start); 
    } 

    @Override 
    public synchronized long getPos() throws IOException { 
     return fsin.getPos(); 
    } 

    @Override 
    public LongWritable createKey() { 
     return new LongWritable(); 
    } 

    @Override 
    public Text createValue() { 
     return new Text(); 
    } 

    @Override 
    public synchronized void close() throws IOException { 
     fsin.close(); 
    } 

    } 
} 

Hier ist meine Ausgabe

$ hdfs dfs -text /poc/testout001/part-* 
25  <a><b><c>val1</c></b></a> 
52  <a><b><c>val2</c></b></a> 
80  <a><b><c>val3</c></b></a> 
141  <a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a> 
+0

Super stuff! Verwenden Sie Ihren Reader als Tutorial. Vielen Dank. – Houston