2016-04-13 25 views
3

Kafka 0,8VWas ist der beste Weg, verschiedene Arten von Nachrichten zu veröffentlichen und zu konsumieren?

Ich mag/veröffentlichen verbrauchen byte [] Objekte, Java Bean Objekte, serialisierbare Objekte und vieles mehr ..

Was ist der beste Weg, um einen Verleger und Verbraucher für diese zu definieren, Typ Szenario? Wenn ich eine Nachricht vom Consumer-Iterator konsumiere, weiß ich nicht, um welchen Nachrichtentyp es sich handelt. Kann mir jemand eine Anleitung zum Entwerfen solcher Szenarien geben?

Antwort

3

Ich erzwinge ein einzelnes Schema oder einen Objekttyp pro Kafka-Thema. Auf diese Weise wissen Sie genau, was Sie bekommen, wenn Sie Nachrichten erhalten.

Zumindest sollten Sie entscheiden, ob ein bestimmtes Thema binary oder string Daten enthält, und davon abhängig, wie es weiter codiert wird.

Zum Beispiel könnten Sie ein Thema mit dem Namen Schema haben, das JSON-codierte Objekte enthält, die als Strings gespeichert sind.

Wenn Sie JSON und eine lose typisierte Sprache wie JavaScript verwenden, könnte es verlockend sein, verschiedene Objekte mit unterschiedlichen Schemas im selben Thema zu speichern. Mit JavaScript können Sie einfach JSON.parse(...) aufrufen, einen Blick auf das resultierende Objekt werfen und herausfinden, was Sie damit machen möchten.

Aber Sie können das nicht in einer streng typisierten Sprache wie Scala tun. Die Scala-JSON-Parser möchten im Allgemeinen, dass Sie den JSON in einen bereits definierten Scalatyp einteilen, normalerweise eine case class. Sie arbeiten nicht mit diesem Modell.

Eine Lösung besteht darin, die Regel für ein Schema/ein Thema beizubehalten, aber ein wenig zu schummeln: Ein Objekt in ein Objekt einbinden. Ein typisches Beispiel wäre ein Action Objekt, bei dem Sie eine Kopfzeile haben, die die Aktion beschreibt, und ein Payload-Objekt mit einem Schema, das vom Aktionstyp abhängt, der in der Kopfzeile aufgeführt ist. Stellen Sie sich dieses pseudo-Schema:

{name: "Action", fields: [ 
    {name: "actionType", type: "string"}, 
    {name: "actionObject", type: "string"} 
]} 

Auf diese Weise sogar in einem stark typisierte Sprache, Sie so etwas wie die folgenden (auch dies ist Pseudo-Code) tun können:

action = JSONParser[Action].parse(msg) 
switch(action.actionType) { 
    case "foo" => var foo = JSONParser[Foo].parse(action.actionObject) 
    case "bar" => var bar = JSONParser[Bar].parse(action.actionObject) 
} 

Einer der Anständige Dinge über diesen Ansatz ist, dass, wenn Sie einen Verbraucher haben, der nur auf eine bestimmte action.actionType wartet, und nur alle anderen ignorieren wird, ist es ziemlich leicht für sie zu entschlüsseln nur die Kopfzeile und schieben Sie die Entschlüsselung action.actionObject bis wann und wenn es wird gebraucht.

Bisher ging es um string-codierte Daten. Wenn Sie mit binären Daten arbeiten möchten, können Sie sie natürlich auch in JSON oder eine beliebige Anzahl von stringbasierten Codierungen wie XML einbinden. Aber es gibt auch eine Reihe von binären Kodierungssystemen wie Thrift und Avro. Tatsächlich basiert das Pseudo-Schema oben auf Avro. Sie können sogar coole Dinge in Avro wie Schema-Evolution tun, die unter anderem eine sehr glatte Möglichkeit bietet, den obigen Anwendungsfall zu handhaben - anstatt ein Objekt in ein Objekt zu wickeln, können Sie ein Schema definieren, das eine Untermenge von anderen ist Schemas und entziffern nur die Felder, die Sie wollen, in diesem Fall nur das Feld action.actionType. Hier ist eine wirklich hervorragende Beschreibung von schema evolution.

Auf den Punkt gebracht, was ich empfehlen ist:

  1. Settle auf einem schemabasierten Codiersystem (sei es JSON, XML, Avro, was auch immer)
  2. erzwingen ein Schema pro Thema Regel
+0

Dank David für eine solche klare Erklärung. Ich bin neu bei Kafka. Ich überprüfe die Avro-Schemas. – Ratha

+0

Kann ich das Schema aus den empfangenen Bytes abrufen? Weil ich Hunderte von Themen aus dem Empfänger-Thread habe, wie identifiziere ich welches Schema anzuwenden? – Ratha

+1

Nein, Sie können ein Schema nicht direkt aus den empfangenen Bytes abrufen - es enthält das Schema nicht. Der "übliche" Weg für Avro ist das Speichern des Schemas in der Datei mit den codierten Datensätzen, aber das funktioniert nicht in einer Streaming-Umgebung. Stattdessen verwende ich, wie oben erwähnt, ein separates Thema nur für Avro-Schemas. Das erste, was ein Konsument in meinem System tut, ist, von Anfang an auf dieses Thema zu hören und sich auf diese Weise mit den entsprechenden Schemata zu konfigurieren. –