2016-05-19 5 views
3

Ich versuche clojure core.async Kanäle zu verwenden, um speicherintensive gleichzeitige Prozesse zu drosseln. Jeder Prozess lädt ein Bild in den Speicher und wendet ein Wasserzeichen an. Wenn ich versuche, zu viele Bilder gleichzeitig zu verarbeiten, erhalte ich OOM-Fehler.Throttling Prozesse mit Clojure core.async

Das folgende Muster scheint zu funktionieren, aber es fühlt sich ein wenig unelegant. Meine Frage ist, gibt es eine bessere Möglichkeit, dies mit core.async zu tun? Oder sollte ich stattdessen einfach den Java-Concurrency-Code verwenden (d. H. Einen Threadpool mit fester Größe usw. erstellen)?

Das Grundkonzept im folgenden Code ist einen globalen feste Größe Kanal zu verwenden, die drosseln tchan verwendet wird, was in in-chan geht, im Wesentlichen die Anzahl von gleichzeitigen Prozessen auf die Größe der tchan begrenzen.

Im folgenden Code ist process-images der Einstiegspunkt.

(def tbuff (buffer 20)) 

(def tchan 
    "tchan is used to throttle the number of processes 
    tbuff is a fixed size buffer" 
    (chan tbuff)) 

(defn accum-results 
    "Accumulates the images in results-chan" 
    [n result-chan] 
    (let [chans [result-chan (timeout timeout-ms)]] 
    (loop [imgs-out [] 
      remaining n] 
     (if (zero? remaining) 
     imgs-out 
     (let [[img-result _] (alts!! chans)] 
      (if (nil? img-result) 
      (do 
       (log/warn "Image processing timed out") 
       (go (dotimes [_ remaining] (<! tchan))) 
       imgs-out) 
      (do 
       (go (<! tchan)) 
       (recur (conj imgs-out img-result) (dec remaining))))))))) 

(defn process-images 
    "Concurrently watermarks a list of images 
    Images is a sequence of maps representing image info 
    Concurrently fetches each actual image and applies the watermark 
    Returns a map of image info map -> image input stream" 
    [images] 
    (let [num-imgs (count images) 
     in-chan (chan num-imgs) 
     out-chan (chan num-imgs)] 
    ;; set up the image-map consumer 
    ;; asynchronously process things found on in-chan 
    (go 
     (dotimes [_ num-imgs] 
     ; block here on input images 
     (let [img-in (<! in-chan)] 
      (thread 
      (let [img-out (watermark/watermarked-image-is img-in)] 
       (>!! out-chan [img-in img-out])))))) 
    ;; put images on in-chan 
    (go 
     (doseq [img images] 
     (>! tchan :x) 
     (>! in-chan img))) 
    ;; accum results 
    (let [results (accum-results num-imgs out-chan)] 
     (log/info (format "Processed %s of %s images and tbuff is %s" 
         (count results) num-imgs (count tbuff))) 
     (into {} results)))) 

Antwort

2

Ich glaube, das ist genau das, was pipeline ist.

Und hier ist ein Beispiel:

user> (require '[clojure.core.async :refer [<! <!! chan go go-loop pipeline pipeline-blocking pipeline-async] :as async]) 

user> (let [output (chan) 
      input (async/to-chan (range 10))] 
     (go-loop [x (<! output)] 
      (println x)) 
     (pipeline 4 
        output 
        (map #(do 
          (Thread/sleep (rand-int 200)) 
          (println "starting" %) 
          (Thread/sleep 1000) 
          (println "finished" %) 
          (inc %))) 
        input)) 
#object[clojure.core.async.impl.channels.ManyToManyChannel 0x3f434b5a "[email protected]"] 
user> starting 0 
starting 3 
starting 1 
starting 2 
finished 0 
1 
finished 3 
finished 1 
finished 2 
starting 4 
starting 5 
starting 6 
finished 4 
finished 5 
finished 6 
+0

Im Beispiel, das ich hier bearbeitet in, es ist erwähnenswert, dass der Anruf ein Wandler abzuzubilden und nicht eine faule Sequenz, es sieht aus wie ein. –