3

TL; DR:Spark-Streaming-Anwendung fehlschlägt mit KafkaException: String überschreitet die maximale Größe oder mit Illegal

Meine sehr einfachen Funken Streaming-Anwendung fehlschlägt in den Fahrern mit der „KafkaException: String die maximale Größe überschreitet“. Ich sehe die gleiche Ausnahme in dem Testamentsvollstrecker, aber ich fand auch irgendwo auf den Testamentsvollstrecker der Protokolle eine Illegal ohne weitere Informationen in ihm

Voll Problem:

Ich bin mit Spark-Streaming einige Nachrichten von einem Kafka Thema lesen . Das ist, was ich tue:

val conf = new SparkConf().setAppName("testName") 
val streamingContext = new StreamingContext(new SparkContext(conf), Milliseconds(millis)) 
val kafkaParams = Map(
     "metadata.broker.list" -> "somevalidaddresshere:9092", 
     "auto.offset.reset" -> "largest" 
    ) 
val topics = Set("data") 
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     streamingContext, 
     kafkaParams, 
     topics 
    ).map(_._2) // only need the values not the keys 

Was ich mit dem Kafka Daten tun nur es wird Druck mit:

stream.print() 

Meine Anwendung offensichtlich mehr Code als diese hat aber, um Lokalisieren Sie mein Problem Ich entfernte alles, was ich nur konnte aus dem Code

Ich versuche, diesen Code auf YARN ausführen. Dies ist mein Funke einreichen Linie:

./spark-submit --class com.somecompany.stream.MainStream --master yarn --deploy-mode cluster myjar.jar hdfs://some.hdfs.address.here/user/spark/streamconfig.properties 

Die streamconfig.properties Datei ist nur eine ganz normale Eigenschaften Datei, die wahrscheinlich irrelevant für das Problem hier ist

Nach dem Versuch, die Anwendung es nicht ziemlich schnell mit der auszuführen folgende Ausnahme auf dem Fahrer:

16/05/10 06:15:38 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, some.hdfs.address.here): kafka.common.KafkaException: String exceeds the maximum size of 32767. 
    at kafka.api.ApiUtils$.shortStringLength(ApiUtils.scala:73) 
    at kafka.api.TopicData$.headerSize(FetchResponse.scala:107) 
    at kafka.api.TopicData.<init>(FetchResponse.scala:113) 
    at kafka.api.TopicData$.readFrom(FetchResponse.scala:103) 
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170) 
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.immutable.Range.foreach(Range.scala:141) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
    at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169) 
    at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135) 
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192) 
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) 
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

ich meinen Code in dem Stack-Trace nicht einmal sehen

Executor Prüfungs fand ich die gleiche Ausnahme wie in den Fahrer, sondern auch begraben ist tief unten die folgende Ausnahme:

16/05/10 06:40:47 ERROR executor.Executor: Exception in task 0.0 in stage 2.0 (TID 8) 
java.lang.IllegalArgumentException 
    at java.nio.Buffer.limit(Buffer.java:275) 
    at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38) 
    at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:100) 
    at kafka.api.TopicData$$anonfun$1.apply(FetchResponse.scala:98) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.immutable.Range.foreach(Range.scala:141) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at kafka.api.TopicData$.readFrom(FetchResponse.scala:98) 
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170) 
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.immutable.Range.foreach(Range.scala:141) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
    at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169) 
    at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135) 
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192) 
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) 
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Ich habe keine Ahnung, was die IllegalArgument ist da keine Informationen

Die Spark-Version enthalten ist Mein YARN benutzt 1.6.0. Ich habe auch überprüft, dass mein Pom Spark 1.6.0 und keine frühere Version enthält. Mein Bereich ist "zur Verfügung gestellt"

Ich manuell lesen Sie die Daten aus dem gleichen Thema und die Daten gibt es einfach nur JSONs. Die Daten dort sind nicht riesig. Auf jeden Fall kleiner als 32767. Auch ich bin in der Lage, diese Daten zu lesen Verbraucher mit der regulären Befehlszeile so, dass seltsam

googeln diese Ausnahme leider keine nützliche Informationen geliefert hat

Hat jemand eine Idee, wie man verstehe was genau ist das Problem hier?

Vielen Dank im Voraus

+2

Ist das Thema in der Frage ("Daten") der tatsächliche Themenname, den Sie verwenden?Nach dem Stack-Trace im Quellcode wird in diesem Fall die Überprüfung der Topic-Länge fehlschlagen. – maasg

+0

Nein, ich habe es für die Frage geändert, der echte Themenname ist nur eine reguläre Zeichenfolge, die gleiche, die ich benutze, wenn ich über die Befehlszeile darauf zugreife. – Gideon

+0

Wie lange ist das? – maasg

Antwort

1

Nach viel zu graben Ich glaube, ich fand, was das Problem war. Ich führe Spark auf YARN (1.6.0-cdh5.7.0). Cloudera hat den neuen Kafka-Client (Version 0.9), der eine Änderung gegenüber den früheren Versionen hatte. Unser Kafka ist jedoch Version 0.8.2.