2016-07-27 2 views
0

Ich lade eine CSV-Datei und wandle jede Zeile mit einer benutzerdefinierten Kartenfunktion in einen POJO um. Für meine Programmlogik benötige ich für jedes POJO eine eindeutige ID von 0 bis n (wobei n die gesamten Zeilennummern sind). Meine Frage ist, kann ich jedem POJO mithilfe einer Transformationsfunktion eine eindeutige ID (z. B. die ursprüngliche Zeilennummer) zuweisen? Der ideale Weg wäre, ein Iterable in einem UDF zu erhalten und eine Variable zu erhöhen, während die Eingangstupel durchlaufen werden, und schließlich das entsprechende POJO auszugeben. Mein Code sieht im Moment so aus:Apache Flink - Zuweisen einer eindeutigen ID zum Eingang

DataSet<MyType> input = env.readCsvFile("/path/file.csv") 
    .includeFields("1111") 
    .types(String.class, Double.class, Double.class,Double.class) 
    .map(new ParseData()); 

wo ParseData Tuples zu den MyType POJOs transformiert.

Gibt es Best Practices zum Erreichen dieser Aufgabe?

Antwort

2

Der schwierige Teil ist, dass Sie den Code in einem verteilten System ausführen, damit die parallelen Instanzen Ihrer ParseData Funktion unabhängig voneinander ausgeführt werden.

Sie können weiterhin eindeutige IDs mithilfe eines lokalen ID-Zählers in ParseData zuweisen. Der Trick zur Vermeidung von Duplikaten ist die korrekte Initialisierung und Zählerinkrementierung. Angenommen, Sie haben eine Parallelität von vier, erhalten Sie vier ParseData Instanzen (nennen wir sie PD1 ... PD4). Sie würden die folgenden ID-Zuordnungen tun:

PD1: 0, 4, 8, 12, ... 
PD2: 1, 5, 9, 13, ... 
PD3, 2, 6, 10, 14, ... 
PD4: 3, 7, 11, 15, ... 

Sie dies erreichen können, indem Sie die parallelen Instanzen mit unterschiedlichen Werten (Details siehe unten) initialisiert und durch Ihre Parallelität in jedem Fall nur den Zähler erhöht (dh ID += parallelism).

In Flink erhalten alle instanced von einer parallelen Funktion automatisch eine eindeutige Nummer zugewiesen (so genannte Task-Index). Sie können diese Nummer einfach verwenden, um Ihren ID-Zähler zu initialisieren. Sie können den Task-Index über abrufen. Sie können auch den Operator/Funktion Parallelität über RuntimeContext.getNumberOfParallelSubtasks()

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RuntimeContext.html

erhalten, um die RuntimeContext eine RichMapFunctionParseData zu implementieren verwenden zu bekommen und getRuntimeContext() in open() nennen.

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RichFunction.html

Etwas Ähnliches (zeige nur relevante Methoden):

class ParseDate extends RichMapFunction { 
    private long parallelism; 
    private long idCounter; 

    public void open(Configuration parameters) { 
     RuntimeContext ctx = getRuntimeContext(); 
     parallelism = ctx.getNumberOfParallelSubtasks(); 
     idCounter = ctx.getIndexOfThisSubtask(); 
    } 

    public OutputDataType map(InputDataType value) { 
     OutputDataType output = new OutputDataType(); 
     output.setID(idCounter); 
     idCounter += parallelism; 
     // further processing 
     return output; 
    } 
} 
+0

Danke, die für mich gearbeitet. Ich musste 'public void open (Konfigurationsparameter)' hinzufügen, damit es funktioniert. Auf diese Weise sind die letzten IDs jedoch nicht fortlaufend (während jedes Laufs werden sie anders zugewiesen), aber ich nehme an, dies hängt mit der Anzahl der Elemente zusammen, die jeder Instanz zugewiesen sind. –

+0

Die offene Methode in meiner Antwort wurde korrigiert - danke, dass Sie darauf hingewiesen haben. Und ja, wenn Daten nicht gleichmäßig verteilt werden, erhalten Sie möglicherweise keine fortlaufende ID, was sehr schwierig wäre, da Sie einen gemeinsamen globalen Status benötigen (was Ihre Leistung stark beeinträchtigen könnte). Ich habe dieses Detail in Ihrer Frage übersehen. –