Ich habe dies durch die Implementierung von benutzerdefinierten Eingabeformat und Datensatzleser getan.
public class ParagraphInputFormat extends TextInputFormat {
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
return new ParagraphRecordReader();
}
}
public class ParagraphRecordReader extends RecordReader<LongWritable, Text> {
private long end;
private boolean stillInChunk = true;
private LongWritable key = new LongWritable();
private Text value = new Text();
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
private byte[] endTag = "\n\r\n".getBytes();
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
FileSplit split = (FileSplit) inputSplit;
Configuration conf = taskAttemptContext.getConfiguration();
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
fsin = fs.open(path);
long start = split.getStart();
end = split.getStart() + split.getLength();
fsin.seek(start);
if (start != 0) {
readUntilMatch(endTag, false);
}
}
public boolean nextKeyValue() throws IOException {
if (!stillInChunk) return false;
boolean status = readUntilMatch(endTag, true);
value = new Text();
value.set(buffer.getData(), 0, buffer.getLength());
key = new LongWritable(fsin.getPos());
buffer.reset();
if (!status) {
stillInChunk = false;
}
return true;
}
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
public float getProgress() throws IOException, InterruptedException {
return 0;
}
public void close() throws IOException {
fsin.close();
}
private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
if (b == -1) return false;
if (withinBlock) buffer.write(b);
if (b == match[i]) {
i++;
if (i >= match.length) {
return fsin.getPos() < end;
}
} else i = 0;
}
}
}
endTag identifiziert das Ende jeden Datensatz.
Ihre Eingabe sieht unheimlich wie JSON aus. Möglicherweise möchten Sie JSON mit einem Datensatz pro Zeile vorverarbeiten und dann mit 'SqlContext.jsonFile' laden. – huitseeker
Sie müssen Ihr eigenes hadoop 'InputFormat' erstellen, das diese mehrzeiligen Dateien teilt, ohne einen einzelnen Datensatz zu teilen. Oder, wie @hititseeker vorschlägt, könnten Sie es in ein Format vorverarbeiten, das hadoop bereits kennt. – lmm
@Huitseeker Aber es entspricht nicht dem JSON-Format –