2016-04-04 6 views
2

Ich versuche, LDA auf Wikipedia XML-Dump durchzuführen. Nachdem ich eine RDD von unformatiertem Text bekommen habe, erstelle ich einen Datenrahmen und transformiere ihn durch Tokenizer, StopWords und CountVectorizer Pipelines. Ich beabsichtige, die RDD von Vectors output von CountVectorizer zu OnlineLDA in MLLib zu übergeben. Hier ist mein Code:Konvertieren JavaRDD <Row> zu JavaRDD <Vector>

// Configure an ML pipeline 
RegexTokenizer tokenizer = new RegexTokenizer() 
    .setInputCol("text") 
    .setOutputCol("words"); 

StopWordsRemover remover = new StopWordsRemover() 
      .setInputCol("words") 
      .setOutputCol("filtered"); 

CountVectorizer cv = new CountVectorizer() 
      .setVocabSize(vocabSize) 
      .setInputCol("filtered") 
      .setOutputCol("features"); 

Pipeline pipeline = new Pipeline() 
      .setStages(new PipelineStage[] {tokenizer, remover, cv}); 

// Fit the pipeline to train documents. 
PipelineModel model = pipeline.fit(fileDF); 

JavaRDD<Vector> countVectors = model.transform(fileDF) 
      .select("features").toJavaRDD() 
      .map(new Function<Row, Vector>() { 
      public Vector call(Row row) throws Exception { 
       Object[] arr = row.getList(0).toArray(); 

       double[] features = new double[arr.length]; 
       int i = 0; 
       for(Object obj : arr){ 
        features[i++] = (double)obj; 
       } 
       return Vectors.dense(features); 
      } 
      }); 

ich die Klasse Guss Ausnahme bekommen, weil der Linie

Object[] arr = row.getList(0).toArray(); 


Caused by: java.lang.ClassCastException: org.apache.spark.mllib.linalg.SparseVector cannot be cast to scala.collection.Seq 
at org.apache.spark.sql.Row$class.getSeq(Row.scala:278) 
at org.apache.spark.sql.catalyst.expressions.GenericRow.getSeq(rows.scala:192) 
at org.apache.spark.sql.Row$class.getList(Row.scala:286) 
at org.apache.spark.sql.catalyst.expressions.GenericRow.getList(rows.scala:192) 
at xmlProcess.ParseXML$2.call(ParseXML.java:142) 
at xmlProcess.ParseXML$2.call(ParseXML.java:1) 

ich die Scala Syntax gefunden here, dies zu tun, konnte aber kein Beispiel finden, es zu tun in Java. Ich habe versucht row.getAs[Vector](0), aber das ist nur Scala-Syntax. Irgendwelche Möglichkeiten, es in Java zu tun?

Antwort

3

So konnte ich es mit einem einfachen Cast zu Vector tun. Ich weiß nicht, warum ich die einfachen Dinge nicht zuerst versucht habe!

  JavaRDD<Vector> countVectors = model.transform(fileDF) 
       .select("features").toJavaRDD() 
       .map(new Function<Row, Vector>() { 
       public Vector call(Row row) throws Exception { 
        return (Vector)row.get(0); 
       } 
       }); 
0

Sie brauchen nicht die DataFrame/DataSet einen JavaRDD verdeckte es mit LDA zu arbeiten. Nach einigen Stunden des Fiedelns habe ich endlich den Eingeborenen rdd in Scala zum arbeiten bekommen.

Relevante Importe:

import org.apache.spark.ml.feature.{CountVectorizer, RegexTokenizer, StopWordsRemover} 
import org.apache.spark.ml.linalg.{Vector => MLVector} 
import org.apache.spark.mllib.clustering.{LDA, OnlineLDAOptimizer} 
import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.sql.{Row, SparkSession} 

Der Ausschnitt aus dem Code der Rest folgt bleibt die gleiche wie this example:

val cvModel = new CountVectorizer() 
     .setInputCol("filtered") 
     .setOutputCol("features") 
     .setVocabSize(vocabSize) 
     .fit(filteredTokens) 


val countVectors = cvModel 
     .transform(filteredTokens) 
     .select("docId","features") 
     .rdd.map { case Row(docId: String, features: MLVector) => 
        (docId.toLong, Vectors.fromML(features)) 
       } 
val mbf = { 
    // add (1.0/actualCorpusSize) to MiniBatchFraction be more robust on tiny datasets. 
    val corpusSize = countVectors.count() 
    2.0/maxIterations + 1.0/corpusSize 
    } 
    val lda = new LDA() 
    .setOptimizer(new OnlineLDAOptimizer().setMiniBatchFraction(math.min(1.0, mbf))) 
    .setK(numTopics) 
    .setMaxIterations(2) 
    .setDocConcentration(-1) // use default symmetric document-topic prior 
    .setTopicConcentration(-1) // use default symmetric topic-word prior 

    val startTime = System.nanoTime() 
    val ldaModel = lda.run(countVectors) 
    val elapsed = (System.nanoTime() - startTime)/1e9 

    /** 
    * Print results. 
    */ 
    // Print training time 
    println(s"Finished training LDA model. Summary:") 
    println(s"Training time (sec)\t$elapsed") 
    println(s"==========") 

Dank here an den Autor des Codes geht.