2013-02-12 9 views
5

Ich bin neu in Apache-Kanal.
Ich versuche zu sehen, wie ich einen JSON (als http-Quelle) bekommen, analysieren und speichern Sie es auf einen dynamischen Pfad auf HDFs nach dem Inhalt.
Zum Beispiel:
wenn die json ist:Kann Apache Gerinne hdfs Sink akzeptieren dynamischen Pfad zu schreiben?

[{ 
    "field1" : "value1", 
    "field2" : "value2" 
}] 

dann wird der hdfs Pfad:
/some-default-root-path/Wert1/Wert2/some-Wert-Name-Datei
Gibt es eine solche Konfiguration von Wasserläufen, die es mir ermöglicht, das zu tun?

Hier ist meine aktuelle Konfiguration (akzeptiert eine json über http und speichert sie in einem Pfad nach Zeitstempel):

#flume.conf: http source, hdfs sink 

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource 
a1.sources.r1.port = 9000 
#a1.sources.r1.handler = org.apache.flume.http.JSONHandler 

# Describe the sink 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = /user/uri/events/%y-%m-%d/%H%M/%S 
a1.sinks.k1.hdfs.filePrefix = events- 
a1.sinks.k1.hdfs.round = true 
a1.sinks.k1.hdfs.roundValue = 10 
a1.sinks.k1.hdfs.roundUnit = minute 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

Dank!

Antwort

8

Die Lösung wurde im Gerinne documentation for the hdfs sink:

Hier ist die überarbeitete Konfiguration:

#flume.conf: http source, hdfs sink 

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource 
a1.sources.r1.port = 9000 
#a1.sources.r1.handler = org.apache.flume.http.JSONHandler 

# Describe the sink 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = /user/uri/events/%{field1} 
a1.sinks.k1.hdfs.filePrefix = events- 
a1.sinks.k1.hdfs.round = true 
a1.sinks.k1.hdfs.roundValue = 10 
a1.sinks.k1.hdfs.roundUnit = minute 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

und die Locke:

curl -X POST -d '[{ "headers" : {   "timestamp" : "434324343", "host" :"random_host.example.com", "field1" : "val1"   }, "body" : "random_body" }]' localhost:9000 
+0

Ich bin eine andere Quelle mit (RabbitMQ) und I Ich gebe selbst eine JSON-Payload weiter. Die von dir beschriebene Methode scheint in meinem Fall nicht zu funktionieren. Ich nehme an, etwas ist falsch an meinem Ende, wenn Sie ähnliche Probleme konfrontiert –

+0

Danke, das hat für mich funktioniert –