2016-08-03 53 views
0

Ich stehe vor einem seltsamen Problem. Ich bin auf der Suche nach einer Menge Informationen von Gerinne zu einem HDFS zu aggregieren. Ich habe die empfohlene Konfiguration angewendet, um zu viele kleine Dateien zu vermeiden, aber es hat nicht funktioniert. Hier ist meine Konfigurationsdatei.Kanal-/Avro-Quelle, Speicherkanal und HDFS-Senke - Zu viele kleine Dateien

# single-node Flume configuration 
# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 5458 
a1.sources.r1.threads = 20 

# Describe the HDFS sink 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://myhost:myport/user/myuser/flume/events/%{senderType}/%{senderName}/%{senderEnv}/%y-%m-%d/%H%M 
a1.sinks.k1.hdfs.filePrefix = logs- 
a1.sinks.k1.hdfs.fileSuffix = .jsonlog 
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
a1.sinks.k1.hdfs.batchSize = 100 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
#never roll-based on time 
a1.sinks.k1.hdfs.rollInterval=0 
##10MB=10485760, 128MB=134217728, 256MB=268435456 
a1.sinks.kl.hdfs.rollSize=10485760 
##never roll base on number of events 
a1.sinks.kl.hdfs.rollCount=0 
a1.sinks.kl.hdfs.round=false 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 5000 
a1.channels.c1.transactionCapacity = 1000 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

Diese Konfiguration funktioniert, und ich sehe meine Dateien. Aber der Gewichtsdurchschnitt der Datei beträgt 1.5kb. Flume Konsole Ausgabe bieten diese Art von Informationen.

16/08/03 09:48:31 INFO hdfs.BucketWriter: Creating hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484507.jsonlog.tmp 
16/08/03 09:48:31 INFO hdfs.BucketWriter: Closing hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484507.jsonlog.tmp 
16/08/03 09:48:31 INFO hdfs.BucketWriter: Renaming hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484507.jsonlog.tmp to hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484507.jsonlog 
16/08/03 09:48:31 INFO hdfs.BucketWriter: Creating hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484508.jsonlog.tmp 
16/08/03 09:48:31 INFO hdfs.BucketWriter: Closing hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484508.jsonlog.tmp 
16/08/03 09:48:31 INFO hdfs.BucketWriter: Renaming hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484508.jsonlog.tmp to hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484508.jsonlog 
16/08/03 09:48:31 INFO hdfs.BucketWriter: Creating hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484509.jsonlog.tmp 
16/08/03 09:48:31 INFO hdfs.BucketWriter: Closing hdfs://myhost:myport/user/myuser/flume/events/a/b/c/16-08-03/0948/logs-.1470210484509.jsonlog.tmp 

Jemand hat eine Idee über das Problem?


Hier sind einige Informationen über das Verhalten von Wasserläufen.

Der Befehl ist Gerinne-ng Mittel -n a1 -c/path/to/Gerinne/conf --conf-Datei sample-flume.conf -Dflume.root.logger = trace, Konsolen- -Xms8192m -Xmx16384m

Hinweis: Die Logger-Direktive funktioniert nicht. Ich verstehe nicht, warum, aber ich bin ...

Flume Startausgabe ist:

16/08/03 15:32:55 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 
16/08/03 15:32:55 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:sample-flume.conf 
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1 
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:kl 
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1 
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1 
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1 
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1 
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1 
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:kl 
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1 
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1 
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:kl 
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1 
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1 
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1 
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1] 
16/08/03 15:32:55 INFO node.AbstractConfigurationProvider: Creating channels 
16/08/03 15:32:55 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory 
16/08/03 15:32:55 INFO node.AbstractConfigurationProvider: Created channel c1 
16/08/03 15:32:55 INFO source.DefaultSourceFactory: Creating instance of source r1, type avro 
16/08/03 15:32:55 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: hdfs 
16/08/03 15:32:56 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false 
16/08/03 15:32:56 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1] 
16/08/03 15:32:56 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:Avro source r1: { bindAddress: 0.0.0.0, port: 5458 } }} sinkRunners:{k1=SinkRunner: { policy:[email protected] counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 
16/08/03 15:32:56 INFO node.Application: Starting Channel c1 
16/08/03 15:32:56 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 
16/08/03 15:32:56 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 
16/08/03 15:32:56 INFO node.Application: Starting Sink k1 
16/08/03 15:32:56 INFO node.Application: Starting Source r1 
16/08/03 15:32:56 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 5458 }... 
16/08/03 15:32:56 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean. 
16/08/03 15:32:56 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started 
16/08/03 15:32:56 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 
16/08/03 15:32:56 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started 
16/08/03 15:32:56 INFO source.AvroSource: Avro source r1 started. 

Da ich nicht mehr ausführliche Ausgabe haben kann, muss ich annehmen, dass Informationen wie

[...] 
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1 
16/08/03 15:32:55 INFO conf.FlumeConfiguration: Processing:k1 
[...] 

zeigt an, dass die Senken korrekt konfiguriert sind.


PS: Ich sah folgende Antworten, aber keiner von denen funktioniert (ich sollte etwas verpassen ...).

flume-hdfs-sink-generates-lots-of-tiny-files-on-hdfs

too-many-small-files-hdfs-sink-flume

flume-tiering-data-flows-using-the-avro-source-and-sink

flume-hdfs-sink-keeps-rolling-small-files

+0

Haben Sie versucht, wenn andere Optionen zum Rollen gut funktionieren? Versuchen Sie auch, das Log direkt nach dem Start von Flume zu überprüfen, im Allgemeinen könnte es einen Hinweis geben. Ich habe Ihre Sink-Konfiguration mit der Konfiguration verglichen, die ich vor einiger Zeit verwendet habe, und es scheint sehr ähnlich zu sein (wir haben etwas größere Kanalkapazitäten konfiguriert, aber es sollte die Senke nicht beeinflussen). – Serhiy

+0

@Serhiy Vielen Dank für Ihre Eingabe. Ich habe meinen Post mit der Konsolenausgabe des Flumes bearbeitet. Ich versuche, einige rollende Informationen zu ändern. Es scheint alle rollenden Konfigurationsinformationen zu sein, die versagen. Ich habe tatsächlich versucht, den rollenden Countr (a1.sinks.kl.hdfs.rollCount = 1000) zu setzen, aber hat nicht funktioniert. Ich betreibe die HDP 2.3.2, könnte ein Tweak von hier sein? – ollie314

+0

Können Sie versuchen, die Protokollierung mit Debug-Ebene einzurichten? In meiner Installation befindet sich 'log4j.properties' im' flume/conf' Verzeichnis. Versuchen Sie es mit 'run.root.logger', 'log4j.logger.org'.apache.flume.lifecycle' und 'log4j.logger.org.apache.hadoop' zu' DEBUG'. – Serhiy

Antwort

0

erhöhen Losgröße wie pro Ihre Anforderung

a1.sinks.k1.hdfs.batchSize =