Wir verwenden Spark 1.4 für Spark-Streaming. Kafka ist eine Datenquelle für den Spark-Stream.Datei wird bei Verwendung von saveAsNewAPIHadoopFile überschrieben
Aufzeichnungen werden jede Sekunde auf Kafka veröffentlicht. Unsere Anforderung besteht darin, auf Kafka veröffentlichte Datensätze in einem einzigen Ordner pro Minute zu speichern. Der Stream liest alle fünf Sekunden Datensätze. Zum Beispiel werden Aufzeichnungen, die während 1200 PM und 1201 PM veröffentlicht wurden, in dem Ordner "1200" gespeichert; zwischen 1201PM und 1202PM im Ordner "1201" und so weiter.
Der Code, den ich schrieb ist als
//First Group records in RDD by date
stream.foreachRDD (rddWithinStream -> {
JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory = rddWithinStream.mapToPair(t -> {
return new Tuple2<String, String> (targetHadoopFolder, t._2());
}).groupByKey();
// All records grouped by folders they will be stored in
// Create RDD for each target folder.
for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) {
JavaPairRDD <String, Iterable<String>> rddByKey = rddGroupedByDirectory.filter(groupedTuples -> {
return groupedTuples._1().equals(hadoopFolder);
});
// And store it in Hadoop
rddByKey.saveAsNewAPIHadoopFile(directory, String.class, String.class, TextOutputFormat.class);
}
folgt Da der Stream-Daten verarbeitet alle fünf Sekunden, wird saveAsNewAPIHadoopFile mehrmals in einer Minute aufgerufen. Dies bewirkt, dass die Datei "Part-00000" jedes Mal überschrieben wird.
Ich erwartete, dass saveAsNewAPIHadoopFile in dem Verzeichnis, das durch den Parameter "directory" angegeben wurde, die Datei part-0000N weiterhin erstellt, auch wenn ich einen einzigen Worker-Knoten habe.
Jede Hilfe/Alternativen werden sehr geschätzt.
Danke.
Meine timeToDirName func (dir + Zeit), und nach der Ausführung zeigt es das Verzeichnis, in hdfs aber wenn ich versuche, Zugriff darauf, es zeigt "Dir_Name existiert nicht" – JSR29