2015-09-17 3 views
5

Ich verwende Spark-Streaming, um einzelne Benutzer zu zählen. Ich benutze updateStateByKey, also muss ich ein Checkpoint-Verzeichnis konfigurieren. Ich lade auch die Daten vom Checkpoint während der Anwendung starten, wie the example in the doc:Wie konfiguriere Checkpoint, um Spark-Streaming-Anwendung neu zu implementieren?

// Function to create and setup a new StreamingContext 
def functionToCreateContext(): StreamingContext = { 
    val ssc = new StreamingContext(...) // new context 
    val lines = ssc.socketTextStream(...) // create DStreams 
    ... 
    ssc.checkpoint(checkpointDirectory) // set checkpoint directory 
    ssc 
} 

// Get StreamingContext from checkpoint data or create a new one 
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) 

Hier ist die Frage, ob mein Code geändert wird, dann hat ich wieder einsetzen, den Code, wird der Kontrollpunkt egal geladen werden, wie viel Der Code wurde geändert? Oder ich muss meine eigene Logik verwenden, um meine Daten zu persistieren und sie im nächsten Lauf zu laden.

Wenn ich meine eigene Logik verwende, um den DStream zu speichern und zu laden, werden dann die Daten sowohl vom Checkpoint-Verzeichnis als auch von meiner eigenen Datenbank geladen, wenn die Anwendung bei einem Fehler neu gestartet wird?

Antwort

3

Der Prüfpunkt selbst enthält Ihre Metadaten, RDD, DAG und sogar Ihre Logik. Wenn Sie Ihre Logik ändern und versuchen, sie vom letzten Prüfpunkt aus auszuführen, treffen Sie höchstwahrscheinlich eine Ausnahme. Wenn Sie Ihre eigene Logik verwenden möchten, um Ihre Daten irgendwo als Prüfpunkt zu speichern, müssen Sie möglicherweise eine Spark-Aktion implementieren, um Ihre Prüfpunktdaten in jede beliebige Datenbank zu verschieben. Laden Sie die Prüfpunktdaten im nächsten Lauf als initiale RDD Sie verwenden updateStateByKey API) und setzen Sie Ihre Logik fort.

2

Ich habe diese Frage in der Spark-Mail-Liste gestellt und habe eine Antwort, ich habe es auf my blog analysiert. Ich werde die Zusammenfassung hier posten:

Der Weg ist, beide Checkpointing und unsere eigenen Daten laden Mechanismus zu verwenden. Aber wir laden unsere Daten als initalRDD von updateStateByKey. Also in beiden Fällen werden die Daten weder verloren noch vervielfältigen:

  1. Wenn wir den Code ändern und die Spark-Anwendung erneut bereitstellen, shutdown wir die alte Spark-Anwendung anmutig und Bereinigung der Checkpoint-Daten, so dass die einzigen geladenen Daten die Daten, die wir gespeichert haben.

  2. Wenn die Spark-Anwendung fehlschlägt und neu startet, werden die Daten vom Prüfpunkt geladen. Aber der Schritt der DAG wird gespeichert, so dass unsere eigenen Daten nicht wieder wie ursprünglich RDD geladen werden. Die einzigen geladenen Daten sind also die Kontrollpunktdaten.