Nehmen wir an, Sie haben einen Spark-Datenrahmen df
mit einer Spalte timestamp
für die Zeit, sagen wir im Unix-Zeit-Format (Sekunden seit 1970). Wie mache ich Spark.Streaming dies als eine Eingabe behandeln, so dass ich Schiebefenster auf den Daten machen kann? Danke!pyspark: Wie konvertiert man Datenfelder mit einer Zeitspalte in ein Spark-Streaming-Objekt?
2
A
Antwort
0
Sie können nicht, oder zumindest nicht in einer sinnvollen Weise. Während es möglich ist, zu verwenden queueStream
Strom wie diese von RDD zu erzeugen:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 10)
df = sc.parallelize([(i,) for i in range(10000)]).toDF(["ts"])
stream = ssc.queueStream([df.rdd])
stream.count().pprint()
ssc.start()
ssc.awaitTermination()
wo die Entsprechung zwischen Batch- und Objekt in der Schlange 1: 1 ist. Leider ist queueStream
, im Gegensatz zu seinem Scala Gegenstück, ein statischer Stream. Neue Daten können nicht in die Warteschlange eingereiht werden, nachdem sie erstellt wurden. Dies bedeutet, dass Sie DataFrame
manuell in mehrere RDD aufgeteilt haben.
Danke @ Zero323. Wenn du es XY nennst, meinst du, dass es eine andere Möglichkeit gibt, zu tun, was ich will. Genau das möchte ich jedoch tun. Ich möchte in der Lage sein, alte Daten zu einem Streaming-Job zum Testen und Entwickeln "abzuspielen", bevor ich die eigentliche Streaming-Eingabe und zum Recooking (Verarbeitung alter akkumulierter Daten auf andere Weise) habe. –
OK, entfernt den Kommentar :) Persönlich würde ich lieber in Erwägung ziehen, ein einfaches Skript zu erstellen, die Daten in einen Stream obwohl. – zero323
Danke @ zero323. Können Sie bitte erläutern, wie solch ein einfaches Skript aussehen würde? –