Ich verwende Kafka 0.8.2
, um Daten von AdExchange zu empfangen. Dann verwende ich Spark Streaming 1.4.1
, um Daten unter MongoDB
zu speichern.So speichern Sie den letzten Offset, den Spark nach ZK oder Kafka verbraucht hat, und können nach dem Neustart zurücklesen
Mein Problem ist, wenn ich meine Spark Streaming
Job zum Beispiel wie Update neue Version neu starten, Fehler beheben, neue Funktionen hinzufügen. Es wird weiterhin lesen Sie die neuesten offset
von kafka
zu der Zeit dann werde ich Daten AdX Push Kafka während des Neustarts der Job verloren.
Ich versuche etwas wie auto.offset.reset -> smallest
, aber es wird von 0 erhalten -> zuletzt dann Daten war riesig und doppelt in db.
Ich versuche auch, group.id
und consumer.id
auf Spark
zu setzen, aber es ist das gleiche.
Wie Sie die neuesten offset
Funken zookeeper
oder kafka
verbraucht speichern dann von dem zurück zu neuesten offset
lesen kann?
Aber, wie neuesten Offset zu ZK oder Kafka verbraucht zu speichern. Ich versuche zu aktivieren 'kafkaParams ++ = Karte [String, String] (" auto.commit.interval.ms "->" 1000 ") kafkaParams ++ = Karte [String, String] (" zookeeper.sync.time. ms "->" 200 ") kafkaParams ++ = Karte [String, String] (" zoekeeper.session.timeout.ms "->" 400 ")' aber es funktioniert nicht – giaosudau
Eine der Optionen ist, wie ich dir sagte Verwenden der OffsetRanges-Datenstruktur Nachdem Sie Ihren Stream in einer bestimmten Iteration verarbeitet haben, können Sie Folgendes tun: 'dStream.foreachRDD {rdd => val x = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges; // Mach etwas mit X (speichere es zB mit externem FS)} 'x hält den letzten verarbeiteten Offset für jede Topic-Partition-Kombination des RDD. Wenn Sie genau einmal Semantik benötigen, müssten Sie sie manuell unterstützen, aber es ist möglich. –
Meine Idee, dass ich nicht im externen Speicher speichern möchte, weil ZK und Kafka damit umgehen können. – giaosudau