2016-03-22 7 views
7

Ich brauche einen Hinweis auf die Datenstruktur als atomare Änderungsprotokoll verwenden.STM-freundliche Liste als Änderungsprotokoll

Ich versuche, den folgenden Algorithmus zu implementieren. Es gibt einen Fluss von eingehenden Änderungen, die eine In-Memory-Map aktualisieren. In Haskell-ähnlichen Pseudo-Code ist es

update :: DataSet -> SomeListOf Change -> Change -> STM (DataSet, SomeListOf Change) 
    update dataSet existingChanges newChange = do 
     ... 
     return (dataSet, existingChanges ++ [newChange]) 

wo DataSet Karte ist (derzeit ist es die Karte aus dem stm-Container-Paket, https://hackage.haskell.org/package/stm-containers-0.2.10/docs/STMContainers-Map.html). Das ganze "Update" wird aus einer beliebigen Anzahl von Threads aufgerufen. Einige der Änderungen können aufgrund der Domänensemantik zurückgewiesen werden. Dafür verwende ich throwSTM, um den Effekt der Transaktion wegzuwerfen. Bei erfolgreichem Commit wird "newChange" zur Liste hinzugefügt.

Es gibt getrennte Threads, die folgende Funktion aufruft:

flush :: STM (DataSet, SomeListOf Change) -> IO() 

diese Funktion soll die aktuelle Momentaufnahme der DataSet nehmen zusammen mit der Liste der Änderungen (es muss ein einheitliches Paar) und spülen Sie es an das Dateisystem, dh

Ich brauche einen Hinweis über die Datenstruktur für "SomeListOf Change" zu verwenden. Ich möchte [Change] nicht verwenden, weil es "zu geordnet" ist und ich befürchte, dass es zu viele Konflikte geben wird, die die gesamte Transaktion erneut versuchen werden. Bitte korrigiere mich, wenn ich hier falsch liege.

Ich kann das Set (https://hackage.haskell.org/package/stm-containers-0.2.10/docs/STMContainers-Set.html) nicht verwenden, weil ich noch einige Ordnung, z. Die Reihenfolge der Transaktion wird bestätigt. Ich könnte TChan dafür verwenden, und es sieht wie eine gute Übereinstimmung aus (genau die Reihenfolge der Transaktion commits), aber ich weiß nicht, wie man die "flush" Funktion implementiert, so dass es die konsistente Ansicht des gesamten Änderungsprotokolls zusammen geben würde mit dem DataSet.

Die aktuelle Implementierung davon ist hier https://github.com/lolepezy/rpki-pub-server/blob/add-storage/src/RRDP/Repo.hs, in den Funktionen applyActionsToState bzw. rrdpSyncThread. Es benutzt TChan und scheint es falsch zu machen.

Vielen Dank im Voraus.

Update: Eine vernünftige Antwort scheint wie die

type SomeListOf c = TChan [c] 

    update :: DataSet -> TChan [Change] -> Change -> STM DataSet 
    update dataSet existingChanges newChange = do 
     ... 
     writeTChan changeChan $ reverse (newChange : existingChanges) 
     return dataSet 

    flush data_ = do 
     (dataSet, changes) <- atomically $ (,) <$> readTVar data_ <*> readTChan changeChan 
     -- write them both to FS 
     -- ... 

Aber ich bin immer noch nicht sicher zu sein, ob es sich um eine saubere Lösung ist die gesamte Liste als Element des Kanals zu übergeben.

+0

Ich habe Ihre Frage nicht sorgfältig gelesen, aber 'TChan' ist eine dead-simple' ([a], [a]) 'Funktionswarteschlange; es klingt, als könnte es für Sie sinnvoll sein, Ihre eigene Variation zu implementieren. – jberryman

+0

Lass mich fragen: Wie viele Threads (zumindest eine ungefähre Anzahl) werden voraussichtlich auf die Struktur zugreifen? Und wie viele von ihnen auf einmal? Wie groß darf die Liste der Änderungen sein? –

+0

Müssen Sie 'update' auch mit anderen' STM' Operationen erstellen, oder läuft es immer in einer eigenen Transaktion? –

Antwort

3

Ich würde wahrscheinlich nur mit der Liste gehen und sehen, wie weit es Leistung braucht. In Anbetracht dessen sollten Sie berücksichtigen, dass sowohl das Anhängen an das Ende einer Liste als auch das Rückgängigmachen von Operationen O (n) -Operationen sind. Sie sollten also versuchen, dies zu vermeiden. Vielleicht können Sie nur die eingehenden Änderungen wie folgt setzt:

update dataSet existingChanges newChange = do 
    -- ... 
    return (dataSet, newChange : existingChanges) 

Auch Ihr Beispiel für Flush, das Problem hat, dass das Lesen und den Zustand der Aktualisierung überhaupt nicht atomar ist. Sie müssen diese mit einem einzigen atomically Anruf erreichen wie so:

flush data = do 
    (dataSet, changes) <- atomically $ do 
    result <- readTVar data_ 
    writeTVar data_ (dataSet, []) 
    return result 

    -- write them both to FS 
    -- ... 

Sie könnten sie dann nur in umgekehrter Reihenfolge schreiben (weil jetzt changes die Elemente von neu nach alt enthalten) oder hier einmal umkehren, wenn es wichtig ist, zu schreiben sie sind vom ältesten zum neuesten.Wenn das wichtig ist, würde ich wahrscheinlich mit einer Datenstruktur gehen, die O (1) -Elementzugriff wie ein guter alter Vektor erlaubt.

Wenn Sie einen Vektor mit fester Größe verwenden, müssen Sie natürlich mit dem Problem umgehen, dass es "voll" wird, was bedeutet, dass Ihre Autoren auf flush warten müssen, bevor sie neue Änderungen hinzufügen. Deshalb würde ich mich zuerst für die einfache Liste entscheiden und sehen, ob es ausreicht oder wo es verbessert werden muss.

PS: Ein dequeue könnte auch gut für Ihr Problem passen, aber wenn Sie eine feste Größe benötigen, müssen Sie mit dem Problem umgehen, dass Ihre Schreiber möglicherweise mehr Änderungen produzieren, als Ihr Leser ausspülen kann. Die Dequeue kann unendlich wachsen, aber Sie sind es wahrscheinlich nicht. Und der Vektor hat ziemlich niedrigen Overhead.

+0

Ich habe eine Antwort mit einigen tatsächlichen Messungen hinzugefügt, so dass es ziemlich egal ist, welche Implementierung des Änderungsprotokolls ich im Vergleich zu anderen Ausgaben verwende. –

0

Ich machte einige (sehr vereinfachende) Untersuchung https://github.com/lolepezy/rpki-pub-server/tree/add-storage/test/changeLog imitieren genau die Art der Ladung, die ich angeblich haben soll. Ich habe die gleiche STMContainers.Map für den Datensatz und die übliche Liste für das Änderungsprotokoll verwendet. Um die Anzahl der Transaktionsversuche zu verfolgen, habe ich Debug.Trace.trace verwendet, dh die Anzahl der Zeilen, die durch die Ablaufverfolgung gedruckt wurden. Und die Anzahl der eindeutige Zeilen von Trace gedruckt gibt mir die Anzahl der festgeschriebenen Transaktionen.

Das Ergebnis ist hier (https://github.com/lolepezy/rpki-pub-server/blob/add-storage/test/changeLog/numbers.txt). Die erste Spalte gibt die Anzahl der Threads an, die zweite die Gesamtzahl der generierten Änderungssets. Die dritte Spalte enthält die Anzahl der Trace-Aufrufe für den Fall ohne Änderungsprotokoll und die letzte Spalte die Anzahl der Trace-Aufrufe mit dem Änderungsprotokoll.

Anscheinend die meiste Zeit Änderungsprotokoll fügt einige zusätzliche Wiederholungen hinzu, aber es ist ziemlich unbedeutend. Also, ich denke, es ist fair zu sagen, dass jede Datenstruktur gut genug wäre, da der Großteil der Arbeit mit der Aktualisierung der Karte zu tun hat und die meisten Wiederholungen deshalb stattfinden.