2016-04-05 6 views
1

ich einige Probleme mit folgendem Weg haben:Apache Camel EIP Route - Wie zu stoppen split()

// from("cxf:....")... 
from("direct:start").process(startRequestProcessor) // STEP 1 
      .choice() 
       .when(body().isNull()) 
         .to("direct:finish") 
       .otherwise() 
        .split(body()) // STEP 2 
        .bean(TypeMapper.class) // STEP 3 
        .log("Goes to DynamicRouter:: routeByTypeHeader with header: ${headers.type}") 
        .recipientList().method(Endpoint1DynamicRouter.class, "routeByTypeHeader") // STEP 4 
        .ignoreInvalidEndpoints(); 

    from("direct:endpoint2") // STEP 6 
      .log("Goes to DynamicRouter::routeByCollectionHeader with header: ${headers.collection}") 
      .recipientList().method(Endpoint2DynamicRouter.class, "routeByCollectionHeader") 
      .ignoreInvalidEndpoints(); 

    from("direct:endpoint1.1") // STEP 5 
      .process(new DateRangeProcessor()) 
      .to("direct:collections"); 

    from("direct:endpoint1.2") // STEP 5 
      .process(new SingleProcessor()) 
      .to("direct:collections"); 


    from("direct:endpoint2.2") // STEP 7 
      .aggregate(header("collection" /** endpoint2.2 */), CollectionAggregationStrategy) 
      .completionSize(exchangeProperty("endpoint22")) 

      .process(new QueryBuilderProcessor()) 
      .bean(MyService, "getDbCriteria") 

      .setHeader("collection", constant("endpoint2.1")) 
      .to("direct:endpoint2.1").end(); 


    from("direct:endpoint2.1") // STEP 8 
      .aggregate(header("collection" /** endpoint2.1 */), CollectionAggregationStrategy) 
      .completionSize(exchangeProperty("CamelSplitSize")) 
      .to("direct:finish").end(); 

    from("direct:finish") 
      .process(new QueryBuilderProcessor()) 
      .bean(MyRepository, "findAll") 
      .log("ResponseData: ${body}"). 
      marshal().json(JsonLibrary.Gson).end(); 

Die Route

  1. erhält JSON-String ein wandelt sie zur Liste (HashSet) von JSON-Objekte.
  2. Teilen Sie die empfangene Liste zu JSON-Objekten.
  3. Set-Header nach Objektinhalt
  4. Routen der Nachrichten nach Header
  5. Konvertieren Nachrichten endpoint1.1 oder endpoint1.2 entsprechenden Kriterien MongoDB und
  6. endpoint2 Routen Nachrichten gemäß einem anderen Header endpoint2 senden Endpunkt2.1 oder Endpunkt2.2.
  7. Endpoint2.2 aggregiert alle empfangenen Nachrichten, verarbeitet sie, um mongodb-Kriterien zu erhalten, und sendet sie an endpoint2.1 (completionSize wird in Schritt 2 berechnet und in der Eigenschaft "endpoint22" gespeichert).
  8. Enpoint2.1 aggregiert ALLE Nachrichten (CamelSplitSize) konvertiert aggregierte Nachrichten in das Query-Objekt und sendet es an das Repository, um die Daten abzurufen.

kann ich gültige Antwort Objekt in Debugger sehen, aber irgendwie bekomme ich einen Fehler:

No message body writer has been found for class java.util.HashSet, ContentType: application/json

Das Problem ist, in Antwort Objekt nicht, wie es mit anderen Routen funktioniert und es nicht HashSets enthalten.

ist meine Vermutung, dass der Weg zum Ausgang sendet die HashSet tut Schritt 1 erstellt ...

Meine Fragen sind:

  • was in der Route Ausgang ist falsch?
  • beide recipientList() versuchen Nachrichten an ungültigen Endpunkt weiterleiten (ich habe .ignoreInvalidEndpoints() verwenden Ausnahme zu vermeiden):

    org.apache.camel.NoSuchEndpointException: No endpoint could be found for: [email protected], please check your classpath contains the needed Camel component jar.

Jede Hilfe wäre sehr willkommen! Danke.

Antwort

2

Ich finde es sehr seltsam, aber .aggregate() Funktion antwortet nicht Austausch. Es verwendet Ihre Aggregationsstrategie, antwortet aber immer auf eingehenden Austausch. Dies ist beim Lesen der Dokumentation nicht klar, aber Sie müssen die Aggregationsstrategie zusammen mit split() verwenden, um den Austausch zurückgeben zu können.