Ich möchte Spark RowSimilarity Recommender auf Daten von mongodb erhalten. Zu diesem Zweck habe ich unten Code geschrieben, der Input von Mongo nimmt, wandelt es in RDD von Objects um. Dies muss zu IndexedDataSetSpark geben werden, die dann an SimilarityAnalysis.rowSimilarityIDS geben wirdScala - Create IndexedDatasetSpark Objekt
import org.apache.hadoop.conf.Configuration
import org.apache.mahout.math.cf.SimilarityAnalysis
import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import org.apache.spark.rdd.{NewHadoopRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}
import org.bson.BSONObject
import com.mongodb.hadoop.MongoInputFormat
object SparkExample extends App {
val mongoConfig = new Configuration()
mongoConfig.set("mongo.input.uri", "mongodb://my_mongo_ip:27017/db.collection")
val sparkConf = new SparkConf()
val sc = new SparkContext("local", "SparkExample", sparkConf)
val documents: RDD[(Object, BSONObject)] = sc.newAPIHadoopRDD(
mongoConfig,
classOf[MongoInputFormat],
classOf[Object],
classOf[BSONObject]
)
val new_doc: RDD[(String, String)] = documents.map(
doc1 => (
doc1._2.get("product_id").toString(),
doc1._2.get("product_attribute_value").toString().replace("[ \"", "").replace("\"]", "").split("\" , \"").map(value => value.toLowerCase.replace(" ", "-")).mkString(" ")
)
)
var myIDs = IndexedDatasetSpark(new_doc)(sc)
SimilarityAnalysis.rowSimilarityIDS(myIDs).dfsWrite("hdfs://myhadoop:9000/myfile", readWriteSchema)
Ich kann keine IndexedDatasetSpark schaffen, die zu SimilarityAnalysis.rowSimilarityIDS geben werden kann. Bitte helfen Sie mir in dieser Angelegenheit.
Edit1: richtig
konnte ich kompiliert das IndexedDatasetSpark Objekt und der Code nun erstellen. Ich hatte (sc)
als implizites Argument IndexedDatasetSpark
für den Code hinzuzufügen auszuführen:
Error: could not find implicit value for parameter sc: org.apache.spark.SparkContext
Nun, wenn ich es laufen, es unten Fehler gibt:
Error: could not find implicit value for parameter sc: org.apache.mahout.math.drm.DistributedContext
ich kann nicht herausfinden, wie man geben VerteilterKontext.
Ist dies der richtige Weg, um RDD zu erstellen und es in IDS zu konvertieren, damit es von rowSimilarityIDS verarbeitet werden kann?
Weitere Kontext: Ich habe aus dieser Situation begonnen: Run Mahout RowSimilarity recommender on MongoDB data
Mein build.sbt:
name := "scala-mongo"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies += "org.mongodb" %% "casbah" % "3.1.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"
libraryDependencies += "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.2"
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client" % "2.6.0" exclude("javax.servlet", "servlet-api") exclude ("com.sun.jmx", "jmxri") exclude ("com.sun.jdmk", "jmxtools") exclude ("javax.jms", "jms") exclude ("org.slf4j", "slf4j-log4j12") exclude("hsqldb","hsqldb"),
"org.scalatest" % "scalatest_2.10" % "1.9.2" % "test"
)
libraryDependencies += "org.apache.mahout" % "mahout-math-scala_2.10" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-spark_2.10" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-math" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-hdfs" % "0.11.2"
resolvers += "typesafe repo" at "http://repo.typesafe.com/typesafe/releases/"
resolvers += Resolver.mavenLocal
Edit2: I dfsWrite vorübergehend entfernt haben Sie den Code ausführen und stolperte über unten Fehler zu lassen :
java.io.NotSerializableException: org.apache.mahout.math.DenseVector
Serialization stack:
- object not serializable (class: org.apache.mahout.math.DenseVector, value: {3:1.0,8:1.0,10:1.0})
- field (class: scala.Some, name: x, type: class java.lang.Object)
- object (class scala.Some, Some({3:1.0,8:1.0,10:1.0}))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Gibt es einige Serialisierung, die ich möglicherweise übersprungen habe?
hast du vergessen, den fehler zu zeigen? – pferrel
@ferrel: Ich habe die Frage mit dem letzten Fehler bearbeitet. Bitte lassen Sie mich wissen, wenn ich die korrekte Vorgehensweise in Scala/Spark/Mahout befolge. – user3295878
@pferrel: Nachdem ich dfsWrite entfernt habe und rowSimilarity laufen lassen habe, habe ich ein neues Problem bekommen. Habe die Frage aktualisiert. – user3295878