2016-05-06 7 views
0

Ich erstelle einen Spark-Job-Server, der mit Cassandra verbindet. Nachdem ich die Datensätze erhalten habe, möchte ich eine einfache Gruppe durchführen und auf sie eingehen. Ich kann die Daten abrufen, ich konnte die Ausgabe nicht drucken. Ich habe google stundenlang ausprobiert und auch in cassandra google groups gepostet. Mein aktueller Code ist wie folgt und ich erhalte Fehler beim Sammeln.Map reduzieren, um Gruppe durchzuführen und Summe in Cassandra, mit Funken und Job-Server

override def runJob(sc: SparkContext, config: Config): Any = { 
//sc.cassandraTable("store", "transaction").select("terminalid","transdate","storeid","amountpaid").toArray().foreach (println) 
// Printing of each record is successful 
val rdd = sc.cassandraTable("POSDATA", "transaction").select("terminalid","transdate","storeid","amountpaid") 
val map1 = rdd.map (x => (x.getInt(0), x.getInt(1),x.getDate(2))->x.getDouble(3)).reduceByKey((x,y)=>x+y) 
println(map1) 
// output is ShuffledRDD[3] at reduceByKey at Daily.scala:34 
map1.collect 
//map1.ccollectAsMap().map(println(_)) 
//Throwing error java.lang.ClassNotFoundException: transaction.Daily$$anonfun$2 

}

+0

Haben Sie Spark Cassandra-Connector-Laufzeitbibliotheken auf Worker-Knoten? – noorul

+0

Es ist nützlich, daran zu denken, dass Spark faul ist - Transformationen werden erst angewendet, wenn Sie die letzte Aktion aufrufen (wie collect, take, foreach, etc). Println erzwingt also keine Berechnung, sondern ruft toString on RDD auf. So können Sie nicht sicher sein, dass Daten abgerufen wurden –

+0

@ noorul ich habe Cassandra Connect-Treiber. Die untere Zeile druckt die Datensätze "sc.cassandraTable (" store "," transaction "). Select (" terminalid "," transdate "," storeid "," betragsmonitor "). ToArray(). Foreach (println)" – user3327953

Antwort

0

Ihre map1 ist ein RDD. Sie können Folgendes versuchen:

map1.foreach(r => println(r)) 
0

Spark macht faul Bewertung auf RDD. Versuchen Sie also eine Aktion

map1.take(10).foreach(println)