2016-04-02 2 views
0

Ich habe zwei Dateien in HDFS mit der gleichen Anzahl von Zeilen. Zeilen aus den Dateien entsprechen einander nach Zeilennummer.zip RDDs aus verschiedenen Eingabedateien

lines1=sc.textFile('1.txt') 
lines2=sc.textFile('2.txt') 

Meine Frage ist, wie rdd Zeilen1 mit Zeilen2 korrekt zip?

zipped=lines1.zip(lines2) 

Zip erfordert die gleiche Größe RDDs und die gleichen Partitionen (wie ich nicht nur Partitionen verstanden zählen, sondern auch die gleiche Anzahl von Elementen in jeder Partition). Erste Voraussetzung ist bereits erfüllt. Wie gewährleistet man die zweite?

Danke!

Sergey.

Antwort

1

Im Allgemeinen wird keine der Bedingungen erfüllt und zip ist kein gutes Werkzeug, um einen solchen Vorgang durchzuführen. Die Anzahl der Partitionen und die Anzahl der Elemente pro Partition hängt nicht nur von der Anzahl der Zeilen ab, sondern auch von der Größe der Datei, der Größe der einzelnen Dateien und der Konfiguration.

zip ist nützlich, wenn Sie RDDs verbinden die gemeinsamen Vorfahren und von Shuffle nicht beispielsweise getrennt sind:

parent = sc.parallelize(range(100)) 
child1 = parent.map(some_func) 
child2 = parent.map(other_func) 
child1.zip(child2) 

RDDs von Zeile fusionieren Sie etwas tun können:

def index_and_sort(rdd): 
    def swap(xy): 
     x, y = xy 
     return y, x 
    return rdd.zipWithIndex().map(swap).sortByKey() 

index_and_sort(lines1).join(index_and_sort(lines)).values() 

Es sollte sicher sein, zip nach Indexierung und Sortierung:

from pyspark import RDD 

RDD.zip(*(index_and_sort(rdd).values() for rdd in [lines1, lines2])) 

aber warum überhaupt stören?

Scala-Äquivalent:

import org.apache.spark.rdd.RDD 

def indexAndSort(rdd: RDD[String]) = rdd.zipWithIndex.map(_.swap).sortByKey() 

indexAndSort(lines1).join(indexAndSort(lines2)).values 
+0

zero323 - die Idee ist klar. Vielen Dank! Warum verwenden Sie auch das Sortieren? – sergun

+0

Weil ich 'HashPartitioner' nicht auf' Join' verwenden möchte. – zero323