2012-07-15 16 views
6

Ich begann mit Kafka zu spielen. Ich habe eine Tierpfleger-Konfiguration eingerichtet, und ich konnte String-Nachrichten senden und konsumieren. Jetzt versuche ich ein Objekt (in Java) zu übergeben, aber aus irgendeinem Grund, wenn ich die Nachricht in den Verbraucher Parsing habe ich Header-Probleme. Ich habe mehrere Serialisierungsoptionen (mit Decoder/Encoder) versucht, und alle geben das gleiche Header-Problem zurück.Kafka Serialisierung eines Objekts

Hier ist mein Code Der Hersteller:

 Properties props = new Properties(); 
     props.put("zk.connect", "localhost:2181"); 
     props.put("serializer.class", "com.inneractive.reporter.kafka.EventsDataSerializer"); 
     ProducerConfig config = new ProducerConfig(props); 
     Producer<Long, EventDetails> producer = new Producer<Long, EventDetails>(config); 
     ProducerData<Long, EventDetails> data = new ProducerData<Long, EventDetails>("test3", 1, Arrays.asList(new EventDetails()); 
     try { 
      producer.send(data); 
     } finally { 
      producer.close(); 
     } 

Und der Verbraucher:

 Properties props = new Properties(); 
     props.put("zk.connect", "localhost:2181"); 
     props.put("zk.connectiontimeout.ms", "1000000"); 
     props.put("groupid", "test_group"); 

     // Create the connection to the cluster 
     ConsumerConfig consumerConfig = new ConsumerConfig(props); 
     ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); 

     // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume 
     Map<String, List<KafkaMessageStream<EventDetails>>> topicMessageStreams = 
       consumerConnector.createMessageStreams(ImmutableMap.of("test3", 4), new EventsDataSerializer()); 
     List<KafkaMessageStream<EventDetails>> streams = topicMessageStreams.get("test3"); 

     // create list of 4 threads to consume from each of the partitions 
     ExecutorService executor = Executors.newFixedThreadPool(4); 

     // consume the messages in the threads 
     for (final KafkaMessageStream<EventDetails> stream: streams) { 
      executor.submit(new Runnable() { 
       public void run() { 
        for(EventDetails event: stream) { 
         System.err.println("********** Got message" + event.toString());   
        } 
       } 
      }); 
     } 

und meine Serializer:

public class EventsDataSerializer implements Encoder<EventDetails>, Decoder<EventDetails> { 
    public Message toMessage(EventDetails eventDetails) { 
     try { 
      ObjectMapper mapper = new ObjectMapper(new SmileFactory()); 
      byte[] serialized = mapper.writeValueAsBytes(eventDetails); 
      return new Message(serialized); 
} catch (IOException e) { 
      e.printStackTrace(); 
      return null; // TODO 
     } 
} 
    public EventDetails toEvent(Message message) { 
     EventDetails event = new EventDetails(); 

     ObjectMapper mapper = new ObjectMapper(new SmileFactory()); 
     try { 
      //TODO handle error 
      return mapper.readValue(message.payload().array(), EventDetails.class); 
     } catch (IOException e) { 
      e.printStackTrace(); 
      return null; 
     } 

    } 
} 

Und das ist der Fehler, den ich bekommen:

org.codehaus.jackson.JsonParseException: Input does not start with Smile format header (first byte = 0x0) and parser has REQUIRE_HEADER enabled: can not parse 
at [Source: N/A; line: -1, column: -1] 

Als ich mit MessagePack arbeitete und mit einfachem Schreiben zu einem ObjectOutputStream bekam ich ein ähnliches Header-Problem. Ich habe auch versucht, der Nachricht die Payload CRC32 hinzuzufügen, aber das half auch nicht.

Was mache ich hier falsch?

Antwort

1

Bytebuffers .array() Methode ist nicht sehr zuverlässig. Es hängt von der jeweiligen Implementierung ab. Sie könnten

ByteBuffer bb = message.payload() 

byte[] b = new byte[bb.remaining()] 
bb.get(b, 0, b.length); 
return mapper.readValue(b, EventDetails.class) 
+0

Danke, das hat ein sehr ähnliches Problem gelöst, das ich hatte! – Jarmex

3

Hm, ich habe nicht laufen in die gleiche Header-Ausgabe versuchen wollen, die Sie stoßen aber mein Projekt nicht korrekt kompilieren, wenn ich nicht einen VerifiableProperties Konstruktor in meinem Encoder/Decoder vorsah . Es ist seltsam, dass der fehlende Konstruktor Jacksons Deserialisierung korrumpieren würde.

Vielleicht versuchen Sie, Ihren Encoder und Decoder aufzuteilen und den Konstruktor VerifiableProperties in beide zu integrieren; Sie sollten Decoder[T] für die Serialisierung nicht implementieren müssen. Ich konnte Json de/Serialisierung mit ObjectMapper nach dem Format in this post erfolgreich implementieren.

Viel Glück!