2013-06-01 8 views
8

Ich entschuldige mich im Voraus für die Länge dieser Notiz. Ich verbrachte viel Zeit damit, es kürzer zu machen, und das war so klein, wie ich es bekommen konnte.rxjava und clojure asynchrony Geheimnis: Futures Versprechungen und Agenten, oh mein

Ich habe ein Rätsel und wäre dankbar für Ihre Hilfe. Dieses Geheimnis kommt aus dem Verhalten eines rxjava observer Ich schrieb in Clojure über ein paar einfache observable s aus Online-Proben.

Eine Observable sendet synchron Meldungen an die onNext Handler ihrer Beobachter, und mein angeblich prinzipientreuer Beobachter verhält sich wie erwartet.

Die andere beobachtbare asynchron, macht das gleiche, auf einem anderen Thread, über eine Clojure future. Der exakt gleiche Beobachter erfasst nicht alle Ereignisse, die auf seine onNext gepostet werden; Es scheint nur eine zufällige Anzahl von Nachrichten am Ende zu verlieren.

Es ist eine absichtliche Rennen in der folgenden zwischen dem Ablauf einer Wartezeit auf den promise d onCompleted und Ablauf einer Wartezeit für alle zu einem agent Sammler gesendet Veranstaltungen. Wenn die promise gewinnt, erwarte ich false für onCompleted und eine möglicherweise kurze Warteschlange in der agent. Wenn der agent gewinnt, erwarte ich, true für onCompleted und alle Nachrichten aus der Warteschlange agent zu sehen. Das einzige Ergebnis, das ich nicht erwarte, ist true für onCompleted UND eine kurze Warteschlange von der agent. Aber Murphy schläft nicht, und genau das sehe ich. Ich weiß nicht, ob die Müllsammlung ein Fehler ist oder ob ich eine interne Warteschlange für Clojures STM oder meine Dummheit oder etwas ganz anderes habe.

Ich präsentiere die Quelle in der Reihenfolge ihrer in sich geschlossenen Form, hier, so dass sie direkt über lein repl ausgeführt werden kann. Es gibt drei cermonials aus dem Weg zu räumen: Erstens, die Leiningen Projektdatei, project.clj, die Abhängigkeit von der 0.9.0 Version von Netflix rxjava erklärt:

(defproject expt2 "0.1.0-SNAPSHOT" 
    :description "FIXME: write description" 
    :url "http://example.com/FIXME" 
    :license {:name "Eclipse Public License" 
      :url "http://www.eclipse.org/legal/epl-v10.html"} 
    :dependencies [[org.clojure/clojure    "1.5.1"] 
       [com.netflix.rxjava/rxjava-clojure "0.9.0"]] 
    :main expt2.core) 

nun dem Namensraum und einer Clojure Anforderung und die Java-Importe :

(ns expt2.core 
    (:require clojure.pprint) 
    (:refer-clojure :exclude [distinct]) 
    (:import [rx Observable subscriptions.Subscriptions])) 

Schließlich wird ein Makro zur Ausgabe an die Konsole:

(defmacro pdump [x] 
    `(let [x# ~x] 
    (do (println "----------------") 
     (clojure.pprint/pprint '~x) 
     (println "~~>") 
     (clojure.pprint/pprint x#) 
     (println "----------------") 
     x#))) 

Schließlich meinen Beobachter. Ich benutze ein agent, um die Nachrichten von Observable onNext gesendet sammeln. Ich benutze eine atom, um ein Potential onError zu sammeln. Ich benutze eine promise für die onCompleted, so dass Verbraucher außerhalb des Beobachters darauf warten können.

(defn- subscribe-collectors [obl] 
    (let [;; Keep a sequence of all values sent: 
     onNextCollector  (agent []) 
     ;; Only need one value if the observable errors out: 
     onErrorCollector  (atom nil) 
     ;; Use a promise for 'completed' so we can wait for it on 
     ;; another thread: 
     onCompletedCollector (promise)] 
    (letfn [;; When observable sends a value, relay it to our agent" 
      (collect-next  [item] (send onNextCollector (fn [state] (conj state item)))) 
      ;; If observable errors out, just set our exception; 
      (collect-error  [excp] (reset! onErrorCollector  excp)) 
      ;; When observable completes, deliver on the promise: 
      (collect-completed [ ] (deliver onCompletedCollector true)) 
      ;; In all cases, report out the back end with this: 
      (report-collectors [ ] 
       (pdump 
       ;; Wait for everything that has been sent to the agent 
       ;; to drain (presumably internal message queues): 
       {:onNext  (do (await-for 1000 onNextCollector) 
           ;; Then produce the results: 
           @onNextCollector) 
       ;; If we ever saw an error, here it is: 
       :onError  @onErrorCollector 
       ;; Wait at most 1 second for the promise to complete; 
       ;; if it does not complete, then produce 'false'. 
       ;; I expect if this times out before the agent 
       ;; times out to see an 'onCompleted' of 'false'. 
       :onCompleted (deref onCompletedCollector 1000 false) 
       }))] 
     ;; Recognize that the observable 'obl' may run on another thread: 
     (-> obl 
      (.subscribe collect-next collect-error collect-completed)) 
     ;; Therefore, produce results that wait, with timeouts, on both 
     ;; the completion event and on the draining of the (presumed) 
     ;; message queue to the agent. 
     (report-collectors)))) 

Jetzt ist hier eine synchrone beobachtbar. Es pumpt 25 Nachrichten hinunter die onNext Kehlen seiner Beobachter und ruft dann ihre onCompleted s.

(defn- customObservableBlocking [] 
    (Observable/create 
    (fn [observer]      ; This is the 'subscribe' method. 
     ;; Send 25 strings to the observer's onNext: 
     (doseq [x (range 25)] 
     (-> observer (.onNext (str "SynchedValue_" x)))) 
     ; After sending all values, complete the sequence: 
     (-> observer .onCompleted) 
     ; return a NoOpSubsription since this blocks and thus 
     ; can't be unsubscribed (disposed): 
     (Subscriptions/empty)))) 

Wir zeichnen unsere Beobachter auf diese beobachtbar:

;;; The value of the following is the list of all 25 events: 
(-> (customObservableBlocking) 
    (subscribe-collectors)) 

Es funktioniert wie erwartet, und wir sehen die folgenden Ergebnisse auf der Konsole

{:onNext (do (await-for 1000 onNextCollector) @onNextCollector), 
:onError @onErrorCollector, 
:onCompleted (deref onCompletedCollector 1000 false)} 
~~> 
{:onNext 
["SynchedValue_0" 
    "SynchedValue_1" 
    "SynchedValue_2" 
    "SynchedValue_3" 
    "SynchedValue_4" 
    "SynchedValue_5" 
    "SynchedValue_6" 
    "SynchedValue_7" 
    "SynchedValue_8" 
    "SynchedValue_9" 
    "SynchedValue_10" 
    "SynchedValue_11" 
    "SynchedValue_12" 
    "SynchedValue_13" 
    "SynchedValue_14" 
    "SynchedValue_15" 
    "SynchedValue_16" 
    "SynchedValue_17" 
    "SynchedValue_18" 
    "SynchedValue_19" 
    "SynchedValue_20" 
    "SynchedValue_21" 
    "SynchedValue_22" 
    "SynchedValue_23" 
    "SynchedValue_24"], 
:onError nil, 
:onCompleted true} 
---------------- 

Hier ist ein asynchrones beobachtbar, das tut genau das gleiche, nur auf einem future 's thread:

(defn- customObservableNonBlocking [] 
    (Observable/create 
    (fn [observer]      ; This is the 'subscribe' method 
     (let [f (future 
       ;; On another thread, send 25 strings: 
       (doseq [x (range 25)] 
        (-> observer (.onNext (str "AsynchValue_" x)))) 
       ; After sending all values, complete the sequence: 
       (-> observer .onCompleted))] 
     ; Return a disposable (unsubscribe) that cancels the future: 
     (Subscriptions/create #(future-cancel f)))))) 

;;; For unknown reasons, the following does not produce all 25 events: 
(-> (customObservableNonBlocking) 
    (subscribe-collectors)) 

Aber, Überraschung, hier ist, was wir auf der Konsole sehen: true für onCompleted, was bedeutet, dass die promise NICHT TIME-OUT; aber nur einige der asynch Nachrichten. Die tatsächliche Anzahl von Nachrichten, die wir sehen, variiert von Lauf zu Lauf, was impliziert, dass es ein Nebenläufigkeitsphänomen gibt. Hinweise geschätzt.

---------------- 
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector), 
:onError @onErrorCollector, 
:onCompleted (deref onCompletedCollector 1000 false)} 
~~> 
{:onNext 
["AsynchValue_0" 
    "AsynchValue_1" 
    "AsynchValue_2" 
    "AsynchValue_3" 
    "AsynchValue_4" 
    "AsynchValue_5" 
    "AsynchValue_6"], 
:onError nil, 
:onCompleted true} 
---------------- 

Antwort

7

Die await-for auf Agentenmittel Blockiert den aktuellen Thread, bis alle Aktionen so geschickt weit (aus diesem Thread oder Vertreter) an die Agenten haben aufgetreten ist, was bedeutet, dass es, dass nach dem await passieren kann ist da drüben ist noch ein anderer Thread, der Nachrichten an den Agenten senden kann und das ist was in Ihrem Fall passiert. Nachdem Ihr Warten auf Agent vorbei ist und Sie seinen Wert in dem Schlüssel :onNext in der Karte löschen, warten Sie auf das on abgeschlossene Versprechen, das sich nach dem Warten als wahr herausstellt, aber in der Zwischenzeit wurden einige andere Nachrichten an die gesendet Agent in den Vektor gesammelt werden.

Sie können dieses Problem lösen, indem die :onCompleted Schlüssel als der erste Schlüssel in der Karte, die im Grunde bedeutet, für die Fertigstellung warten und dann für die Agenten warten Coz bis zu diesem Zeitpunkt gibt es keine mehr send fordert die Mittel nach wie geschehen kann, haben bereits onCompleted erhalten.

{:onCompleted (deref onCompletedCollector 1000 false) 
:onNext  (do (await-for 0 onNextCollector) 
           @onNextCollector) 
:onError  @onErrorCollector 
} 
+0

Geprüft und getestet. –