2016-03-24 6 views
1

Ich versuche Objekt von MongoDB zu ziehen und es zu meiner aktuellen Nutzlast ADD und es in einer anderen Datenbank speichern:Wie die Nutzlast mit einem Objekt von MongoDB (Kamel mongodb) bereichern

@Override 
public void configure() throws Exception 
{ 
    from(kafkaEndpoint) 
      .convertBodyTo(DBObject.class) 
      .enrich("mongodb:mongoDb?database=myDbName1&collection=UserColl&operation=findOneByQuery", 
        (original, external) -> { 
         DBObject originalBody = original.getIn().getBody(DBObject.class); 
         DBObject externalBody = external.getIn().getBody(DBObject.class); 

         Map<String, DBObject> map = new HashMap<String, DBObject>(); 
         map.put("original", originalBody); 
         map.put("external", externalBody); 

         original.getIn().setBody(map); 
         return original; 
        }) 
      .to("mongodb:mongoDb?database=myDbName2&collection=UserColl&operation=insert"); 
} 

Das Problem dass enrich die Abfrage aus dem In.body holen, die mein ursprüngliches Objekt hält ...

So wie kann ich Abfrage übergeben ({ "entity.id": ""}) zu Enrich (mongoldb: ...) und Originalobjekt erhalten, um es mit Ergebnissen zu verbinden?

Danke.

Antwort

0
@Override 
    public void configure() throws Exception 
    { 
     from(kafkaEndpoint) 
       .convertBodyTo(DBObject.class) 
       .enrich("direct:findOneByQuery",  // <------- 
         (original, external) -> { 
          DBObject originalBody = original.getIn().getBody(DBObject.class); 
          DBObject externalBody = external.getIn().getBody(DBObject.class); 

          Map<String, DBObject> map = new HashMap<String, DBObject>(); 
          map.put("original", originalBody); 
          map.put("external", externalBody); 

          original.getIn().setBody(map); 
          return original; 
         }) 
       .to("mongodb:mongoDb?database=myDbName2&collection=UserColl&operation=insert"); 

    } 

    from("direct:findOneByQuery") 
      .process(new Processor() 
      { 
       @Override 
       public void process(Exchange exchange) throws Exception 
       { 
        DBObject body = exchange.getIn().getBody(DBObject.class); 
        DBObject query = BasicDBObjectBuilder.start() 
          .append("entity._id", body.get("_id")) 
          .get(); 

        exchange.getIn().setBody(query); 
       } 
      }) 
      .to("mongodb:mongoDb?database=myDbName1&collection=UserColl&operation=findOneByQuery"); 

// 
+0

Ich habe versucht, diese Methode zu verwenden, jedoch bekomme ich immer "Keine Verbraucher am Endpunkt verfügbar: direct: // findOneByQuery". Ist das auch etwas, dem du begegnet bist? –

+0

Der andere Fehler, der auftritt, wenn ich versuche, die mongodb-Komponente in "anreichern" zu verwenden, ist: "Fehler beim Erstellen des Producer für den Endpunkt: mongodb3 nullPointerException" –