2016-05-27 11 views
-1

Ich konvertiere meinen Scala-Code in pyspark wie unten, aber hat verschiedene Zählungen für die endgültige RDD.Inkonsistente Ausgaben von Scala Spark und pyspark Job

Mein Scala Code:

val scalaRDD = rowRDD.map { 
    row: Row => 
    var rowList: ListBuffer[Row] = ListBuffer() 
    rowList.add(row) 
    (row.getString(1) + "_" + row.getString(6), rowList) 
}.reduceByKey{ (list1,list2) => 

    var rowList: ListBuffer[Row] = ListBuffer() 
    for (i <- 0 to list1.length -1) { 
    val row1 = list1.get(i); 

    var foundMatch = false; 

    breakable { 
     for (j <- 0 to list2.length -1) { 
     var row2 = list2.get(j); 
     val result = mergeRow(row1, row2) 
     if (result._1) { 
      list2.set(j, result._2) 
      foundMatch = true; 
      break; 
     } 
     } // for j loop 
    } // breakable for j 

    if(!foundMatch) { 
     rowList.add(row1); 
    } 
    } 

    list2.addAll(rowList); 

    list2 
}.flatMap { t=> t._2 } 

wo

def mergeRow(row1:Row, row2:Row):(Boolean, Row)= { 
    var z:Array[String] = new Array[String](row1.length) 
    var hasDiff = false 

    for (k <- 1 to row1.length -2){ 
       // k = 0 : ID, always different 
       // k = 43 : last field, which is not important 

     if (row1.getString(0) < row2.getString(0)) { 
     z(0) = row2.getString(0) 
     z(43) = row2.getString(43) 
     } else { 
     z(0) = row1.getString(0) 
     z(43) = row1.getString(43) 
     } 

     if (Option(row2.getString(k)).getOrElse("").isEmpty && !Option(row1.getString(k)).getOrElse("").isEmpty) { 
      z(k) = row1.getString(k) 
      hasDiff = true 
     } else if (!Option(row1.getString(k)).getOrElse("").isEmpty && !Option(row2.getString(k)).getOrElse("").isEmpty && row1.getString(k) != row2.getString(k)) { 
      return (false, null) 
     } else { 
      z(k) = row2.getString(k) 
     } 
    } // for k loop 

    if (hasDiff) { 
     (true, Row.fromSeq(z)) 
    } else { 
     (true, row2) 
    } 
} 

ich dann versucht, sie zu konvertieren, wie unten pyspark Code:

pySparkRDD = rowRDD.map (
    lambda row : singleRowList(row) 
).reduceByKey(lambda list1,list2: mergeList(list1,list2)).flatMap(lambda x : x[1]) 

wo ich:

def mergeRow(row1, row2): 
    z=[] 
    hasDiff = False 

    #for (k <- 1 to row1.length -2){ 
    for k in xrange(1, len(row1) - 2): 
       # k = 0 : ID, always different 
       # k = 43 : last field, which is not important 

     if (row1[0] < row2[0]): 
     z[0] = row2[0] 
     z[43] = row2[43] 
     else: 
     z[0] = row1[0] 
     z[43] = row1[43] 


     if not(row2[k]) and row1[k]: 
      z[k] = row1[k].strip() 
      hasDiff = True 
     elif row1[k] and row2[k] and row1[k].strip() != row2[k].strip(): 
      return (False, None) 
     else: 
      z[k] = row2[k].strip() 



    if hasDiff: 
     return (True, Row.fromSeq(z)) 
    else: 
     return (True, row2) 

und

def singleRowList(row): 
    myList=[] 
    myList.append(row) 

    return (row[1] + "_" + row[6], myList) 

und

def mergeList(list1, list2): 
    rowList = [] 
    for i in xrange(0, len(list1)-1): 
    row1 = list1[i] 
    foundMatch = False 
    for j in xrange(0, len(list2)-1): 
     row2 = list2[j] 
     resultBool, resultRow = mergeRow(row1, row2) 
     if resultBool: 
      list2[j] = resultRow 
      foundMatch = True 
      break 

    if foundMatch == False: 
     rowList.append(row1) 

    list2.extend(rowList) 

    return list2 

BTW, rowRDD aus einem Datenrahmen umgewandelt. d. h. rowRDD = myDF.rdd

Ich habe jedoch unterschiedliche Zählungen für scalaRDD und pySparkRDD. Ich habe die Codes oft überprüft, konnte aber nicht herausfinden, was ich verpasst habe. Hat jemand irgendwelche Ideen? Vielen Dank!

Antwort

2

Bedenken Sie:

scala> (1 to 5).length 
res1: Int = 5 

und diese:

>>> len(xrange(1, 5)) 
4 
+0

In Scala, sollten Sie "bis" verwenden Python "Bereich" zu reproduzieren: (1 bis 5) -> 1 , 2, 3, 4, 5 (1 bis 5) -> 1, 2, 3, 4 – FLab