2016-08-04 39 views
3

Inspiriert von dieser question, schrieb ich einen Code zum Speichern einer RDD (die aus einer Parquet-Datei gelesen wurde), mit einem Schema von (Foto_ID, Daten), in Paaren, durch Tabulatoren und begrenzt nur als Detail Basis 64 kodieren sie, wie folgt aus:Lesen Sie eine verteilte Registerkarte mit Trennzeichen CSV

def do_pipeline(itr): 
    ... 
    item_id = x.photo_id 

def toTabCSVLine(data): 
    return '\t'.join(str(d) for d in data) 

serialize_vec_b64pkl = lambda x: (x[0], base64.b64encode(cPickle.dumps(x[1]))) 

def format(data): 
    return toTabCSVLine(serialize_vec_b64pkl(data)) 

dataset = sqlContext.read.parquet('mydir') 
lines = dataset.map(format) 
lines.saveAsTextFile('outdir') 

so, jetzt der Punkt von Interesse: , wie das Daten-Set und drucken zum Beispiel seine entserialisierten Daten lesen?

Ich benutze Python 2.6.6.


Mein Versuch, liegt hier, wo nur für die Überprüfung, dass alles getan werden kann, habe ich diesen Code geschrieben:

deserialize_vec_b64pkl = lambda x: (x[0], cPickle.loads(base64.b64decode(x[1]))) 

base64_dataset = sc.textFile('outdir') 
collected_base64_dataset = base64_dataset.collect() 
print(deserialize_vec_b64pkl(collected_base64_dataset[0].split('\t'))) 

die collect() aufruft, die für die Prüfung in Ordnung ist, aber in einer realen Welt Szenario würde kämpfen ...


Edit:

Als ich versuchte, zero323 des Vorschlag:

foo = (base64_dataset.map(str.split).map(deserialize_vec_b64pkl)).collect() 

Ich habe diesen Fehler, der dazu läuft darauf hinaus:

PythonRDD[2] at RDD at PythonRDD.scala:43 
16/08/04 18:32:30 WARN TaskSetManager: Lost task 4.0 in stage 0.0 (TID 4, gsta31695.tan.ygrid.yahoo.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/worker.py", line 98, in main 
    command = pickleSer._read_with_length(infile) 
    File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length 
    return self.loads(obj) 
    File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/serializers.py", line 422, in loads 
    return pickle.loads(obj) 
UnpicklingError: NEWOBJ class argument has NULL tp_new 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

16/08/04 18:32:30 ERROR TaskSetManager: Task 12 in stage 0.0 failed 4 times; aborting job 
16/08/04 18:32:31 WARN TaskSetManager: Lost task 14.3 in stage 0.0 (TID 38, gsta31695.tan.ygrid.yahoo.com): TaskKilled (killed intentionally) 
16/08/04 18:32:31 WARN TaskSetManager: Lost task 13.3 in stage 0.0 (TID 39, gsta31695.tan.ygrid.yahoo.com): TaskKilled (killed intentionally) 
16/08/04 18:32:31 WARN TaskSetManager: Lost task 16.3 in stage 0.0 (TID 42, gsta31695.tan.ygrid.yahoo.com): TaskKilled (killed intentionally) 
--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
/homes/gsamaras/code/read_and_print.py in <module>() 
    17  print(base64_dataset.map(str.split).map(deserialize_vec_b64pkl)) 
    18 
---> 19  foo = (base64_dataset.map(str.split).map(deserialize_vec_b64pkl)).collect() 
    20  print(foo) 

/home/gs/spark/current/python/lib/pyspark.zip/pyspark/rdd.py in collect(self) 
    769   """ 
    770   with SCCallSiteSync(self.context) as css: 
--> 771    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    772   return list(_load_from_socket(port, self._jrdd_deserializer)) 
    773 

/home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    811   answer = self.gateway_client.send_command(command) 
    812   return_value = get_return_value(
--> 813    answer, self.gateway_client, self.target_id, self.name) 
    814 
    815   for temp_arg in temp_args: 

/home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    306     raise Py4JJavaError(
    307      "An error occurred while calling {0}{1}{2}.\n". 
--> 308      format(target_id, ".", name), value) 
    309    else: 
    310     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
+1

Warum nicht 'base64_dataset.map (str.split) .map (deserialize_vec_b64pkl)'? – zero323

+0

@ zero323 Ich wusste nicht, dass wir 'str.split' verwenden könnten, ich bin noch neu dazu, also bitte bare mit mir, ich bin mir ziemlich sicher, dass, wenn jemand erklärt, ich in der Lage sein werde, danach weiterzukommen Was Sie vorschlagen, sollte zu einem RDD führen. Um nur sicher zu sein, dass alles funktioniert, wie kann ich das erste Element sehen? Ich habe versucht, '' zu sammeln() ', was Sie sagten, aber das führte zu einem Fehler (' Py4JJavaError: Ein Fehler trat beim Aufrufen von z: org.apache.spark.api.python.PythonRDD.collectAndServe.'). Vielleicht könnte es helfen, wenn ich das Datenlayout der resultierenden RDD verstehe. – gsamaras

+0

@ zero323 Ich benutze Python 2, es wäre genug, um das zu überdecken, ich meine von dort kann ich zu Python 3, wenn nötig! – gsamaras

Antwort

2

Lassen Sie uns ein einfaches Beispiel versuchen. Aus praktischen Gründen verwende ich die handliche -Bibliothek, aber das ist hier nicht wirklich erforderlich.

import sys 
import base64 

if sys.version_info < (3,): 
    import cPickle as pickle 
else: 
    import pickle 


from toolz.functoolz import compose 

rdd = sc.parallelize([(1, {"foo": "bar"}), (2, {"bar": "foo"})]) 

Jetzt ist Ihr Code nicht gerade portabel. In Python 2 gibt base64.b64encodestr zurück, während in Python 3 bytes zurückgegeben wird. Lets zeigen, dass:

  • Python 2

    type(base64.b64encode(pickle.dumps({"foo": "bar"}))) 
    ## str 
    
  • Python 3

    type(base64.b64encode(pickle.dumps({"foo": "bar"}))) 
    ## bytes 
    

So an die Pipeline Decodierung hinzufügen können:

# Equivalent to 
# def pickle_and_b64(x): 
#  return base64.b64encode(pickle.dumps(x)).decode("ascii") 

pickle_and_b64 = compose(
    lambda x: x.decode("ascii"), 
    base64.b64encode, 
    pickle.dumps 
) 

Bitte beachten Sie, dass dies keine besondere Form der Daten voraussetzt.Aus diesem Grund werden wir mapValues verwenden, um nur Schlüssel zu serialisiert:

serialized = rdd.mapValues(pickle_and_b64) 
serialized.first() 
## 1, u'KGRwMApTJ2ZvbycKcDEKUydiYXInCnAyCnMu') 

Jetzt können wir es mit einfachen Format folgen und sparen:

from tempfile import mkdtemp 
import os 

outdir = os.path.join(mkdtemp(), "foo") 

serialized.map(lambda x: "{0}\t{1}".format(*x)).saveAsTextFile(outdir) 

die Datei lesen wir umkehren den Prozess:

# Equivalent to 
# def b64_and_unpickle(x): 
#  return pickle.loads(base64.b64decode(x)) 

b64_and_unpickle = compose(
    pickle.loads, 
    base64.b64decode 
) 

decoded = (sc.textFile(outdir) 
    .map(lambda x: x.split("\t")) # In Python 3 we could simply use str.split 
    .mapValues(b64_and_unpickle)) 

decoded.first() 
## (u'1', {'foo': 'bar'}) 
+0

Auch wenn Sie auf Python 2.x sind a) 'str.split' funktioniert möglicherweise nicht. Verwenden Sie stattdessen die vollständige Funktion. B) Zum Testen ist "Beize" etwas ausführlicher, wenn Fehlermeldungen angezeigt werden. – zero323

+0

2.6? !! Habe das eine Zeit lang nicht gesehen :) Ich habe nicht einmal Inthronisierung, die ich benutzen kann, um es zu testen. Ganz zu schweigen von der Tatsache, dass Spark in der letzten Version 2.6 Unterstützung verloren hat und die Branche vor einigen Jahren ihr Ende erreicht hat. Bezüglich Toolz - kein besonderer Grund außer Bequemlichkeit. Ich bin verwöhnt und finde Verschachtelungsfunktionsaufrufe mühsam. Ich habe Funktionen mit vollem Funktionsumfang hinzugefügt. – zero323

+1

Oh ich hätte eine Funktion schreiben sollen, Dumm von mir tut mir leid! Alles gut jetzt, ich werde meinen Code debuggen, danke! – gsamaras