2016-04-27 8 views
0

Ich habe ein Skript, um BSON-Dumps zu analysieren, aber es funktioniert nur mit unkomprimierten Dateien. Ich bekomme eine leere RDD beim Lesen von gz Bson-Dateien.PySpark: Leere RDD beim Lesen von gezippten BSON-Dateien

pyspark_location = 'lib/pymongo_spark.py' 
HDFS_HOME = 'hdfs://1.1.1.1/' 
INPUT_FILE = 'big_bson.gz' 


class BsonEncoder(JSONEncoder): 
    def default(self, obj): 
     if isinstance(obj, ObjectId): 
      return str(obj) 
     elif isinstance(obj, datetime): 
      return obj.isoformat() 
     return JSONEncoder.default(self, obj) 


def setup_spark_with_pymongo(app_name='App'): 
    conf = SparkConf().setAppName(app_name) 
    sc = SparkContext(conf=conf) 
    sc.addPyFile(pyspark_location) 
    return sc 


def main(): 
    spark_context = setup_spark_with_pymongo('PysparkApp') 
    filename = HDFS_HOME + INPUT_FILE 
    import pymongo_spark 
    pymongo_spark.activate() 
    rdd = spark_context.BSONFileRDD(filename) 
    print(rdd.first()) #Raises ValueError("RDD is empty") 

Ich bin mit Mongo-java-Treiber-3.2.2.jar, Mongo-Hadoop-Funken 1.5.2.jar, pymongo-3.2.2-py2.7-linux-x86_64 und pymongo_spark in zusammen mit Spark-submit. Die bereitgestellte Version von Spark ist 1.6.1 zusammen mit Hadoop 2.6.4.

Ich bin mir bewusst, dass die Bibliothek nicht unterstützt Splitting von komprimierten BSON-Dateien, aber es sollte mit einem einzigen Split. Ich habe Hunderte von komprimierten BSON-Dateien zu analysieren und Deflation jeder von ihnen scheint nicht eine praktikable Option sein.

Irgendeine Idee, wie ich weiter verfahren sollte? Vielen Dank im Voraus!

Antwort

0

Ich habe gerade in der Umgebung getestet: mongo-hadoop-spark-1.5.2.jar, Spark-Version 1.6.1 für Hadoop 2.6.4, Pymongo 3.2.2. Die Quelldatei ist eine Ausgabe von mongodump compressed und eine Datei kleiner Größe für einen einzelnen Split (unkomprimierte Sammlungsgröße von 105 MB). Laufen durch PySpark:

from pyspark import SparkContext, SparkConf 
import pymongo_spark 
pymongo_spark.activate() 
conf = SparkConf().setAppName("pyspark-bson") 
file_path = "/file/example_bson.gz" 
rdd = sc.BSONFileRDD(file_path) 
rdd.first() 

Es ist in der Lage die komprimierten BSON-Datei zu lesen, und das erste Dokument aufgeführt. Bitte stellen Sie sicher, dass Sie die Eingabedatei erreichen können und die Datei im richtigen BSON-Format vorliegt.