2016-01-27 1 views
10

Ich führe Spark Streaming mit zwei verschiedenen Fenstern (im Fenster für das Training eines Modells mit SKLearn und das andere für die Vorhersage von Werten basierend auf diesem Modell) und ich frage mich, wie ich ein Fenster (das "langsame" Trainingsfenster) vermeiden kann, um ein Modell zu trainieren, ohne das "schnelle" Vorhersagefenster zu "blockieren".
Mein vereinfachten Code sieht wie folgt aus:Wie verhindert man, dass ein Spark-Streaming-Fenster ein anderes Fenster mit beiden nativen Python-Code blockiert

conf = SparkConf() 
conf.setMaster("local[4]") 
sc = SparkContext(conf=conf) 
ssc = StreamingContext(sc, 1) 

stream = ssc.socketTextStream("localhost", 7000) 


import Custom_ModelContainer 

### Window 1 ### 
### predict data based on model computed in window 2 ### 

def predict(time, rdd): 
    try: 
     # ... rdd conversion to df, feature extraction etc... 

     # regular python code 
     X = np.array(df.map(lambda lp: lp.features.toArray()).collect()) 
     pred = Custom_ModelContainer.getmodel().predict(X) 

     # send prediction to GUI 

    except Exception, e: print e 

predictionStream = stream.window(60,60) 
predictionStream.foreachRDD(predict) 


### Window 2 ### 
### fit new model ### 

def trainModel(time, rdd): 
try: 
    # ... rdd conversion to df, feature extraction etc... 

    X = np.array(df.map(lambda lp: lp.features.toArray()).collect()) 
    y = np.array(df.map(lambda lp: lp.label).collect()) 

    # train test split etc... 

    model = SVR().fit(X_train, y_train) 
    Custom_ModelContainer.setModel(model) 

except Exception, e: print e 

modelTrainingStream = stream.window(600,600) 
modelTrainingStream.foreachRDD(trainModel) 

(Hinweis: Die Custom_ModelContainer ist eine Klasse, ich schrieb das trainierte Modell zu speichern und abrufen), dass jedes Mal,

Mein Setup funktioniert im Allgemeinen gut, mit Ausnahme ein neues Modell wird im zweiten Fenster trainiert (was ungefähr eine Minute dauert), das erste Fenster berechnet keine Vorhersagen, bis das Modelltraining beendet ist. Ich nehme an, dass dies sinnvoll ist, da Modellanpassung und Vorhersagen beide auf dem Master-Knoten berechnet werden (in einer nicht verteilten Einstellung - aufgrund von SKLearn).

Also meine Frage ist die folgende: Wäre es möglich, das Modell auf einem einzigen Worker-Knoten (anstelle des Master-Knotens) zu trainieren? Wenn ja, wie könnte ich Letzteres erreichen und würde das mein Problem lösen?

Wenn nicht, irgendwelche anderen Vorschläge, wie ich ein solches Setup machen könnte, ohne die Berechnungen in Fenster 1 zu verzögern?

Jede Hilfe wird sehr geschätzt.

EDIT: Ich denke, die allgemeinere Frage wäre: Wie kann ich zwei verschiedene Aufgaben auf zwei verschiedene Arbeiter parallel ausführen?

Antwort

2

Haftungsausschluss: Dies ist nur eine Reihe von Ideen. Keines davon wurde in der Praxis getestet.


Ein paar Dinge können Sie versuchen:

  1. predict nicht collect tun. scikit-learn Modelle sind in der Regel serializable so Prädiktionsprozess kann leicht auf dem Cluster behandelt werden:

    def predict(time, rdd): 
        ... 
    
        model = Custom_ModelContainer.getmodel() 
        pred = (df.rdd.map(lambda lp: lp.features.toArray()) 
         .mapPartitions(lambda iter: model.predict(np.array(list(iter))))) 
        ... 
    

    Es sollte nicht nur die Prognosen parallelisieren, sondern auch, wenn Rohdaten nicht zu GUI übergeben wird, die Menge der Daten zu reduzieren, die gesammelt werden muss .

  2. Versuchen Sie collect und senden Sie Daten asynchron. PySpark bietet keine collectAsync Methode, aber man kann versuchen, etwas ähnliches mit concurrent.futures zu erreichen:

    from pyspark.rdd import RDD 
    from concurrent.futures import ThreadPoolExecutor 
    
    executor = ThreadPoolExecutor(max_workers=4) 
    
    def submit_to_gui(*args): ... 
    
    def submit_if_success(f): 
        if not f.exception(): 
         executor.submit(submit_to_gui, f.result()) 
    

    von 1 fortzusetzen.

    def predict(time, rdd): 
        ... 
        f = executor.submit(RDD.collect, pred) 
        f.add_done_callback(submit_if_success) 
        ... 
    
  3. Wenn Sie wirklich lokal verwenden möchten scikit-learn Modell versuchen zu collect und fit wie oben mit Futures. Sie können auch nur einmal zu sammeln versuchen, vor allem, wenn die Daten im Cache nicht:

    def collect_and_train(df): 
        y, X = zip(*((p.label, p.features.toArray()) for p in df.collect())) 
        ... 
        return SVR().fit(X_train, y_train) 
    
    def set_if_success(f): 
        if not f.exception(): 
         Custom_ModelContainer.setModel(f.result()) 
    
    def trainModel(time, rdd): 
        ... 
        f = excutor.submit(collect_and_train, df) 
        f.add_done_callback(set_if_success) 
        ... 
    
  4. verschieben Trainingsprozess zum Cluster entweder mit bereits vorhandenen Lösungen wie spark-sklearn oder benutzerdefinierten Ansatz:

    • naive Lösung - Bereiten Sie Ihre Daten vor, coalesce(1) und trainieren Sie ein einzelnes Modell mit mapPartitions.
    • verteilte Lösung - erstellen und validieren Sie ein separates Modell pro Partition mit mapPartitions, sammeln Sie Modelle und verwenden Sie sie als Ensemble, indem Sie zum Beispiel eine durchschnittliche oder mittlere Vorhersage treffen.
  5. wegzuwerfen scikit-learn und ein Modell verwendet werden, die in einer verteilten ausgebildet und aufrechterhalten werden können, Streaming-Umgebung (beispielsweise StreamingLinearRegressionWithSGD).

    Ihr aktueller Ansatz macht Spark überflüssig. Wenn Sie ein Modell lokal trainieren können, besteht eine gute Chance, dass Sie alle anderen Aufgaben auf dem lokalen Computer viel schneller ausführen können. Andernfalls schlägt Ihr Programm einfach unter collect fehl.

1

Ich denke, was Sie suchen, ist die Eigenschaft: "spark.streaming.concurrentJobs", die auf 1 voreingestellt ist. Das Erhöhen dieser sollte Ihnen erlauben, mehrere foreachRDD-Funktionen parallel auszuführen.

In JobScheduler.scala:

private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) 

Nur zur Erinnerung auch die Thread-Sicherheit auf Ihrem benutzerdefinierten Modellcontainer bewusst sein, wenn Sie parallel mutieren und zu lesen sein werden. :)