1
Ich habe den folgenden Code zum Starten einer SQL-Abfrage beim Streaming. Mein Problem ist, dass nach einem der Ergebnisse eine ArrayIndexOutOfBoundsException angezeigt wird. Warum passiert das?Spark SQL über Streaming - ArrayIndexOutOfBoundsException
import org.apache.spark._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration
import org.apache.spark.sql.functions.udf
object StreamingSQL {
case class Persons(name: String, age: Int)
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
val sc = new SparkContext(sparkConf)
// Create the context
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.textFileStream("/home/cloudera/Smartcare/stream/")
lines.foreachRDD(rdd=>rdd.foreach(println))
val sqc = new SQLContext(sc);
//import sqc.createSchemaRDD
import sqc.implicits._
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
lines.foreachRDD{rdd=>
val persons = rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).toDF()
persons.registerTempTable("data")
val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
teenagers.foreach(println)
}
ssc.start()
ssc.awaitTermination()
}
}
Dies ist die Ausgabe, die ich bekomme. Nach einem korrekten Ergebnis springe ich den Fehler:
16/03/23 16:58:56 INFO GenerateUnsafeProjection: Code generated in 131.828141 ms
[Edgar]
16/03/23 16:58:56 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.ArrayIndexOutOfBoundsException: 1
Mein txt ist:
Ana,31
Edgar,16
Luis,22
Noelia,26
Isabel50
Pablo,34
Laura,18
Paco,17
Ich würde starten, indem Sie überprüfen, ob die tatsächlichen Daten in der RDD zwei Felder hat. – eliasah
Ich habe überprüft, dass die RDD zwei Felder hat. – nest
Ich kann es jetzt nicht testen, aber es ist gut, dass Sie mindestens ein Eingabedatenbeispiel bereitgestellt haben. Können Sie versuchen, zu überprüfen, ob es Ihnen den gleichen Fehler gibt, wenn Sie es als eine ganze RDD lesen und die Transformation durchführen? – eliasah