Ich versuche, eine Kafka-Nachricht zu analysieren, es ist in einem verschlüsselten AVRO-Format. Ich habe folgendes AvroSchema.avsc avro Schemadatei:Entschlüsseln Kafka Avro Nachricht
{
"type": "record",
"namespace": "kafka.events",
"name": "AvroSchema",
"fields": [
{ "name": "product_id", "type": "string" },
{ "name": "available_to_promise_quantity", "type": "double" },
{ "name": "online_available_to_promise_quantity", "type": "double" },
{ "name": "stores_available_to_promise_quantity", "type": "double" },
{ "name": "is_infinite_inventory", "type": "boolean", "default" : false },
{ "name": "event_timestamp", "type": "long" },
{ "name": "previous_event", "type": "AvroSchema" }
]
}
Jetzt habe ich den folgenden Code geschrieben, um die Daten im JSON-Format zu erhalten:
for (final KafkaStream<byte[], byte[]> stream : streams){
ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator();
byte[] consumedEncryptedMessage;
MessageAndMetadata<byte[], byte[]> consumedEntry;
while(consumerIterator.hasNext()){
consumedEntry = consumerIterator.next();
if(null != consumedEntry){
consumedEncryptedMessage = consumedEntry.message();
try {
Schema schema = null;
schema = new Schema.Parser().parse(new File("src/AvroSchema.avsc"));
DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(consumedEncryptedMessage , null);
GenericRecord decryptedmsg = null;
decryptedmsg = reader.read(null, decoder);
System.out.println(decryptedmsg);
}
catch(Exception e) {
e.printStackTrace();
System.out.println(e);
}
Bitte helfen Sie mir, wie Sie die Nachricht entschlüsseln.
Die verschlüsselte Byte-Nachricht ist von dieser Art: 080-21-0001 :�Aw�@@��A�ǐ�U :�Aw�@@��A
ich die Änderungen wie vorgeschlagen, und jetzt habe ich folgende Codestück:
while(consumerIterator.hasNext()){
consumedEntry = consumerIterator.next();
if(null != consumedEntry){
consumedEncryptedMessage = consumedEntry.message();
try {
Schema schema = new Schema.Parser().parse(new File("src/AVROSchema.avsc"));
File myfile = new File("/Users/z001ldc/Desktop/myfile.txt");
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
FileUtils.writeByteArrayToFile(myfile, consumedEncryptedMessage);
@SuppressWarnings("resource")
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(myfile, reader);
while (dataFileReader.hasNext()) {
decryptedMessage = dataFileReader.next(decryptedMessage);
System.out.println(decryptedMessage.get("product_id").toString());
}
}
catch(Exception e) {
e.printStackTrace();
System.out.println(e);
}
Aber noch bin ich immer den Fehler als " Keine Datei ".
DataFileReader dataFileReader = new DataFileReader (consumedEncryptedMessage, Leser); Diese Zeile zeigt einen Fehler an, da es sich bei consumeredEntryMessage nicht um eine Datei handelt, sondern vom Typ byte []. Soll ich die Nachricht als Datei konsumieren? –
Ohhh .. habe nicht gesehen, dass Sie eine Kette von Ereignissen haben, wie im Schema ATPEvent angegeben. Können Sie dies bitte aus Ihrem Schema zum Testen entfernen und versuchen, System.out.println (result.get ("product_id"). ToString()) auszusprechen. Hoffentlich ist das der Täter, der Rest deines Codes sieht gut aus. – tesnik03
Gibt es eine Möglichkeit, consesuredentrymsg nur in Byte-Array anstelle von Datei zu verwenden, weil ich das nur als Byte-Array bekomme. –