2016-05-03 10 views
0

Ich versuche, eine Kafka-Nachricht zu analysieren, es ist in einem verschlüsselten AVRO-Format. Ich habe folgendes AvroSchema.avsc avro Schemadatei:Entschlüsseln Kafka Avro Nachricht

{ 
    "type": "record", 
    "namespace": "kafka.events", 
    "name": "AvroSchema", 
     "fields": [ 
      { "name": "product_id", "type": "string" }, 
      { "name": "available_to_promise_quantity", "type": "double" }, 
      { "name": "online_available_to_promise_quantity", "type": "double" }, 
      { "name": "stores_available_to_promise_quantity", "type": "double" }, 
      { "name": "is_infinite_inventory", "type": "boolean", "default" : false }, 
      { "name": "event_timestamp", "type": "long" }, 
      { "name": "previous_event", "type": "AvroSchema" } 
     ] 
} 

Jetzt habe ich den folgenden Code geschrieben, um die Daten im JSON-Format zu erhalten:

for (final KafkaStream<byte[], byte[]> stream : streams){ 
    ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator(); 
    byte[] consumedEncryptedMessage; 
    MessageAndMetadata<byte[], byte[]> consumedEntry; 
    while(consumerIterator.hasNext()){ 
     consumedEntry = consumerIterator.next(); 
      if(null != consumedEntry){ 
       consumedEncryptedMessage = consumedEntry.message(); 
        try { 
          Schema schema = null; 
          schema = new Schema.Parser().parse(new File("src/AvroSchema.avsc")); 
          DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema); 
          Decoder decoder = DecoderFactory.get().binaryDecoder(consumedEncryptedMessage , null); 
          GenericRecord decryptedmsg = null; 
          decryptedmsg = reader.read(null, decoder); 
          System.out.println(decryptedmsg); 
         } 
         catch(Exception e) { 
          e.printStackTrace(); 
          System.out.println(e); 
         } 

Bitte helfen Sie mir, wie Sie die Nachricht entschlüsseln.

Die verschlüsselte Byte-Nachricht ist von dieser Art: 080-21-0001 :�Aw�@@��A�ǐ�U :�Aw�@@��A

ich die Änderungen wie vorgeschlagen, und jetzt habe ich folgende Codestück:

while(consumerIterator.hasNext()){ 
    consumedEntry = consumerIterator.next(); 
     if(null != consumedEntry){ 
      consumedEncryptedMessage = consumedEntry.message(); 
       try { 
        Schema schema = new Schema.Parser().parse(new File("src/AVROSchema.avsc")); 
        File myfile = new File("/Users/z001ldc/Desktop/myfile.txt"); 
        DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); 
        FileUtils.writeByteArrayToFile(myfile, consumedEncryptedMessage); 
        @SuppressWarnings("resource") 
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(myfile, reader); 
        while (dataFileReader.hasNext()) { 
         decryptedMessage = dataFileReader.next(decryptedMessage); 
         System.out.println(decryptedMessage.get("product_id").toString()); 
        } 
       } 
       catch(Exception e) { 
        e.printStackTrace(); 
        System.out.println(e); 
       } 

Aber noch bin ich immer den Fehler als " Keine Datei ".

Antwort

0

Deserialisieren erfordern wont Entschlüsselung

Zuerst erhalten Sie Ihr Schema Linie

schema = new Schema.Parser().parse(new File("src/AvroSchema.avsc")); 
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); 

Dann

DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(consumedEncryptedMessage, reader); 
GenericRecord user = null; 
while (dataFileReader.hasNext()) { 
// Reuse user object by passing it to next(). This saves us from 
// allocating and garbage collecting many objects for files with 
// many items. 
user = dataFileReader.next(user); 
System.out.println(user); 
+0

DataFileReader dataFileReader = new DataFileReader (consumedEncryptedMessage, Leser); Diese Zeile zeigt einen Fehler an, da es sich bei consumeredEntryMessage nicht um eine Datei handelt, sondern vom Typ byte []. Soll ich die Nachricht als Datei konsumieren? –

+0

Ohhh .. habe nicht gesehen, dass Sie eine Kette von Ereignissen haben, wie im Schema ATPEvent angegeben. Können Sie dies bitte aus Ihrem Schema zum Testen entfernen und versuchen, System.out.println (result.get ("product_id"). ToString()) auszusprechen. Hoffentlich ist das der Täter, der Rest deines Codes sieht gut aus. – tesnik03

+0

Gibt es eine Möglichkeit, consesuredentrymsg nur in Byte-Array anstelle von Datei zu verwenden, weil ich das nur als Byte-Array bekomme. –