2014-09-25 6 views
12

Ich bin neu bei Apache Spark, und ich weiß, dass die Kerndatenstruktur RDD ist. Jetzt schreibe ich einige Apps, die Informationen zur Elementposition benötigen. Zum Beispiel muss ich nach dem Konvertieren einer ArrayList in eine (Java) RDD für jede Integer in RDD ihren (globalen) Array-Index kennen. Ist es möglich, es zu tun?Wie kann ich eine Elementposition in der RDD von Spark erhalten?

Wie ich weiß, gibt es eine Take (int) Funktion für RDD, so glaube ich, dass die Positionsinformationen in RDD noch beibehalten wird.

Antwort

11

Im Wesentlichen scheint RDDs zipWithIndex() -Methode dies zu tun, aber es wird nicht die ursprüngliche Reihenfolge der Daten erhalten, aus denen die RDD erstellt wurde. Zumindest wirst du eine stabile Ordnung bekommen.

val orig: RDD[String] = ... 
val indexed: RDD[(String, Long)] = orig.zipWithIndex() 

Der Grund, warum Sie unwahrscheinlich sind, etwas zu finden, dass die Reihenfolge, in den ursprünglichen Daten bewahrt wird für zipWithIndex in dem API-doc begraben():

„Zips diese RDD mit seinem Element-Indizes. Die Reihenfolge basiert zuerst auf dem Partitionsindex und dann die Reihenfolge der Elemente innerhalb jeder Partition. So erhält das erste Element in der ersten Partition Index 0, und das letzte Element in der letzten Partition erhält den größten Index.Dies ist ist ähnlich Scalas zipWithIndex, aber es verwendet Long statt Int als der Indextyp. Dieses Verfahren benötigt einen Funken Job auszulösen, wenn diese RDD mehr Partitionen enthält.“

So sieht es aus wie die ursprüngliche Reihenfolge verworfen. Wenn die ursprüngliche Reihenfolge zu bewahren Sie wichtig ist, sieht es aus wie Sie benötigen den Index vor fügen Sie die RDD erstellen.

+0

Ja, das Hinzufügen von Array-Index als zusätzliches Attribut vor dem Erstellen von RDD kann dieses Problem lösen. Es gibt jedoch zwei ernsthafte Einschränkungen: 1) Offensichtlich verdoppelt dieses zusätzliche Indexattribut mindestens die Speicherkosten, und solche Kosten können noch mehr betragen, z. B. wird in einem Integer/Float-Array ein langes int-Feld für den Index hinzugefügt. 2) Da das Hinzufügen zusätzlicher Indexwerte nicht in Spark geladen werden kann, kann eine solche Datenkonvertierung auch nicht von Spark parallelisiert werden. Daher muss ich andere parallele Techniken einbeziehen, um den Index hinzuzufügen. – SciPioneer

14

ich in den meisten Fällen glauben, zipWithIndex() wird es tun, und es wird die Bestellung erhalten. wieder die Kommentare lesen. Mein Verständnis ist, dass es genau bedeutet, halten die Bestellung in der RDD

scala> val r1 = sc.parallelize(List("a", "b", "c", "d", "e", "f", "g"), 3) 
scala> val r2 = r1.zipWithIndex 
scala> r2.foreach(println) 
(c,2) 
(d,3) 
(e,4) 
(f,5) 
(g,6) 
(a,0) 
(b,1) 

Obiges Beispiel bestätigen. Das Rot hat 3 Partitionen und a mit Index 0, b mit Index 1 usw.

+0

Danke für Ihre Antwort! In den meisten Fällen ist diese Methode nicht schlecht, da das Element in der Eingabe-Array/-Liste ein relativ großes Objekt sein kann. Es kann jedoch ein Problem für Arrays vom primitiven Typ sein, z. B. ein Integer-Array, da diese scheinbar einzige Lösung sowohl hinsichtlich der Berechnungs- als auch der Speicherkosten ziemlich ineffizient ist. Wie auch immer, ich bin sehr zufrieden mit deiner Antwort. Ich hoffe eines Tages natürlich beibehalten den Index ohne (zipWithIndex) kann für Spark RDD wahr werden. – SciPioneer

+0

Basierend auf dem Design von Spark, kann ich keinen guten Weg darstellen, um den Index des Elements zu erhalten, ohne den Speicher zu opfern. –