2016-08-03 28 views
1

Ich möchte Datenzeilen, die in externen MySQL-Datenbank nach alle 2 Minuten eingefügt wird. Ich möchte diese Sache mit Spark Streaming zu tun.Spark Streaming MYSql

Aber ich habe diesen Fehler nach Programmablauf für einen time.So seine mir die Daten zum ersten Mal gibt aber danach bekam ich die folgende Fehlermeldung und das Programm beendet

Fehler ich habe ist

16/08/02 11:15:44 INFO JdbcRDD: closed connection 
16/08/02 11:15:44 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 620 bytes result sent to driver 
16/08/02 11:15:44 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 451 ms on localhost (1/1) 
16/08/02 11:15:44 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/08/02 11:15:44 INFO DAGScheduler: ResultStage 0 (foreach at databaseread.scala:33) finished in 0.458 s 
16/08/02 11:15:44 INFO DAGScheduler: Job 0 finished: foreach at databaseread.scala:33, took 0.664559 s 
16/08/02 11:15:44 ERROR StreamingContext: Error starting the context, marking it as stopped 
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 
    at scala.Predef$.require(Predef.scala:224) 
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163) 
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:543) 
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:595) 
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:594) 
    at org.test.spark.databaseread$.main(databaseread.scala:41) 
    at org.test.spark.databaseread.main(databaseread.scala) 
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 
    at scala.Predef$.require(Predef.scala:224) 

ich meinen Code über here.Please Entsendung mir helfen

package org.test.spark 

import org.xml.sax.helpers.NewInstance 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.rdd.JdbcRDD 
import java.sql.DriverManager 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.Seconds 

object databaseread { 
     def main(args:Array[String]) 
     { 
       val url="jdbc:mysql://localhost:3306/dbname" 
       val uname="root" 
       val pwd="root" 
       var i=0 
       val driver="com.mysql.jdbc.Driver" 
       val conf=new SparkConf().setAppName("DBget").setMaster("local") 
       val sc=new SparkContext(conf) 
       val ssc = new StreamingContext(sc, Seconds(60)) 

     val RDD=new JdbcRDD(sc,()=>DriverManager.getConnection(url,uname,pwd), 
      "select * from crimeweathercoords where ? 
       =?",1,1,1,r=>r.getString("Borough")+","+r.getString("Month")) 



     ssc.checkpoint(".") 


     ssc.start() 
     ssc.awaitTermination() 


     } 
    } 

Antwort

0

Spark-Streaming ist nicht darauf ausgelegt, Daten von einem RDBMS wie MySQL zu lesen. Sie könnten versuchen, Ihren eigenen benutzerdefinierten Empfänger zu erstellen, aber zu diesem Zeitpunkt ist es möglicherweise einfacher, die normale Spark API zu verwenden, um die Daten in Blöcken abzurufen.

+0

Vielen Dank.Will dies versuchen .. !!! –