2016-08-07 19 views
2

Wie würden Sie die Ergebnisse einer Liste von Async a in Haskell wie sie verfügbar werden sammeln? Die Idee ist, die Ergebnisse von asynchronen Aufgaben zu verarbeiten, sobald sie verfügbar sind.Das Sammeln der Async Ergebnisse sobald sie verfügbar sind

Das Beste, was ich die folgende Funktion mit ist einfiel:

collect :: [Async a] -> IO [a] 
collect [] = return [] 
collect asyncs = do 
    (a, r) <- waitAny asyncs 
    rs <- collect (filter (/= a) asyncs) 
    return (r:rs) 

Allerdings ist diese Funktion nicht zeigt das gewünschte Verhalten, da, wie in der unten stehenden Kommentar darauf hingewiesen, es nicht bis alles zurückkehrt Die asynchronen Aufgaben sind abgeschlossen. Außerdem läuft collect in O(n^2), da ich die Liste bei jedem rekursiven Schritt filtere. Dies könnte verbessert werden, indem eine effizientere Struktur verwendet wird (und möglicherweise die Position der Async Werte in der Liste indexiert wird).

Vielleicht gibt es Bibliotheksfunktionen, die sich darum kümmern, aber ich konnte sie nicht im Control.Concurrent.Async Modul finden und ich frage mich warum.

EDIT: nachdem das Problem etwas genauer zu denken, ich frage mich, ob eine solche Funktion ist eine gute Idee. Ich könnte einfach fmap auf den asynchronen Aufgaben verwenden. Vielleicht ist es besser, auf die Ergebnisse zu warten, wenn keine andere Wahl ist.

+2

Ihr 'collect' kehrt erst zurück, wenn alle Async's in der Liste abgeschlossen sind. Ist es das wonach du suchst? –

+0

Nein. Das habe ich nicht bemerkt. Meine Implementierung ist in der Tat falsch. Meine Idee war 'collect' und gab ein Ergebnis zurück, sobald der asynchrone Thread fertig ist. Ich werde meine Antwort entsprechend bearbeiten. –

Antwort

1

Wie ich in my other answer erwähnt habe, Streaming-Ergebnisse aus einer Liste von Async s, wie sie verfügbar sind, wird am besten mit einer Stream-Processing-Bibliothek erreicht. Hier ist ein Beispiel mit pipes.

import Control.Concurrent (threadDelay) 
import Control.Concurrent.Async 
import Control.Concurrent.STM 
import Data.Functor (($>)) 
import Pipes 
import Pipes.Concurrent -- from the pipes-concurrency package 
import qualified Pipes.Prelude as P 


asCompleted :: MonadIO m => [Async a] -> Producer a m() 
asCompleted asyncs = do 
    (o, i, seal) <- liftIO $ spawn' unbounded 
    liftIO $ forkIO $ do 
     forConcurrently asyncs (\async -> atomically $ waitSTM async >>= send o) 
     atomically seal 
    fromInput i 

main = do 
    actions <- traverse async [threadDelay 2000000 $> "bar", threadDelay 1000000 $> "foo"] 
    runEffect $ asCompleted actions >-> P.print 
-- after one second, prints "foo", then "bar" a second later 

Mit pipes-concurrency wir spawn' ein Output-Input Paar und sofort wandeln die Input einen ProducerfromInput verwenden. Asynchron, wir send Elemente wie sie verfügbar werden. Wenn alle Async s wir seal den Posteingang geschlossen haben, um die Producer zu schließen.

0

Ich lese Ihre Frage als "ist es möglich, eine Liste von Async s durch ihre Abschlusszeit zu sortieren?". Wenn Sie das meinten, ist die Antwort ja.

import Control.Applicative (liftA2) 
import Control.Concurrent (threadDelay) 
import Control.Concurrent.Async 
import Data.Functor (($>)) 
import Data.List (sortBy) 
import Data.Ord (comparing) 
import Data.Time (getCurrentTime) 


sortByCompletion :: [Async a] -> IO [a] 
sortByCompletion = fmap (fmap fst . sortBy (comparing snd)) . mapConcurrently withCompletionTime 
    where withCompletionTime async = liftA2 (,) (wait async) getCurrentTime 

main = do 
    asyncs <- traverse async [threadDelay 2000000 $> "bar", threadDelay 1000000 $> "foo"] 
    sortByCompletion asyncs 
-- ["foo", "bar"], after two seconds 

Mit mapConcurrently wir für jeden Async auf einem separaten Thread warten. Nach Beendigung erhalten wir die aktuelle Zeit - die Zeit, zu der die Async abgeschlossen ist - und verwenden sie, um die Ergebnisse zu sortieren. Das ist O (n log n) Komplexität, weil wir die Liste sortieren. (Ihre ursprüngliche Algorithmus war effektiv ein selection sort.)

Wie Ihr collect, wird sortByCompletion nicht zurück, bis alle Async s in der Liste abgeschlossen haben. Wenn Sie wollen, Stream Ergebnisse auf den Haupt-Thread, wie sie verfügbar werden, nun, Listen sind kein sehr gutes Werkzeug dafür. Ich würde eine Streaming-Abstraktion wie conduit oder pipes verwenden, oder, auf einer niedrigeren Ebene arbeiten, eine TQueue. Ein Beispiel finden Sie in my other answer.

+0

Ich möchte die Async-Werte nicht nach Abschlusszeit sortieren, sondern die Verarbeitung starten können, sobald ein Ergebnis verfügbar ist. Danke, dass du darauf hingewiesen hast. Ich werde diese Klarstellung zu meiner Frage hinzufügen. –

1

über TChan implementiert, zusätzlich eine Version implementiert, die sofort reagieren können, aber es ist komplexer und könnte auch Probleme mit Ausnahmen haben (wenn Sie Ausnahmen erhalten möchten, verwenden SlaveThread.fork statt forkIO), so kommentierte ich, dass Code in Falls Sie nicht daran interessiert sind:

import   Control.Concurrent  (threadDelay) 
import   Control.Concurrent  (forkIO) 
import   Control.Concurrent.Async 
import   Control.Concurrent.STM 
import   Control.Monad 

collect :: [Async a] -> IO [a] 
collect = atomically . collectSTM 

collectSTM :: [Async a] -> STM [a] 
collectSTM as = do 
    c <- newTChan 
    collectSTMChan c as 

collectSTMChan :: TChan a -> [Async a] -> STM [a] 
collectSTMChan chan as = do 
    mapM_ (waitSTM >=> writeTChan chan) as 
    replicateM (length as) (readTChan chan) 

main :: IO() 
main = do 
    a1 <- async (threadDelay 2000000 >> putStrLn "slept 2 secs" >> return 2) 
    a2 <- async (threadDelay 3000000 >> putStrLn "slept 3 secs" >> return 3) 
    a3 <- async (threadDelay 1000000 >> putStrLn "slept 1 sec" >> return 1) 
    res <- collect [a1,a2,a3] 
    putStrLn (show res) 

    -- -- reacting immediately 
    -- a1 <- async (threadDelay 2000000 >> putStrLn "slept 2 secs" >> return 2) 
    -- a2 <- async (threadDelay 3000000 >> putStrLn "slept 3 secs" >> return 3) 
    -- a3 <- async (threadDelay 1000000 >> putStrLn "slept 1 sec" >> return 1) 
    -- c <- collectChan [a1,a2,a3] 
    -- replicateM_ 3 (atomically (readTChan c) >>= \v -> putStrLn ("Received: " ++ show v)) 

-- collectChan :: [Async a] -> IO (TChan a) 
-- collectChan as = do 
--  c <- newTChanIO 
--  forM_ as $ \a -> forkIO ((atomically . (waitSTM >=> writeTChan c)) a) 
--  return c