2016-07-14 22 views
1

Ich versuche, Kafka-Nachrichten mit Kamel konsumieren.In Camel-Route, Code-Steuerelement nicht in der Lage, Prozess() Methode gehen

Der Code unten:

public class Main { 

private static CamelContext context = new DefaultCamelContext(); 

public static void main(String[] args) { 
    // TODO Auto-generated method stub 

    try { 
     context.start(); 
     context.addRoutes(new RouteBuilder() { 

      @Override 
      public void configure() throws Exception { 

       System.out.println("Configuring Routes"); 

       from("kafka:172.16.30.5:9093?topic=reddy&groupId=testing&autoOffsetReset=earliest&consumersCount=1")     
       .process(new Processor(){ 

        @Override 
        public void process(Exchange exchange) throws Exception { 


         /*System.out.println("Processing.."); 
         System.out.println("Messages:"); 
         System.out.println(exchange.getIn());*/ 

         String messageKey = ""; 
         if (exchange.getIn() != null) { 
          Message message = exchange.getIn(); 
          Integer partitionId = (Integer) message 
            .getHeader(KafkaConstants.PARTITION); 
          String topicName = (String) message 
            .getHeader(KafkaConstants.TOPIC); 
          if (message.getHeader(KafkaConstants.KEY) != null) 
           messageKey = (String) message 
             .getHeader(KafkaConstants.KEY); 
          Object data = message.getBody(); 


          System.out.println("topicName :: " 
            + topicName + " partitionId :: " 
            + partitionId + " messageKey :: " 
            + messageKey + " message :: " 
            + data + "\n"); 
         } 

         } 

       }); 

      } 
     }); 
    } catch (Exception e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 

} 

} 

OUTPUT:

java -jar camelKafka-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
log4j:WARN No appenders could be found for logger  (org.apache.camel.impl.DefaultCamelContext). 
log4j:WARN Please initialize the log4j system properly. 
Configuring Routes 

Das heißt, die Code-Steuerung nicht in der Prozess Methode überhaupt geht.

+0

Wenn Sie ein Protokoll() nach dem von aber vor dem Prozess() hinzufügen, dass protokolliert wird? –

Antwort