2016-06-22 6 views
0

Problem:
Es sind keine Zuweisungen in Lambda oder Datenrahmen Transformationen erlaubt, das bedeutet, dass wir normalerweise eine neue Struktur für jede Datenbearbeitung in Datarahmen mit Spark erstellen müssen.Wie ändert man numpy Arrays in Spark Dataframe?

Beispiel (Python):
Ich habe einfach durch die Schaffung der modifizierten Daten an Ort und Stelle ohne Zuweisungen in Listen und Wörterbücher, aber die numpy Arithmetik erweist sich als sehr mühsam, um dieses Problem vorher bekommen. Und ich habe einige Simulationen durchgeführt, um all diese Daten in Listen zu schreiben, und es würde ziemlich deutlich verlangsamt werden, da die Arrays ziemlich groß sind. (Bsp. Diese Anordnungen sind etwa 3K Elemente lang jeweils für mehrere Millionen Zeilen in Listen von 30 Arrays pro db Reihe, enthalten)

a = np.zeros(5) 

# Actual operation 
a[1:3] += 7 
print "{}".format(a) 
>> [ 0. 7. 7. 0. 0.] 

# Spark compatability - Create modified array in memory to avoid assignment 
# Not sure if this is best "solution" performance-wise 
c = np.concatenate([a[:1], a[1:3] + 7, a[3:]]) 
print "{}\n".format(c) 
>> [ 0. 7. 7. 0. 0.] 

Beispiel (pyspark):
So, jetzt können Sie die Ausgabe sehen Ich erwarte, hier ist eine Spark-Version.

t = sc.parallelize(a) 
t2 = t.map(lambda ar: np.concatenate([ar[:1], ar[1:3] + 7, ar[3:]])) 
t2.take(1) 

Fehler:
Ich dachte, das würde funktionieren, aber ich diese. Ich dachte, das Problem wäre "ar [1: 3] + 7", aber nachdem ich es ohne es ausgeführt hatte, gab es immer noch denselben Fehler. Vielleicht gibt es etwas, das mir fehlt.

Maybe the np.concatenate() does some sort of assignment that causes this. If that is the case what would be a way around it?

--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-46-4a4c467a0b3d> in <module>() 
    12 t = sc.parallelize(a) 
    13 t2 = t.map(lambda ar: np.concatenate([ar[:1], ar[1:3] + 7, ar[3:]])) 
---> 14 t2.take(1) 

/databricks/spark/python/pyspark/rdd.py in take(self, num) 
    1297 
    1298    p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) 
-> 1299    res = self.context.runJob(self, takeUpToNumLeft, p) 
    1300 
    1301    items += res 

/databricks/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 
    914   # SparkContext#runJob. 
    915   mappedRDD = rdd.mapPartitions(partitionFunc) 
--> 916   port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 
    917   return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 
    918 

/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    536   answer = self.gateway_client.send_command(command) 
    537   return_value = get_return_value(answer, self.gateway_client, 
--> 538     self.target_id, self.name) 
    539 
    540   for temp_arg in temp_args: 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 
    34  def deco(*a, **kw): 
    35   try: 
---> 36    return f(*a, **kw) 
    37   except py4j.protocol.Py4JJavaError as e: 
    38    s = e.java_exception.toString() 

/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    298     raise Py4JJavaError(
    299      'An error occurred while calling {0}{1}{2}.\n'. 
--> 300      format(target_id, '.', name), value) 
    301    else: 
    302     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 25.0 failed 1 times, most recent failure: Lost task 0.0 in stage 25.0 (TID 30, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/databricks/spark/python/pyspark/worker.py", line 111, in main 
    process() 
    File "/databricks/spark/python/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/databricks/spark/python/pyspark/serializers.py", line 263, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "/databricks/spark/python/pyspark/rdd.py", line 1295, in takeUpToNumLeft 
    yield next(iterator) 
    File "<ipython-input-46-4a4c467a0b3d>", line 13, in <lambda> 
IndexError: invalid index to scalar variable. 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1827) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1840) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1853) 
    at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393) 
    at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/databricks/spark/python/pyspark/worker.py", line 111, in main 
    process() 
    File "/databricks/spark/python/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/databricks/spark/python/pyspark/serializers.py", line 263, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "/databricks/spark/python/pyspark/rdd.py", line 1295, in takeUpToNumLeft 
    yield next(iterator) 
    File "<ipython-input-46-4a4c467a0b3d>", line 13, in <lambda> 
IndexError: invalid index to scalar variable. 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    ... 1 more 

Antwort

0

Die Ursache des Problems ist viel einfacher als das. Wenn Sie sc.parallelize(a) ausführen, wird das Eingabearray in eine Liste konvertiert und Elemente dieser Liste werden zu Elementen der . Wenn Sie also map ausführen, wendet es die Funktion für jedes Element des Eingangs separat an. Es entspricht also etwa so etwas:

f = lambda ar: np.concatenate([ar[:1], ar[1:3] + 7, ar[3:]]) 

[f(x) for x in list(a)] 
## IndexError  
## ... 
## IndexError: invalid index to scalar variable. 

Daher der Fehler, den Sie sehen. Was Sie wollen, ist höchstwahrscheinlich das:

sc.parallelize([a]).map(f).take(1) 
## [array([ 0., 14., 14., 0., 0.])] 

Darüber hinaus gibt es zwei Dinge bemerkenswert:

  • Funke nicht Lambda-Ausdruck benötigt, wenn sie mit Funktionen höherer Ordnung arbeiten. Die einzige Voraussetzung ist, dass die übergebene Funktion ihre Argumente nicht ändern sollte und optimal rein sein sollte. In der Praxis können Sie die Daten in PySpark (nicht Spark im Allgemeinen) ändern, wenn Sie wissen, was im Inneren passiert, aber das sollten Sie in der Praxis nicht tun. Um die Frage aus dem Titel zu beantworten, versuchen Sie es einfach nicht.
  • Lambda-Ausdrücke haben keinen magischen Schutz, der vor Nebenwirkungen schützt. Sie können Anweisungen in seinem Körper nicht direkt verwenden.