2016-04-12 18 views
1

Ich habe diese einfache Kafka-Streamkafka Weitstrahl DSTREAM Karte nicht gedruckt

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 

// Each Kafka message is a flight 
val flights = messages.map(_._2) 

flights.foreachRDD(rdd => { 
    println("--- New RDD with " + rdd.partitions.length + " partitions and " + rdd.count() + " flight records"); 
    rdd.map { flight => {   
    val flightRows = FlightParser.parse(flight) 
    println ("Parsed num rows: " + flightRows) 
    } 
    }   
}) 

ssc.start() 
ssc.awaitTermination() 

Kafka Nachrichten hat Spark Streaming es in der Lage, sie als RDDs zu bekommen. Aber das zweite println in meinem Code druckt nichts. Ich schaute auf Treiber-Konsolen-Logs, wenn im lokalen [2] -Modus lief, überprüfte Garn-Logs, wenn im Garn-Client-Modus lief.

Was fehlt mir?

Statt rdd.map, der folgende Code druckt gut in Funkenfahrerkonsole:

for(flight <- rdd.collect().toArray) { 
    val flightRows = FlightParser.parse(flight) 
    println ("Parsed num rows: " + flightRows) 
} 

Aber ich habe Angst, dass die Verarbeitung auf diesem Flugobjekt in Funken Fahrer Projekt passieren könnte, statt Testamentsvollstrecker. Bitte korrigieren Sie mich, falls ich falsch liege.

Dank

+1

Haben Sie sich die * worker * executor logs angeschaut? Vielleicht finden Sie Ihre 'FlightParser' Klasse nicht? –

Antwort

1

rdd.map ist eine faule Transformation. Es wird nicht realisiert, es sei denn, es wird eine Aktion zu dieser RDD aufgerufen.
In diesem speziellen Fall könnten wir rdd.foreach verwenden, was eine der allgemeinsten Aktionen auf RDD ist, die uns Zugriff auf jedes Element in der RDD gibt.

flights.foreachRDD{ rdd => 
    rdd.foreach { flight =>   
     val flightRows = FlightParser.parse(flight) 
     println ("Parsed num rows: " + flightRows) // prints on the stdout of each executor independently 
    } 
} 

Da diese RDD Aktion in dem Testamentsvollstrecker ausgeführt wird, werden wir den println Ausgang in dem STDOUT Testamentsvollstreckers finden. Wenn Sie die Daten stattdessen auf dem Treiber drucken möchten, können Sie collect die Daten der RDD innerhalb der DStream.foreachRDD Schließung.

flights.foreachRDD{ rdd => 
    val allFlights = rdd.collect() 
    println(allFlights.mkString("\n")) // prints to the stdout of the driver 
} 
+0

Danke @massg für Ihren Vorschlag. Wenn ich Ihren ersten Ansatz versuchen, erhalte ich die folgende Ausnahme: org.apache.spark.SparkException: Aufgabe nicht serializable Verursacht durch: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext Ich schätze das passiert, weil die Flugvariable nur in Spark Driver und nicht auf den Executoren verfügbar ist. Was fehlt mir? –