2016-05-25 6 views
2

ich auf der offiziellen Dokumentation Prüfpunktintervall für meine Python Funken Streaming-Skripte, Basis festlegen möchten:Wie wird das Checkpoint-Intervall für das Spark-Streaming-Checkpointing festgelegt?

Für Stateful Transformationen, die RDD Prüfpunkten erfordern, ist das Standardintervall ein Vielfaches des Batch-Intervall, das 10 zumindest Sekunden. Er kann mit dstream.checkpoint (checkpointInterval) festgelegt werden. In der Regel ist ein Prüfpunktintervall von 5 - 10-fachen des Gleitintervalls eines DStreams eine gute Einstellung für den Versuch.

meine Skripte:

import sys 

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 

def functionToCreateContext(): 
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount") 
    ssc = StreamingContext(sc, 6) 
    ssc.checkpoint("./checkpoint") 
    kvs = KafkaUtils.createDirectStream(ssc, ['test123'], {"metadata.broker.list": "localhost:9092"}) 

    kvs = kvs.checkpoint(60) #set the checkpoint interval 

    lines = kvs.map(lambda x: x[1]) 
    counts = lines.flatMap(lambda line: line.split(" ")) \ 
     .map(lambda word: (word, 1)) \ 
     .reduceByKey(lambda a, b: a+b) 
    counts.pprint() 
    return ssc 

if __name__ == "__main__": 
    ssc = StreamingContext.getOrCreate("./checkpoint", functionToCreateContext) 

    ssc.start() 
    ssc.awaitTermination() 

der Ausgang nach Ausführung des Skripts:

16/05/25 17:49:03 INFO DirectKafkaInputDStream: Slide time = 6000 ms 
16/05/25 17:49:03 INFO DirectKafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 
16/05/25 17:49:03 INFO DirectKafkaInputDStream: Checkpoint interval = null 
16/05/25 17:49:03 INFO DirectKafkaInputDStream: Remember duration = 120000 ms 
16/05/25 17:49:03 INFO DirectKafkaInputDStream: Initialized and validated [email protected]4 
16/05/25 17:49:03 INFO PythonTransformedDStream: Slide time = 6000 ms 
16/05/25 17:49:03 INFO PythonTransformedDStream: Storage level = StorageLevel(false, true, false, false, 1) 
16/05/25 17:49:03 INFO PythonTransformedDStream: Checkpoint interval = 60000 ms 
16/05/25 17:49:03 INFO PythonTransformedDStream: Remember duration = 120000 ms 
16/05/25 17:49:03 INFO PythonTransformedDStream: Initialized and validated [email protected]9f9a089 
16/05/25 17:49:03 INFO PythonTransformedDStream: Slide time = 6000 ms 
16/05/25 17:49:03 INFO PythonTransformedDStream: Storage level = StorageLevel(false, false, false, false, 1) 
16/05/25 17:49:03 INFO PythonTransformedDStream: Checkpoint interval = null 
16/05/25 17:49:03 INFO PythonTransformedDStream: Remember duration = 6000 ms 
16/05/25 17:49:03 INFO PythonTransformedDStream: Initialized and validated [email protected]97386a 
16/05/25 17:49:03 INFO PythonTransformedDStream: Slide time = 6000 ms 
16/05/25 17:49:03 INFO PythonTransformedDStream: Storage level = StorageLevel(false, false, false, false, 1) 
16/05/25 17:49:03 INFO PythonTransformedDStream: Checkpoint interval = null 
16/05/25 17:49:03 INFO PythonTransformedDStream: Remember duration = 6000 ms 
16/05/25 17:49:03 INFO PythonTransformedDStream: Initialized and validated [email protected]6c474ad 
16/05/25 17:49:03 INFO ForEachDStream: Slide time = 6000 ms 
16/05/25 17:49:03 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1) 
16/05/25 17:49:03 INFO ForEachDStream: Checkpoint interval = null 
16/05/25 17:49:03 INFO ForEachDStream: Remember duration = 6000 ms 
.......... 

der DSTREAM Prüfpunktintervall noch null ist. Irgendeine Idee dafür?

Antwort

1

Versuchen Sie, diese Linie ein paar Zeilen nach unten zu bewegen, nachdem Sie den Stream erstellt haben: ssc.checkpoint("./checkpoint")

Grundsätzlich den Kontrollpunkt tun, nachdem Sie voll Stream vorbereitet haben.