Hier ist eine einfache Implementierung der Funktion:
public class ZipWithIndex {
public static void main(String[] args) throws Exception {
ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> in = ee.readTextFile("/home/robert/flink-workdir/debug/input");
// count elements in each partition
DataSet<Tuple2<Integer, Long>> counts = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Integer, Long>>() {
@Override
public void mapPartition(Iterable<String> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
long cnt = 0;
for (String v : values) {
cnt++;
}
out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), cnt));
}
});
DataSet<Tuple2<Long, String>> result = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Long, String>>() {
long start = 0;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
List<Tuple2<Integer, Long>> offsets = getRuntimeContext().getBroadcastVariable("counts");
Collections.sort(offsets, new Comparator<Tuple2<Integer, Long>>() {
@Override
public int compare(Tuple2<Integer, Long> o1, Tuple2<Integer, Long> o2) {
return ZipWithIndex.compare(o1.f0, o2.f0);
}
});
for(int i = 0; i < getRuntimeContext().getIndexOfThisSubtask(); i++) {
start += offsets.get(i).f1;
}
}
@Override
public void mapPartition(Iterable<String> values, Collector<Tuple2<Long, String>> out) throws Exception {
for(String v: values) {
out.collect(new Tuple2<Long, String>(start++, v));
}
}
}).withBroadcastSet(counts, "counts");
result.print();
}
public static int compare(int x, int y) {
return (x < y) ? -1 : ((x == y) ? 0 : 1);
}
}
Dies ist, wie es funktioniert: Ich bin mit dem ersten mapPartition()
Betrieb alle Elemente in den Partitionen gehen, wie viele Elemente in es zu zählen . Ich muss die Anzahl der Elemente in jeder Partition kennen, um die Offsets richtig festzulegen, wenn die IDs den Elementen zugewiesen werden. Das Ergebnis der ersten mapPartition
ist ein DataSet, das Zuordnungen enthält. Ich übermittle dieses DataSet an alle zweiten mapPartition()
-Operatoren, die die IDs den Elementen aus der Eingabe zuweisen. In der open()
Methode der zweiten mapPartition()
Berechnung ich den Offset für jede Partition.
Ich werde wahrscheinlich den Code zu Flink beitragen (nach der Diskussion mit den anderen Commitern).
Das ist eine interessante Frage. Ich werde versuchen, eine Implementierung zu finden. –