Ich möchte Datenzeilen, die in externen MySQL-Datenbank nach alle 2 Minuten eingefügt wird. Ich möchte diese Sache mit Spark Streaming zu tun.Spark Streaming MYSql
Aber ich habe diesen Fehler nach Programmablauf für einen time.So seine mir die Daten zum ersten Mal gibt aber danach bekam ich die folgende Fehlermeldung und das Programm beendet
Fehler ich habe ist
16/08/02 11:15:44 INFO JdbcRDD: closed connection
16/08/02 11:15:44 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 620 bytes result sent to driver
16/08/02 11:15:44 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 451 ms on localhost (1/1)
16/08/02 11:15:44 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/08/02 11:15:44 INFO DAGScheduler: ResultStage 0 (foreach at databaseread.scala:33) finished in 0.458 s
16/08/02 11:15:44 INFO DAGScheduler: Job 0 finished: foreach at databaseread.scala:33, took 0.664559 s
16/08/02 11:15:44 ERROR StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:543)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:595)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:594)
at org.test.spark.databaseread$.main(databaseread.scala:41)
at org.test.spark.databaseread.main(databaseread.scala)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:224)
ich meinen Code über here.Please Entsendung mir helfen
package org.test.spark
import org.xml.sax.helpers.NewInstance
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.JdbcRDD
import java.sql.DriverManager
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
object databaseread {
def main(args:Array[String])
{
val url="jdbc:mysql://localhost:3306/dbname"
val uname="root"
val pwd="root"
var i=0
val driver="com.mysql.jdbc.Driver"
val conf=new SparkConf().setAppName("DBget").setMaster("local")
val sc=new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(60))
val RDD=new JdbcRDD(sc,()=>DriverManager.getConnection(url,uname,pwd),
"select * from crimeweathercoords where ?
=?",1,1,1,r=>r.getString("Borough")+","+r.getString("Month"))
ssc.checkpoint(".")
ssc.start()
ssc.awaitTermination()
}
}
Vielen Dank.Will dies versuchen .. !!! –