2016-07-08 14 views
1

habe ich den folgenden Code ein HBase Tabelle zu exportieren und die Ausgabe auf HDFS sparen:Wie Dateien lesen Sequenz von HBase exportiert

hbase org.apache.hadoop.hbase.mapreduce.Export \ 
MyHbaseTable1 hdfs://nameservice1/user/ken/data/exportTable1 

Ausgabedateien Binärdateien sind. Wenn ich pyspark verwenden, um die Datei-Ordner zu lesen:

test1 = sc.textFile('hdfs://nameservice1/user/ken/data/exportTable1') 
test1.show(5) 

Es zeigt:

u'SEQ\x061org.apache.hadoop.hbase.io.ImmutableBytesWritable%org.apache.hadoop.hbase.client.Result\x00\x00\x00\x00\x00\x00\ufffd-\x10A\ufffd~lUE\u025bt\ufffd\ufffd\ufffd&\x00\x00\x04\ufffd\x00\x00\x00' 
u'\x00\x00\x00\x067-2010\ufffd\t' 
u'|' 
u'\x067-2010\x12\x01r\x1a\x08clo-0101 \ufffd\ufffd\ufffd*(\x042\\6.67|10|10|10|7.33|6.67|6.67|6.67|6.67|6.67|6.67|5.83|3.17|0|0|0.67|0.67|0.67|0.67|0|0|0|0|0' 
u'u' 

kann ich sagen, dass

  • '7-2010' in der 2. Zeile der ist Rowkey,
  • 'r' in der 4. Zeile ist die Spaltenfamilie,
  • 'CLO-0101' in der 4. Zeile ist der Spaltenname,
  • ‚6,67 | 10 | 10 | 10 | 7,33 | 6,67 | 6,67 | 6,67 | 6,67 | 6,67 | 6,67 | 5,83 | 3,17 | 0 | 0 | 0.67 | 0.67 | 0.67 | 0.67 | 0 | 0 | 0 | 0 | 0 ' ist der Wert.

Ich weiß nicht, wo die 3. und 5. Zeile herkam. Es scheint, als ob Hbase-export seiner eigenen Regel folgt, um die Datei zu erzeugen, wenn ich meine eigene Weise benutze, um sie zu dekodieren, könnten Daten beschädigt sein.

Frage:

Wie kann ich konvertieren diese Datei wieder in ein lesbares Format? Zum Beispiel:

7-2010, r, clo-0101, 6.67|10|10|10|7.33|6.67|6.67|6.67|6.67|6.67|6.67|5.83|3.17|0|0|0.67|0.67|0.67|0.67|0|0|0|0|0 

Ich habe versucht:

test1 = sc.sequenceFile('/user/youyang/data/hbaseSnapshot1/', keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, minSplits=None, batchSize=0) 
test1.show(5) 

und

test1 = sc.sequenceFile('hdfs://nameservice1/user/ken/data/exportTable1' 
      , keyClass='org.apache.hadoop.hbase.mapreduce.TableInputFormat' 
      , valueClass='org.apache.hadoop.hbase.io.ImmutableBytesWritable' 
      , keyConverter='org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter' 
      , valueConverter='org.apache.spark.examples.pythonconverters.HBaseResultToStringCon verter' 
      , minSplits=None 
      , batchSize=100) 

Kein Glück hat der Code nicht funktioniert, ERROR:

Caused by: java.io.IOException: Could not find a deserializer for the Value class: 'org.apache.hadoop.hbase.client.Result'. Please ensure that the configuration 'io.serializations' is properly configured, if you're using custom serialization.

Irgendwelche Vorschläge? Vielen Dank!

Antwort

0

Ich hatte dieses Problem vor kurzem selbst. Ich löste es, indem ich von sc.sequenceFile wegging, und stattdessen sc.newAPIHadoopFile (oder einfach hadoopFile, wenn Sie auf der alten API sind). Der Spark SequenceFile-Leser scheint nur Schlüssel/Werte zu behandeln, die beschreibbare Typen sind (sie wird in docs angegeben).

Wenn Sie newAPIHadoopFile es verwenden, verwendet die Hadoop Deserialisierung Logik, und Sie können die Serialisierung Typen, die Sie benötigen in der config-Wörterbuch Sie geben es an: für „io

hadoop_conf = {"io.serializations": "org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.hbase.mapreduce.ResultSerialization"} 

sc.newAPIHadoopFile(
<input_path>, 
'org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat', 
keyClass='org.apache.hadoop.hbase.io.ImmutableBytesWritable', 
valueClass='org.apache.hadoop.hbase.client.Result', 
keyConverter='org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter', 
valueConverter='org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter', 
conf=hadoop_conf) 

Beachten Sie, dass der Wert in hadoop_conf. Serialisierungen "ist eine durch Kommas getrennte Liste, die" org.apache.hadoop.hbase.mapreduce.ResultSerialization "enthält. Das ist die Schlüsselkonfiguration, die Sie benötigen, um das Ergebnis deserialisieren zu können. Die WriteableSerialization wird auch benötigt, um ImmutableBytesWritable deserialisieren zu können.Sie können auch sc.newAPIHadoopRDD verwenden, aber Sie müssen auch einen Wert für "mapreduce.input.fileinputformat.inputdir" im Konfigurationswörterbuch festlegen.