Ich muss Koordinaten vergleichen, um die Entfernung zu erhalten. Dazu lade ich die Daten mit sc.textFile() und mache ein kartesisches Produkt. Es gibt ca. 2.000.000 Zeilen in der Textdatei, also 2.000.000 x 2.000.000 zu vergleichende Koordinaten.Spark cartesian Produkt
Ich habe den Code mit etwa 2.000 Koordinaten getestet und es funktionierte innerhalb von Sekunden. Aber mit der großen Datei scheint es an einem bestimmten Punkt zu stoppen und ich weiß nicht warum. Der Code sieht folgendermaßen aus:
def concat(x,y):
if(isinstance(y, list)&(isinstance(x,list))):
return x + y
if(isinstance(x,list)&isinstance(y,tuple)):
return x + [y]
if(isinstance(x,tuple)&isinstance(y,list)):
return [x] + y
else: return [x,y]
def haversian_dist(tuple):
lat1 = float(tuple[0][0])
lat2 = float(tuple[1][0])
lon1 = float(tuple[0][2])
lon2 = float(tuple[1][2])
p = 0.017453292519943295
a = 0.5 - cos((lat2 - lat1) * p)/2 + cos(lat1 * p) * cos(lat2 * p) * (1 - cos((lon2 - lon1) * p))/2
print(tuple[0][1])
return (int(float(tuple[0][1])), (int(float(tuple[1][1])),12742 * asin(sqrt(a))))
def sort_val(tuple):
dtype = [("globalid", int),("distance",float)]
a = np.array(tuple[1], dtype=dtype)
sorted_mins = np.sort(a, order="distance",kind="mergesort")
return (tuple[0], sorted_mins)
def calc_matrix(sc, path, rangeval, savepath, name):
data = sc.textFile(path)
data = data.map(lambda x: x.split(";"))
data = data.repartition(100).cache()
data.collect()
matrix = data.cartesian(data)
values = matrix.map(haversian_dist)
values = values.reduceByKey(concat)
values = values.map(sort_val)
values = values.map(lambda x: (x[0], x[1][1:int(rangeval)].tolist()))
values = values.map(lambda x: (x[0], [y[0] for y in x[1]]))
dicti = values.collectAsMap()
hp.save_pickle(dicti, savepath, name)
Selbst eine Datei mit etwa 15.000 Einträgen funktioniert nicht. Ich weiß, dass das Kartesische O (n^2) Laufzeit verursacht. Aber sollte das nicht funktionieren? Oder stimmt etwas nicht? Der einzige Ausgangspunkt ist eine Fehlermeldung, aber ich weiß nicht, ob es auf das eigentliche Problem betrifft:
16/08/06 22:21:12 WARN TaskSetManager: Lost task 15.0 in stage 1.0 (TID 16, hlb0004): java.net.SocketException: Daten?bergabe unterbrochen (broken pipe)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:440)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)
16/08/06 22:21:12 INFO TaskSetManager: Starting task 15.1 in stage 1.0 (TID 17, hlb0004, partition 15,PROCESS_LOCAL, 2408 bytes)
16/08/06 22:21:12 WARN TaskSetManager: Lost task 7.0 in stage 1.0 (TID 8, hlb0004): java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:209)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:139)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
können Sie ein Beispiel für Ihre Daten geben? Auch wenn 'dist (u, v) == dist (v, u) 'und' dist (u, u) == 0' oder eine Konstante, dann können Sie die Anzahl der Berechnungen auf '(n * (n -1))/2' Paare statt 'n^2' Paare. – jtitusj
eine Zeile sieht so aus "94.5406036377; 1313316.000000000000000; 32.791301727300002; 5", ja, ich kann dies verwenden, um es zu reduzieren, aber ich denke, es stoppt sogar vor diesen Berechnungen. oder kann ich das umsetzen, während der kartesian gebaut wird? –
kannst du mir auf die hairsine distance Formel zeigen, die du benutzt hast und mir die Konstanten 'p' und 12742 erklären? Es scheint ein Problem bei der Berechnung der Entfernung zu geben. – jtitusj