2016-04-18 5 views
0

Basierend auf Avro-Schema habe ich eine Klasse (Data) erzeugt, um mit der Klasse für das Schema zu arbeiten Danach kodiere ich die Daten und sende in andere Anwendung "A" mit kafkaAvro mit Kafka - Deserialisieren mit wechselndem Schema

Data data; // <- The object was initialized before . Here it is only the declaration "for example" 
EncoderFactory encoderFactory = EncoderFactory.get(); 
     ByteArrayOutputStream out = new ByteArrayOutputStream(); 
     BinaryEncoder encoder = encoderFactory. directBinaryEncoder(out, null);      
     DatumWriter<Tloog> writer;     
     writer = new SpecificDatumWriter<Data>(Data.class); 
     writer.write(data, encoder); 
     byte[] avroByteMessage = out.toByteArray(); 

auf der anderen Seite (in der Anwendung „A“) habe ich die Daten deserilize von Deserializer Umsetzung

class DataDeserializer implements Deserializer<Data> { 
    private String encoding = "UTF8"; 

    @Override 
    public void configure(Map<String, ?> configs, boolean isKey) { 
     // nothing to do 
    } 

    @Override 
    public Tloog deserialize(String topic, byte[] data) { 
     try { 
      if (data == null) 
      { 
       return null; 
      } 
      else 
      { 
         DatumReader<Tloog> reader = new SpecificDatumReader<Data>(Data.class); 
         DecoderFactory decoderFactory = DecoderFactory.get(); 
         BinaryDecoder decoder = decoderFactory.binaryDecoder(data, null); 
         Data decoded = reader.read(null, decoder); 
         return decoded; 
      } 
     } catch (Exception e) { 
      throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding); 
     } 
    } 

Das Problem ist, dass dieser Ansatz die Verwendung von SpecificDatumReader erfordert, sollte Iethe Datenklasse mit dem integriert werden Anwendungscode ... Diese problematisch sein könnte - Schema ändern könnte und sollte daher Datenklasse wird neu generiert und integrierte nochmals 2 Fragen:

  1. Soll ich GenericDatumReader in der Anwendung verwenden? Wie man das richtig macht. (Ich kann das Schema einfach in der Anwendung speichern)
  2. Gibt es eine einfache Möglichkeit, mit SpecificDatumReader zu arbeiten, wenn sich Daten ändern? Wie könnte es ohne viel Ärger integriert werden?

Dank

Antwort

1

Ich benutze GenericDatumReader - na ja, leite ich daraus meine Leser-Klasse, aber Sie erhalten den Punkt. Um es zu benutzen, behalte ich meine Schemata in einem speziellen Kafka-Thema - Schema überraschend genug. Verbraucher und Produzenten lesen beide beim Start von diesem Thema und konfigurieren ihre jeweiligen Parser.

Wenn Sie es so machen, können Sie Ihre Kunden und Hersteller sogar veranlassen, ihre Schemas im laufenden Betrieb zu aktualisieren, ohne sie neu starten zu müssen. Das war ein Designziel für mich - ich wollte meine Anwendungen nicht neu starten müssen, um Schemas hinzuzufügen oder zu ändern. Warum funktioniert SpecificDatumReader nicht für mich, und ehrlich gesagt, warum ich Avro an erster Stelle anstelle von etwas wie Thrift verwenden.

aktualisieren

Der normale Weg Avro zu tun ist, um das Schema in der Datei mit den Datensätzen zu speichern. Ich tue es nicht so, hauptsächlich weil ich es nicht kann. Ich verwende Kafka, also kann ich die Schemas nicht direkt mit den Daten speichern - ich muss die Schemas in einem separaten Thema speichern.

Die Art, wie ich es tue, zuerst lade ich alle meine Schemas. Sie können sie aus einer Textdatei lesen; aber wie ich schon sagte, lese ich sie von einem Kafka Thema. Nachdem ich sie von Kafka lesen, habe ich ein Array wie folgt:

val schemaArray: Array[String] = Array(
    """{"name":"MyObj","type":"record","fields":[...]}""", 
    """{"name":"MyOtherObj","type":"record","fields":[...]}""" 
) 

BTW für die Scala Apologize, aber es ist das, was ich bekam.

Auf jeden Fall, dann müssen Sie einen Parser, und foreach Schema erstellen, analysieren sie und erstellen Leser und Autoren, und sie Karten speichern off:

val parser = new Schema.Parser() 
val schemas = Map(schemaArray.map{s => parser.parse(s)}.map(s => (s.getName, s)):_*) 
val readers = schemas.map(s => (s._1, new GenericDatumReader[GenericRecord](s._2))) 
val writers = schemas.map(s => (s._1, new GenericDatumWriter[GenericRecord](s._2))) 
var decoder: BinaryDecoder = null 

ich all das tun, bevor ich analysieren ein tatsächlicher Datensatz - das ist nur um den Parser zu konfigurieren. Dann, um einen individuellen Datensatz zu dekodieren, würde ich tun:

val byteArray: Array[Byte] = ... // <-- Avro encoded record 
val schemaName: String = ... // <-- name of the Avro schema 

val reader = readers.get(schemaName).get 

decoder = DecoderFactory.get.binaryDecoder(byteArray, decoder) 
val record = reader.read(null, decoder) 
+0

Nur um dies hier [Confluent Schema Registry] (http://docs.confluent.io/1.0.1/schema-registry/docs/index.html). – vlahmot

+0

Ich schaute auf Schema Registry - es scheint sehr merkwürdig, dass Sie eine RESTful-Schnittstelle auf eine Kafka-Back-End-Architektur zerschlagen würden. Warum lassen Sie Ihre Kunden nicht einfach direkt mit Ihrem Schema-Stream interagieren? Es ist, als würde man mit einem Pferdegespann ein Autofahrgestell ziehen. Für einen Anwendungsfall wie diesen, in dem wir bereits Kafka-Streams konsumieren, möchten Sie jetzt keine RESTful-Aufrufe durchführen, um Ihre Schemas abzuholen. –

+0

Ich mochte es für die automatische Schema-Evolution und den Schutz vor Datenkorruption. Die Tatsache, dass nur ein Verweis auf das Schema und nicht das vollständige Schema mit jedem Datenpunkt gespeichert wird, ist ebenfalls gut. Das Hinzufügen eines Webaufrufs zum Abrufen der Schemas war für uns kein Problem. – vlahmot