Wenn ich Code ausführen wie die folgenden:Spark-RDD-Checkpoint auf beharrte/gecached RDDs werden Durchführung der DAG zweimal
val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
newRDD.checkpoint
print(newRDD.count())
und beobachten Sie die Stadien in Yarn, merke ich, dass Funke die DAG Berechnung ZWEIMAL tut - - einmal für die distinct + -Zählung, die die RDD materialisiert und zwischenspeichert, und dann eine vollständig ZWEITE Zeit, um die Checkpoint-Kopie zu erstellen.
Da die RDD bereits materialisiert und zwischengespeichert ist, warum nutzt das Checkpointing nicht einfach diesen Vorteil und speichert die zwischengespeicherten Partitionen auf der Festplatte?
Gibt es einen bestehenden Weg (irgendeine Art von Konfigurationseinstellung oder Codeänderung), um Spark dazu zu zwingen, dies auszunutzen und die Operation nur EINMAL auszuführen, und Checkpointing wird nur Dinge kopieren?
Muss ich zweimal "materialisieren" statt?
val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
print(newRDD.count())
newRDD.checkpoint
print(newRDD.count())
Ich habe einen Apache-Spark-Jira-Ticket erstellt dies ein Feature-Request zu machen: Sieht aus https://issues.apache.org/jira/browse/SPARK-8666