2016-07-09 11 views
1

Ich versuche, Daten von kafka zu streamenkafka.cluster.BrokerEndPoint kann nicht auf kafka.cluster.Broker gegossen werden

Funke Ich Funke bin mit 1.6.2 mit kafka 0.9.0.1 und scala 2.11.8

alles funktioniert gut, wenn ich den Hörer basierten Ansatz verwenden (KafkaUtils.createStream()) aber wenn ich versuche, den direkten Weg ohne Empfänger wie diese

val kafkaStreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, 
    Map("group.id" -> "blah", 
    "auto.offset.reset" -> "smallest", 
    "metadata.broker.list" -> "127.0.0.1:9092", 
    "bootstrap.servers"-> "127.0.0.1:9092"), 
    Set("tweets") 
) 

ich diesen Fehler

Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90) 
at scala.Option.map(Option.scala:146) 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90) 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87) 
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) 
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) 
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87) 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86) 
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
at scala.collection.immutable.Set$Set1.foreach(Set.scala:94) 
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) 
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86) 
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85) 
at scala.util.Either$RightProjection.flatMap(Either.scala:522) 
at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85) 
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179) 
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161) 
at org.apache.spark.streaming.kafka.KafkaCluster.getEarliestLeaderOffsets(KafkaCluster.scala:155) 
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:213) 
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211) 
at scala.util.Either$RightProjection.flatMap(Either.scala:522) 
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211) 
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) 
at SparkStreaming$.delayedEndpoint$SparkStreaming$1(SparkStreaming.scala:32) 
at SparkStreaming$delayedInit$body.apply(SparkStreaming.scala:24) 
at scala.Function0$class.apply$mcV$sp(Function0.scala:34) 
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) 
at scala.App$$anonfun$main$1.apply(App.scala:76) 
at scala.App$$anonfun$main$1.apply(App.scala:76) 
at scala.collection.immutable.List.foreach(List.scala:381) 
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) 
at scala.App$class.main(App.scala:76) 
at SparkStreaming$.main(SparkStreaming.scala:24) 
at SparkStreaming.main(SparkStreaming.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:497) 
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) 

und das sind meine Abhängigkeiten

"org.apache.spark" %% "spark-streaming-kafka" % "1.6.2", 
"org.apache.spark" %% "spark-core" % "1.6.2", 
"org.apache.spark" % "spark-streaming_2.11" % "1.6.2", 
"org.apache.kafka" %% "kafka" % "0.9.0.1" 

Ich kann nicht sehen, wo das Problem ist? Kann mir bitte jemand helfen?

Antwort

6

Laut der Spark Streaming-Dokumentation here ist Spark Streaming 1.6.2 mit Kakfa 0.8.2.1 kompatibel.

Kafka: Funken 1.6.2 Streaming ist kompatibel mit Kafka 0.8.2.1

So Ihr Problem Verwendung kafka Bibliotheken der Version 0.8.2.1 statt 0.9.0.1 zu lösen.

Hoffe, das hilft!

+0

Vielen Dank für Ihre Antwort. Ich frage mich nur, ob die Verwendung von 0.8.2.1-Bibliotheken mit 0.9.0.1-Brokern irgendwelche Kompatibilitätsprobleme verursachen wird! –

+0

Kafka 0.8.2.1 Bibliotheken sind kompatibel mit Kafka 0.9.0.1 Es wird also keine Probleme geben. – avr

+0

ok danke nochmal. Entschuldigung dafür, dass du nicht aufhebst, antwortest du. Ich habe noch nicht genug Ruf: D –