2016-07-28 19 views
0

Gibt es eine Möglichkeit, den endgültigen Wert der Aggregatoren nach einer Dataflow-Batchausführung programmatisch zu extrahieren?Extrahieren von Aggregatorwerten in Batchausführung

Basierend auf der DirectePipelineRunner Klasse, schrieb ich die folgende Methode. Es scheint zu funktionieren, aber für dynamisch erstellte Zähler gibt es andere Werte als die Werte, die in der Konsolenausgabe angezeigt werden.

PS. Wenn es hilft, nehme ich an, dass Aggregatoren auf Long-Werten basieren, mit einer Summe, die Funktion kombiniert.

public static Map<String, Object> extractAllCounters(Pipeline p, PipelineResult pr) 
{ 
    AggregatorPipelineExtractor aggregatorExtractor = new AggregatorPipelineExtractor(p); 
    Map<String, Object> results = new HashMap<>(); 

    for (Map.Entry<Aggregator<?, ?>, Collection<PTransform<?, ?>>> e : 
      aggregatorExtractor.getAggregatorSteps().entrySet()) { 
     Aggregator agg = e.getKey(); 
     try { 
      results.put(agg.getName(), pr.getAggregatorValues(agg).getTotalValue(agg.getCombineFn())); 
     } catch(AggregatorRetrievalException|IllegalArgumentException aggEx) { 
      //System.err.println("Can't extract " + agg.getName() + ": " + aggEx.getMessage()); 
     } 
    } 

    return results; 
} 

Antwort

2

Die Werte von Aggregatoren sollten im PipelineResult verfügbar sein. Zum Beispiel:

CountOddsFn countOdds = new CountOddsFn(); 
pipeline 
    .apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100)) 
    .apply(ParDo.of(countOdds)); 
PipelineResult result = pipeline.run(); 
// Here you may need to use the BlockingDataflowPipelineRunner 

AggregatorValues<Integer> values = 
result.getAggregatorValues(countOdds.aggregator); 
Map<String, Integer> valuesAtSteps = values.getValuesAtSteps(); 
// Now read the values from the step... 

Beispiel DoFn, die den Aggregator berichtet:

private static class CountOddsFn extends DoFn<Integer, Void> { 

    Aggregator<Integer, Integer> aggregator = 
    createAggregator("odds", new SumIntegerFn()); 

    @Override 
    public void processElement(ProcessContext c) throws Exception { 
    if (c.element() % 2 == 1) { 
     aggregator.addValue(1); 
    } 
    } 
} 
+0

Dank! Ich hätte gerne eine allgemeinere Lösung, wie ich sie gepostet habe, aber das ist in Ordnung –