2015-06-02 7 views
5

Ich möchte jeder Zeile meines Eingangs eine id zuweisen - die eine Zahl sein sollte von 0 bis N - 1, wobei N die Anzahl der Zeilen in der Eingabe ist.zipWithIndex auf Apache Flink

Grob gesagt, würde Ich mag Lage sein, so etwas wie die folgenden Funktionen ausführen:

val data = sc.textFile(textFilePath, numPartitions) 
val rdd = data.map(line => process(line)) 
val rddMatrixLike = rdd.zipWithIndex.map { case (v, idx) => someStuffWithIndex(idx, v) } 

Aber in Apache Flink. Ist es möglich?

+0

Das ist eine interessante Frage. Ich werde versuchen, eine Implementierung zu finden. –

Antwort

6

Dies ist jetzt ein Teil der 0.10-SNAPSHOT-Version von Apache Flink. Beispiele für zipWithIndex(in) und zipWithUniqueId(in) sind im offiziellen Flink documentation verfügbar.

5

Hier ist eine einfache Implementierung der Funktion:

public class ZipWithIndex { 

public static void main(String[] args) throws Exception { 

    ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment(); 

    DataSet<String> in = ee.readTextFile("/home/robert/flink-workdir/debug/input"); 

    // count elements in each partition 
    DataSet<Tuple2<Integer, Long>> counts = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Integer, Long>>() { 
     @Override 
     public void mapPartition(Iterable<String> values, Collector<Tuple2<Integer, Long>> out) throws Exception { 
      long cnt = 0; 
      for (String v : values) { 
       cnt++; 
      } 
      out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), cnt)); 
     } 
    }); 

    DataSet<Tuple2<Long, String>> result = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Long, String>>() { 
     long start = 0; 

     @Override 
     public void open(Configuration parameters) throws Exception { 
      super.open(parameters); 
      List<Tuple2<Integer, Long>> offsets = getRuntimeContext().getBroadcastVariable("counts"); 
      Collections.sort(offsets, new Comparator<Tuple2<Integer, Long>>() { 
       @Override 
       public int compare(Tuple2<Integer, Long> o1, Tuple2<Integer, Long> o2) { 
        return ZipWithIndex.compare(o1.f0, o2.f0); 
       } 
      }); 
      for(int i = 0; i < getRuntimeContext().getIndexOfThisSubtask(); i++) { 
       start += offsets.get(i).f1; 
      } 
     } 

     @Override 
     public void mapPartition(Iterable<String> values, Collector<Tuple2<Long, String>> out) throws Exception { 
      for(String v: values) { 
       out.collect(new Tuple2<Long, String>(start++, v)); 
      } 
     } 
    }).withBroadcastSet(counts, "counts"); 
    result.print(); 

} 

public static int compare(int x, int y) { 
    return (x < y) ? -1 : ((x == y) ? 0 : 1); 
} 
} 

Dies ist, wie es funktioniert: Ich bin mit dem ersten mapPartition() Betrieb alle Elemente in den Partitionen gehen, wie viele Elemente in es zu zählen . Ich muss die Anzahl der Elemente in jeder Partition kennen, um die Offsets richtig festzulegen, wenn die IDs den Elementen zugewiesen werden. Das Ergebnis der ersten mapPartition ist ein DataSet, das Zuordnungen enthält. Ich übermittle dieses DataSet an alle zweiten mapPartition()-Operatoren, die die IDs den Elementen aus der Eingabe zuweisen. In der open() Methode der zweiten mapPartition() Berechnung ich den Offset für jede Partition.

Ich werde wahrscheinlich den Code zu Flink beitragen (nach der Diskussion mit den anderen Commitern).

+0

Danke Robert! Könntest du vielleicht auch in ein paar Worten erklären, wie das funktioniert? Z.B. Warum benutzen wir 'getRuntimeContext(). getIndexOfThisSubtask()' und warum Broadcast-Zählungen jeder Partition helfen könnten? –

+0

Guter Punkt. Ich werde bald eine Beschreibung hinzufügen. –

+0

Beschreibung hinzugefügt –