2016-08-02 27 views
3

Ich versuche AVRO Daten aus einem Kafka Thema mit Flink 1.0.3 zu lesen.Wie Kafka Nachrichten mit Avro und Flink zu entschlüsseln

Ich weiß nur, dass dieses spezielle Kafka-Thema AVRO codierte Nachricht hat und ich habe die AVRO-Schemadatei.

My Flink code:

public static void main(String[] args) throws Exception { 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     Properties properties = new Properties(); 
     properties.setProperty("bootstrap.servers", "dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092"); 
     properties.setProperty("zookeeper.connect", "dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181"); 
     properties.setProperty("group.id", "Zeeshantest"); 
     AvroDeserializationSchema<Event> avroSchema = new AvroDeserializationSchema<>(Event.class); 
     FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties); 
     DataStream<Event> messageStream = env.addSource(kafkaConsumer); 
     messageStream.rebalance().print(); 
     env.execute("Flink AVRO KAFKA Test"); 
    } 

Ich habe meine Event.java Datei mit den Avro-Tools und das Schema "rocana.avsc" erstellt

java -jar /path/to/avro-tools-1.8.1.jar compile schema rocana.avsc 

Hier ist rocana.avsc Datei in Github hochgeladen.

AvroDeserializationSchema.java

import org.apache.avro.io.BinaryDecoder; 
import org.apache.avro.io.DatumReader; 
import org.apache.avro.io.DecoderFactory; 
import org.apache.avro.reflect.ReflectDatumReader; 
import org.apache.avro.specific.SpecificDatumReader; 
import org.apache.flink.api.common.typeinfo.TypeInformation; 
import org.apache.flink.api.java.typeutils.TypeExtractor; 
import org.apache.flink.streaming.util.serialization.DeserializationSchema; 

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> { 

    private static final long serialVersionUID = 4330538776656642778L; 

    private final Class<T> avroType; 
    private transient DatumReader<T> reader; 
    private transient BinaryDecoder decoder; 

    public AvroDeserializationSchema(Class<T> avroType) { 
     this.avroType = avroType; 
    } 

    @Override 
    public T deserialize(byte[] message) { 
     ensureInitialized(); 
     try { 
      decoder = DecoderFactory.get().binaryDecoder(message, decoder); 
      return reader.read(null, decoder); 
     } catch (Exception e) { 
      throw new RuntimeException(e); 
     } 
    } 

    @Override 
    public boolean isEndOfStream(T nextElement) { 
     return false; 
    } 

    @Override 
    public TypeInformation<T> getProducedType() { 
     return TypeExtractor.getForClass(avroType); 
    } 

    private void ensureInitialized() { 
     if (reader == null) { 
      if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) { 
       reader = new SpecificDatumReader<T>(avroType); 
      } else { 
       reader = new ReflectDatumReader<T>(avroType); 
      } 
     } 
    } 
} 

Auf meinem Programm läuft ich die folgende Störung erhalte:

17:25:30,759 INFO org.apache.zookeeper.ZooKeeper        - Session: 0x356350cb9001857 closed 
17:25:30,759 INFO org.apache.zookeeper.ClientCnxn        - EventThread shut down 
17:25:30,761 INFO org.apache.flink.runtime.taskmanager.Task      - Source: Custom Source (3/4) switched to FAILED with exception. 
java.lang.Exception: 2 
    at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222) 
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316) 
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) 
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) 
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) 
    at java.lang.Thread.run(Unknown Source) 
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 
    at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402) 
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290) 
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) 
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) 
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) 
    at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230) 
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) 
    at com.google.cloud.dataflow.sdk.coders.AvroCoder.decode(AvroCoder.java:274) 
    at org.fmr.flink.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:52) 
    at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:39) 
    at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:657) 
17:25:30,769 INFO org.apache.flink.runtime.taskmanager.Task      - Freeing task resources for Source: Custom Source (3/4) 
17:25:30,776 INFO org.apache.flink.runtime.taskmanager.Task      - Sink: Unnamed (1/4) switched 

Ich glaube, mein deserialize Code nicht korrekt ist. Weiß jemand, was ich falsch mache? Ist das der Weg, AVRO-Daten von Kafka mit Flink zu lesen oder gibt es einen besseren Weg?

+0

Sind Sie sicher, dass die Daten in Kafka mit genau der gleichen Version des Schemas codiert wurden? Durch das Erstellen eines 'neuen SpecificDatumReaders (avroType)' sagen Sie dem Datumsleser, dass 'avroType's Schema sowohl das Leser- als auch das Schreiberschema ist, und ich glaube, Sie können diese Art von Ausnahmen erhalten, wenn es tatsächlich eine andere Version des Schema wurde ursprünglich zum Verschlüsseln der Nachrichten verwendet. – Josh

+0

Ja, die Schemadatei ist die richtige, ich habe die gleiche 'Schemadatei' und' kafka topic' in 'logstash' verwendet und es funktionierte einwandfrei. – Zeeshan

Antwort

0

Was auch immer Code, den ich nach meiner Frage gestellt perfekt funktioniert gut.

Problem wurde mit den Daten an kafka Thema gesendet, beide JSON und AVRO-formatierte Daten wurden dorthin gesendet. Ich habe ein anderes Kafka-Thema abonniert, wo die Daten nur in AVRO waren und mein Code gut funktionierte.

2

den Code unten Versuchen Sie, den Avro Datensatz deserialisieren:

Schema a; //Your Avro schema 
DatumReader<GenericData.Record> reader = new GenericDatumReader<GenericData.Record>(a); 
GenericData.Record a = reader.read(null, DecoderFactory.get().binaryDecoder(bytes, null)); 
+0

Nein, funktioniert nicht. Derselbe Fehler. Schema erstellt mit: 'Schema schema = new Schema.Parser(). Parse (neue Datei (" ./ src/main/resources/rocana.avsc "));' – Zeeshan

+0

Dies sollte funktionieren, wenn Daten nur Avro-codiert sind. Können Sie überprüfen, wie Sie Daten in Kafka einlesen? Wenn Sie eine Library benutzen, lesen Sie bitte den Encoder von selbe. – Garry

+3

Problem wurde mit den Daten an Kafka Thema gesendet, beide JSON und AVRO formatierte Daten wurden dort gesendet. Ich habe ein anderes Kafka-Thema abonniert, wo die Daten nur in AVRO waren und mein Code gut funktionierte. – Zeeshan