Ich arbeite gerade an einer Topologie, die Daten aus Kafka aufnimmt und sie in elasticsearch festhält. Ok, zuerst habe ich den grundlegenden KafkaSpout aus der Sturmabhängigkeit verwendet, um nach Daten zu suchen, die von einem präzisen Kafka-Thema kommen, und ich habe die Elasticsearch-Schraube aus dem elasticsearch-hadoop-Projekt erneut implementiert: https://github.com/elastic/elasticsearch-hadoop/blob/master/storm/src/main/java/org/elasticsearch/storm/EsBolt.java. Das Ziel war, auf mehreren Indizes in elasticsearch zu schreiben. Also, wenn ich die Nachrichten von Kafka verarbeiten, habe ich einige Ausnahmen, wenn die Anzahl der Daten in der Kafka-Warteschlange wächst. Dies ist ein Teil der Stapelüberwachung im Betriebsprotokolle:AWS-Mitarbeiter können aufgrund der Net- Client-Hostnamensauflösung nicht kommunizieren
2016-04-13T22:24:44.641+0000 b.s.m.n.Client [ERROR] failed to send 580 messages to Netty-Client-ip-[internal-ip].ec2.internal/[internal-ip]:6700:
java.nio.channels.ClosedChannelException
2016-04-13T22:24:44.641+0000 b.s.m.n.Client [ERROR] failed to send 575 messages to Netty-Client-ip-[internal-ip].ec2.internal/[internal-ip]:6700:
java.nio.channels.ClosedChannelException
2016-04-13T22:25:05.970+0000 b.s.m.n.Client [WARN] Re-connection to ip-[internal-ip].ec2.internal/[internal-ip]:6701 was successful but 52890 messages
has been lost so far
2016-04-13T22:36:33.571+0000 b.s.m.n.StormClientHandler [INFO] Connection failed Netty-Client-ip-ip-[internal-ip].ec2.internal/[internal-ip]:6701
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0_77]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0_77]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_77]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0_77]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0_77]
at org.apache.storm.netty.channel.socket.nio.NioWorker.read(NioWorker.java:64) [storm-core-0.9.6.jar:0.9.6]
at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) [storm-core-0.9.6.jar:0.9.6]
at org.apache.storm.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) [storm-core-0.9.6.jar:0.9.6]
at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) [storm-core-0.9.6.jar:0.9.6]
at org.apache.storm.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) [storm-core-0.9.6.jar:0.9.6]
at org.apache.storm.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [storm-core-0.9.6.jar:0.9.6]
at org.apache.storm.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [storm-core-0.9.6.jar:0.9.6]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_77]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_77]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
Ich bin ein Sturm Cluster von Knoten 3 (1 nimbus + UI + Zookeeper Supervisoren und 2) verwendet wird. Sturmversion 0.9.6. Jede dieser Maschinen haben 4GB RAM und dies ist der Inhalt meiner storm.yml Konfigurationsdatei:
storm.zookeeper.servers:
- "nimbus-ip"
storm.local.dir: "/mnt/storm"
nimbus.seeds: ["nimbus-ip"]
storm.zookeeper.port: 2181
ui.port: 8080
nimbus.host: "nimbus-ip"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
storm.messaging.netty.max_wait_ms: 10000
Kann mir jemand helfen zu wissen, warum Arbeiter nicht kommunizieren können aufgrund Netty-Client-Host-Namen Auflösung? Ich habe bereits einen Bericht dieses Problems in der Version 0.9.4 von storm https://issues.apache.org/jira/browse/STORM-908 gesehen. Ist es möglich, dass die Version 0.9.6 dieses Problem nicht behebt?
Vielen Dank !!