2016-08-02 23 views

Antwort

0

Bolt.execute() wird für jedes eingehende Tupel aufgerufen, unabhängig davon, was der Produzent war (und Sie können dies nicht ändern). Wenn Sie mehrere Tupel von verschiedenen Herstellern gleichzeitig verarbeiten möchten, müssen Sie benutzerdefinierten UDF-Code schreiben.

  1. Sie benötigen einen Eingangspuffer für jeden Hersteller, der die eingehenden Tupel puffern kann (vielleicht ein LinkedList<Tuple> als Schraubenelement)
  2. Für jeden eingehenden Tupel, können Sie die Tupel zu dem entsprechenden Puffer hinzufügen (können Sie den Hersteller zugreifen Informationen in den Metadaten des Tupels, via. input.getSourceComponent()
  3. Nach dem Hinzufügen des Tupel zum Puffer, überprüfen Sie, ob jeder Puffer mindestens ein Tupel enthält: wenn ja, nehmen Sie ein Tupel aus jedem Puffer und verarbeiten Sie (nach der Verarbeitung, überprüfen die Puffer erneut, bis mindestens einmal der Puffer leer ist) - von nein, nur zurückkehren und nichts verarbeiten
0

Sie können einen Blick auf here werfen (siehe Batching). Bei Schrauben, die komplexere Operationen wie die Aggregation mehrerer Eingangstupel verarbeiten, müssen Sie BaseRichBolt erweitern und den Verankerungsmechanismus selbst steuern.

Dazu müssen Sie Ihren eigenen Ausgangssammler wie folgt erklären:

private OutputCollector outputCollector; 

Und dann initialisieren durch Überschreibung der Vorbereitung Methode:

@Override 
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { 
    this.outputCollector = outputCollector; 
} 

Ihre Methode execute für BaseRichBolt erhält nur ein Tupel als Argument, müssen Sie in der Lage sein, die Logik auszuführen, um die Anker beizubehalten und sie beim Aussenden zu verwenden.

private final List<Tuple> anchors = new ArrayList<Tuple>(); 

@Override 
public void execute(Tuple tuple) { 
    if (!isTupleAggregationComplete(anchors, tuple)) { 
     anchors.add(tuple); 
     return; 
    } 

    // do your computations here! 

    collector.emit(anchors, new Values(foo,bar,xpto)); 

    anchors.clear(); 
} 

Sie sollten isTupleAggregationComplete mit der notwendigen Logik implementieren, die überprüft, ob die Schraube alles notwendigen Informationen verfügen, mit der Verarbeitung fortzufahren.