2016-07-26 64 views
0

Ich versuche, Kafka von meinem lokalen Rechner zu verbinden:Flink, Kafka und Zookeeper mit einem URI

kafkaParams.setProperty("bootstrap.servers", Defaults.BROKER_URL) 
kafkaParams.setProperty("metadata.broker.list", Defaults.BROKER_URL) 
kafkaParams.setProperty("group.id", "group_id") 
kafkaParams.setProperty("auto.offset.reset", "earliest") 

Vollkommen in Ordnung, aber meine BROKER_URI ist definiert als my-server.com:1234/my/subdirectory folgt.

Ich fand heraus, dass dieses Phänomen als Chroot-Pfad bezeichnet wird.

Es führt den folgenden Fehler: Caused by: org.apache.kafka.common.config.ConfigException: Invalid url in bootstrap.servers: my-server.com:1234/my/subdirectory

Wie kann ich dieses Problem lösen?

Das sind meine Abhängigkeiten:

val flinkVersion = "1.0.3" 

"org.apache.flink" %% "flink-scala" % flinkVersion % "provided", 
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", 
"org.apache.flink" %% "flink-connector-kafka-0.9" % flinkVersion, 

Antwort

0

Nur host:port Format ohne den Pfad Kontext und Schrägstriche versuchen. Wenn Sie mehr als einen Server haben, es wäre eine Liste host1:port1,host2:port2

Referenz sein: address1:port1,address2:port2,...,addressn:portn: http://kafka.apache.org/documentation.html

+0

Dies gibt die folgenden Fehler: 'Exception in thread "main" org.apache.kafka.common.errors.TimeoutException: Timeout während abgelaufen Abrufen von Thema Metadaten. –

+0

Es ist ein Hinweis, dass das Format für die Konfiguration in Ordnung ist. Das nächste, was zu suchen wäre, wenn Iptables oder Firewall auf der Kafka-Instanz läuft. Können Sie die Kafka-Instanz von Ihrer Client-Box aus telnet? –

+0

Spaß Sache ist, kann ich mit der Kafka-Konsole Verbraucher verbinden: './kafka-console-consumer.sh --zookeke my-server.com:1234/my/subdirectory --topic my-topic --from-Beginning funktioniert einwandfrei. Und Telnet funktioniert auch gut: 'telnet my-server.com 1234' –

0

bootstrap.servers sollte eine durch Kommata getrennte Liste wie die folgende sein. Wenn Sie nur einen Kafka-Broker haben, sollten Sie etwas wie localhost:9092 eingeben (es sei denn, Sie haben Kafka so konfiguriert, dass es auf einem anderen Port läuft).

Sie können sich auf die this post from dataArtisans beziehen für weitere Details, wie Flink und Kafka zusammen arbeiten.

0

Dumm. Zoowärter! = Kafka. Wie Sie im Code sehen können, habe ich dieselbe URL zweimal verwendet, aber es stellte sich heraus, dass sie unterschiedlich sein sollten.

ich Kafka von meinem lokalen Rechner zu verbinden versuchen:

kafkaParams.setProperty("bootstrap.servers", Defaults.KAFKA_URL) 
kafkaParams.setProperty("metadata.broker.list", Defaults.ZOOKEEPER_URL) 
kafkaParams.setProperty("group.id", "group_id") 
kafkaParams.setProperty("auto.offset.reset", "earliest") 
+1

;) würden Sie bitte den richtigen Code posten? –

+0

Natürlich :-), es ist hinzugefügt –