Ich versuche, Daten von kafka zu streamenkafka.cluster.BrokerEndPoint kann nicht auf kafka.cluster.Broker gegossen werden
Funke Ich Funke bin mit 1.6.2 mit kafka 0.9.0.1 und scala 2.11.8
alles funktioniert gut, wenn ich den Hörer basierten Ansatz verwenden (KafkaUtils.createStream()) aber wenn ich versuche, den direkten Weg ohne Empfänger wie diese
val kafkaStreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
Map("group.id" -> "blah",
"auto.offset.reset" -> "smallest",
"metadata.broker.list" -> "127.0.0.1:9092",
"bootstrap.servers"-> "127.0.0.1:9092"),
Set("tweets")
)
ich diesen Fehler
Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:146)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
at scala.util.Either$RightProjection.flatMap(Either.scala:522)
at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
at org.apache.spark.streaming.kafka.KafkaCluster.getEarliestLeaderOffsets(KafkaCluster.scala:155)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:213)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
at scala.util.Either$RightProjection.flatMap(Either.scala:522)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at SparkStreaming$.delayedEndpoint$SparkStreaming$1(SparkStreaming.scala:32)
at SparkStreaming$delayedInit$body.apply(SparkStreaming.scala:24)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at SparkStreaming$.main(SparkStreaming.scala:24)
at SparkStreaming.main(SparkStreaming.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
und das sind meine Abhängigkeiten
"org.apache.spark" %% "spark-streaming-kafka" % "1.6.2",
"org.apache.spark" %% "spark-core" % "1.6.2",
"org.apache.spark" % "spark-streaming_2.11" % "1.6.2",
"org.apache.kafka" %% "kafka" % "0.9.0.1"
Ich kann nicht sehen, wo das Problem ist? Kann mir bitte jemand helfen?
Vielen Dank für Ihre Antwort. Ich frage mich nur, ob die Verwendung von 0.8.2.1-Bibliotheken mit 0.9.0.1-Brokern irgendwelche Kompatibilitätsprobleme verursachen wird! –
Kafka 0.8.2.1 Bibliotheken sind kompatibel mit Kafka 0.9.0.1 Es wird also keine Probleme geben. – avr
ok danke nochmal. Entschuldigung dafür, dass du nicht aufhebst, antwortest du. Ich habe noch nicht genug Ruf: D –