Ich verwende die konfluente Plattform, 0.9.0.1 und kafka-avro-serializer 2.0.1. Versucht, Ereignisse an Kafka zu senden und sie zurückzulesen, sehe ich nicht, wie man Ereignisse in Java-Objekte zurückbringt. Ich habe die Avro- und Confluent-Dokumente gelesen, und es gibt Hinweise, dass dies machbar ist, aber ich kann kein gutes Beispiel finden. Hier ist mein Code, ich bekomme einen GenericData $ Record zurück, wenn ich ihn mit dem Kafka Consumer lese, meine Frage ist, wie man das wieder in einen Java Pojo bringt. Ich habe diesen bit Code gefunden, den ich verwendet habe, um das Objekt zu serialisieren.Konvertieren von Java nach Avro und zurück bei Verwendung von Kafka
Hier ist mein Code:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.joda.time.DateTime;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.util.Collections;
import java.util.Properties;
/**
* This is a test...
*/
public class KafkaAvroProducerTest {
private static final Logger log = LogManager.getLogger(KafkaAvroProducerTest.class);
@Test
public void produceAndSendAndEvent() throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put("schema.registry.url", "http://localhost:8081");
KafkaProducer producer = new KafkaProducer(props);
log.debug("starting producer");
String topic = "topic11";
Schema schema = ReflectData.get().getSchema(Purchase.class);
Purchase purchase = new Purchase("appStore", 9.99d, DateTime.now().getMillis(), "BRXh2lf9wm");
ReflectDatumWriter<Purchase> reflectDatumWriter = new ReflectDatumWriter<>(schema);
GenericDatumReader<Object> genericRecordReader = new GenericDatumReader<>(schema);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
reflectDatumWriter.write(purchase, EncoderFactory.get().directBinaryEncoder(bytes, null));
GenericRecord avroRecord = (GenericRecord) genericRecordReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null));
ProducerRecord record = new ProducerRecord<Object, Object>(topic, avroRecord);
Thread producerThread = new Thread(() -> {
try {
while(true) {
log.debug("send a message {}", record);
producer.send(record);
Thread.sleep(2000);
}
}catch(Exception ex) {
log.error("error", ex);
}
});
producerThread.start();
props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "testGroup");
props.put("auto.commit.enable", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
org.apache.kafka.clients.consumer.KafkaConsumer<String, GenericRecord> kafkaConsumer = new KafkaConsumer(props);
kafkaConsumer.subscribe(Collections.singletonList(topic));
Thread consumerThread = new Thread(() -> {
try {
while(true) {
try {
ConsumerRecords<String, GenericRecord> records = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, GenericRecord> record1 : records) {//
log.debug("read - {}", record1.value().getClass());
}
}catch(Exception ex) {
log.error("error", ex);
}
}
}catch(Exception ex) {
log.error("error", ex);
}
});
consumerThread.start();
System.in.read();
}
}
Danke! Wahrscheinlich wird es funktionieren. Ich glaube, ich dachte fälschlicherweise, Avro wäre wie JSON, weil man Pojo einfach serialisieren und deserialisieren kann. Aber alle Avro-Beispiele, die ich sehe, verwenden entweder das Schema zusammen mit einem Maven-Plugin, um die Klasse zu generieren (was eine Unterklasse von SpecificRecordBase ist), oder sie verwenden GenericRecord und füllen die Felder manuell aus, wie Sie es anzeigen. Ich hatte gehofft, nur ein Objekt meiner eigenen zu verwenden, um zu serialisieren, aber es scheint, dass es nicht möglich ist. Wir werden wahrscheinlich die Schema-und Code-gen-Route gehen, die bei der Aufrechterhaltung der Kompatibilität hilft (Sie können das Schema ändern, wie sich Ihre Klasse ändert). –
Wie hoch ist der Ausführungsaufwand beim Konvertieren jeder Nachricht? Passiert es nicht zweimal? – aasthetic
Nicht sicher, ob ich folgen kann. Es ist nur eine Umwandlung von 'GenericRecord' in einen POJO-Typ. Was denkst du passiert zweimal? –