2016-04-29 6 views
0

Ich versuche, ein einfaches Re-Sequence-Programm mit Apache Camel auszuführen. Dieses Programm verwendet Java-DSL, um eingehende Java-Nachrichten neu zu sortieren. Wenn ich dieses Programm ausführe, werden die Nachrichten in den Ordner geschrieben, aber scheinen nicht in einer bestimmten Reihenfolge basierend auf dem Header-Wert oder der alphabetischen Reihenfolge des einzelnen Worts im Nachrichtentext zu sein. Die Dateien, die Camel erstellt, sind noch nicht in Ordnung, als ob die resequence DSL-Funktion nichts getan hätte.Apache Camel Resequence Programm funktioniert nicht

Wie bekomme ich dieses Programm, um die Nachrichten tatsächlich zu bestellen, wie die Arrays.sort() Methode tun würde? Wie kann ich dieses Programm dazu bringen, die Nachrichten neu zu sortieren und dann in der korrekten Sortierreihenfolge zu einer einzigen Datei zusammenzufassen?

Hier ist das Programm ... Ich rufe die Haupt Camel Route über die andere Klasse, die die main Methode hat.

import org.apache.camel.builder.RouteBuilder; 

public class SortThoseMessages extends RouteBuilder { 

    @Override 
    public void configure() throws Exception { 

     from("direct:pointA") 
       .resequence(header("grocery")) 

       .to("file:target/pointB"); 
    } 

} 

Die Klasse unten hat main und erzeugt die Nachrichten in der Warteschlange, pointA.

import org.apache.camel.CamelContext; 
import org.apache.camel.impl.DefaultCamelContext; 
import org.apache.camel.ProducerTemplate; 

public class NewSequenceMain { 

    public static void main(String[] args) { 
     CamelContext c = new DefaultCamelContext(); 

     try { 

     c.addRoutes(new SortThoseMessages()); 



     ProducerTemplate template = c.createProducerTemplate(); 

     c.start(); 

     template.sendBodyAndHeader("direct:pointA", "apple", "grocery", 1); 
     template.sendBodyAndHeader("direct:pointA", "orange", "grocery", 3); 
     template.sendBodyAndHeader("direct:pointA", "bannanna", "grocery", 2); 

     Thread.sleep(5000); 
     c.stop(); 

     } catch(Exception ex) { 
      System.err.println("Exception thrown -> " + ex); 
      System.err.println("Now printing stacktrace..."); 
      ex.printStackTrace(); 
     } 

    } 

} 
+0

Was meinen Sie mit "die Dateien sind außer Betrieb"? Sind die Zeitstempel der Dateien inkorrekt, so dass es bisher keine Sortierung gab? Vielleicht könnten Sie vor und nach der resequence() einen Logger in die Wurzel einfügen, um die Reihenfolge der Nachrichten zu sehen. – Frank

+0

Hallo @Frank, wenn ich sage, dass sie außer Betrieb sind, meine ich, dass die Dateien, die Camel erstellt, in der Reihenfolge sind, in der die Nachrichten mit dem ProducerTemplate erstellt wurden. Die Dateien enden in Zahlen - 1 für die Apple-Nachricht, 3 für die orange Nachricht und 5 für die Bananen-Nachricht. Vielleicht wäre es hilfreich, irgendwann Logger zu benutzen, wie du es beschreibst. Danke für die Hilfe Frank. :) Es ist verwirrend ... –

+0

@Frank Nach dem Hinzufügen in einem 'log' in der Java-DSL, wie Sie vorgeschlagen, sehe ich, dass möglicherweise nach den Breadcrumbs in der Kopfzeile, dass die Nachrichten alphabetisch sortiert oder sortiert wurden. Ich werde versuchen, es zu testen, indem ich alle Nachrichtenkörper zu einer Datei zusammenfasse, so dass die Sortierung offensichtlicher ist. : D –

Antwort

0

Die Nachrichten können innerhalb der Camel-Route neu sequenziert werden, aber nicht, wenn sie in die Datei geschrieben werden. Um die Re-Sequenz zu sehen, rufen Sie das Java DSL aggregator auf, um die Nachrichtentexte in der angegebenen Reihenfolge zu sehen. Die Nachrichten im gebuchten Programm sind nach der Nummer der Kopfzeile geordnet. Der Methodenaufruf auf der Objektreferenz ProducerTemplate setzt den Header als Ganzzahl im letzten Argument des Methodenaufrufs sendBodyAndHeader().

Um die Resequenzierung wirksam in einer einzigen Datei als Ziel der Camel Route siehe unten das Beispiel Besuche:

import org.apache.camel.builder.RouteBuilder; 


public class ReorganizingMessages extends RouteBuilder { 

    @Override 
    public void configure() throws Exception { 

     from("direct:pointA") 
       .resequence(header("grocery")) 
       .to("log://org.apache.camel.howto?showAll=true&multiline=true") 

       .aggregate().constant(true).completionTimeout(100L). 
       aggregationStrategy(new StringAggregator()) 
       .to("file:target/pointB"); 
    } 

} 

Der obige Code verwendet eine benutzerdefinierte Aggregator Java Bean, die gesehen werden kann unten.

import org.apache.camel.Exchange; 
import org.apache.camel.processor.aggregate.AggregationStrategy; 

public class StringAggregator implements AggregationStrategy { 

    @Override 
    public Exchange aggregate(Exchange old, Exchange new1) { 
     if (old == null) { 
      return new1; 
     } 

     String oldBody = old.getIn().getBody(String.class); 
     String newBody = new1.getIn().getBody(String.class); 
     old.getIn().setBody(oldBody + " " + newBody); 
     return old; 

    } 

}