2

Wie stelle ich sicher, dass ich immer von Anfang an ein Kafka-Thema mit Flink konsumiere?Von Anfang an ein Kafka-Thema mit Fink konsumieren

Mit dem Kafka 0.9.x consumer, die Teil Flink 1.0.2 ist, scheint es, dass es nicht mehr Kafka aber Flink ist der Versatz zu steuern:

Flink Snapshots die Offsets intern als Teil seiner verteilten Checkpoints. Die Offsets, die an Kafka/ZooKeeper übergeben wurden, dienen nur dazu, die Außenansicht des Fortschritts synchron mit Flink's Ansicht des Fortschritts zu bringen. Auf diese Weise können Überwachungs- und andere Jobs eine Ansicht anzeigen, wie weit der Flink Kafka-Consumer ein Thema verbraucht hat.

Dies ist, wie weit ich habe, aber mein Flink Programm immer dort beginnt, wo er aufgehört hat, und kehrt nicht zurück an den Anfang, wie die Konfiguration es anweist:

val props = new Properties() 
props.setProperty("bootstrap.servers", "localhost:9092"); 
props.setProperty("group.id", "myflinkservice") 
props.setProperty("auto.offset.reset", "earliest") 

val incomingData = env.addSource(
    new FlinkKafkaConsumer09[IncomingDataRecord](
    "my.topic.name", 
    new IncomingDataSchema, 
    props 
) 
) 

Antwort

0

ich glaube, Sie können dies umgehen, indem eine zufällige group.id Angabe:

val props = new Properties() 
props.setProperty("bootstrap.servers", "localhost:9092"); 
props.setProperty("group.id", s"myflinkservice_${UUID.randomUUID}") 
props.setProperty("auto.offset.reset", "smallest") // "smallest", not "earliest" 

auto.offset.reset funktioniert nur, wenn kein Anfangs es in ZooKeeper Offset zur Verfügung.

+0

Das habe ich gemacht :-) aber es ist nicht praktikabel, da ich die Möglichkeit verliere, eine Gruppe für meine Kafka-Konsumenten festzulegen. Laut den Dokumenten, die ich in meiner Frage zitiert habe, ist Flink 1.0.2 offenbar nicht mehr auf die Offsets angewiesen, die an den Tierpfleger gebunden sind, und verfolgt die Offsets selbst. –

+0

Wenn der Kafka-Consumer startet und für die Gruppe kein Offset verfügbar ist, wird dieser gemäß der Offset-Reset-Strategie gestartet. Die gesamte Versatzverfolgung, die Flink ausführt, gilt nur für die Lebensdauer eines Jobs (mit Ausnahme von Sicherungspunkten). Sobald der Job gestoppt ist, verlassen wir uns auf ZK oder die Auto-Offset-Strategie. Was Sie suchen, ist die Fähigkeit, das Offset-Committing (zu ZK oder dem Broker) vollständig zu deaktivieren, so dass Flink die Offsets niemals auswählen kann. Das ist eine Funktion, die momentan nicht in Flinks Kafka-Connector verfügbar ist –