2016-06-29 12 views
3

In Kafka Streams, was ist der kanonische Weg, einen Stream zu produzieren/zu schreiben? In Spark gibt es den benutzerdefinierten Empfänger, der als ein lang laufender Adapter aus einer beliebigen Datenquelle funktioniert. Was ist das Äquivalent in Kafka Streams?Kafka Streams: Wie schreibe ich zu einem Thema?

Um genau zu sein, ich frage nicht, wie man Transformationen von einem Thema zum anderen durchführt. Die Dokumentation ist dazu sehr klar. Ich möchte verstehen, wie ich meine Arbeiter schreibe, die den ersten Schreibvorgang in einer Reihe von Transformationen zu Kafka machen werden.

Ich erwarte, dass die Lage sein, dies zu tun

builder1.<something>(<some intake worker like a spark reciver) 
     .to(topic1) 
     .start() 

builder2.from(topic1) 
     .transform(<some transformation function>) 
     .to(topic2) 
     .start() 

Aber keiner der bestehenden Dokumentation zeigt? Fehle ich etwas?

Antwort

4

Hängt davon ab, ob Sie die Kafka Streams DSL oder Prozessor-API verwenden:

  • Kafka Streams DSL Sie KStream#to() die KStream zu einem Thema materialisieren können. Dies ist die kanonische Methode, um Daten zu einem Thema zu materialisieren. Alternativ können Sie KStream#through() verwenden. Dies wird auch Daten zu einem Thema materialisieren, aber auch die resultierenden KStream zur weiteren Verwendung zurückgeben. Der einzige Unterschied zwischen #to() und #through() besteht dann darin, dass Sie eine KStreamBuilder#stream() speichern, wenn Sie die resultierende materialisierte Partition als KStream möchten.

  • Prozessor-API Sie materialisieren Daten an eine Partition, indem Sie die Daten an einen Senke-Prozessor weiterleiten.

So oder so, eine wichtige Sache zu beachten ist, dass die Daten nicht zu einem Thema materialisiert werden, bis Sie auf eine Partition mit einem der oben genannten Methoden schreiben. map(), filter(), etc materialisieren Daten nicht. Die Daten verbleiben in der Prozessor-Task/dem Thread/Speicher, bis sie durch eine der obigen Methoden materialisiert werden.


Um in Kafka Streams zu erzeugen:

Properties producerConfig = new Properties(); 
producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:2181"); 
producerConfig.put(ACKS_CONFIG, "all"); 
producerConfig.put(RETRIES_CONFIG, 0); 
Producer<Integer, Integer> producer = new KafkaProducer<>(producerConfig, new IntegerSerializer(), new IntegerSerializer<>()); 

und dann:

Arrays.asList(1, 2, 3, 4).forEach(integer -> producer.send(new ProducerRecord<>("integers", integer, integer))) 

Sie benötigen:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>${version.kafka}</version> 
</dependency> 
+0

das ist nicht das, was ich gefragt habe. von sagen, ein literales Array von ganzen Zahlen, wie würden Sie dieses Array zu einem Thema materialisieren? – dmead

+0

Sie würden einen 'KafkaProducer' verwenden –

2

Ich verstehe, wollen, wie schreiben meine Arbeiter das wird den ersten Schreibvorgang in einer Reihe von Transformationen zu Kafka machen.

Der initiale Schreibvorgang (= Eingabedaten) sollte nicht über Kafka Streams erfolgen. Kafka Streams geht davon aus, dass die Eingabedaten bereits in Kafka sind.

Also das erwartete Workflow von Ihnen ist nicht anwendbar:

builder1.<something>(<some intake worker like a spark reciver) 
    .to(topic1) 
    .start() 

Vielmehr Sie so etwas wie Kafka Connect verwenden würde, Daten in Kafka zu bekommen (zvon einer Datenbank in ein Kafka-Thema) oder verwenden Sie die "normalen" Kafka-Producer-Clients (Java, C/C++, Python, ...), um die Eingabedaten in Kafka zu schreiben.

In Kafka Streams ist noch kein "Hook" verfügbar, um die Eingabedaten zu laden. Wir erwarten eine bessere Integration von Kafka Connect und Kafka Streams, sodass sich diese Situation in naher Zukunft möglicherweise verbessern wird.

-1

können Sie versuchen, den folgenden Befehl für Linux:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicName --property "parse.key=true"property "key.separator=:"; 
  1. parse.key wenn auf true gesetzt Eingang als Schlüssel und Wert kann von der Konsole Paar zu akzeptieren.

  2. key.separator muss auf das Trennzeichen für Schlüssel und Wert gesetzt werden.