2016-05-25 5 views

Antwort

0

Sie können nicht, oder zumindest nicht in einer sinnvollen Weise. Während es möglich ist, zu verwenden queueStream Strom wie diese von RDD zu erzeugen:

from pyspark.streaming import StreamingContext 

ssc = StreamingContext(sc, 10) 
df = sc.parallelize([(i,) for i in range(10000)]).toDF(["ts"]) 
stream = ssc.queueStream([df.rdd]) 
stream.count().pprint() 

ssc.start() 
ssc.awaitTermination() 

wo die Entsprechung zwischen Batch- und Objekt in der Schlange 1: 1 ist. Leider ist queueStream, im Gegensatz zu seinem Scala Gegenstück, ein statischer Stream. Neue Daten können nicht in die Warteschlange eingereiht werden, nachdem sie erstellt wurden. Dies bedeutet, dass Sie DataFrame manuell in mehrere RDD aufgeteilt haben.

+0

Danke @ Zero323. Wenn du es XY nennst, meinst du, dass es eine andere Möglichkeit gibt, zu tun, was ich will. Genau das möchte ich jedoch tun. Ich möchte in der Lage sein, alte Daten zu einem Streaming-Job zum Testen und Entwickeln "abzuspielen", bevor ich die eigentliche Streaming-Eingabe und zum Recooking (Verarbeitung alter akkumulierter Daten auf andere Weise) habe. –

+0

OK, entfernt den Kommentar :) Persönlich würde ich lieber in Erwägung ziehen, ein einfaches Skript zu erstellen, die Daten in einen Stream obwohl. – zero323

+0

Danke @ zero323. Können Sie bitte erläutern, wie solch ein einfaches Skript aussehen würde? –