2015-06-15 12 views
6

ich die HTTPSource in Flume zur Aufnahme POST Ereignisse in json Format verwende wie folgt:Wie JSON in HDFS einfügen mit Flume richtig

{"username":"xyz","password":"123"} 

Meine Frage ist: Muss ich die Quelle zu ändern haben die Ereignisse (ich meine die, die die JSON zum Flume senden), so dass die JSON, hat folgendes Format:

[{ 
    "headers" : { 
      "timestamp" : "434324343", 
      "host" : "random_host.example.com" 
      }, 
    "body" : "{"username":"xyz","password":"123"}" 
}] 

Dies ist der beste Weg, es zu tun? Oder ich kann es überall anders ändern?

Meine conf-Datei für die flume agent ist:

## Componentes 
SomeAgent.sources = SomeHTTP 
SomeAgent.channels = MemChannel 
SomeAgent.sinks = SomeHDFS 

## Fuente e Interceptores 
SomeAgent.sources.SomeHTTP.type = http 
SomeAgent.sources.SomeHTTP.port = 5140 
SomeAgent.sources.SomeHTTP.handler = org.apache.flume.source.http.JSONHandler 
SomeAgent.sources.SomeHTTP.channels = MemChannel 
SomeAgent.sources.SomeHTTP.interceptors = i1 i2 

## Interceptores 
SomeAgent.sources.SomeHTTP.interceptors.i1.type = timestamp 
SomeAgent.sources.SomeHTTP.interceptors.i2.type = host 
SomeAgent.sources.SomeHTTP.interceptors.i2.hostHeader = hostname 

## Canal 
SomeAgent.channels.MemChannel.type = memory 
SomeAgent.channels.MemChannel.capacity = 10000 
SomeAgent.channels.MemChannel.transactionCapacity = 1000 

## Sumidero 
SomeAgent.sinks.SomeHDFS.type = hdfs 
SomeAgent.sinks.SomeHDFS.channel = MemChannel 
SomeAgent.sinks.SomeHDFS.hdfs.path = /raw/logs/%Y-%m-%d 
SomeAgent.sinks.SomeHDFS.hdfs.fileType = DataStream 
SomeAgent.sinks.SomeHDFS.hdfs.filePrefix = SomeLogs- 
SomeAgent.sinks.SomeHDFS.hdfs.writeFormat = Text 
SomeAgent.sinks.SomeHDFS.hdfs.batchSize = 100 
SomeAgent.sinks.SomeHDFS.hdfs.rollSize = 0 
SomeAgent.sinks.SomeHDFS.hdfs.rollCount = 10000 
SomeAgent.sinks.SomeHDFS.hdfs.rollInterval = 600 
SomeAgent.sinks.SomeHDFS.hdfs.useLocalTimeStamp = true 

Ausführen des cat von hadoop fs

$ hadoop fs -ls -R /raw/logs/somes 
drwxr-xr-x - flume-agent supergroup   0 2015-06-16 12:43 /raw/logs/arquimedes/2015-06-16 
-rw-r--r-- 3 flume-agent supergroup  3814 2015-06-16 12:33 /raw/logs/arquimedes/2015-06-16/SomeLogs.1434471803369 
-rw-r--r-- 3 flume-agent supergroup  3719 2015-06-16 12:43 /raw/logs/arquimedes/2015-06-16/SomeLogs.1434472404774 


$ hadoop fs -cat /raw/logs/somes/2015-06-16/SomeLogs.1434471803369 | head 




$ 

(Sie sehen richtig, leere Zeilen)

Wenn ich jetzt auf die Datei aussehen (unter Verwendung der binären Sicht von HUE zum Beispiel):

0000000: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................ 
0000010: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................ 
0000020: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................ 

Antwort

4

Wenn ich gut verstanden habe, möchten Sie sowohl die Daten als auch die Header serialisieren. In diesem Fall müssen Sie die Datenquelle nicht ändern, sondern einige standardmäßige Flume-Elemente verwenden und Ihren eigenen Serializer für HDFS erstellen.

Der erste Schritt ist zu erreichen Flume schafft die gewünschte JSON-Struktur, d. H. Header + Körper. Flume in der Lage, es für Sie zu tun, verwenden Sie einfach JSONHandler an Ihrem HTTPSource, auf diese Weise:

a1.sources = r1 
a1.sources.r1.hnadler = org.apache.flume.source.http.JSONHandler 

In der Tat ist es nicht notwendig, den JSON-Handler zu konfigurieren, da es der Standard ein für HTTPSource ist.

Verwenden Sie dann beide Timestamp Interceptor und Host Interceptor, um die gewünschten Header hinzuzufügen. Der einzige Trick ist das Gerinne Mittel in der gleichen Maschine als der Senderprozess, um die abgefangenen Host ist eine die gleiche als der Sender ausgeführt werden muss:

a1.sources.r1.interceptors = i1 i2 
a1.sources.r1.interceptors.i1.type = timestamp 
a1.sources.r1.interceptors.i2.type = host 
a1.sources.r1.interceptors.i2.hostHeader = hostname 

An diesem Punkt werden Sie das gewünschte Ereignis haben. Trotzdem speichern Standard-Serialisierer für HDFS nur den Körper, nicht die Header. Erstellen Sie einen benutzerdefinierten Serializer, der org.apache.flume.serialization.EventSerializer implementiert. Es ist so konfiguriert, wie:

a1.sinks = k1 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.serializer = my_custom_serializer 

HTH

+0

Ich habe diesen Link gefunden (http://grokbase.com/t/flume/user/128nspvnfg/can-hdfssink-write-headers-as-well) sagen, dass Cuatom Serialisierer für HDFSSink nur für 'erstellt werden können fileType = CompressedStream' oder 'DataStream'. Ich weiß nicht, ob es momentan für 'SequenceFiles' festgelegt ist. – frb

+0

frb danke für deine Antwort, ich habe nur die Konfigurationsdatei eingefügt, aber wenn ich (mit 'hadoop fs -cat/rohe/log/2015-06-15/SomeLog-.1434410388430') sehe ich nichts (a Viele leere Bündel, von denen ich vermute, dass sie binär sind. Könnten Sie den Fehler sehen? – nanounanue

+0

Ich habe die Ausgabe als Binär in der Frage hinzugefügt ... Loggt nichts ': (' – nanounanue

3

Die Antwort von @frb geschrieben war richtig, fehlt der einzige Punkt ist, dass die JSON-Generator muss den body Teil senden (ich muss zugeben/beklagen, dass die docs sind in diesem Punkt nicht klar), so der richtig Weg, um die json der Entsendung wird

[body:"{'username':'xyz','password':'123'}"] 

Bitte keine te, dass die json von Daten jetzt eine Zeichenfolge ist.

Mit dieser Änderung ist jetzt die json im hdfs sichtbar.

+0

Guter Punkt! Musstest du endlich ändern die Ereignisse Quelle, um die JSON als Zeichenfolge zu senden? Oder haben Sie in irgendeiner Weise die JSONHandler geändert, um die empfangenen Daten zu string? – frb

+0

Ich änderte die Quelle. Die Änderung wurde die JSON string und den Schlüssel 'body' hinzufügen – nanounanue

1

Die Flute HTTPSource, die den Standard-JSONHandler verwendet, erwartet, dass eine Liste von vollständig geformten Flume-Ereignissen in der JSON-Darstellung [{ headers: ..., body: ... }] an den Endpunkt übermittelt wird. Um einen Agentenendpunkt zu erstellen, der eine einfache Struktur auf Anwendungsebene wie {"username":"xyz", "password":"123"} akzeptiert, können Sie den Handler mit einer alternativen Klasse überschreiben, die HTTPSourceHandler implementiert; Sehen Sie die JSONHandler Quelle - es gibt nicht viel zu.

public List<Event> getEvents(HttpServletRequest request) throws ... 

In einem benutzerdefinierten JSONHandler Sie auch Header auf das Ereignis auf der HTTP-Anforderung, wie der Quell-IP, User-Agent usw. (ein Interceptor haben den Kontext nicht für diese) basiert hinzufügen könnte. Möglicherweise möchten Sie zu diesem Zeitpunkt den von der Anwendung bereitgestellten JSON validieren (der Standardhandler tut dies jedoch nicht).

Obwohl, wie Sie gefunden haben, die Sie gerade den [{body: ...}] Teil, wie ein benutzerdefinierter Handler auch nützlich sein könnte passieren können, wenn Sie wollen verhindern ein Generator-Header für die Veranstaltung zu injizieren.