2015-11-16 10 views
7

Dies ist mein erstes Mal hier, so tut mir leid, wenn ich nicht gut post, und Entschuldigung für mein schlechtes Englisch.Konfigurieren Sie Spüle elasticsearch Apache-Rinne

Ich versuche, Apache Flume und Elasticsearch-Senken zu konfigurieren. Alles ist in Ordnung, es scheint, dass es gut läuft, aber es gibt 2 Warnungen, wenn ich einen Agenten starte; die folgenden:

2015-11-16 09:11:22,122 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:[email protected] counterGroup:{ name:null counters:{} } } - Exception follows. 
java.lang.NoSuchMethodError: org.elasticsearch.common.transport.InetSocketTransportAddress.<init>(Ljava/lang/String;I)V 
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:143) 
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:77) 
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48) 
    at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:357) 
    at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46) 
    at org.apache.flume.SinkRunner.start(SinkRunner.java:79) 
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
2015-11-16 09:11:22,137 (lifecycleSupervisor-1-3) [WARN - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:260)] Component SinkRunner: { policy:[email protected] counterGroup:{ name:null counters:{} } } stopped, since it could not besuccessfully started due to missing dependencies 

Mein Agent Konfiguration:

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port = 44444 

# Describe the sink ES 
a1.sinks = k1 
a1.sinks.k1.type = elasticsearch 
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300 
a1.sinks.k1.indexName = items 
a1.sinks.k1.indexType = item 
a1.sinks.k1.clusterName = elasticsearch 
a1.sinks.k1.batchSize = 500 
a1.sinks.k1.ttl = 5d 
a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer 
a1.sinks.k1.channel = c1 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

Es beginnt die netcat und alles ist in Ordnung, aber ich habe Angst, über Thesen Warnungen, ich verstehe es nicht.

+0

Sind Sie sicher, dass die angegebene Konfiguration ordnungsgemäß ausgeführt wird? Die erste Protokollablaufverfolgung ist keine Warnung, sondern ein Fehler, der Ihnen mitteilt, dass 'ElasticSearchSink' ein Problem hat, das höchstwahrscheinlich mit einem Abhängigkeitsproblem verbunden ist (es gibt eine Methode, die nicht gefunden wird). – frb

+0

Ich habe nichts über die spezielle Nachricht bemerkt, die der Warn-Trace gegeben hat, aber es bestätigt meine Diagnose: 'Component SinkRunner: {policy: [email protected] counterGroup: {name: null counters: {} }} gestoppt, da es aufgrund fehlender Abhängigkeiten nicht erfolgreich gestartet werden konnte – frb

Antwort

1

Bei den Protokollen gibt es ein Problem mit einer fehlenden Abhängigkeit.

Wenn Sie einen Blick auf die ElasticSearchSink Dokumentation haben, werden Sie folgendes sehen:

Die Elasticsearch und Lucene-Core-Gläser für Ihre Umgebung erforderlich sind, müssen im Verzeichnis lib des Apache Flume platziert werden Installation. Elasticsearch erfordert, dass die Hauptversion der Client-JAR mit der des Servers übereinstimmt und beide die gleiche Nebenversion der JVM ausführen. SerializationExceptions werden angezeigt, wenn dies nicht korrekt ist. Um die erforderliche Version auszuwählen, müssen Sie zuerst die Version von elasticsearch und die JVM-Version des Zielclusters ermitteln. Wählen Sie dann eine elasticsearch-Client-Bibliothek aus, die der Hauptversion entspricht. Ein 0.19.x-Client kann mit einem 0.19.x-Cluster kommunizieren. 0,20.x kann mit 0,20.x sprechen und 0,90.x kann mit 0,90.x sprechen. Sobald die ElasticSearch-Version ermittelt wurde, lesen Sie die Datei pom.xml, um die richtige zu verwendende JUC-Version von Lucene-Core zu ermitteln. Der Flume-Agent, auf dem ElasticSearchSink ausgeführt wird, sollte auch mit der JVM übereinstimmen, in der der Zielcluster mit der Nebenversion ausgeführt wird.

Wahrscheinlich haben Sie die erforderlichen Java-Jars nicht platziert oder die Version ist nicht die richtige.

2

Ich habe einen Grund gefunden, es scheint, dass Apache Flume 1.6.0 und Elasticsearch 2.0 nicht richtig kommunizieren können.

Ich fand eine gute Senke von einer 3. Person, die ich modifizierte.

Here is the link

Und das ist meine letzte Konfiguration,

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port = 44444 

# Describe the sink ES 
a1.sinks = k1 
a1.sinks.k1.type = com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink 
a1.sinks.k1.hostNames = 127.0.0.1:9300 
a1.sinks.k1.indexName = items 
a1.sinks.k1.indexType = item 
a1.sinks.k1.clusterName = elasticsearch 
a1.sinks.k1.batchSize = 500 
a1.sinks.k1.ttl = 5d 
a1.sinks.k1.serializer = com.frontier45.flume.sink.elasticsearch2.ElasticSearchDynamicSerializer 
a1.sinks.k1.indexNameBuilder = com.frontier45.flume.sink.elasticsearch2.TimeBasedIndexNameBuilder 
a1.sinks.k1.channel = c1 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

Es funktioniert für mich.

Danke für die Antworten.

P.S. Ja, ich musste die Bibliotheken verschieben.

1

nur in Gerinne/lib dir unter 2 JAR-Dateien hinzugefügt und es hat funktioniert, müssen nicht alle anderen Lucene JAR-Dateien hinzufügen:

Elasticsearch-1.7.1.jar

lucene-Core- 4.10.4.jar

Befehl Gerinne zu starten:

bin/flume-ng agent --conf conf --conf-file conf/flume-aggregator.conf --name agent2 -Dflume.root.logger=INFO,console 

stellen Sie sicher, unten an flume-env.sh

export JAVA_HOME=/usr/java/default 

export JAVA_OPTS="-Xms3072m -Xmx3072m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" 

FLUME_CLASSPATH="/usr/flume/flume1.6/apache-flume-1.6.0-bin/;/usr/flume/flume1.6/apache-flume-1.6.0-bin/lib" 

Flume Aggregator Config hinzufügen Daten in ES zu laden: rume-aggregator.conf

agent2.sources = source1 
agent2.sinks = sink1 
agent2.channels = channel1 

################################################ 
# Describe Source 
################################################ 

# Source Avro 
agent2.sources.source1.type = avro 
agent2.sources.source1.bind = 0.0.0.0 
agent2.sources.source1.port = 9997 

################################################ 
# Describe Interceptors 
################################################ 
# an example of nginx access log regex match 
# agent2.sources.source1.interceptors = interceptor1 
# agent2.sources.source1.interceptors.interceptor1.type = regex_extractor 
# 
# agent2.sources.source1.interceptors.interceptor1.regex = "^(\\S+) \\[(.*?)\\] \"(.*?)\" (\\S+) (\\S+)(\"(.*?)\" \"(.*?)\")?" 
# 
# # agent2.sources.source1.interceptors.interceptor1.regex = ^(.*) ([a-zA-Z\\.\\@\\-\\+_%]+) ([a-zA-Z\\.\\@\\-\\+_%]+) \\[(.*)\\] \\"(POST|GET) ([A-Za-z0-9\\$\\.\\+\\@#%_\\/\\-]*)\\??(.*) (.*)\\" ([a-zA-Z0-9\\.\\/\\s\-]*) (.*) ([0-9]+) ([0-9]+) ([0-9\\.]+) 
# # agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11 s12 s13 
# 
# agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 
# agent2.sources.source1.interceptors.interceptor1.serializers.s1.name = clientip 
# agent2.sources.source1.interceptors.interceptor1.serializers.s2.name = datetime 
# agent2.sources.source1.interceptors.interceptor1.serializers.s3.name = method 
# agent2.sources.source1.interceptors.interceptor1.serializers.s4.name = request 
# agent2.sources.source1.interceptors.interceptor1.serializers.s5.name = response 
# agent2.sources.source1.interceptors.interceptor1.serializers.s6.name = status 
# agent2.sources.source1.interceptors.interceptor1.serializers.s7.name = bytes 
# agent2.sources.source1.interceptors.interceptor1.serializers.s8.name = requesttime 
# 

################################################ 
# Describe Sink 
################################################ 

# Sink ElasticSearch 
# Elasticsearch lib ---> flume/lib 
# elasticsearch/config/elasticsearch.yml cluster.name clusterName. data/clustername data. 
agent2.sinks.sink1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink 
agent2.sinks.sink1.hostNames = 10.20.156.16:9300,10.20.176.20:9300 
agent2.sinks.sink1.indexName = pdi 
agent2.sinks.sink1.indexType = pdi_metrics 
agent2.sinks.sink1.clusterName = My-ES-CLUSTER 
agent2.sinks.sink1.batchSize = 1000 
agent2.sinks.sink1.ttl = 2 
#this serializer is crucial in order to use kibana 
agent2.sinks.sink1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer 



################################################ 
# Describe Channel 
################################################ 

# Channel Memory 
agent2.channels.channel1.type = memory 
agent2.channels.channel1.capacity = 10000000 
agent2.channels.channel1.transactionCapacity = 1000 

################################################ 
# Bind the source and sink to the channel 
################################################ 

agent2.sources.source1.channels = channel1 
agent2.sinks.sink1.channel = channel1 
+0

Vielen Dank für die Auflistung der genauen Gläser. – user99999991