Ich führe Spark Streaming mit zwei verschiedenen Fenstern (im Fenster für das Training eines Modells mit SKLearn und das andere für die Vorhersage von Werten basierend auf diesem Modell) und ich frage mich, wie ich ein Fenster (das "langsame" Trainingsfenster) vermeiden kann, um ein Modell zu trainieren, ohne das "schnelle" Vorhersagefenster zu "blockieren".
Mein vereinfachten Code sieht wie folgt aus:Wie verhindert man, dass ein Spark-Streaming-Fenster ein anderes Fenster mit beiden nativen Python-Code blockiert
conf = SparkConf()
conf.setMaster("local[4]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)
stream = ssc.socketTextStream("localhost", 7000)
import Custom_ModelContainer
### Window 1 ###
### predict data based on model computed in window 2 ###
def predict(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...
# regular python code
X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
pred = Custom_ModelContainer.getmodel().predict(X)
# send prediction to GUI
except Exception, e: print e
predictionStream = stream.window(60,60)
predictionStream.foreachRDD(predict)
### Window 2 ###
### fit new model ###
def trainModel(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...
X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
y = np.array(df.map(lambda lp: lp.label).collect())
# train test split etc...
model = SVR().fit(X_train, y_train)
Custom_ModelContainer.setModel(model)
except Exception, e: print e
modelTrainingStream = stream.window(600,600)
modelTrainingStream.foreachRDD(trainModel)
(Hinweis: Die Custom_ModelContainer ist eine Klasse, ich schrieb das trainierte Modell zu speichern und abrufen), dass jedes Mal,
Mein Setup funktioniert im Allgemeinen gut, mit Ausnahme ein neues Modell wird im zweiten Fenster trainiert (was ungefähr eine Minute dauert), das erste Fenster berechnet keine Vorhersagen, bis das Modelltraining beendet ist. Ich nehme an, dass dies sinnvoll ist, da Modellanpassung und Vorhersagen beide auf dem Master-Knoten berechnet werden (in einer nicht verteilten Einstellung - aufgrund von SKLearn).
Also meine Frage ist die folgende: Wäre es möglich, das Modell auf einem einzigen Worker-Knoten (anstelle des Master-Knotens) zu trainieren? Wenn ja, wie könnte ich Letzteres erreichen und würde das mein Problem lösen?
Wenn nicht, irgendwelche anderen Vorschläge, wie ich ein solches Setup machen könnte, ohne die Berechnungen in Fenster 1 zu verzögern?
Jede Hilfe wird sehr geschätzt.
EDIT: Ich denke, die allgemeinere Frage wäre: Wie kann ich zwei verschiedene Aufgaben auf zwei verschiedene Arbeiter parallel ausführen?