Ich habe eine X, verteilte Matrix, in RowMatrix Form. Ich benutze Spark 1.3.0. Ich muss X inverse berechnen können.Wie berechnet man die Umkehrung einer RowMatrix in Apache Spark?
Antwort
import org.apache.spark.mllib.linalg.{Vectors,Vector,Matrix,SingularValueDecomposition,DenseMatrix,DenseVector}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
def computeInverse(X: RowMatrix): DenseMatrix = {
val nCoef = X.numCols.toInt
val svd = X.computeSVD(nCoef, computeU = true)
if (svd.s.size < nCoef) {
sys.error(s"RowMatrix.computeInverse called on singular matrix.")
}
// Create the inv diagonal matrix from S
val invS = DenseMatrix.diag(new DenseVector(svd.s.toArray.map(x => math.pow(x,-1))))
// U cannot be a RowMatrix
val U = new DenseMatrix(svd.U.numRows().toInt,svd.U.numCols().toInt,svd.U.rows.collect.flatMap(x => x.toArray))
// If you could make V distributed, then this may be better. However its alreadly local...so maybe this is fine.
val V = svd.V
// inv(X) = V*inv(S)*transpose(U) --- the U is already transposed.
(V.multiply(invS)).multiply(U)
}
Ich hatte Probleme mit der Option der Verwendung dieser Funktion
conf.set("spark.sql.shuffle.partitions", "12")
Die Zeilen in RowMatrix wurde gemischt.
Hier ist ein Update, das
für mich gearbeitetimport org.apache.spark.mllib.linalg.{DenseMatrix,DenseVector}
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix
def computeInverse(X: IndexedRowMatrix)
: DenseMatrix =
{
val nCoef = X.numCols.toInt
val svd = X.computeSVD(nCoef, computeU = true)
if (svd.s.size < nCoef) {
sys.error(s"IndexedRowMatrix.computeInverse called on singular matrix.")
}
// Create the inv diagonal matrix from S
val invS = DenseMatrix.diag(new DenseVector(svd.s.toArray.map(x => math.pow(x, -1))))
// U cannot be a RowMatrix
val U = svd.U.toBlockMatrix().toLocalMatrix().multiply(DenseMatrix.eye(svd.U.numRows().toInt)).transpose
val V = svd.V
(V.multiply(invS)).multiply(U)
}
Matrix U durch X.computeSVD
zurückgegeben hat Abmessungen mxk wo m die Anzahl der Zeilen des Originals (verteilt) RowMatrix X. Man würde m erwarten zu groß sein (möglicherweise größer als k), so ist es nicht ratsam, es im Treiber zu sammeln, wenn wir wollen, dass unser Code auf wirklich große Werte von m skalieren.
Ich würde sagen, beide der unten aufgeführten Lösungen leiden unter diesem Fehler. Die Antwort von @Alexander Kharlamov
ruft val U = svd.U.toBlockMatrix().toLocalMatrix()
auf, die die Matrix im Treiber sammelt. Das gleiche passiert mit der Antwort von @Climbs_lika_Spyder
(übrigens Ihr Nick Felsen !!), die svd.U.rows.collect.flatMap(x => x.toArray)
ruft. Ich würde eher vorschlagen, sich auf eine verteilte Matrixmultiplikation wie den Scala Code zu verlassen, der here bekannt gegeben wird.
Ich sehe keine umgekehrten Berechnungen bei der Verknüpfung, die Sie hinzugefügt haben. –
@Climbs_lika_Spyder Der Link bezieht sich auf verteilte Matrixmultiplikation, um die lokale Matrixmultiplikation '(V.multiply (invS)) .multiple (U)' in der letzten Zeile Ihrer Lösung zu ersetzen, so dass Sie 'U' nicht sammeln müssen im Fahrer. Ich denke, 'V' und' invS' sind nicht groß genug, um Probleme zu verursachen. – Pablo