2016-06-21 27 views
9

Ich schrieb etwas core.async Code in Clojure und als ich es ausführte, verbrauchte es den gesamten verfügbaren Speicher und scheiterte mit einem Fehler. Es scheint, dass die Verwendung von in einer core.async-Pipeline den Druck bricht. (Welche ist bedauerlich, aus Gründen, die den Rahmen dieser Frage.)Wo ist der Speicherverlust, wenn Mapcat den Backpressure-Druck in core.async aufhebt?

Hier ist ein Code, der das Problem durch Zählen :x s in und aus einem mapcat ing Wandler zeigt:

(ns mapcat.core 
    (:require [clojure.core.async :as async])) 

(defn test-backpressure [n length] 
    (let [message (repeat length :x) 
     input (async/chan) 
     transform (async/chan 1 (mapcat seq)) 
     output (async/chan) 
     sent (atom 0)] 
    (async/pipe input transform) 
    (async/pipe transform output) 
    (async/go 
     (dotimes [_ n] 
     (async/>! input message) 
     (swap! sent inc)) 
     (async/close! input)) 
    (async/go-loop [x 0] 
     (when (= 0 (mod x (/ (* n length) 10))) 
     (println "in:" (* @sent length) "out:" x)) 
     (when-let [_ (async/<! output)] 
     (recur (inc x)))))) 

=> (test-backpressure 1000 10) 
in: 10 out: 0 
in: 2680 out: 1000 
in: 7410 out: 2000 
in: 10000 out: 3000 ; Where are the other 7000 characters? 
in: 10000 out: 4000 
in: 10000 out: 5000 
in: 10000 out: 6000 
in: 10000 out: 7000 
in: 10000 out: 8000 
in: 10000 out: 9000 
in: 10000 out: 10000 

Die Produzenten Rennen weit vor dem Verbraucher.

Es scheint, dass ich nicht die erste Person bin, die das entdeckt. Aber die Erklärung gegeben here scheint nicht ganz zu decken. (Obwohl es eine angemessene Problemumgehung bietet.) Konzeptionell würde ich erwarten, dass der Produzent voraus ist, aber nur durch die Länge der wenigen Nachrichten, die in den Kanälen gepuffert werden könnten.

Meine Frage ist, wo sind all die anderen Nachrichten? Bei der vierten Zeile des Ausgangs 7000 werden :x s nicht berücksichtigt.

+0

In dem Link, den Sie geben, erwähnte Alex, dass dies ein Dilemma zwischen falschem Ergebnis und Puffergrenzverletzung ist. Offensichtlich bevorzugt [ASYNC-124] (http://dev.clojure.org/jira/browse/ASYNC-124) eine korrekte Antwort – Davyzhu

+0

Also, in Bezug auf Ihre Frage könnten die anderen Nachrichten in den 'Nehmern' enthalten sein, auf die hier verwiesen wird [hier ] (https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/channels.clj#L86). Nicht so sicher, also warten wir auf eine selbstbewusste Antwort. – Davyzhu

Antwort

2

Es gibt zwei mögliche Interpretationen der Frage "Wo ist das Speicherleck?"

Erstens, wo werden die Daten gespeichert? Die Antwort scheint in dem Kanalpuffer unmittelbar stromabwärts der expandierenden Transformation zu liegen.

Kanäle verwenden standardmäßig eine FixedBuffer (clojure.core.async.impl.buffers/FixedBuffer), die feststellen kann, ob sie voll ist, aber nicht dagegen ist, dass sie übervoll ist.

Zweitens, welche Passage des Codes bewirkt, dass der Puffer überfüllt ist? Dies (korrigiere mich, wenn ich falsch liege) scheint in the take! method von ManyToManyChannel (clojure.core.async.impl.channels/ManyToManyChannel) zu sein, wo die first call to add! auf dem Puffer auftritt, bevor irgendwelche calls to full? stattgefunden haben.

Es scheint, dass take! davon ausgeht, dass es für jedes Element, das es entfernt, mindestens ein Element zum Puffer hinzufügen kann. Bei lang laufenden Dehnungsaufnehmern wie mapcat ist dies nicht immer eine sichere Annahme.

Durch Ändern von this line zu (when (and (.hasNext iter) (not (impl/full? buf))) in einer lokalen Kopie von core.async kann ich den Code in der Frage wie erwartet verhalten. (NB Mein Verständnis von core.async reicht nicht für mich zu garantieren, dass dies eine robuste Lösung für Ihren Anwendungsfall.)

UPDATE 2016.09.17: Es gibt jetzt ein Problem für diesen: http://dev.clojure.org/jira/browse/ASYNC-178