Inspiriert von dieser question, schrieb ich einen Code zum Speichern einer RDD (die aus einer Parquet-Datei gelesen wurde), mit einem Schema von (Foto_ID, Daten), in Paaren, durch Tabulatoren und begrenzt nur als Detail Basis 64 kodieren sie, wie folgt aus:Lesen Sie eine verteilte Registerkarte mit Trennzeichen CSV
def do_pipeline(itr):
...
item_id = x.photo_id
def toTabCSVLine(data):
return '\t'.join(str(d) for d in data)
serialize_vec_b64pkl = lambda x: (x[0], base64.b64encode(cPickle.dumps(x[1])))
def format(data):
return toTabCSVLine(serialize_vec_b64pkl(data))
dataset = sqlContext.read.parquet('mydir')
lines = dataset.map(format)
lines.saveAsTextFile('outdir')
so, jetzt der Punkt von Interesse: , wie das Daten-Set und drucken zum Beispiel seine entserialisierten Daten lesen?
Ich benutze Python 2.6.6.
Mein Versuch, liegt hier, wo nur für die Überprüfung, dass alles getan werden kann, habe ich diesen Code geschrieben:
deserialize_vec_b64pkl = lambda x: (x[0], cPickle.loads(base64.b64decode(x[1])))
base64_dataset = sc.textFile('outdir')
collected_base64_dataset = base64_dataset.collect()
print(deserialize_vec_b64pkl(collected_base64_dataset[0].split('\t')))
die collect() aufruft, die für die Prüfung in Ordnung ist, aber in einer realen Welt Szenario würde kämpfen ...
Edit:
Als ich versuchte, zero323 des Vorschlag:
foo = (base64_dataset.map(str.split).map(deserialize_vec_b64pkl)).collect()
Ich habe diesen Fehler, der dazu läuft darauf hinaus:
PythonRDD[2] at RDD at PythonRDD.scala:43
16/08/04 18:32:30 WARN TaskSetManager: Lost task 4.0 in stage 0.0 (TID 4, gsta31695.tan.ygrid.yahoo.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
UnpicklingError: NEWOBJ class argument has NULL tp_new
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/08/04 18:32:30 ERROR TaskSetManager: Task 12 in stage 0.0 failed 4 times; aborting job
16/08/04 18:32:31 WARN TaskSetManager: Lost task 14.3 in stage 0.0 (TID 38, gsta31695.tan.ygrid.yahoo.com): TaskKilled (killed intentionally)
16/08/04 18:32:31 WARN TaskSetManager: Lost task 13.3 in stage 0.0 (TID 39, gsta31695.tan.ygrid.yahoo.com): TaskKilled (killed intentionally)
16/08/04 18:32:31 WARN TaskSetManager: Lost task 16.3 in stage 0.0 (TID 42, gsta31695.tan.ygrid.yahoo.com): TaskKilled (killed intentionally)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/homes/gsamaras/code/read_and_print.py in <module>()
17 print(base64_dataset.map(str.split).map(deserialize_vec_b64pkl))
18
---> 19 foo = (base64_dataset.map(str.split).map(deserialize_vec_b64pkl)).collect()
20 print(foo)
/home/gs/spark/current/python/lib/pyspark.zip/pyspark/rdd.py in collect(self)
769 """
770 with SCCallSiteSync(self.context) as css:
--> 771 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
772 return list(_load_from_socket(port, self._jrdd_deserializer))
773
/home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814
815 for temp_arg in temp_args:
/home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
Warum nicht 'base64_dataset.map (str.split) .map (deserialize_vec_b64pkl)'? – zero323
@ zero323 Ich wusste nicht, dass wir 'str.split' verwenden könnten, ich bin noch neu dazu, also bitte bare mit mir, ich bin mir ziemlich sicher, dass, wenn jemand erklärt, ich in der Lage sein werde, danach weiterzukommen Was Sie vorschlagen, sollte zu einem RDD führen. Um nur sicher zu sein, dass alles funktioniert, wie kann ich das erste Element sehen? Ich habe versucht, '' zu sammeln() ', was Sie sagten, aber das führte zu einem Fehler (' Py4JJavaError: Ein Fehler trat beim Aufrufen von z: org.apache.spark.api.python.PythonRDD.collectAndServe.'). Vielleicht könnte es helfen, wenn ich das Datenlayout der resultierenden RDD verstehe. – gsamaras
@ zero323 Ich benutze Python 2, es wäre genug, um das zu überdecken, ich meine von dort kann ich zu Python 3, wenn nötig! – gsamaras