2014-09-08 3 views
5

Ich habe kürzlich festgestellt, dass Camel jetzt eine eigene Komponente für Kafka hat, also habe ich beschlossen, es zu drehen.Kamel Kafka Integration

beschloss ich, eine schöne einfache Datei, um zu versuchen -> kafka Thema wie folgt ...

<route> 
     <from uri="file:///tmp/input" /> 
     <setHeader headerName="kafka.PARTITION_KEY"> 
      <constant>Test</constant> 
     </setHeader> 
     <to uri="kafka:localhost:9092?topic=test&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;groupId=group1" /> 
</route> 

Dies scheint einfach genug, dies jedoch auf laufend ich ...

java.lang.ClassCastException: java.lang.String cannot be cast to [B 
    at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34) 
    at org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:78) 

und der die Camel-Code auf die Kontrolle, es wird der folgende ...

String msg = exchange.getIn().getBody(String.class); 
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, partitionKey.toString(), msg); 
producer.send(data); 

Offensichtlich ist dies eine Serialisierung Problem ist, ich bin nur nicht sicher, ob es eine Problemumgehung gibt, oder das ist inhärent ein Fehler mit der vorhandenen Implementierung? (Oder hoffentlich nur mein Missverständnis)

Irgendwelche Vorschläge? Danke,

Antwort

10

Ah, vergiss es nicht, wir gehen ... Hoffe, das hilft jemand anderem, Sie müssen den Serialisierer in den Optionen einstellen.

<route> 
      <from uri="file:///tmp/input" /> 
      <setHeader headerName="kafka.PARTITION_KEY"> 
       <constant>Test</constant> 
      </setHeader> 
      <to uri="kafka:localhost:9092?topic=test&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;groupId=group1&amp;serializerClass=kafka.serializer.StringEncoder" /> 
</route> 
0

Gefunden ein schönes Beispiel für die Installation und Apache Kafka beginnt und ein Kamel Endpunkt Konfiguration zu Kafka zum Senden Nachricht themen-

@Override 
    public void configure() throws Exception { 

     String topicName = "topic=javainuse-topic"; 
     String kafkaServer = "kafka:localhost:9092"; 
     String zooKeeperHost = "zookeeperHost=localhost&zookeeperPort=2181"; 
     String serializerClass = "serializerClass=kafka.serializer.StringEncoder"; 

     String toKafka = new StringBuilder().append(kafkaServer).append("?").append(topicName).append("&") 
       .append(zooKeeperHost).append("&").append(serializerClass).toString(); 

     from("file:C:/inbox?noop=true").split().tokenize("\n").to(toKafka); 
    } 

Reference- Apache Camel + Kafka Integration example