2013-07-18 13 views
6

Ich arbeite an einer Haskell Netzwerk-Anwendung und ich benutze das Aktor-Muster, um Multithreading zu verwalten. Eine Sache, die ich fand, ist, wie man zum Beispiel eine Reihe von Client-Sockets/Handles speichert. Diese müssen natürlich für alle Threads zugänglich sein und können sich ändern, wenn sich Clients an-/abmelden.Haskell - Aktor basierte Wandelbarkeit

Da ich aus der Imperativ Welt kommen dachte ich an einer Art von Lock-Mechanismus aber, als ich bemerkte, wie hässlich ist dies dachte ich über „reine“ Veränderlichkeit, gut, es ist eigentlich ganz rein:

import Control.Concurrent 
import Control.Monad 
import Network 
import System.IO 
import Data.List 
import Data.Maybe 
import System.Environment 
import Control.Exception 


newStorage :: (Eq a, Show a) => IO (Chan (String, Maybe (Chan [a]), Maybe a)) 
newStorage = do 
    q <- newChan 
    forkIO $ storage [] q 
    return q 


newHandleStorage :: IO (Chan (String, Maybe (Chan [Handle]), Maybe Handle)) 
newHandleStorage = newStorage 


storage :: (Eq a, Show a) => [a] -> Chan (String, Maybe (Chan [a]), Maybe a) -> IO() 
storage s q = do 
    let loop = (`storage` q) 
    (req, reply, d) <- readChan q 
    print ("processing " ++ show(d)) 
    case req of 
    "add" -> loop ((fromJust d) : s) 
    "remove" -> loop (delete (fromJust d) s) 
    "get" -> do 
     writeChan (fromJust reply) s 
     loop s 


store s d = writeChan s ("add", Nothing, Just d) 
unstore s d = writeChan s ("remove", Nothing, Just d) 
request s = do 
    chan <- newChan 
    writeChan s ("get", Just chan, Nothing) 
    readChan chan 

Der Punkt ist, dass ein Thread (Akteur) verwaltet eine Liste von Elementen und ändert die Liste nach eingehenden Anforderungen. Da Thread wirklich billig sind, dachte ich, das könnte eine wirklich schöne funktionelle Alternative sein.

Natürlich ist dies nur ein Prototyp (eine schnelle schmutzige Proof of Concept). Also meine Frage ist:

  1. Ist dies eine „gute“ Art und Weise geteilt änderbare Variablen der Verwaltung (in der Schauspieler Welt)?
  2. Gibt es bereits eine Bibliothek für dieses Muster? (Ich schon gesucht, aber ich fand nichts)

Grüße, Chris

+3

Wenn Sie bereit sind, Alternativen zum Schauspielermodell zu erkunden, würde ich Ihnen vorschlagen, Haskells [Software Transaktionsspeicher] (https://en.wikipedia.org/wiki/Software_transactional_memory) auszuprobieren. Es ist ein schöner Mechanismus ähnlich zu Datenbanktransaktionen. Siehe [Kapitel 28] (http://book.realworldhaskell.org/read/software-transactional-memory.html) in The Real World Haskell. –

+0

Technisch eine gute Wahl, aber ich habe gehört, dass die Verwendung von STM mit einer großen Anzahl von Threads (ein Thread pro Client, die Standard in Haskell ist) und relativ langen Operationen (Löschen eines Elements aus einer Liste ist O (n), natürlich Hash-Sätze/Karten könnten hier helfen) könnte die Leistung von STM um eine große Anzahl reduzieren. Und natürlich könnte der MVar-Kanal durch den STM-Kanal ersetzt werden, was bedeutet, dass die beste der beiden Techniken verwendet wird. BEARBEITEN: Das Schauspieler-Muster ist in einer solchen Situation im Allgemeinen sehr nett, weil das Löschen/Hinzufügen eines Elements O (1) ist (nur Senden einer Nachricht) Die eigentliche Arbeit ist in einem Thread erledigt ... – Kr0e

+0

Sie haben Recht. Bei STM kann es vorkommen, dass Transaktionen mehrmals neu gestartet werden, was zu einer reduzierten Leistung führt. Wenn Ihre synchronisierten Operationen jedoch lange dauern, können Sie auch zu ähnlichen Problemen mit Akteuren kommen - wenn es mehr Nachrichten gibt, als sie verarbeiten können, wird ihr Zustand hinter der Realität zurückbleiben. Die Verwendung von Balanced Trees ('Map' /' Set') oder 'ST/IO'-basierten Hash-Sets würde definitiv helfen. –

Antwort

6

Hier ist ein schnelles und schmutziges Beispiel ist mit stm und pipes-network. Dadurch wird ein einfacher Server eingerichtet, über den Clients eine Verbindung herstellen und einen Zähler erhöhen oder verringern können. Es wird eine sehr einfache Statusleiste angezeigt, die die aktuellen Werte aller angeschlossenen Clients anzeigt und Client-Werte aus der Leiste entfernt, wenn sie getrennt werden.

Zuerst werde ich mit dem Server beginnen, und ich habe kommentierte großzügig um den Code zu erklären, wie es funktioniert:

import Control.Concurrent.STM (STM, atomically) 
import Control.Concurrent.STM.TVar 
import qualified Data.HashMap.Strict as H 
import Data.Foldable (forM_) 

import Control.Concurrent (forkIO, threadDelay) 
import Control.Monad (unless) 
import Control.Monad.Trans.State.Strict 
import qualified Data.ByteString.Char8 as B 
import Control.Proxy 
import Control.Proxy.TCP 
import System.IO 

main = do 
    hSetBuffering stdout NoBuffering 

    {- These are the internal data structures. They should be an implementation 
     detail and you should never expose these references to the 
     "business logic" part of the application. -} 
    -- I use nRef to keep track of creating fresh Ints (which identify users) 
    nRef <- newTVarIO 0  :: IO (TVar Int) 
    {- hMap associates every user (i.e. Int) with a counter 

     Notice how I've "striped" the hash map by storing STM references to the 
     values instead of storing the values directly. This means that I only 
     actually write the hashmap when adding or removing users, which reduces 
     contention for the hash map. 

     Since each user gets their own unique STM reference for their counter, 
     modifying counters does not cause contention with other counters or 
     contention with the hash map. -} 
    hMap <- newTVarIO H.empty :: IO (TVar (H.HashMap Int (TVar Int))) 

    {- The following code makes heavy use of Haskell's pure closures. Each 
     'let' binding closes over its current environment, which is safe since 
     Haskell is pure. -} 

    let {- 'getCounters' is the only server-facing command in our STM API. The 
      only permitted operation is retrieving the current set of user 
      counters. 

      'getCounters' closes over the 'hMap' reference currently in scope so 
      that the server never needs to be aware about our internal 
      implementation. -} 
     getCounters :: STM [Int] 
     getCounters = do 
      refs <- fmap H.elems (readTVar hMap) 
      mapM readTVar refs 

     {- 'init' is the only client-facing command in our STM API. It 
      initializes the client's entry in the hash map and returns two 
      commands: the first command is what the client calls to 'increment' 
      their counter and the second command is what the client calls to log 
      off and delete 
      'delete' command. 

      Notice that those two returned commands each close over the client's 
      unique STM reference so the client never needs to be aware of how 
      exactly 'init' is implemented under the hood. -} 
     init :: STM (STM(), STM()) 
     init = do 
      n <- readTVar nRef 
      writeTVar nRef $! n + 1 

      ref <- newTVar 0 
      modifyTVar' hMap (H.insert n ref) 

      let incrementRef :: STM() 
       incrementRef = do 
        mRef <- fmap (H.lookup n) (readTVar hMap) 
        forM_ mRef $ \ref -> modifyTVar' ref (+ 1) 

       deleteRef :: STM() 
       deleteRef = modifyTVar' hMap (H.delete n) 

      return (incrementRef, deleteRef) 

    {- Now for the actual program logic. Everything past this point only uses 
     the approved STM API (i.e. 'getCounters' and 'init'). If I wanted I 
     could factor the above approved STM API into a separate module to enforce 
     the encapsulation boundary, but I am lazy. -} 

    {- Fork a thread which polls the current state of the counters and displays 
     it to the console. There is a way to implement this without polling but 
     this gets the job done for now. 

     Most of what it is doing is just some simple tricks to reuse the same 
     console line instead of outputting a stream of lines. Otherwise it 
     would be just: 

     forkIO $ forever $ do 
      ns <- atomically getCounters 
      print ns 
    -} 
    forkIO $ (`evalStateT` 0) $ forever $ do 
     del <- get 
     lift $ do 
      putStr (replicate del '\b') 
      putStr (replicate del ' ') 
      putStr (replicate del '\b') 
     ns <- lift $ atomically getCounters 
     let str = show ns 
     lift $ putStr str 
     put $! length str 
     lift $ threadDelay 10000 

    {- Fork a thread for each incoming connection, which listens to the client's 
     commands and translates them into 'STM' actions -} 
    serve HostAny "8080" $ \(socket, _) -> do 
     (increment, delete) <- atomically init 

     {- Right now, just do the dumb thing and convert all keypresses into 
      increment commands, with the exception of the 'q' key, which will 
      quit -} 
     let handler :: (Proxy p) =>() -> Consumer p Char IO() 
      handler() = runIdentityP loop 
       where 
       loop = do 
        c <- request() 
        unless (c == 'q') $ do 
         lift $ atomically increment 
         loop 

     {- This uses my 'pipes' library. It basically is a high-level way to 
      say: 

      * Read binary packets from the socket no bigger than 4096 bytes 

      * Get the first character from each packet and discard the rest 

      * Handle the character using the above 'handler' function -} 
     runProxy $ socketReadS 4096 socket >-> mapD B.head >-> handler 

     {- The above pipeline finishes either when the socket closes or 
      'handler' stops looping because it received a 'q'. Either case means 
      that the client is done so we log them out using 'delete'. -} 
     atomically delete 

Als nächstes ist der Kunde, der einfach eine Verbindung öffnet und leitet alle Tastendrücke als einzelne Pakete:

import Control.Monad 
import Control.Proxy 
import Control.Proxy.Safe 
import Control.Proxy.TCP.Safe 
import Data.ByteString.Char8 (pack) 
import System.IO 

main = do 
    hSetBuffering stdin NoBuffering 
    hSetEcho  stdin False 

    {- Again, this uses my 'pipes' library. It basically says: 

     * Read characters from the console using 'commands' 

     * Pack them into a binary format 

     * send them to a server running at 127.0.0.1:8080 

     This finishes looping when the user types a 'q' or the connection is 
     closed for whatever reason. 
    -} 
    runSafeIO $ runProxy $ runEitherK $ 
     try . commands 
    >-> mapD (\c -> pack [c]) 
    >-> connectWriteD Nothing "127.0.0.1" "8080" 

commands :: (Proxy p) =>() -> Producer p Char IO() 
commands() = runIdentityP loop 
    where 
    loop = do 
     c <- lift getChar 
     respond c 
     unless (c == 'q') loop 

Es ist ziemlich einfach: commands erzeugt einen Strom von Char s, die dann auf ByteString s umgewandelt und dann als Pakete an den Server gesendet.

Wenn Sie den Server und ein paar Kunden und haben sie jede Art in ein paar Tasten, dem Server-Display ausgeben wird eine Liste laufen die zeigt, wie viele Schlüssel jeder Client eingegeben:

[1,6,4] 

... und wenn einige der Kunden trennen sie von der Liste gestrichen werden:

[1,4] 

Beachten sie, dass die pipes Komponente dieser Beispiele in der kommenden pipes-4.0.0 Release stark vereinfacht, aber die aktuelle pipes Ökosystem noch g Ets die Arbeit erledigt wie es ist.

+0

Tolle Lösung, ich werde definitiv darüber nachdenken;) – Kr0e

+0

Nur für mein Verständnis: Wird STM als rein angesehen? Ich schätze, es ist nicht so, da es um die Veränderbarkeit geht, ohne einen Verriegelungsmechanismus zu benutzen, oder? – Kr0e

+2

@ Kr0e Richtig. Stellen Sie sich STM als zusammensetzbare, thread-sichere, veränderbare Speicherreferenzen vor. –

3

Zuerst würde ich definitiv empfehlen, einen eigenen spezifischen Datentyp für die Darstellung von Befehlen zu verwenden. Bei der Verwendung von (String, Maybe (Chan [a]), Maybe a) kann ein fehlerhafter Client den Actor zum Absturz bringen, indem er einfach einen unbekannten Befehl sendet oder ("add", Nothing, Nothing) sendet usw.Ich würde so etwas wie

data Command a = Add a | Remove a | Get (Chan [a]) 

vorschlagen Dann können Sie Mustererkennung auf Befehle in storage in einer Art und Weise zu speichern.

Akteure haben ihre Vorteile, aber ich habe auch das Gefühl, dass sie einige Nachteile haben. Um beispielsweise eine Antwort von einem Akteur zu erhalten, müssen Sie ihm einen Befehl senden und dann auf eine Antwort warten. Und der Client kann nicht völlig sicher sein, dass er eine Antwort erhält und dass die Antwort von einem bestimmten Typ ist - Sie können nicht sagen, dass ich nur Antworten dieses Typs (und wie viele davon) für diesen speziellen Befehl möchte.

Als ein Beispiel gebe ich eine einfache, STM-Lösung. Es wäre besser, eine Hash-Tabelle oder einen (symmetrischen Baum) Satz zu verwenden, aber da Handle weder Ord noch Hashable implementiert, können wir diese Datenstrukturen nicht verwenden, also werde ich weiterhin Listen verwenden.

module ThreadSet (
    TSet, add, remove, get 
) where 

import Control.Monad 
import Control.Monad.STM 
import Control.Concurrent.STM.TVar 
import Data.List (delete) 

newtype TSet a = TSet (TVar [a]) 

add :: (Eq a) => a -> TSet a -> STM() 
add x (TSet v) = readTVar v >>= writeTVar v . (x :) 

remove :: (Eq a) => a -> TSet a -> STM() 
remove x (TSet v) = readTVar v >>= writeTVar v . delete x 

get :: (Eq a) => TSet a -> STM [a] 
get (TSet v) = readTVar v 

Dieses Modul implementiert einen Satz von STM basierend beliebigen Elementen. Sie können mehrere solcher Sätze verwenden und sie zusammen in einer einzigen STM-Transaktion verwenden, die gleichzeitig erfolgreich ist oder fehlschlägt. Zum Beispiel

-- | Ensures that there is exactly one element `x` in the set. 
add1 :: (Eq a) => a -> TSet a -> STM() 
add1 x v = remove x v >> add x v 

Dies würde mit den Schauspielern schwierig sein, würden Sie es als ein weiterer Befehl für die Schauspieler hinzufügen müssen, können Sie es von bestehenden Maßnahmen nicht komponieren und noch Unteilbarkeit haben.

Update: Es gibt eine interessante article zu erklären, warum Clojure Designer wählte keine Schauspieler zu verwenden. Wenn Sie beispielsweise Akteure verwenden, die viele Lesevorgänge und nur sehr wenig Schreibvorgänge in einer veränderbaren Struktur haben, werden sie alle serialisiert, was die Leistung erheblich beeinträchtigen kann.

+0

Nun, Serialisierung/Deserialisierung kostet viel, das ist wahr. CloudHaskell hat den gleichen "Serialisierungs-Overhead", sie nennen es ein Feature. Aber in letzter Zeit haben sie eine unsichere Sendefunktion hinzugefügt, die die Nachricht ohne ser./deser weitergibt. Das ist eine Größenordnung schneller. Theoretisch sollte die Weitergabe von Nachrichten so billig sein wie ein einfacher Funktionsaufruf, um das Aktor-Muster zu einer echten Alternative zu machen, was natürlich nicht der Fall ist, aber in Erlang ist es das. Ich denke, STM ist ein wirklich großartiges Feature, vielleicht ist der Einsatz beider Techniken der richtige Weg, da STM im Vergleich zum Actor-Pattern sehr low-level ist. – Kr0e