Der schwierige Teil ist, dass Sie den Code in einem verteilten System ausführen, damit die parallelen Instanzen Ihrer ParseData
Funktion unabhängig voneinander ausgeführt werden.
Sie können weiterhin eindeutige IDs mithilfe eines lokalen ID-Zählers in ParseData
zuweisen. Der Trick zur Vermeidung von Duplikaten ist die korrekte Initialisierung und Zählerinkrementierung. Angenommen, Sie haben eine Parallelität von vier, erhalten Sie vier ParseData
Instanzen (nennen wir sie PD1 ... PD4
). Sie würden die folgenden ID-Zuordnungen tun:
PD1: 0, 4, 8, 12, ...
PD2: 1, 5, 9, 13, ...
PD3, 2, 6, 10, 14, ...
PD4: 3, 7, 11, 15, ...
Sie dies erreichen können, indem Sie die parallelen Instanzen mit unterschiedlichen Werten (Details siehe unten) initialisiert und durch Ihre Parallelität in jedem Fall nur den Zähler erhöht (dh ID += parallelism
).
In Flink erhalten alle instanced von einer parallelen Funktion automatisch eine eindeutige Nummer zugewiesen (so genannte Task-Index). Sie können diese Nummer einfach verwenden, um Ihren ID-Zähler zu initialisieren. Sie können den Task-Index über abrufen. Sie können auch den Operator/Funktion Parallelität über RuntimeContext.getNumberOfParallelSubtasks()
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RuntimeContext.html
erhalten, um die RuntimeContext
eine RichMapFunction
ParseData
zu implementieren verwenden zu bekommen und getRuntimeContext()
in open()
nennen.
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RichFunction.html
Etwas Ähnliches (zeige nur relevante Methoden):
class ParseDate extends RichMapFunction {
private long parallelism;
private long idCounter;
public void open(Configuration parameters) {
RuntimeContext ctx = getRuntimeContext();
parallelism = ctx.getNumberOfParallelSubtasks();
idCounter = ctx.getIndexOfThisSubtask();
}
public OutputDataType map(InputDataType value) {
OutputDataType output = new OutputDataType();
output.setID(idCounter);
idCounter += parallelism;
// further processing
return output;
}
}
Danke, die für mich gearbeitet. Ich musste 'public void open (Konfigurationsparameter)' hinzufügen, damit es funktioniert. Auf diese Weise sind die letzten IDs jedoch nicht fortlaufend (während jedes Laufs werden sie anders zugewiesen), aber ich nehme an, dies hängt mit der Anzahl der Elemente zusammen, die jeder Instanz zugewiesen sind. –
Die offene Methode in meiner Antwort wurde korrigiert - danke, dass Sie darauf hingewiesen haben. Und ja, wenn Daten nicht gleichmäßig verteilt werden, erhalten Sie möglicherweise keine fortlaufende ID, was sehr schwierig wäre, da Sie einen gemeinsamen globalen Status benötigen (was Ihre Leistung stark beeinträchtigen könnte). Ich habe dieses Detail in Ihrer Frage übersehen. –