Ich schrieb etwas core.async Code in Clojure und als ich es ausführte, verbrauchte es den gesamten verfügbaren Speicher und scheiterte mit einem Fehler. Es scheint, dass die Verwendung von in einer core.async-Pipeline den Druck bricht. (Welche ist bedauerlich, aus Gründen, die den Rahmen dieser Frage.)Wo ist der Speicherverlust, wenn Mapcat den Backpressure-Druck in core.async aufhebt?
Hier ist ein Code, der das Problem durch Zählen :x
s in und aus einem mapcat
ing Wandler zeigt:
(ns mapcat.core
(:require [clojure.core.async :as async]))
(defn test-backpressure [n length]
(let [message (repeat length :x)
input (async/chan)
transform (async/chan 1 (mapcat seq))
output (async/chan)
sent (atom 0)]
(async/pipe input transform)
(async/pipe transform output)
(async/go
(dotimes [_ n]
(async/>! input message)
(swap! sent inc))
(async/close! input))
(async/go-loop [x 0]
(when (= 0 (mod x (/ (* n length) 10)))
(println "in:" (* @sent length) "out:" x))
(when-let [_ (async/<! output)]
(recur (inc x))))))
=> (test-backpressure 1000 10)
in: 10 out: 0
in: 2680 out: 1000
in: 7410 out: 2000
in: 10000 out: 3000 ; Where are the other 7000 characters?
in: 10000 out: 4000
in: 10000 out: 5000
in: 10000 out: 6000
in: 10000 out: 7000
in: 10000 out: 8000
in: 10000 out: 9000
in: 10000 out: 10000
Die Produzenten Rennen weit vor dem Verbraucher.
Es scheint, dass ich nicht die erste Person bin, die das entdeckt. Aber die Erklärung gegeben here scheint nicht ganz zu decken. (Obwohl es eine angemessene Problemumgehung bietet.) Konzeptionell würde ich erwarten, dass der Produzent voraus ist, aber nur durch die Länge der wenigen Nachrichten, die in den Kanälen gepuffert werden könnten.
Meine Frage ist, wo sind all die anderen Nachrichten? Bei der vierten Zeile des Ausgangs 7000 werden :x
s nicht berücksichtigt.
In dem Link, den Sie geben, erwähnte Alex, dass dies ein Dilemma zwischen falschem Ergebnis und Puffergrenzverletzung ist. Offensichtlich bevorzugt [ASYNC-124] (http://dev.clojure.org/jira/browse/ASYNC-124) eine korrekte Antwort – Davyzhu
Also, in Bezug auf Ihre Frage könnten die anderen Nachrichten in den 'Nehmern' enthalten sein, auf die hier verwiesen wird [hier ] (https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/channels.clj#L86). Nicht so sicher, also warten wir auf eine selbstbewusste Antwort. – Davyzhu