Ich verwende Spark-Streaming, um einzelne Benutzer zu zählen. Ich benutze updateStateByKey
, also muss ich ein Checkpoint-Verzeichnis konfigurieren. Ich lade auch die Daten vom Checkpoint während der Anwendung starten, wie the example in the doc:Wie konfiguriere Checkpoint, um Spark-Streaming-Anwendung neu zu implementieren?
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
Hier ist die Frage, ob mein Code geändert wird, dann hat ich wieder einsetzen, den Code, wird der Kontrollpunkt egal geladen werden, wie viel Der Code wurde geändert? Oder ich muss meine eigene Logik verwenden, um meine Daten zu persistieren und sie im nächsten Lauf zu laden.
Wenn ich meine eigene Logik verwende, um den DStream zu speichern und zu laden, werden dann die Daten sowohl vom Checkpoint-Verzeichnis als auch von meiner eigenen Datenbank geladen, wenn die Anwendung bei einem Fehler neu gestartet wird?