Problem:
Es sind keine Zuweisungen in Lambda oder Datenrahmen Transformationen erlaubt, das bedeutet, dass wir normalerweise eine neue Struktur für jede Datenbearbeitung in Datarahmen mit Spark erstellen müssen.Wie ändert man numpy Arrays in Spark Dataframe?
Beispiel (Python):
Ich habe einfach durch die Schaffung der modifizierten Daten an Ort und Stelle ohne Zuweisungen in Listen und Wörterbücher, aber die numpy Arithmetik erweist sich als sehr mühsam, um dieses Problem vorher bekommen. Und ich habe einige Simulationen durchgeführt, um all diese Daten in Listen zu schreiben, und es würde ziemlich deutlich verlangsamt werden, da die Arrays ziemlich groß sind. (Bsp. Diese Anordnungen sind etwa 3K Elemente lang jeweils für mehrere Millionen Zeilen in Listen von 30 Arrays pro db Reihe, enthalten)
a = np.zeros(5)
# Actual operation
a[1:3] += 7
print "{}".format(a)
>> [ 0. 7. 7. 0. 0.]
# Spark compatability - Create modified array in memory to avoid assignment
# Not sure if this is best "solution" performance-wise
c = np.concatenate([a[:1], a[1:3] + 7, a[3:]])
print "{}\n".format(c)
>> [ 0. 7. 7. 0. 0.]
Beispiel (pyspark):
So, jetzt können Sie die Ausgabe sehen Ich erwarte, hier ist eine Spark-Version.
t = sc.parallelize(a)
t2 = t.map(lambda ar: np.concatenate([ar[:1], ar[1:3] + 7, ar[3:]]))
t2.take(1)
Fehler:
Ich dachte, das würde funktionieren, aber ich diese. Ich dachte, das Problem wäre "ar [1: 3] + 7", aber nachdem ich es ohne es ausgeführt hatte, gab es immer noch denselben Fehler. Vielleicht gibt es etwas, das mir fehlt.
Maybe the np.concatenate() does some sort of assignment that causes this. If that is the case what would be a way around it?
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-46-4a4c467a0b3d> in <module>()
12 t = sc.parallelize(a)
13 t2 = t.map(lambda ar: np.concatenate([ar[:1], ar[1:3] + 7, ar[3:]]))
---> 14 t2.take(1)
/databricks/spark/python/pyspark/rdd.py in take(self, num)
1297
1298 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1299 res = self.context.runJob(self, takeUpToNumLeft, p)
1300
1301 items += res
/databricks/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
914 # SparkContext#runJob.
915 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 916 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
917 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
918
/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
34 def deco(*a, **kw):
35 try:
---> 36 return f(*a, **kw)
37 except py4j.protocol.Py4JJavaError as e:
38 s = e.java_exception.toString()
/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 25.0 failed 1 times, most recent failure: Lost task 0.0 in stage 25.0 (TID 30, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 111, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/databricks/spark/python/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/databricks/spark/python/pyspark/rdd.py", line 1295, in takeUpToNumLeft
yield next(iterator)
File "<ipython-input-46-4a4c467a0b3d>", line 13, in <lambda>
IndexError: invalid index to scalar variable.
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1827)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1840)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1853)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 111, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/databricks/spark/python/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/databricks/spark/python/pyspark/rdd.py", line 1295, in takeUpToNumLeft
yield next(iterator)
File "<ipython-input-46-4a4c467a0b3d>", line 13, in <lambda>
IndexError: invalid index to scalar variable.
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more