2016-08-08 19 views
1

Ich muss Koordinaten vergleichen, um die Entfernung zu erhalten. Dazu lade ich die Daten mit sc.textFile() und mache ein kartesisches Produkt. Es gibt ca. 2.000.000 Zeilen in der Textdatei, also 2.000.000 x 2.000.000 zu vergleichende Koordinaten.Spark cartesian Produkt

Ich habe den Code mit etwa 2.000 Koordinaten getestet und es funktionierte innerhalb von Sekunden. Aber mit der großen Datei scheint es an einem bestimmten Punkt zu stoppen und ich weiß nicht warum. Der Code sieht folgendermaßen aus:

def concat(x,y): 
    if(isinstance(y, list)&(isinstance(x,list))): 
     return x + y 
    if(isinstance(x,list)&isinstance(y,tuple)): 
     return x + [y] 
    if(isinstance(x,tuple)&isinstance(y,list)): 
     return [x] + y 
    else: return [x,y] 

def haversian_dist(tuple): 
    lat1 = float(tuple[0][0]) 
    lat2 = float(tuple[1][0]) 
    lon1 = float(tuple[0][2]) 
    lon2 = float(tuple[1][2]) 
    p = 0.017453292519943295 
    a = 0.5 - cos((lat2 - lat1) * p)/2 + cos(lat1 * p) * cos(lat2 * p) * (1 - cos((lon2 - lon1) * p))/2 
    print(tuple[0][1]) 
    return (int(float(tuple[0][1])), (int(float(tuple[1][1])),12742 * asin(sqrt(a)))) 

def sort_val(tuple): 
    dtype = [("globalid", int),("distance",float)] 
    a = np.array(tuple[1], dtype=dtype) 
    sorted_mins = np.sort(a, order="distance",kind="mergesort") 
    return (tuple[0], sorted_mins) 


def calc_matrix(sc, path, rangeval, savepath, name): 
    data = sc.textFile(path) 
    data = data.map(lambda x: x.split(";")) 
    data = data.repartition(100).cache() 
    data.collect() 
    matrix = data.cartesian(data) 
    values = matrix.map(haversian_dist) 
    values = values.reduceByKey(concat) 
    values = values.map(sort_val) 
    values = values.map(lambda x: (x[0], x[1][1:int(rangeval)].tolist())) 
    values = values.map(lambda x: (x[0], [y[0] for y in x[1]])) 
    dicti = values.collectAsMap() 
    hp.save_pickle(dicti, savepath, name) 

Selbst eine Datei mit etwa 15.000 Einträgen funktioniert nicht. Ich weiß, dass das Kartesische O (n^2) Laufzeit verursacht. Aber sollte das nicht funktionieren? Oder stimmt etwas nicht? Der einzige Ausgangspunkt ist eine Fehlermeldung, aber ich weiß nicht, ob es auf das eigentliche Problem betrifft:

16/08/06 22:21:12 WARN TaskSetManager: Lost task 15.0 in stage 1.0 (TID 16, hlb0004): java.net.SocketException: Daten?bergabe unterbrochen (broken pipe) 
    at java.net.SocketOutputStream.socketWrite0(Native Method) 
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) 
    at java.net.SocketOutputStream.write(SocketOutputStream.java:153) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) 
    at java.io.DataOutputStream.write(DataOutputStream.java:107) 
    at java.io.FilterOutputStream.write(FilterOutputStream.java:97) 
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:440) 
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452) 
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280) 
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741) 
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239) 

16/08/06 22:21:12 INFO TaskSetManager: Starting task 15.1 in stage 1.0 (TID 17, hlb0004, partition 15,PROCESS_LOCAL, 2408 bytes) 
16/08/06 22:21:12 WARN TaskSetManager: Lost task 7.0 in stage 1.0 (TID 8, hlb0004): java.net.SocketException: Connection reset 
    at java.net.SocketInputStream.read(SocketInputStream.java:209) 
    at java.net.SocketInputStream.read(SocketInputStream.java:141) 
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) 
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265) 
    at java.io.DataInputStream.readInt(DataInputStream.java:387) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:139) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
+0

können Sie ein Beispiel für Ihre Daten geben? Auch wenn 'dist (u, v) == dist (v, u) 'und' dist (u, u) == 0' oder eine Konstante, dann können Sie die Anzahl der Berechnungen auf '(n * (n -1))/2' Paare statt 'n^2' Paare. – jtitusj

+0

eine Zeile sieht so aus "94.5406036377; 1313316.000000000000000; 32.791301727300002; 5", ja, ich kann dies verwenden, um es zu reduzieren, aber ich denke, es stoppt sogar vor diesen Berechnungen. oder kann ich das umsetzen, während der kartesian gebaut wird? –

+0

kannst du mir auf die hairsine distance Formel zeigen, die du benutzt hast und mir die Konstanten 'p' und 12742 erklären? Es scheint ein Problem bei der Berechnung der Entfernung zu geben. – jtitusj

Antwort

3

Sie verwenden data.collect() im Code, die im Grunde alle Daten in eine Maschine aufruft. Abhängig vom Speicher auf diesem Computer passen möglicherweise 2.000.000 Datenzeilen nicht sehr gut.

Außerdem habe ich versucht, die Anzahl der Berechnungen zu reduzieren, indem Joins statt cartesian durchgeführt werden. (Bitte beachten Sie, dass ich gerade erzeugten Zufallszahlen mit numpy und dass das Format hier kann unterschiedlich sein von dem, was Sie haben. Doch die Hauptidee ist die gleiche.)

import numpy as np 
from numpy import arcsin, cos, sqrt 

# suppose my data consists of latlong pairs 
# we will use the indices for pairing up values 
data = sc.parallelize(np.random.rand(10,2)).zipWithIndex() 
data = data.map(lambda (val, idx): (idx, val)) 

# generate pairs (e.g. if i have 3 pairs with indices [0,1,2], 
# I only have to compute for distances of pairs (0,1), (0,2) & (1,2) 
idxs = range(data.count()) 
indices = sc.parallelize([(i,j) for i in idxs for j in idxs if i < j]) 

# haversian func (i took the liberty of editing some parts of it) 
def haversian_dist(latlong1, latlong2): 
    lat1, lon1 = latlong1 
    lat2, lon2 = latlong2 
    p = 0.017453292519943295 
    def hav(theta): return (1 - cos(p * theta))/2 
    a = hav(lat2 - lat1) + cos(p * lat1)*cos(p * lat2)*hav(lon2 - lon1) 
    return 12742 * arcsin(sqrt(a)) 

joined1 = indices.join(data).map(lambda (i, (j, val)): (j, (i, val))) 
joined2 = joined1.join(data).map(lambda (j, ((i, latlong1), latlong2)): ((i,j), (latlong1, latlong2)) 
haversianRDD = joined2.mapValues(lambda (x, y): haversian_dist(x, y)) 
+0

das scheint zu funktionieren eine art, dass ich jetzt einen fehler bekomme "slurmstepd: error: Überschrittene Speichergrenze irgendwann überschritten." Danke für deine Antwort! ich hoffe, dass ich diesen neuen Fehler ^^ lösen kann –