2016-05-06 27 views
1

Ich möchte Spark RowSimilarity Recommender auf Daten von mongodb erhalten. Zu diesem Zweck habe ich unten Code geschrieben, der Input von Mongo nimmt, wandelt es in RDD von Objects um. Dies muss zu IndexedDataSetSpark geben werden, die dann an SimilarityAnalysis.rowSimilarityIDS geben wirdScala - Create IndexedDatasetSpark Objekt

import org.apache.hadoop.conf.Configuration 
import org.apache.mahout.math.cf.SimilarityAnalysis 
import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark 
import org.apache.spark.rdd.{NewHadoopRDD, RDD} 
import org.apache.spark.{SparkConf, SparkContext} 
import org.bson.BSONObject 
import com.mongodb.hadoop.MongoInputFormat 

object SparkExample extends App { 
    val mongoConfig = new Configuration() 
    mongoConfig.set("mongo.input.uri", "mongodb://my_mongo_ip:27017/db.collection") 

    val sparkConf = new SparkConf() 
    val sc = new SparkContext("local", "SparkExample", sparkConf) 

    val documents: RDD[(Object, BSONObject)] = sc.newAPIHadoopRDD(
    mongoConfig, 
    classOf[MongoInputFormat], 
    classOf[Object], 
    classOf[BSONObject] 
) 
    val new_doc: RDD[(String, String)] = documents.map(
    doc1 => (
    doc1._2.get("product_id").toString(), 
    doc1._2.get("product_attribute_value").toString().replace("[ \"", "").replace("\"]", "").split("\" , \"").map(value => value.toLowerCase.replace(" ", "-")).mkString(" ") 
    ) 
) 
    var myIDs = IndexedDatasetSpark(new_doc)(sc) 

    SimilarityAnalysis.rowSimilarityIDS(myIDs).dfsWrite("hdfs://myhadoop:9000/myfile", readWriteSchema) 

Ich kann keine IndexedDatasetSpark schaffen, die zu SimilarityAnalysis.rowSimilarityIDS geben werden kann. Bitte helfen Sie mir in dieser Angelegenheit.

Edit1: richtig

konnte ich kompiliert das IndexedDatasetSpark Objekt und der Code nun erstellen. Ich hatte (sc) als implizites Argument IndexedDatasetSpark für den Code hinzuzufügen auszuführen:

Error: could not find implicit value for parameter sc: org.apache.spark.SparkContext 

Nun, wenn ich es laufen, es unten Fehler gibt:

Error: could not find implicit value for parameter sc: org.apache.mahout.math.drm.DistributedContext 

ich kann nicht herausfinden, wie man geben VerteilterKontext.

Ist dies der richtige Weg, um RDD zu erstellen und es in IDS zu konvertieren, damit es von rowSimilarityIDS verarbeitet werden kann?

Weitere Kontext: Ich habe aus dieser Situation begonnen: Run Mahout RowSimilarity recommender on MongoDB data

Mein build.sbt:

name := "scala-mongo" 

version := "1.0" 

scalaVersion := "2.10.6" 

libraryDependencies += "org.mongodb" %% "casbah" % "3.1.1" 

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1" 
libraryDependencies += "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.2" 

libraryDependencies ++= Seq(
    "org.apache.hadoop" % "hadoop-client" % "2.6.0" exclude("javax.servlet", "servlet-api") exclude ("com.sun.jmx", "jmxri") exclude ("com.sun.jdmk", "jmxtools") exclude ("javax.jms", "jms") exclude ("org.slf4j", "slf4j-log4j12") exclude("hsqldb","hsqldb"), 
    "org.scalatest" % "scalatest_2.10" % "1.9.2" % "test" 
) 

libraryDependencies += "org.apache.mahout" % "mahout-math-scala_2.10" % "0.11.2" 
libraryDependencies += "org.apache.mahout" % "mahout-spark_2.10" % "0.11.2" 
libraryDependencies += "org.apache.mahout" % "mahout-math" % "0.11.2" 
libraryDependencies += "org.apache.mahout" % "mahout-hdfs" % "0.11.2" 

resolvers += "typesafe repo" at "http://repo.typesafe.com/typesafe/releases/" 

resolvers += Resolver.mavenLocal 

Edit2: I dfsWrite vorübergehend entfernt haben Sie den Code ausführen und stolperte über unten Fehler zu lassen :

java.io.NotSerializableException: org.apache.mahout.math.DenseVector 
Serialization stack: 
- object not serializable (class: org.apache.mahout.math.DenseVector, value: {3:1.0,8:1.0,10:1.0}) 
- field (class: scala.Some, name: x, type: class java.lang.Object) 
- object (class scala.Some, Some({3:1.0,8:1.0,10:1.0})) 
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

Gibt es einige Serialisierung, die ich möglicherweise übersprungen habe?

+0

hast du vergessen, den fehler zu zeigen? – pferrel

+0

@ferrel: Ich habe die Frage mit dem letzten Fehler bearbeitet. Bitte lassen Sie mich wissen, wenn ich die korrekte Vorgehensweise in Scala/Spark/Mahout befolge. – user3295878

+0

@pferrel: Nachdem ich dfsWrite entfernt habe und rowSimilarity laufen lassen habe, habe ich ein neues Problem bekommen. Habe die Frage aktualisiert. – user3295878

Antwort

0

Ich würde zurückstellen, was auch immer Sie entfernt haben, der sekundäre Fehler ist selbstverschuldet.

Der ursprüngliche Fehler ist, weil Sie keine SparkContext erstellt haben, die getan werden kann:

implicit val mc = mahoutSparkContext() 

Danach denke ich, die implizite Umwandlung des mc (a SparkDistributedContext) zu sc (a SparkContext) sein wird, wird von den Pakethelferfunktionen gehandhabt. Wenn die sc immer noch fehlt versuchen:

implicit val sc = sdc2sc(mc) 
+0

Vielen Dank @Pferrel. Ich habe über MahoutSparkContext herausgefunden. Aber ich musste (mc) beide Funktionen explizit übergeben, damit es funktioniert. Soll ich den endgültigen Code posten? – user3295878

+0

Es klingt wie Sie Ihre eigene Frage beantworten können? – pferrel

+0

Ich schaffte es, den Code nach dem Betrachten dieser [Link] (https://mahout.apache.org/users/environment/how-to-build-an-app.html) zu arbeiten. Ich weiß immer noch nicht, ob das der richtige Weg ist. Soll ich meinen Code als Antwort posten? – user3295878