2016-06-22 35 views
1

Ich verwendete vorher Sturm und ich muss mehr Batching-Fähigkeiten, also suchte ich nach Batching in Sturm. Und ich fand heraus, Trident, die Mikro-Batch in Echtzeit.Wie wird Storm Trident für Batch-Tupel verwendet?

Aber irgendwie kann ich nicht herausfinden, wie Trident Mikro-Batching (Flow, Batch-Größe, Batch-Intervall) zu wissen, dass es wirklich, was ich brauche.

Ich möchte Tupel sammeln/speichern, die von einer Tülle in einem Intervall ausgegeben werden, und sie erneut an Downstream-Komponente/Schraube/Funktion mit einem anderen Zeitintervall ausgeben. (Zum Beispiel, Spout emittieren ein Tupel pro Sekunde, nächste Dreizack-Funktion sammelt/speichert Tupel und emittieren 50 Tupel pro Minute zur nächsten Funktion.)

Kann mir jemand sagen, wie ich Trident in diesem Fall anwenden kann? Oder ein anderer anwendbarer Weg mit Sturmfunktionen?

Antwort

1

Ausgezeichnete Frage! Leider wird diese Art der Mikro-Dosierung nicht aus der Trident-Box unterstützt.

Sie können jedoch versuchen, Ihre eigene frequenzgesteuerte Mikrocharge zu implementieren. So etwas wie dieses Skelett Beispiel:

import java.util.ArrayList; 
import java.util.List; 
import java.util.Map; 
import java.util.concurrent.LinkedBlockingQueue; 

import org.apache.storm.task.OutputCollector; 
import org.apache.storm.task.TopologyContext; 
import org.apache.storm.topology.OutputFieldsDeclarer; 
import org.apache.storm.topology.base.BaseRichBolt; 
import org.apache.storm.tuple.Tuple; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

public class MicroBatchingBolt extends BaseRichBolt { 

    private static final long serialVersionUID = 8500984730263268589L; 
    private static final Logger LOG = LoggerFactory.getLogger(MicroBatchingBolt.class); 

    protected LinkedBlockingQueue<Tuple> queue = new LinkedBlockingQueue<Tuple>(); 

    /** The threshold after which the batch should be flushed out. */ 
    int batchSize = 100; 

    /** 
    * The batch interval in sec. Minimum time between flushes if the batch sizes 
    * are not met. This should typically be equal to 
    * topology.tick.tuple.freq.secs and half of topology.message.timeout.secs 
    */ 
    int batchIntervalInSec = 45; 

    /** The last batch process time seconds. Used for tracking purpose */ 
    long lastBatchProcessTimeSeconds = 0; 

    private OutputCollector collector; 

    @Override 
    @SuppressWarnings("rawtypes") 
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
     this.collector = collector; 
    } 

    @Override 
    public void execute(Tuple tuple) { 
     // Check if the tuple is of type Tick Tuple 
     if (isTickTuple(tuple)) { 
     // If so, it is indication for batch flush. But don't flush if previous 
     // flush was done very recently (either due to batch size threshold was 
     // crossed or because of another tick tuple 

     if ((System.currentTimeMillis()/1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) { 
      LOG.debug("Current queue size is " + this.queue.size() 
       + ". But received tick tuple so executing the batch"); 

      finishBatch(); 
     } else { 
      LOG.debug("Current queue size is " + this.queue.size() 
       + ". Received tick tuple but last batch was executed " 
       + (System.currentTimeMillis()/1000 - lastBatchProcessTimeSeconds) 
       + " seconds back that is less than " + batchIntervalInSec 
       + " so ignoring the tick tuple"); 
     } 
     } else { 
     // Add the tuple to queue. But don't ack it yet. 
     this.queue.add(tuple); 
     int queueSize = this.queue.size(); 
     LOG.debug("current queue size is " + queueSize); 
     if (queueSize >= batchSize) { 
      LOG.debug("Current queue size is >= " + batchSize 
       + " executing the batch"); 

      finishBatch(); 
     } 
     } 
    } 

    private boolean isTickTuple(Tuple tuple) { 
     // Check if it is tick tuple here 
     return false; 
    } 

    /** 
    * Finish batch. 
    */ 
    public void finishBatch() { 

     LOG.debug("Finishing batch of size " + queue.size()); 
     lastBatchProcessTimeSeconds = System.currentTimeMillis()/1000; 
     List<Tuple> tuples = new ArrayList<Tuple>(); 
     queue.drainTo(tuples); 

     for (Tuple tuple : tuples) { 
     // Prepare your batch here (may it be JDBC, HBase, ElasticSearch, Solr or 
     // anything else. 
     // List<Response> responses = externalApi.get("..."); 
     } 

     try { 
     // Execute your batch here and ack or fail the tuples 
     LOG.debug("Executed the batch. Processing responses."); 
     //  for (int counter = 0; counter < responses.length; counter++) { 
     //   if (response.isFailed()) { 
     //   LOG.error("Failed to process tuple # " + counter); 
     //   this.collector.fail(tuples.get(counter)); 
     //   } else { 
     //   LOG.debug("Successfully processed tuple # " + counter); 
     //   this.collector.ack(tuples.get(counter)); 
     //   } 
     //  } 
     } catch (Exception e) { 
     LOG.error("Unable to process " + tuples.size() + " tuples", e); 
     // Fail entire batch 
     for (Tuple tuple : tuples) { 
      this.collector.fail(tuple); 
     } 
     } 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     // ... 
    } 

} 

Quelle: http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/ und Using tick tuples with trident in storm