Sie können einen Blick auf here werfen (siehe Batching). Bei Schrauben, die komplexere Operationen wie die Aggregation mehrerer Eingangstupel verarbeiten, müssen Sie BaseRichBolt erweitern und den Verankerungsmechanismus selbst steuern.
Dazu müssen Sie Ihren eigenen Ausgangssammler wie folgt erklären:
private OutputCollector outputCollector;
Und dann initialisieren durch Überschreibung der Vorbereitung Methode:
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
Ihre Methode execute für BaseRichBolt erhält nur ein Tupel als Argument, müssen Sie in der Lage sein, die Logik auszuführen, um die Anker beizubehalten und sie beim Aussenden zu verwenden.
private final List<Tuple> anchors = new ArrayList<Tuple>();
@Override
public void execute(Tuple tuple) {
if (!isTupleAggregationComplete(anchors, tuple)) {
anchors.add(tuple);
return;
}
// do your computations here!
collector.emit(anchors, new Values(foo,bar,xpto));
anchors.clear();
}
Sie sollten isTupleAggregationComplete mit der notwendigen Logik implementieren, die überprüft, ob die Schraube alles notwendigen Informationen verfügen, mit der Verarbeitung fortzufahren.