2016-07-12 6 views
1

Ich habe madde ein dataaframe die repartitined I basierend auf ihren Primärschlüssel auf den KnotenMuss ich Caching nach repartitining

val config=new SparkConf().setAppName("MyHbaseLoader").setMaster("local[10]") 
val context=new SparkContext(config) 
val sqlContext=new SQLContext(context) 
val rows="sender,time,time(utc),reason,context-uuid,rat,cell-id,first-pkt,last-pkt,protocol,sub-proto,application-id,server-ip,server-domain-name, http-proxy-ip,http-proxy-domain-name, video,packets-dw, packets-ul, bytes-dw, bytes-ul" 
val scheme= new StructType(rows.split(",").map(e=>new StructField(e.trim,StringType,true))) 
val dFrame=sqlContext.read 
    .schema(scheme) 
    .format("csv") 
    .load("E:\\Users\\Mehdi\\Downloads\\ProbDocument\\ProbDocument\\ggsn_cdr.csv") 

dFrame.registerTempTable("GSSN") 
dFrame.persist(StorageLevel.MEMORY_AND_DISK) 

val distincCount=sqlContext.sql("select count(distinct sender) as SENDERS from GSSN").collectAsList().get(0).get(0).asInstanceOf[Long] 
dFrame.repartition(distincCount.toInt/3,dFrame("sender")) 

Muss ich wieder meine presist Methode aufrufen, nachdem für die nächste Reduzierung Jobs auf Datenrahmen neu zu partitionieren?

Antwort

0

Ja, repartition gibt einen neuen Datenrahmen zurück, so dass Sie wieder cache benötigen.

0

Während die Antwort von Dikei scheint Ihre direkte Frage zu adressieren, ist es wichtig zu beachten, dass in einem Fall wie diesem gibt es in der Regel keinen Grund, explizit zwischenzuspeichern.

Jeder Shuffle in Spark (hier ist repartition) dient als impliziter Cache-Punkt. Wenn ein Teil der Abstammung erneut ausgeführt werden muss und keiner der Executoren verloren gegangen ist, muss er nicht weiter zurückgehen als bis zur letzten Shuffle- und Shuffle-Datei.

Dies bedeutet, dass das Zwischenspeichern kurz vor oder unmittelbar nach dem Mischen in der Regel eine Verschwendung von Zeit und Ressourcen ist, besonders wenn Sie nicht nur an In-Memory oder einem anderen Caching-Mechanismus interessiert sind.

+0

Korrigiere mich, wenn ich falsch liege, aber ich denke, es gibt kein Caching während des Mischens, nur das Ergebnis der Kartenphase wird auf die Festplatte gelegt. Es ist schnell, ShuffleRDD aus diesen Dateien neu zu berechnen, wenn sie sich im OS-Cache befinden, aber das ist nicht garantiert. – Dikei

+0

@Dikei Sie sind nicht falsch, aber es hat keinen Einfluss auf meine Meinung. Wenn OP Schreibvorgänge von Anfang an auf Festplatte annimmt und möglicherweise die gleichen Daten zweimal auf Platte legt, ist das ziemlich verschwenderisch. Und da Caching weder kostenlos noch garantiert ist, sehe ich hier keinen Grund für eine solche Redundanz. Es war nur In-Memory oder Off-Heap, vielleicht ... aber ich könnte mich irren. Denkst du, dass es hier wirklich Sinn macht? – zero323

+0

Ich denke, es hängt davon ab, was der Rest des Programms tut. – Dikei

0

Sie müssten den Reparaturdatenrahmen persistieren, da DataFrames unveränderlich sind und Reparatur einen neuen Datenrahmen zurückgibt.

Ein Ansatz, dem Sie folgen könnten, besteht darin, dFrame beizubehalten, und nach seiner Reparatur ist der neue DataFrame, der zurückgegeben wurde, dFrameRepart. An dieser Stelle können Sie das dFrameRepart beibehalten und den dFrame aufheben, um den Speicher freizugeben, sofern Sie dFrame nicht erneut verwenden. Falls Sie dFrame nach dem Reparaturvorgang verwenden, können beide DataFrames beibehalten werden.

dFrame.registerTempTable ("GSSN") dFrame.persist (StorageLevel.MEMORY_AND_DISK)

val distincCount = sqlContext.sql ("select count (distinct Sender) als Absender von GSSN"). CollectAsList(). erhalten (0) .get (0) .asInstanceOf [Long]

valdFrameRepart = dFrame.repartition (distincCount.toInt/3, Dframe ("Sender")). persistieren (StorageLevel.MEMORY_AND_DISK) dFrame.unpersist