2015-10-10 6 views
8

Ich habe eine Clojure Processing App, die eine Pipeline von Kanälen ist. Jeder Verarbeitungsschritt führt seine Berechnungen asynchron durch (dh er macht eine HTTP-Anfrage unter Verwendung von http-kit oder etwas) und bringt sein Ergebnis auf den Ausgabekanal. Auf diese Weise kann der nächste Schritt aus diesem Kanal lesen und seine Berechnung durchführen.Wie man am besten eine clojure core.async Pipeline von Prozessen herunterfahren

Meine Hauptfunktion sieht wie folgt aus

(defn -main [args] 
(-> file/tmp-dir 
    (schedule/scheduler) 
    (search/searcher) 
    (process/resultprocessor) 
    (buy/buyer) 
    (report/reporter))) 

Derzeit ist der Scheduler Schritt, um die Pipeline-Laufwerke (es hat einen Eingangskanal nicht bekam), und bietet die Kette mit Arbeitsbelastung.

Wenn ich laufe dies in der REPL:

(-main "some args") 

Es läuft im Grunde immer auf Grund der Unendlichkeit des Schedulers. Was ist der beste Weg, um diese Architektur so zu ändern, dass ich das ganze System von der REPL herunterfahren kann? Bedeutet das Schließen jedes Kanals, dass das System beendet wird?

Würde ein Broadcast-Kanal helfen?

+0

'(System/Ausgang 0)'? – Bill

+0

Das tötet leider auch die REPL. Ich werde den Komponenten-Ansatz ausprobieren –

Antwort

6

könnten Sie Ihre Planer haben alts!/alts!! auf einem Kill-Kanal und dem Eingangskanal Ihrer Pipeline:

(def kill-channel (async/chan)) 

(defn scheduler [input output-ch kill-ch] 
    (loop [] 
    (let [[v p] (async/alts!! [kill-ch [out-ch (preprocess input)]] 
        :priority true)] 
     (if-not (= p kill-ch) 
     (recur)))) 

Wert auf kill-channel Putting wird dann die Schleife beenden.

Technisch könnten Sie auch verwenden, um den Prozess zu steuern (setzt auf geschlossene Kanäle zurück false), aber ich finde normalerweise explizite Kill-Kanäle sauberer, zumindest für Top-Level-Pipelines.

Um die Dinge gleichzeitig eleganter und bequemer zu machen (sowohl bei der REPL als auch in der Produktion), könnten Sie Stuart Sierra's component, starten Sie die Scheduler-Schleife (in einem separaten Thread) und assoc den Kill-Kanal auf Ihre Komponente in die start Methode der Komponente und dann close! der Kill-Kanal (und damit die Schleife zu beenden) in der Komponente stop Methode.

4

Ich würde vorschlagen, etwas wie https://github.com/stuartsierra/component zu verwenden, um System-Setup zu behandeln. Es stellt sicher, dass Sie Ihr System in der REPL einfach starten und stoppen können. Wenn Sie diese Bibliothek verwenden, richten Sie sie so ein, dass jeder Verarbeitungsschritt eine Komponente ist und jede Komponente die Einrichtung und den Abbau von Kanälen in ihren Protokollen start und stop übernimmt. Außerdem könnten Sie wahrscheinlich ein IStream Protokoll für jede zu implementierende Komponente erstellen und jede Komponente davon abhängig machen, dass Komponenten dieses Protokoll implementieren. Es kauft Ihnen einige sehr einfache Modularität.

Sie würden am Ende mit einem System, das wie folgt aussieht:

(component/system-map 
:scheduler (schedule/new-scheduler file/tmp-dir) 
:searcher (component/using (search/searcher) 
          {:in :scheduler}) 
:processor (component/using (process/resultprocessor) 
          {:in :searcher}) 
:buyer  (component/using (buy/buyer) 
          {:in :processor}) 
:report (component/using (report/reporter) 
          {:in :buyer})) 

Eine nette Sache mit dieser Art von Ansatz ist, dass man leicht Komponenten hinzufügen konnte, wenn sie auch auf einem Kanal verlassen. Wenn beispielsweise jede Komponente ihren Ausgabekanal mithilfe einer tap auf einer internen mult erstellt, können Sie eine Protokollfunktion für den Prozessor nur durch eine Protokollierungskomponente hinzufügen, die den Prozessor als Abhängigkeit verwendet.

:processor (component/using (process/resultprocessor) 
          {:in :searcher}) 
:processor-logger (component/using (log/logger) 
            {:in processor}) 

Ich würde empfehlen, seine beobachtete talk auch eine Vorstellung davon zu bekommen, wie es funktioniert.

1

Sie sollten Stuart Sierra's reloaded workflow verwenden, was von der Modellierung Ihrer 'Pipeline'-Elemente wie components abhängt. Auf diese Weise können Sie Ihre logischen Singletons als' Klassen 'modellieren, was bedeutet, dass Sie die Logik für Konstruktion und Zerstörung (Start/Stopp) steuern können jeder von Ihnen.