2014-04-30 4 views
12

Ich versuche, Elemente zu einer Map hinzuzufügen, während ich die Elemente einer RDD wiederhole. Ich bekomme keine Fehler, aber die Änderungen werden nicht ausgeführt.Ändern der Sammlung in einem Spark RDD foreach

Alles funktioniert gut Zugabe direkt oder Iterieren andere Sammlungen:

scala> val myMap = new collection.mutable.HashMap[String,String] 
myMap: scala.collection.mutable.HashMap[String,String] = Map() 

scala> myMap("test1")="test1" 

scala> myMap 
res44: scala.collection.mutable.HashMap[String,String] = Map(test1 -> test1) 

scala> List("test2", "test3").foreach(w => myMap(w) = w) 

scala> myMap 
res46: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3) 

Aber wenn ich versuche, das gleiche von einem RDD zu tun:

scala> val fromFile = sc.textFile("tests.txt") 
... 
scala> fromFile.take(3) 
... 
res48: Array[String] = Array(test4, test5, test6) 

scala> fromFile.foreach(w => myMap(w) = w) 
scala> myMap 
res50: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3) 

Ich habe versucht, den Inhalt des Druck map so wie es vor der foreach war, um sicherzustellen, dass die Variable die gleiche ist, und es druckt korrekt:

fromFile.foreach(w => println(myMap("test1"))) 
... 
test1 
test1 
test1 
... 

Ich habe auch das modifizierte Element der Karte innerhalb des Foreach-Codes gedruckt, und es wird wie geändert gedruckt, aber wenn die Operation abgeschlossen ist, scheint die Karte unverändert zu sein.

scala> fromFile.foreach({w => myMap(w) = w; println(myMap(w))}) 
... 
test4 
test5 
test6 
... 
scala> myMap 
res55: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3) 

Konvertieren des RDD zu einem Array (sammeln) funktioniert auch:

fromFile.collect.foreach(w => myMap(w) = w) 
scala> myMap 
res89: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test5 -> test5, test1 -> test1, test4 -> test4, test6 -> test6, test3 -> test3) 

Ist das ein Kontext Problem? Greife ich auf eine Kopie der Daten zu, die an anderer Stelle geändert werden?

Antwort

30

Es wird klarer, wenn es auf einem Spark-Cluster ausgeführt wird (nicht auf einem einzelnen Computer). Die RDD ist jetzt auf mehrere Maschinen verteilt. Wenn Sie foreach anrufen, sagen Sie jeder Maschine, was mit dem Stück der RDD zu tun ist, die es hat. Wenn Sie auf lokale Variablen verweisen (wie myMap), werden diese serialisiert und an die Maschinen gesendet, damit sie sie verwenden können. Aber nichts kommt zurück. So ist Ihre Originalversion von myMap nicht betroffen.

Ich denke, das beantwortet Ihre Frage, aber offensichtlich versuchen Sie, etwas zu erreichen, und Sie werden nicht in der Lage sein, dorthin zu gelangen. Fühlen Sie sich frei, hier oder in einer separaten Frage zu erklären, was Sie zu tun versuchen, und ich werde versuchen zu helfen.

+0

Es beantwortet meine Frage, und machen Sie sich keine Gedanken darüber, was ich erreichen wollte, ich fand nur einen interessanten Fall, für den ich keine Erklärung hatte. Ich mache es jetzt, danke! – palako

+3

Ja, da Daniel darauf hinweist, dass man den Status nicht mutieren kann, fehlt Palako der Punkt funktionaler Programmierung. Du solltest nicht mutieren, denn dann kannst du nicht parallerisieren. Indem Sie Code so entwerfen, dass Sie den Zustand nicht mutieren, kann Ihr Code kostenlos freigegeben werden und Sie können Frameworks wie Spark und Scaling verwenden, um sie auf einen Cluster zu verteilen. – samthebest

+0

@Daniel Gibt es einen Workaround dafür? –