12

Ich verwende Kafka 0.8.2, um Daten von AdExchange zu empfangen. Dann verwende ich Spark Streaming 1.4.1, um Daten unter MongoDB zu speichern.So speichern Sie den letzten Offset, den Spark nach ZK oder Kafka verbraucht hat, und können nach dem Neustart zurücklesen

Mein Problem ist, wenn ich meine Spark Streaming Job zum Beispiel wie Update neue Version neu starten, Fehler beheben, neue Funktionen hinzufügen. Es wird weiterhin lesen Sie die neuesten offset von kafka zu der Zeit dann werde ich Daten AdX Push Kafka während des Neustarts der Job verloren.

Ich versuche etwas wie auto.offset.reset -> smallest, aber es wird von 0 erhalten -> zuletzt dann Daten war riesig und doppelt in db.

Ich versuche auch, group.id und consumer.id auf Spark zu setzen, aber es ist das gleiche.

Wie Sie die neuesten offset Funken zookeeper oder kafka verbraucht speichern dann von dem zurück zu neuesten offset lesen kann?

Antwort

13

Einer der Konstruktoren der createDirectStream-Funktion kann eine Zuordnung abrufen, die die Partitions-ID als Schlüssel und den Offset enthält, von dem Sie als Wert beginnen.

Gerade bei api hier: http://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html Die Karte, die ich sprach in der Regel genannt: fromOffsets

Sie können Daten auf der Karte einfügen:

startOffsetsMap.put(TopicAndPartition(topicName,partitionId), startOffset) 

Und es verwenden, wenn Sie die direkte erstellen Strom:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
       streamingContext, kafkaParams, startOffsetsMap, messageHandler(_)) 

Nach jeder Iteration Sie die verarbeiteten Offsets erhalten können:

Sie könnten diese Daten verwenden, um die fromOffsets-Map in der nächsten Iteration zu erstellen.

Sie können den vollständigen Code und die Verwendung hier sehen: https://spark.apache.org/docs/latest/streaming-kafka-integration.html am Ende der Seite

+0

Aber, wie neuesten Offset zu ZK oder Kafka verbraucht zu speichern. Ich versuche zu aktivieren 'kafkaParams ++ = Karte [String, String] (" auto.commit.interval.ms "->" 1000 ") kafkaParams ++ = Karte [String, String] (" zookeeper.sync.time. ms "->" 200 ") kafkaParams ++ = Karte [String, String] (" zoekeeper.session.timeout.ms "->" 400 ")' aber es funktioniert nicht – giaosudau

+2

Eine der Optionen ist, wie ich dir sagte Verwenden der OffsetRanges-Datenstruktur Nachdem Sie Ihren Stream in einer bestimmten Iteration verarbeitet haben, können Sie Folgendes tun: 'dStream.foreachRDD {rdd => val x = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges; // Mach etwas mit X (speichere es zB mit externem FS)} 'x hält den letzten verarbeiteten Offset für jede Topic-Partition-Kombination des RDD. Wenn Sie genau einmal Semantik benötigen, müssten Sie sie manuell unterstützen, aber es ist möglich. –

+2

Meine Idee, dass ich nicht im externen Speicher speichern möchte, weil ZK und Kafka damit umgehen können. – giaosudau

0

ich dies aus noch 100% nicht herausgefunden haben, aber Ihre beste Wette ist wahrscheinlich JavaStreamingContext.checkpoint einzurichten() .

Siehe https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointing für ein Beispiel.

Nach einigen Blog-Einträge https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md einige Einschränkungen gibt es, aber es fühlt sich fast wie es bestimmte Rand Fälle beinhaltet, die nur angedeutet sind und nicht wirklich erklärt.

+2

Checkpointing ist der richtige Weg, wenn Sie Ihren StreamingContext nicht ändern, da Sie dann die Verarbeitung automatisch vom rechten Offset aus fortsetzen können (Spark kümmert sich darum). Wenn Sie Features/Fehler hinzufügen möchten (und anscheinend giaosudau es tun wollen), werden Sie sehr oft den Streaming-Kontext ändern und daher nicht in der Lage sein, das Checkpoints-Verzeichnis zu benutzen. Der letzte Link, den du zur Verfügung gestellt hast, erklärt es perfekt. –

+0

@MichaelKopaniov gibt es einen Weg zur Prüfsumme der Kontextfunktion und den vorherigen Kontext ungültig, wenn die Funktion geändert hat? In diesem Fall würde es wieder auf das Lesen von Offsets aus einem Geschäft (fs, Datenbank) zurückgreifen. – Stephane

+0

@Stephane Wenige Tage sind vergangen, seit ich mich mit diesem Problem beschäftigt habe, damit ich mich irre, aber soweit ich mich erinnere, im alten Spark-Streaming (<2.0) Sie erstellen entweder einen neuen StreamingContext oder Sie lesen einen StreamingContext, der zuvor aus dem Prüfpunktverzeichnis definiert wurde. Sie erstellen keinen neuen StreamingContext für jede Iteration und vergleichen ihn einfach mit dem Kontext aus dem Prüfpunktverzeichnis. Wenn Sie also Ihre Frage richtig verstanden haben, können Sie den zuvor gespeicherten Kontext nicht ungültig machen. –

1

Um zu Michael Kopaniovs Antwort hinzuzufügen, können Sie, wenn Sie wirklich ZK als den Ort verwenden möchten, wo Sie Ihre Karte von Offsets speichern und laden.

Da Ihre Ergebnisse jedoch nicht an ZK ausgegeben werden, erhalten Sie keine zuverlässige Semantik, es sei denn, Ihre Ausgabeoperation ist idempotent (was nicht so klingt).

Wenn es möglich ist, Ihre Ergebnisse in einem Monogramm zusammen mit den Offsets in einer einzigen atomaren Aktion zu speichern, könnte das besser für Sie sein.

Für weitere Einzelheiten ist https://www.youtube.com/watch?v=fXnNEq1v3VA

2

sehen hier einige Code, den Sie http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/

speichern Offsets in ZK können einige Code Und hier ist der Offset zu verwenden, wenn Sie KafkaUtils.createDirectStream anrufen verwenden können: http://geeks.aretotally.in/spark-streaming-direct-api-reusing-offset-from-zookeeper/

+2

Diese beiden Links sind jetzt unterbrochen, weshalb die Community immer vorschlägt, die Lösung als Teil der Antwort zusammen mit dem Link zu veröffentlichen, nicht nur den Link. – ammills01