2015-04-30 17 views

Antwort

6
import org.apache.spark.mllib.linalg.{Vectors,Vector,Matrix,SingularValueDecomposition,DenseMatrix,DenseVector} 
import org.apache.spark.mllib.linalg.distributed.RowMatrix 

def computeInverse(X: RowMatrix): DenseMatrix = { 
    val nCoef = X.numCols.toInt 
    val svd = X.computeSVD(nCoef, computeU = true) 
    if (svd.s.size < nCoef) { 
    sys.error(s"RowMatrix.computeInverse called on singular matrix.") 
    } 

    // Create the inv diagonal matrix from S 
    val invS = DenseMatrix.diag(new DenseVector(svd.s.toArray.map(x => math.pow(x,-1)))) 

    // U cannot be a RowMatrix 
    val U = new DenseMatrix(svd.U.numRows().toInt,svd.U.numCols().toInt,svd.U.rows.collect.flatMap(x => x.toArray)) 

    // If you could make V distributed, then this may be better. However its alreadly local...so maybe this is fine. 
    val V = svd.V 
    // inv(X) = V*inv(S)*transpose(U) --- the U is already transposed. 
    (V.multiply(invS)).multiply(U) 
    } 
3

Ich hatte Probleme mit der Option der Verwendung dieser Funktion

conf.set("spark.sql.shuffle.partitions", "12") 

Die Zeilen in RowMatrix wurde gemischt.

Hier ist ein Update, das

für mich gearbeitet
import org.apache.spark.mllib.linalg.{DenseMatrix,DenseVector} 
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix 

def computeInverse(X: IndexedRowMatrix) 
: DenseMatrix = 
{ 
    val nCoef = X.numCols.toInt 
    val svd = X.computeSVD(nCoef, computeU = true) 
    if (svd.s.size < nCoef) { 
    sys.error(s"IndexedRowMatrix.computeInverse called on singular matrix.") 
    } 

    // Create the inv diagonal matrix from S 
    val invS = DenseMatrix.diag(new DenseVector(svd.s.toArray.map(x => math.pow(x, -1)))) 

    // U cannot be a RowMatrix 
    val U = svd.U.toBlockMatrix().toLocalMatrix().multiply(DenseMatrix.eye(svd.U.numRows().toInt)).transpose 

    val V = svd.V 
    (V.multiply(invS)).multiply(U) 
} 
0

Matrix U durch X.computeSVD zurückgegeben hat Abmessungen mxk wo m die Anzahl der Zeilen des Originals (verteilt) RowMatrix X. Man würde m erwarten zu groß sein (möglicherweise größer als k), so ist es nicht ratsam, es im Treiber zu sammeln, wenn wir wollen, dass unser Code auf wirklich große Werte von m skalieren.

Ich würde sagen, beide der unten aufgeführten Lösungen leiden unter diesem Fehler. Die Antwort von @Alexander Kharlamov ruft val U = svd.U.toBlockMatrix().toLocalMatrix() auf, die die Matrix im Treiber sammelt. Das gleiche passiert mit der Antwort von @Climbs_lika_Spyder (übrigens Ihr Nick Felsen !!), die svd.U.rows.collect.flatMap(x => x.toArray) ruft. Ich würde eher vorschlagen, sich auf eine verteilte Matrixmultiplikation wie den Scala Code zu verlassen, der here bekannt gegeben wird.

+0

Ich sehe keine umgekehrten Berechnungen bei der Verknüpfung, die Sie hinzugefügt haben. –

+0

@Climbs_lika_Spyder Der Link bezieht sich auf verteilte Matrixmultiplikation, um die lokale Matrixmultiplikation '(V.multiply (invS)) .multiple (U)' in der letzten Zeile Ihrer Lösung zu ersetzen, so dass Sie 'U' nicht sammeln müssen im Fahrer. Ich denke, 'V' und' invS' sind nicht groß genug, um Probleme zu verursachen. – Pablo