2016-04-26 3 views
2

Ich verwende die konfluente Plattform, 0.9.0.1 und kafka-avro-serializer 2.0.1. Versucht, Ereignisse an Kafka zu senden und sie zurückzulesen, sehe ich nicht, wie man Ereignisse in Java-Objekte zurückbringt. Ich habe die Avro- und Confluent-Dokumente gelesen, und es gibt Hinweise, dass dies machbar ist, aber ich kann kein gutes Beispiel finden. Hier ist mein Code, ich bekomme einen GenericData $ Record zurück, wenn ich ihn mit dem Kafka Consumer lese, meine Frage ist, wie man das wieder in einen Java Pojo bringt. Ich habe diesen bit Code gefunden, den ich verwendet habe, um das Objekt zu serialisieren.Konvertieren von Java nach Avro und zurück bei Verwendung von Kafka

Hier ist mein Code:

import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericDatumReader; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.avro.io.DecoderFactory; 
import org.apache.avro.io.EncoderFactory; 
import org.apache.avro.reflect.ReflectData; 
import org.apache.avro.reflect.ReflectDatumWriter; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.Logger; 
import org.joda.time.DateTime; 
import org.junit.Test; 

import java.io.ByteArrayOutputStream; 
import java.util.Collections; 
import java.util.Properties; 

/** 
* This is a test... 
*/ 
public class KafkaAvroProducerTest { 
    private static final Logger log = LogManager.getLogger(KafkaAvroProducerTest.class); 

    @Test 
    public void produceAndSendAndEvent() throws Exception { 
     Properties props = new Properties(); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
       org.apache.kafka.common.serialization.StringSerializer.class); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
       io.confluent.kafka.serializers.KafkaAvroSerializer.class); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     props.put("schema.registry.url", "http://localhost:8081"); 
     KafkaProducer producer = new KafkaProducer(props); 

     log.debug("starting producer"); 
     String topic = "topic11"; 
     Schema schema = ReflectData.get().getSchema(Purchase.class); 
     Purchase purchase = new Purchase("appStore", 9.99d, DateTime.now().getMillis(), "BRXh2lf9wm"); 

     ReflectDatumWriter<Purchase> reflectDatumWriter = new ReflectDatumWriter<>(schema); 
     GenericDatumReader<Object> genericRecordReader = new GenericDatumReader<>(schema); 
     ByteArrayOutputStream bytes = new ByteArrayOutputStream(); 
     reflectDatumWriter.write(purchase, EncoderFactory.get().directBinaryEncoder(bytes, null)); 
     GenericRecord avroRecord = (GenericRecord) genericRecordReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null)); 
     ProducerRecord record = new ProducerRecord<Object, Object>(topic, avroRecord); 

     Thread producerThread = new Thread(() -> { 
      try { 
       while(true) { 
        log.debug("send a message {}", record); 
        producer.send(record); 
        Thread.sleep(2000); 
       } 
      }catch(Exception ex) { 
       log.error("error", ex); 
      } 
     }); 
     producerThread.start(); 

     props = new Properties(); 
     props.put("bootstrap.servers", "localhost:9092"); 
     props.put("group.id", "testGroup"); 
     props.put("auto.commit.enable", "false"); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); 
     props.put("schema.registry.url", "http://localhost:8081"); 
     org.apache.kafka.clients.consumer.KafkaConsumer<String, GenericRecord> kafkaConsumer = new KafkaConsumer(props); 
     kafkaConsumer.subscribe(Collections.singletonList(topic)); 

     Thread consumerThread = new Thread(() -> { 
      try { 
       while(true) { 
        try { 
         ConsumerRecords<String, GenericRecord> records = kafkaConsumer.poll(1000); 
         for (ConsumerRecord<String, GenericRecord> record1 : records) {// 
          log.debug("read - {}", record1.value().getClass()); 
         } 
        }catch(Exception ex) { 
         log.error("error", ex); 
        } 
       } 
      }catch(Exception ex) { 
       log.error("error", ex); 
      } 
     }); 
     consumerThread.start(); 
     System.in.read(); 
    } 
} 

Antwort

2

ich nie Avro verwenden, aber bei https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.html suchen, warum Sie einfach ... manuell POJO bevölkern nicht können sicher

class MyPojo { 
    public int v1; 
    public String v2; 
} 

// copied from your example code 
ConsumerRecords<String, GenericRecord> records = kafkaConsumer.poll(1000); 
for (ConsumerRecord<String, GenericRecord> record1 : records) { 
    GenericRecord avroRecord = record1.value(); 
    MyPojo pojo = new MyPojo(); 
    pojo.v1 = (Integer)avroRecord.get("<fieldname1>"); 
    pojo.v2 = (String)avroRecord.get("<fieldname2>"); 

    // process current pojo 
} 

Nicht, wenn dies sinnvoll ist . Wenn das funktioniert, würde ich es in einen Konstruktor MyPojo(GenericRecord) verschieben.

+1

Danke! Wahrscheinlich wird es funktionieren. Ich glaube, ich dachte fälschlicherweise, Avro wäre wie JSON, weil man Pojo einfach serialisieren und deserialisieren kann. Aber alle Avro-Beispiele, die ich sehe, verwenden entweder das Schema zusammen mit einem Maven-Plugin, um die Klasse zu generieren (was eine Unterklasse von SpecificRecordBase ist), oder sie verwenden GenericRecord und füllen die Felder manuell aus, wie Sie es anzeigen. Ich hatte gehofft, nur ein Objekt meiner eigenen zu verwenden, um zu serialisieren, aber es scheint, dass es nicht möglich ist. Wir werden wahrscheinlich die Schema-und Code-gen-Route gehen, die bei der Aufrechterhaltung der Kompatibilität hilft (Sie können das Schema ändern, wie sich Ihre Klasse ändert). –

+0

Wie hoch ist der Ausführungsaufwand beim Konvertieren jeder Nachricht? Passiert es nicht zweimal? – aasthetic

+0

Nicht sicher, ob ich folgen kann. Es ist nur eine Umwandlung von 'GenericRecord' in einen POJO-Typ. Was denkst du passiert zweimal? –