Ich versuche, eine Eingabedatei basierend auf accountId
zu partitionieren Aber diese Partition wurde nur erstellt, wenn dataFrames mehr als 1000 Datensätze enthält. Die ist eine dynamische Ganzzahl, die nicht bekannt sein konnte. Betrachten Sie den folgenden Code unterSo führen Sie eine dynamische Partition basierend auf Zeilenanzahl in DataFram für einen Spaltenwert
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.textFileStream("input")
lines.print()
lines.foreachRDD { rdd =>
val count = rdd.count()
if (count > 0) {
val df = sqlContext.read.json(rdd)
val filteredDF = df.filter(df("accountId")==="3")
if (filteredDF.count() > 1000) {
df.write.partitionBy("accountId").format("json").save("output")
}
}
}
ssc.start()
ssc.awaitTermination()
Aber der obige Code partitioniert alle accountId, die nicht benötigt wird.
- Ich möchte die Anzahl für jede
accountId
im Datenframe finden. - Wenn Datensätze für jede accountId 1000 überschreiten, schreiben Sie die partitionierten Informationen in die Ausgabequelle.
Zum Beispiel, wenn die Eingabedatei 1500 Datensätze für accountId hat = 1 und 10 Datensätze für accountId = 2, dann filtriert Partition Datenrahmen basierend auf accountId = 1 in Ausgangsquelle und halten accountId = 2 Datensätze in memmory.
Wie erreicht man dies mithilfe von Spark-Streaming?