2014-06-10 4 views
9

Ich schreibe ein Diagramm-bezogenes Programm in Scala mit Spark. Der Datensatz hat 4 Millionen Knoten und 4 Millionen Kanten (Sie können dies als Baum behandeln), aber ich bearbeite für jeden Zeitpunkt (ein) nur einen Teil davon, nämlich einen Unterbaum, der von einem bestimmten Knoten verwurzelt ist, und den Knoten in einem Pfad zwischen diesem gegebenen Knoten und dem Root.Was ist der effiziente Weg, den Wert in Sparks RDD zu aktualisieren?

Die Iteration hat Abhängigkeit, was bedeutet i+1 braucht das Ergebnis von i. Also muss ich das Ergebnis von jedem Iteration für den nächsten Schritt speichern.

Ich versuche, eine effiziente Art und Weise zu finden RDD zu aktualisieren, haben aber keine Ahnung, so far.I feststellen, dass PairRDD eine lookup Funktion haben, die die Rechenzeit von O(N), auf O verringern könnte (M), N die bezeichnen Gesamtzahl der Objekte in RDD und M bezeichnen die Anzahl der Elemente in jeder Partition.

Also ich denke, gibt es sowieso, dass ich ein Objekt in der mit O(M) aktualisieren könnte? Oder mehr ideal, O (1)? (Ich sehe eine E-Mail in Spark Mail-Liste sagen, dass die lookup kann modifiziert werden, um O zu erreichen (1))

Eine andere Sache ist, wenn ich O(M) für die Aktualisierung der RDD erreichen könnte, Könnte ich die Partition auf eine Nummer größer als die Anzahl der Kerne erhöhen und eine bessere Leistung erzielen?

+1

RDD unveränderlich ist, können Sie nur eine neue RDD durch Transformation erstellen, aber es nicht aktualisieren kann. – cloud

+0

@cloud Danke für deinen Kommentar, bedeutet das, dass ich eine ganze neue RDD anstatt nur einer Partition erstellen muss? – bxshi

+1

Das war's. Ich werde eine Antwort schreiben, um es im Detail zu erklären. – cloud

Antwort

4

Ein RDD ist ein verteilter Datensatz, eine Partition ist die Einheit für RDD-Speicher und die zu verarbeitende Einheit und RDD ist ein Element.

Zum Beispiel, Sie lesen eine große Datei von HDFS als RDD, dann ist das Element dieser RDD String (Zeilen in dieser Datei), und Funken speichert diese RDD über den Cluster nach Partition. Für Sie, als Spark-Benutzer, müssen Sie nur darauf achten, wie Sie mit den Zeilen dieser Dateien umgehen, so wie Sie ein normales Programm schreiben, und Sie lesen eine Datei aus dem lokalen Dateisystem Zeile für Zeile. Das ist die Macht der Funken :)

Wie auch immer, Sie haben keine Ahnung, welche Elemente in einer bestimmten Partition gespeichert werden, so dass es nicht sinnvoll ist, eine bestimmte Partition zu aktualisieren.

+0

Also basierend auf Ihrer und Maasgs Antwort, sollte ich RDD als normales Objekt behandeln und nicht versuchen, die Performance auf der unteren Ebene zu "tunen", da das Framework dies für mich tun und eine neue RDD mit Objektwiederverwendung erstellen wird im Grunde die Schöpfung nur eine Iteration und ersetzen ein Objekt durch neue) ist nicht so langsam wie ich dachte? – bxshi

+4

@bxshi RDD-Objekt ist billig, aber die Daten darin ist teuer. Zum Beispiel schreiben Sie eine Anwendung: data_source -> rdd1 -> rdd2 -> rdd3 -> get_result. Was für ein Funke ist eigentlich: Erinnern Sie sich an Ihre Transformation t1, t2, t3 und wenden Sie diese Transformation auf die Datenquelle an und erhalten Sie das Ergebnis. Spark speichert die RDD-Daten nicht, es sei denn, Sie rufen 'RDD.cache()' auf. – cloud

+0

@cloud: Bedeutet dies, dass immer nur eine RDD existiert? – Shankar

6

Als funktionale Datenstrukturen sind RDDs unveränderlich und eine Operation auf einer RDD generiert eine neue RDD.

Unveränderlichkeit der Struktur bedeutet nicht unbedingt vollständige Replikation. Persistente Datenstrukturen sind ein häufiges funktionelles Muster, bei dem Operationen auf unveränderlichen Strukturen eine neue Struktur ergeben, frühere Versionen jedoch beibehalten und häufig wiederverwendet werden.

GraphX ​​(a ‚Modul‘ auf der Oberseite des Funken) ist ein Graph API auf dem Funken, der solches Konzept verwendet: in den Dokumenten:

Änderungen der Werte oder die Struktur des Graphen durchgeführt werden durch Erstellen einer neuen Grafik mit den gewünschten Änderungen. Man beachte, dass wesentliche Teile des ursprünglichen Graphen (d. H. Unberührte Struktur, Attribute, Indizes und Indizes) in dem neuen Graphen wiederverwendet werden, wodurch die Kosten dieser inhärent funktionellen Datenstruktur reduziert werden.

Es könnte eine Lösung für das Problem bei der Hand sein: http://spark.apache.org/docs/1.0.0/graphx-programming-guide.html

+0

Ja, sie werden wiederverwendet, aber Sie müssen immer noch alle Elemente durchlaufen, um das neue Objekt zu erstellen. – bxshi

+0

Wenn Sie sagen "Ich versuche eine effiziente Methode zur Aktualisierung von RDD zu finden", bezog ich mich auf Mutationen vor Ort. Sprechen Sie eher über Nachschlagen? – maasg

+0

@massg Nun, ich wollte über Update RDD sprechen, aber ich habe einen Fehler gemacht über die Definition von "Iteration". Wenn Sie eine Map oder andere Manipulationsoperationen zum Erstellen einer neuen RDD ausführen, haben Sie bei solchen Operationen eine Parallelität, müssen aber dennoch auf alle Elemente innerhalb der alten RDD zugreifen. – bxshi

1

Das MapReduce-Programmiermodell (und FP) nicht wirklich Updates von Einzelwerten unterstützen. Vielmehr soll man eine Abfolge von Transformationen definieren.

Wenn Sie nun voneinander abhängige Werte haben, dh Sie können Ihre Transformation nicht mit einem einfachen map ausführen, müssen aber mehrere Werte aggregieren und basierend auf diesem Wert aktualisieren, was Sie dann tun müssen, um diese Werte zu gruppieren dann transformiere jede Gruppe - oder definiere eine monoidale Operation, so dass die Operation verteilt und in Teilschritte zerlegt werden kann.

Group By Ansatz

Jetzt werde ich versuchen, ein wenig mehr spezifisch für Ihren speziellen Fall zu sein. Sie sagen, Sie haben Teilbäume, ist es möglich, jeden Knoten zuerst auf einen Schlüssel abzubilden, der den entsprechenden Teilbaum angibt? Wenn dies der Fall könnten Sie so etwas tun:

nodes.map(n => (getSubTreeKey(n), n)).grouByKey().map ...

Monoid

(streng genommen Sie einen kommutativen Monoid wollen) Die besten Sie lesen http://en.wikipedia.org/wiki/Monoid#Commutative_monoid

Zum Beispiel + ein monoidal Betrieb ist, weil Wenn man die Summe von, sagen wir, einer RDD von Ints berechnen will, dann kann das zugrundeliegende Rahmenwerk die Daten in Stücke zerhacken, die Summe auf jedem Stück durchführen und dann die resultierenden Summen addieren (möglicherweise in mehr ein nur 2 Schritte auch). Wenn Sie ein Monoid finden, das letztendlich dieselben Ergebnisse liefert, die Sie für einzelne Updates benötigen, haben Sie eine Möglichkeit, Ihre Verarbeitung zu verteilen. Z.B.

nodes.reduce(_ myMonoid _)