2014-12-18 1 views
10

Ich habe jeden Datensatz über mehrere Zeilen in der Eingabedatei verteilt (Sehr große Datei).So verarbeiten Sie mehrzeilige Eingangsdatensätze in Spark

Ex:

Id: 2 
ASIN: 073870
    title: Test tile for this product 
    group: Book 
    salesrank: 168501 
    similar: 5 0738700811 1567184912 1567182813 0738700514 0738700915 
    categories: 2 
    |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Wicca[12484] 
    |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Witchcraft[12486] 
    reviews: total: 12 downloaded: 12 avg rating: 4.5 
    2001-12-16 cutomer: A11NCO6YTE4BTJ rating: 5 votes: 5 helpful: 4 
    2002-1-7 cutomer: A9CQ3PLRNIR83 rating: 4 votes: 5 helpful: 5 

Wie jede Multi-Line-Aufzeichnung in Funken zu identifizieren und zu verarbeiten?

+1

Ihre Eingabe sieht unheimlich wie JSON aus. Möglicherweise möchten Sie JSON mit einem Datensatz pro Zeile vorverarbeiten und dann mit 'SqlContext.jsonFile' laden. – huitseeker

+1

Sie müssen Ihr eigenes hadoop 'InputFormat' erstellen, das diese mehrzeiligen Dateien teilt, ohne einen einzelnen Datensatz zu teilen. Oder, wie @hititseeker vorschlägt, könnten Sie es in ein Format vorverarbeiten, das hadoop bereits kennt. – lmm

+1

@Huitseeker Aber es entspricht nicht dem JSON-Format –

Antwort

7

Ich habe dies durch die Implementierung von benutzerdefinierten Eingabeformat und Datensatzleser getan.

public class ParagraphInputFormat extends TextInputFormat { 

    @Override 
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) { 
     return new ParagraphRecordReader(); 
    } 
} 

public class ParagraphRecordReader extends RecordReader<LongWritable, Text> { 
    private long end; 
    private boolean stillInChunk = true; 

    private LongWritable key = new LongWritable(); 
    private Text value = new Text(); 

    private FSDataInputStream fsin; 
    private DataOutputBuffer buffer = new DataOutputBuffer(); 

    private byte[] endTag = "\n\r\n".getBytes(); 

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { 
     FileSplit split = (FileSplit) inputSplit; 
     Configuration conf = taskAttemptContext.getConfiguration(); 
     Path path = split.getPath(); 
     FileSystem fs = path.getFileSystem(conf); 

     fsin = fs.open(path); 
     long start = split.getStart(); 
     end = split.getStart() + split.getLength(); 
     fsin.seek(start); 

     if (start != 0) { 
      readUntilMatch(endTag, false); 
     } 
    } 

    public boolean nextKeyValue() throws IOException { 
     if (!stillInChunk) return false; 

     boolean status = readUntilMatch(endTag, true); 

     value = new Text(); 
     value.set(buffer.getData(), 0, buffer.getLength()); 
     key = new LongWritable(fsin.getPos()); 
     buffer.reset(); 

     if (!status) { 
      stillInChunk = false; 
     } 

     return true; 
    } 

    public LongWritable getCurrentKey() throws IOException, InterruptedException { 
     return key; 
    } 

    public Text getCurrentValue() throws IOException, InterruptedException { 
     return value; 
    } 

    public float getProgress() throws IOException, InterruptedException { 
     return 0; 
    } 

    public void close() throws IOException { 
     fsin.close(); 
    } 

    private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException { 
     int i = 0; 
     while (true) { 
      int b = fsin.read(); 
      if (b == -1) return false; 
      if (withinBlock) buffer.write(b); 
      if (b == match[i]) { 
       i++; 
       if (i >= match.length) { 
        return fsin.getPos() < end; 
       } 
      } else i = 0; 
     } 
    } 

} 

endTag identifiziert das Ende jeden Datensatz.

9

Wenn die Multi-Line-Daten einen definierten Datensatztrenn hat, könnten Sie die Hadoop-Unterstützung für mehrzeilige Datensätze verwenden, das Trennzeichen durch ein hadoop.Configuration Objekt bereitstellt:

etwas tun soll:

import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.io.{LongWritable, Text} 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat 
val conf = new Configuration 
conf.set("textinputformat.record.delimiter", "id:") 
val dataset = sc.newAPIHadoopFile("/path/to/data", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf) 
val data = dataset.map(x=>x._2.toString) 

Dies wird Ihnen eine RDD[String] zur Verfügung stellen, wo jedes Element einem Datensatz entspricht. Anschließend müssen Sie jeden Datensatz nach Ihren Anwendungsanforderungen analysieren.