2016-03-15 4 views
5

Ich rufe eine Funktion in scala, die eine RDD[(Long,Long,Double)] als Ausgabe gibt.Merge multiple RDD generiert in Schleife

def helperfunction(): RDD[(Long, Long, Double)]

Ich nenne diese Funktion in der Schleife in einem anderen Teil des Codes, und ich möchte alle generierten RDDs verschmelzen. Die Schleife Aufruf der Funktion sieht so etwas wie dieses

for (i <- 1 to n){ 
    val tOp = helperfunction() 
    // merge the generated tOp 
} 

Was ich tun möchte, ist etwas ähnlich, was String für Sie in Java tun würde, wenn man die Saiten verschmelzen wollte. Ich habe bei Techniken der Verschmelzung RDDs sah, die meist wie diese

RDD1.union(RDD2) 

zur Verwendung von Union Funktion zeigen Dies erfordert jedoch beide RDDs vor der Einnahme ihrer Vereinigung erzeugt werden. Ich habe zwar eine var RDD1 initialisiert, um die Ergebnisse außerhalb der for-Schleife anzusammeln, aber ich bin mir nicht sicher, wie ich eine leere RDD des Typs [(Long,Long,Double)] initialisieren kann. Auch ich beginne mit Funken, also bin ich mir nicht einmal sicher, ob dies die eleganteste Methode ist, um dieses Problem zu lösen.

Antwort

4

Statt Vars zu verwenden, können Sie funktionale Programmierparadigmen verwenden, um zu erreichen, was Sie wollen:

val rdd = (1 to n).map(x => helperFunction()).reduce(_ union _) 

Auch wenn Sie noch eine leere RDD erstellen müssen, können Sie tun it using:

val empty = sc.emptyRDD[(long, long, String)] 
+0

IIRC Sie können eine RDD bis Spark 2.0 nicht mit einem leeren RDD union verbinden. – MrChristine

+0

Wie machen Sie das, wenn Sie den Schleifenindex an die Hilfsfunktion übergeben müssen? – G3M

+0

Wenn Sie den Schleifenindex an die Hilfsfunktion übergeben möchten, können Sie Folgendes tun: 'val rdd = (1 bis n) .zipWithIndex.map {case (x, index) => helperFunction (i)} .reduce (_ union _) ' Natürlich ist es in diesem Fall nicht notwendig, da wir eine ganzzahlige Inkrementierung der Auflistung haben, aber Sie können' (1 bis n) 'durch irgendeine Sammlung ersetzen –

2

Sie haben zwar Recht, dass dies nicht der optimale Weg ist, aber wir würden mehr Informationen darüber benötigen, was Sie mit dem Erzeugen einer neuen RDD bei jedem Aufruf Ihrer Hilfsfunktion erreichen möchten.

Sie könnten 1 RDD vor der Schleife definieren und ihm eine Variable zuweisen und dann durch Ihre Schleife laufen lassen. Hier ein Beispiel:

val rdd = sc.parallelize(1 to 100) 
val rdd_tuple = rdd.map(x => (x.toLong, (x*10).toLong, x.toDouble)) 
var new_rdd = rdd_tuple 
println("Initial RDD count: " + new_rdd.count()) 
for (i <- 2 to 4) { 
    new_rdd = new_rdd.union(rdd_tuple) 
} 
println("New count after loop: " + new_rdd.count()) 
+0

Jeder Körper haben JavaCode für das gleiche Szenario? – Neethu