Gibt es eine Möglichkeit, den endgültigen Wert der Aggregatoren nach einer Dataflow-Batchausführung programmatisch zu extrahieren?Extrahieren von Aggregatorwerten in Batchausführung
Basierend auf der DirectePipelineRunner Klasse, schrieb ich die folgende Methode. Es scheint zu funktionieren, aber für dynamisch erstellte Zähler gibt es andere Werte als die Werte, die in der Konsolenausgabe angezeigt werden.
PS. Wenn es hilft, nehme ich an, dass Aggregatoren auf Long-Werten basieren, mit einer Summe, die Funktion kombiniert.
public static Map<String, Object> extractAllCounters(Pipeline p, PipelineResult pr)
{
AggregatorPipelineExtractor aggregatorExtractor = new AggregatorPipelineExtractor(p);
Map<String, Object> results = new HashMap<>();
for (Map.Entry<Aggregator<?, ?>, Collection<PTransform<?, ?>>> e :
aggregatorExtractor.getAggregatorSteps().entrySet()) {
Aggregator agg = e.getKey();
try {
results.put(agg.getName(), pr.getAggregatorValues(agg).getTotalValue(agg.getCombineFn()));
} catch(AggregatorRetrievalException|IllegalArgumentException aggEx) {
//System.err.println("Can't extract " + agg.getName() + ": " + aggEx.getMessage());
}
}
return results;
}
Dank! Ich hätte gerne eine allgemeinere Lösung, wie ich sie gepostet habe, aber das ist in Ordnung –