Ich habe diese mit loopDoWhile dsl getan:
from("direct:start").loopDoWhile(stopLoopPredicate())
.to("bean:restAPIProcessor")
.to("bean:dataEnricherBean")
.end();
Die stopLoopPredicate() ist hier:
public Predicate stopLoopPredicate() {
Predicate stopLoop = new Predicate() {
@Override
public boolean matches(Exchange exchange) {
return exchange.getIn().getBody() != null && !exchange.getIn().getBody().toString().equalsIgnoreCase("stopLoop");
}
};
return stopLoop;
}
Die restAPIProcessor ist eine Implementierung von Prozessor, wo die REST-API-Aufruf gemacht wird.
Die Logik zur Verarbeitung der Seitennummerierung wird in restAPIProcessor & in dem Moment implementiert, in dem die tatsächliche REST-API eine Nullantwort zurückgibt. "StopLoop" wird auf den Hauptteil der Austauschroute gesetzt. Das funktioniert ziemlich gut. Hier ist der Code für RestAPIProcessor:
public class RestAPIProcessor implements Processor {
@Inject
private RestTemplate restTemplate;
private static final int LIMIT = 100;
private static final String REST_API = "<REST API URL>";
@Override
public void process(Exchange exchange) throws Exception {
Integer offset = (Integer) exchange.getIn().getHeader("offset");
Integer count = (Integer) exchange.getIn().getHeader("count");
if (offset == null) offset = 0;
if (count == null) count = 0;
String response = "";
Map<String,Object> body = new LinkedHashMap<>();
body.put("offset",offset++);
body.put("limit",LIMIT);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<?> entity = new HttpEntity<Object>(body,headers);
ResponseEntity<String> countResponseEntity = restTemplate.exchange(REST_API, HttpMethod.POST,entity,String.class);
response = countResponseEntity.getBody();
count += LIMIT;
if (response == null || response.isEmpty()) {
exchange.getIn().setBody("stopLoop");
exchange.getOut().setHeaders(exchange.getIn().getHeaders());
} else {
exchange.getIn().setHeader("count", count);
exchange.getIn().setHeader("offset", offset);
exchange.getOut().setHeaders(exchange.getIn().getHeaders());
exchange.getOut().setBody(response);
}
}
}