2015-01-20 12 views
5

Ich bekomme eine Warnung, wenn Sie eine RDD in einem für den Einsatz verwenden, und ich bin mir nicht sicher, ob es etwas ist, was ich falsch mache. Wenn ich dies tun:Warnung bei der Verwendung von RDD zum Verständnis

val sc = new SparkContext(...) 

val anRDD = sc.parallelize(List(
    ("a", List(1, 2, 3)), 
    ("b", List(4), 
    ("c", List(5, 6)) 
) 

for { 
    (someString, listOfInts) <- anRDD 
    someInt <- listOfInts 
} yield (someString, someInt) 

Dann bekomme ich diese Ausgabe:

warning: `withFilter' method does not yet exist on org.apache.spark.rdd.RDD[(String, List[Int])], using `filter' method instead 
    (s, li) <- rl 

Aber es ist immer noch erfolgreich eine FlatMappedRDD return [(String, Int)]. Mache ich etwas falsch? Oder ist es sicher, diese Warnung zu ignorieren?

Update: Ich würde auch als Antwort akzeptieren, wie das For-Verständnis diese Operationen zu map/flatMap/Filter-Aufrufe konvertiert, da ich nicht glaubte, dass es Filter oder mit Filteraufrufen erforderlich wäre. Ich nahm an, es ähnlich entspricht etwas sein würde:

anRDD.flatMap(tuple => tuple.map(someInt => (tuple._1, someInt))) 

Aber dies schließt keinen Filter oder withFilter Anrufe, die die Quelle der Warnung zu sein scheint.

Oh, ich benutze Spark 1.2.0, Scala 2.10.4, und das ist alles innerhalb der REPL.

Antwort

1

Erstens, ich bin kein Experte, aber haben einige graben getan und hier ist das, was ich gefunden habe:

ich den Code kompiliert mit -print (seit Java Decompiler aus irgendeinem Grund versagte), die aus dem Programm gedruckt werden mit allen Scala-spezifischen Funktionen entfernt. Dort sah ich:

test.this.anRDD().filter({ 
    (new anonymous class anonfun$1(): Function1) 
    }).flatMap({ 
    (new anonymous class anonfun$2(): Function1) 
    }, ClassTag.apply(classOf[scala.Tuple2])); 

Sie die filter bemerken ... so, ich auf der anonfun$1 geprüft:

public final boolean apply(Tuple2<String, List<Object>> check$ifrefutable$1) 
    { 
    Tuple2 localTuple2 = check$ifrefutable$1; 
    boolean bool; 
    if (localTuple2 != null) { 
     bool = true; 
    } else { 
     bool = false; 
    } 
    return bool; 
    } 

Also, wenn Sie zusammen all dies gesagt, scheint es, dass die filter geschieht im Verständnis, weil es alles ausfiltert, was kein Tuple2 ist.

Und die Präferenz ist withFilter anstelle von filter zu verwenden (nicht sicher, warum atm). Sie können decompiling eine reguläre Liste sehen, dass anstelle eines RDD

object test { 
    val regList = List(
    ("a", List(1, 2, 3)), 
    ("b", List(4)), 
    ("c", List(5, 6)) 
) 

val foo = for { 
    (someString, listOfInts) <- regList 
    someInt <- listOfInts 
} yield (someString, someInt) 
} 

Welche dekompiliert zu:

test.this.regList().withFilter({ 
    (new anonymous class anonfun$1(): Function1) 
}).flatMap({ 
    (new anonymous class anonfun$2(): Function1) 
}, immutable.this.List.canBuildFrom()).$asInstanceOf[List](); 

Also, es dasselbe ist, außer es withFilter verwendet, wo es kann

+0

Nicht unbedingt. Woher kommt der Aufruf von withFilter? Ich nehme an (wie ich immer mit Scala), dass es wegen einer impliziten Konvertierung irgendwo ist, aber ich sehe nicht, warum es hier wichtig ist, da das Verständnis erkennt, dass es heißt .filter, nicht .withFilter – jayhutfles

+0

Weil die Präferenz Verwenden Sie withFilter per Warnung. Es fällt in diesem Fall nur auf die Filtermethode zurück. –

+0

@jayhutfles Ich habe gerade mehr hinzugefügt, zeigt die Präferenz für withFilter –

0

Rufen Sie collect() in der RDD auf, bevor Sie sie zum Verständnis senden.

val collectedList = anRDD.collect 
for { 
    (someString, listOfInts) <- collectedList 
    someInt <- listOfInts 
} yield (someString, someInt)