Ich versuche clojure core.async Kanäle zu verwenden, um speicherintensive gleichzeitige Prozesse zu drosseln. Jeder Prozess lädt ein Bild in den Speicher und wendet ein Wasserzeichen an. Wenn ich versuche, zu viele Bilder gleichzeitig zu verarbeiten, erhalte ich OOM-Fehler.Throttling Prozesse mit Clojure core.async
Das folgende Muster scheint zu funktionieren, aber es fühlt sich ein wenig unelegant. Meine Frage ist, gibt es eine bessere Möglichkeit, dies mit core.async zu tun? Oder sollte ich stattdessen einfach den Java-Concurrency-Code verwenden (d. H. Einen Threadpool mit fester Größe usw. erstellen)?
Das Grundkonzept im folgenden Code ist einen globalen feste Größe Kanal zu verwenden, die drosseln tchan
verwendet wird, was in in-chan
geht, im Wesentlichen die Anzahl von gleichzeitigen Prozessen auf die Größe der tchan
begrenzen.
Im folgenden Code ist process-images
der Einstiegspunkt.
(def tbuff (buffer 20))
(def tchan
"tchan is used to throttle the number of processes
tbuff is a fixed size buffer"
(chan tbuff))
(defn accum-results
"Accumulates the images in results-chan"
[n result-chan]
(let [chans [result-chan (timeout timeout-ms)]]
(loop [imgs-out []
remaining n]
(if (zero? remaining)
imgs-out
(let [[img-result _] (alts!! chans)]
(if (nil? img-result)
(do
(log/warn "Image processing timed out")
(go (dotimes [_ remaining] (<! tchan)))
imgs-out)
(do
(go (<! tchan))
(recur (conj imgs-out img-result) (dec remaining)))))))))
(defn process-images
"Concurrently watermarks a list of images
Images is a sequence of maps representing image info
Concurrently fetches each actual image and applies the watermark
Returns a map of image info map -> image input stream"
[images]
(let [num-imgs (count images)
in-chan (chan num-imgs)
out-chan (chan num-imgs)]
;; set up the image-map consumer
;; asynchronously process things found on in-chan
(go
(dotimes [_ num-imgs]
; block here on input images
(let [img-in (<! in-chan)]
(thread
(let [img-out (watermark/watermarked-image-is img-in)]
(>!! out-chan [img-in img-out]))))))
;; put images on in-chan
(go
(doseq [img images]
(>! tchan :x)
(>! in-chan img)))
;; accum results
(let [results (accum-results num-imgs out-chan)]
(log/info (format "Processed %s of %s images and tbuff is %s"
(count results) num-imgs (count tbuff)))
(into {} results))))
Im Beispiel, das ich hier bearbeitet in, es ist erwähnenswert, dass der Anruf ein Wandler abzuzubilden und nicht eine faule Sequenz, es sieht aus wie ein. –