2016-07-08 17 views
5

Ich habe einen einzigen Knoten, multi (3) Broker Zookeeper/Kafka-Setup. Ich benutze den Kafka 0.10 Java Client.Kafka 0.10 Java Client TimeoutException: Batch enthält 1 Satz (e) abgelaufen

Ich schrieb einfache Fern folgenden (auf einem anderen Server als Kafka) Produzent (im Code ich meine öffentliche IP-Adresse mit MYIP ersetzt):

Properties config = new Properties(); 
try { 
    config.put(ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName()); 
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "MYIP:9092, MYIP:9093, MYIP:9094"); 
    config.put(ProducerConfig.ACKS_CONFIG, "all"); 
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); 
    producer = new KafkaProducer<String, byte[]>(config); 
    Schema.Parser parser = new Schema.Parser(); 
    schema = parser.parse(GATEWAY_SCHEMA); 
    recordInjection = GenericAvroCodecs.toBinary(schema); 
    GenericData.Record avroRecord = new GenericData.Record(schema); 
    //Filling in avroRecord (code not here) 
    byte[] bytes = recordInjection.apply(avroRecord); 

    Future<RecordMetadata> future = producer.send(new ProducerRecord<String, byte[]>(datasetId+"", "testKey", bytes)); 
    RecordMetadata data = future.get(); 
} catch (Exception e) { 
    e.printStackTrace(); 
} 

Meine Servereigenschaften für die 3-Broker wie folgt aussehen (In den 3 verschiedenen Servereigenschaften-Dateien broker.id ist 0, 1, 2 und Listener ist PLAINTEXT: //: 9092, PLAINTEXT: //: 9093, PLAINTEXT: //: 9094 und host.name ist 10.2.0.4, 10.2. 0.5, 10.2.0.6). Dies wird der erste Server-Eigenschaften-Datei:

broker.id=0 
listeners=PLAINTEXT://:9092 
num.network.threads=3 
num.io.threads=8 
socket.send.buffer.bytes=102400 
socket.receive.buffer.bytes=102400 
socket.request.max.bytes=104857600 
log.dirs=/tmp/kafka1-logs 
num.partitions=1 
num.recovery.threads.per.data.dir=1 
log.retention.hours=168 
log.segment.bytes=1073741824 
log.retention.check.interval.ms=300000 
zookeeper.connect=localhost:2181 
zookeeper.connection.timeout.ms=6000 

Wenn ich den Code ausführen, bekomme ich folgende Ausnahme:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0 
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65) 
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52) 
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) 
    at com.nr.roles.gateway.GatewayManager.addTransaction(GatewayManager.java:212) 
    at com.nr.roles.gateway.gw.service(gw.java:126) 
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) 
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:821) 
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583) 
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1158) 
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511) 
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1090) 
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) 
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:109) 
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:119) 
    at org.eclipse.jetty.server.Server.handle(Server.java:517) 
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:308) 
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:242) 
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:261) 
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95) 
    at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:75) 
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceAndRun(ExecuteProduceConsume.java:213) 
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:147) 
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654) 
    at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0 

Weiß jemand, was ich vermisst? Jede Hilfe wäre willkommen. Vielen Dank

+0

Ich habe auch versucht, das gleiche wie oben zu tun, aber mit nur einem Broker (auf Port 9092). Ich bekomme immer noch genau die selbe Ausnahme. Ich habe sichergestellt, dass die Broker- und Zoowäcker-Ports auf dem Remote-Computer geöffnet sind und ich sie vom Producer-Computer telnet kann. – Armen

Antwort

1

Portinformation in Ihrer BOOTSTRAP_SERVERS_CONFIG Konfiguration ist falsch (MYIP:).

Wie Sie in server.properties als "KLARTEXT: //: 9093, KLARTEXT: //: 9093, KLARTEXT: //: 9094" erwähnt haben.

+0

Entschuldigung, ich hatte hier einen Tippfehler, in server.properties ist es "PLAINTEXT: //: 9092, PLAINTEXT: //: 9093, PLAINTEXT: //: 9094". Also sind die 'BOOTSTRAP_SERVERS_CONFIG' Ports korrekt. – Armen

3

Ich stoße auf die gleichen Probleme.

Sie sollten Ihre kafka server.properties ändern, um die IP-Adresse anzugeben. zB:

KLARTEXT: // YOUIP: 9093

wenn nicht, kafka Hostnamen wird verwendet werden, wenn der Hersteller nicht den Host bekommen, kann es nicht Nachricht sendet an Kafka selbst wenn Sie sie telnet können.

+0

Ich habe meine Eigenschaft richtig in meiner server.properties as eingestellt, Listeners = PLAINTEXT: // Domänenname: 9092, aber immer noch diese Ausnahme org.apache.kafka.common.errors.TimeoutException: Batch abgelaufen java.util.concurrent. ExecutionException, beim Verbinden von externem Server habe ich request.timeout.ms \t auf einen höheren Wert erhöht. –

+0

beantwortet die Frage des OP anscheinend nicht ... – Paul

0

This Antwort teilt einige Einblicke. Sie können die Producer-Konfiguration request.timeout.ms erhöhen, damit der Client die Batches länger warten kann, bevor sie ablaufen.

Sie könnten auch in die batch.size und linger.ms Konfigurationen suchen und finden Sie die optimale, die in Ihrem Fall funktioniert.