Ich rufe eine Funktion in scala, die eine RDD[(Long,Long,Double)]
als Ausgabe gibt.Merge multiple RDD generiert in Schleife
def helperfunction(): RDD[(Long, Long, Double)]
Ich nenne diese Funktion in der Schleife in einem anderen Teil des Codes, und ich möchte alle generierten RDDs verschmelzen. Die Schleife Aufruf der Funktion sieht so etwas wie dieses
for (i <- 1 to n){
val tOp = helperfunction()
// merge the generated tOp
}
Was ich tun möchte, ist etwas ähnlich, was String für Sie in Java tun würde, wenn man die Saiten verschmelzen wollte. Ich habe bei Techniken der Verschmelzung RDDs sah, die meist wie diese
RDD1.union(RDD2)
zur Verwendung von Union Funktion zeigen Dies erfordert jedoch beide RDDs vor der Einnahme ihrer Vereinigung erzeugt werden. Ich habe zwar eine var RDD1 initialisiert, um die Ergebnisse außerhalb der for-Schleife anzusammeln, aber ich bin mir nicht sicher, wie ich eine leere RDD des Typs [(Long,Long,Double)]
initialisieren kann. Auch ich beginne mit Funken, also bin ich mir nicht einmal sicher, ob dies die eleganteste Methode ist, um dieses Problem zu lösen.
IIRC Sie können eine RDD bis Spark 2.0 nicht mit einem leeren RDD union verbinden. – MrChristine
Wie machen Sie das, wenn Sie den Schleifenindex an die Hilfsfunktion übergeben müssen? – G3M
Wenn Sie den Schleifenindex an die Hilfsfunktion übergeben möchten, können Sie Folgendes tun: 'val rdd = (1 bis n) .zipWithIndex.map {case (x, index) => helperFunction (i)} .reduce (_ union _) ' Natürlich ist es in diesem Fall nicht notwendig, da wir eine ganzzahlige Inkrementierung der Auflistung haben, aber Sie können' (1 bis n) 'durch irgendeine Sammlung ersetzen –