2010-09-19 7 views
5

Ich bin ein Haskell Anfänger und dachte, das wäre eine gute Übung. Ich habe eine Zuordnung wo I-Datei in einem Thread A lesen müssen, um die Datei in Zeilen Gewinde B_i handhaben, und die Ausgangs dann die Ergebnisse in Gewinde C.Begrenzung der Speichernutzung beim Lesen von Dateien

I dies bereits weit umgesetzt haben, aber eine der Anforderungen ist, dass wir nicht vertrauen können, dass die gesamte Datei in den Speicher passt. Ich hatte gehofft, dass faule IO und Garbage Collector würde dies für mich tun, aber leider die Speichernutzung steigt und steigt weiter.

Der Leser Thread (A) liest die Datei mit readFile, die dann gezippt wird mit Zeilennummern und in Just gewickelt. Diese gezippten Zeilen werden dann zu Control.Concurrent.Chan geschrieben. Jeder Consumer-Thread B hat seinen eigenen Kanal.

Jeder Verbraucher liest seinen eigenen Kanal, wenn es Daten hat und wenn der Regex übereinstimmt, wird es auf seinen eigenen entsprechenden Ausgangskanal innerhalb von Maybe (bestehend aus Listen) ausgegeben.

Der Drucker überprüft den Ausgabekanal jedes B-Threads. Wenn keine die Ergebnisse (Zeile) ist nichts, wird die Zeile gedruckt. Da an dieser Stelle kein Hinweis auf die älteren Zeilen sein sollte, dachte ich, dass der Müll Sammler diese Zeilen freigeben könnte, aber ach, ich schein in das falsche hier zu sein.

Die .lhs Datei ist in hier: http://gitorious.org/hajautettujen-sovellusten-muodostamistekniikat/hajautettujen-sovellusten-muodostamistekniikat/blobs/master/mgrep.lhs

Die Frage ist also, wie beschränke ich die Speichernutzung oder der Müll Sammler erlauben, um die Linien zu entfernen.

Snippets wie angefordert. Hoffentlich wird Einrücken nicht zu stark zerstört :)

data Global = Global {done :: MVar Bool, consumers :: Consumers} 
type Done = Bool 
type Linenum = Int 
type Line = (Linenum, Maybe String) 
type Output = MVar [Line] 
type Input = Chan Line 
type Consumers = MVar (M.Map ThreadId (Done, (Input, Output))) 
type State a = ReaderT Global IO a 


producer :: [Input] -> FilePath -> State() 
producer c p = do 
    liftIO $ Main.log "Starting producer" 
    d <- asks done 
    f <- liftIO $ readFile p 
    mapM_ (\l -> mapM_ 
    (liftIO . flip writeChan l) c) 
    $ zip [1..] $ map Just $ lines f 
    liftIO $ modifyMVar_ d (return . not) 

printer :: State() 
printer = do 
    liftIO $ Main.log "Starting printer" 
    c <- (fmap (map (snd . snd) . M.elems) 
    (asks consumers >>= liftIO . readMVar)) 
    uniq' c 
    where head' :: Output -> IO Line 
    head' ch = fmap head (readMVar ch) 

    tail' = mapM_ (liftIO . flip modifyMVar_ 
     (return . tail)) 

    cont ch = tail' ch >> uniq' ch 

    printMsg ch = readMVar (head ch) >>= 
     liftIO . putStrLn . fromJust . snd . head 

    cempty :: [Output] -> IO Bool 
    cempty ch = fmap (any id) 
     (mapM (fmap ((==) 0 . length) . readMVar) ch) 

    {- Return false unless none are Nothing -} 
    uniq :: [Output] -> IO Bool 
    uniq ch = fmap (any id . map (isNothing . snd)) 
     (mapM (liftIO . head') ch) 

    uniq' :: [Output] -> State() 
    uniq' ch = do 
     d <- consumersDone 
     e <- liftIO $ cempty ch 
     if not e 
     then do 
      u <- liftIO $ uniq ch 
      if u then cont ch else do 
     liftIO $ printMsg ch 
     cont ch 
      else unless d $ uniq' ch 

Antwort

6

Concurrent programming bietet keine definierte Ausführungsreihenfolge, wenn Sie ein, sich mit MVar und dergleichen durchzusetzen. So ist es wahrscheinlich, dass der Produzent Faden klebt alle/die meisten Linien im Chan vor jedem Verbraucher liest sie ab und übergibt sie an. Eine andere Architektur, die den Anforderungen entsprechen sollte, ist nur Thread A rufen Sie die Lazy Readfile und kleben Sie das Ergebnis in eine Mvar. Dann nimmt jeder Consumer-Thread die mvar, liest eine Zeile und ersetzt dann die mvar, bevor sie mit der Verarbeitung der Zeile fortfährt. Selbst dann, wenn der Ausgabe-Thread nicht mithalten kann, kann sich die Anzahl der übereinstimmenden Zeilen, die auf dem Chan gespeichert sind, beliebig aufbauen.

Was Sie haben, ist eine Push-Architektur. Um wirklich in einem konstanten Raum arbeiten zu können, denken Sie nachfrageorientiert. Finden einen Mechanismus, so dass die Ausgabe-Thread-Signale an die Verarbeitungsthreads, dass sie etwas tun sollen, und so, dass die Verarbeitungsthreads Signal an den Leser-Thread, daß sie etwas tun.

Eine andere Möglichkeit, dies zu tun, ist Chans von begrenzter Größe stattdessen - so blockiert der Leser Thread, wenn die Prozessor-Threads nicht aufgeholt haben, und so blockieren die Prozessor-Threads, wenn der Ausgabe-Thread nicht aufgeholt hat.

Als Ganzes ist das Problem in der Tat erinnert mich an Tim Bray widefinder Benchmark, obwohl die Anforderungen etwas anders sind. In jedem Fall führte es zu einer weit verbreiteten Diskussion über den besten Weg mehradrige grep zu implementieren. Die große Pointe war, dass das Problem IO gebunden ist, und Sie möchten mehrere Leser Threads über mmapped Dateien.

Sehen Sie hier für mehr als Sie jemals wissen wollen werden: http://www.tbray.org/ongoing/When/200x/2007/09/20/Wide-Finder

+4

BoundedChan auf Hackage für genau diese Art der Verwendung ist. –

+0

Danke Tom und Sciv. Ich werde versuchen, es zu implementieren und als eine Antwort zu markieren, wenn es funktioniert – Masse