2012-06-18 9 views
23

Betrachten Sie einige Webserver-Instanzen, die parallel ausgeführt werden. Jeder Server enthält einen Verweis auf einen einzelnen freigegebenen "Status-Keeper", dessen Aufgabe es ist, die letzten N Anfragen von allen Servern zu halten.Thread-sicherer Ringspeicher in Java

Zum Beispiel (N=3):

Server a: "Request id = ABCD"  Status keeper=["ABCD"] 
Server b: "Request id = XYZZ"  Status keeper=["ABCD", "XYZZ"] 
Server c: "Request id = 1234"  Status keeper=["ABCD", "XYZZ", "1234"] 
Server b: "Request id = FOO"   Status keeper=["XYZZ", "1234", "FOO"] 
Server a: "Request id = BAR"   Status keeper=["1234", "FOO", "BAR"] 

Zu jedem Zeitpunkt des „Status keeper“ könnte von einer Überwachungsanwendung aufgerufen werden, die diese letzten N Anfragen für einen SLA-Bericht liest.

Was ist der beste Weg, dieses Producer-Consumer-Szenario in Java zu implementieren und den Webservern höhere Priorität als dem SLA-Bericht zu geben?

CircularFifoBuffer scheint die geeignete Datenstruktur zu sein, um die Anforderungen zu halten, aber ich bin mir nicht sicher, was der optimale Weg ist, effiziente Nebenläufigkeit zu implementieren.

+0

Definieren Sie "höhere Priorität". Was passiert, wenn der Bericht den Puffer gelesen hat? Sollte es brechen und neu anfangen, wenn jemand es schreiben will? Kann das wiederum zu Hunger führen? –

+0

Es sollte nie verhungern und es sollte nie gestoppt werden, aber es kann ein bisschen länger warten - was bedeutet, dass seine Priorität langsam mit der Zeit zunehmen sollte. –

+0

Wie viele Hersteller und wie viele Verbraucher sollte der Ring-Puffer haben, werde ich einige Code löschen, wenn Sie Daten bereitstellen. – bestsss

Antwort

16
Buffer fifo = BufferUtils.synchronizedBuffer(new CircularFifoBuffer()); 
+0

+1 Sollte 'fifo' nicht volatil sein? –

+2

Ist nicht wirklich wichtig, solange der Initialisierungscode nicht rennbar ist – MahdeTo

+0

Woher kommt BufferUtils? Ich habe versucht, dies von Apache, in Gradle-Datei: "compile 'org.apache.commons: commons-collections4: 4.1'", aber es ist nicht da ... –

2

Ich würde ArrayDeque betrachten, oder für eine mehr gleichzeitige Implementierung werfen Sie einen Blick auf die Disruptor-Bibliothek, die eine der anspruchsvollsten/komplexen Ring-Puffer in Java ist.

Eine Alternative ist die Verwendung einer unbegrenzten Warteschlange, die mehr parallel ist, da der Erzeuger niemals auf den Verbraucher warten muss.

Sofern Ihre Anforderungen die Komplexität nicht rechtfertigen, kann ein ArrayDeque alles sein, was Sie brauchen.

+0

Ein wichtiges Problem: 'ArrayDeque' ist nicht größenbegrenzt. Es verwendet ein kreisförmiges Array, True, aber es wird die Größe ändern, um mehr Elemente wie nötig unterzubringen. Das OP müsste manuell ein Element "poppen"(), bevor es nach einiger Zeit ein neues einfügt, während gleichzeitig auch die Thread-Sicherheit explizit beibehalten wird ... – thkala

+1

Wenn Sie eine Größenbeschränkung benötigen, können Sie ArrayBlockingQueue verwenden. –

+1

'ArrayBlockingQueue' begrenzt seine Größe, indem es blockiert, bis ein Element entfernt wird. Soweit ich das beurteilen kann, möchte das OP, dass die Warteschlange das älteste Element implizit löscht/überschreibt, wobei nur die letzten "N" -Elemente beibehalten werden. – thkala

7

Hier ist eine blockierungsfreie Ringpufferimplementierung. Es implementiert einen Puffer fester Größe - es gibt keine FIFO-Funktionalität. Ich würde vorschlagen, dass Sie stattdessen eine Collection von Anfragen für jeden Server speichern. Auf diese Weise kann Ihr Bericht die Filterung durchführen, anstatt Ihre Datenstruktur zu filtern.

/** 
* Container 
* --------- 
* 
* A lock-free container that offers a close-to O(1) add/remove performance. 
* 
*/ 
public class Container<T> implements Iterable<T> { 

    // The capacity of the container. 
    final int capacity; 
    // The list. 
    AtomicReference<Node<T>> head = new AtomicReference<Node<T>>(); 
    // TESTING { 
    AtomicLong totalAdded = new AtomicLong(0); 
    AtomicLong totalFreed = new AtomicLong(0); 
    AtomicLong totalSkipped = new AtomicLong(0); 

    private void resetStats() { 
    totalAdded.set(0); 
    totalFreed.set(0); 
    totalSkipped.set(0); 
    } 
    // TESTING } 

    // Constructor 
    public Container(int capacity) { 
    this.capacity = capacity; 
    // Construct the list. 
    Node<T> h = new Node<T>(); 
    Node<T> it = h; 
    // One created, now add (capacity - 1) more 
    for (int i = 0; i < capacity - 1; i++) { 
     // Add it. 
     it.next = new Node<T>(); 
     // Step on to it. 
     it = it.next; 
    } 
    // Make it a ring. 
    it.next = h; 
    // Install it. 
    head.set(h); 
    } 

    // Empty ... NOT thread safe. 
    public void clear() { 
    Node<T> it = head.get(); 
    for (int i = 0; i < capacity; i++) { 
     // Trash the element 
     it.element = null; 
     // Mark it free. 
     it.free.set(true); 
     it = it.next; 
    } 
    // Clear stats. 
    resetStats(); 
    } 

    // Add a new one. 
    public Node<T> add(T element) { 
    // Get a free node and attach the element. 
    totalAdded.incrementAndGet(); 
    return getFree().attach(element); 
    } 

    // Find the next free element and mark it not free. 
    private Node<T> getFree() { 
    Node<T> freeNode = head.get(); 
    int skipped = 0; 
    // Stop when we hit the end of the list 
    // ... or we successfully transit a node from free to not-free. 
    while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) { 
     skipped += 1; 
     freeNode = freeNode.next; 
    } 
    // Keep count of skipped. 
    totalSkipped.addAndGet(skipped); 
    if (skipped < capacity) { 
     // Put the head as next. 
     // Doesn't matter if it fails. That would just mean someone else was doing the same. 
     head.set(freeNode.next); 
    } else { 
     // We hit the end! No more free nodes. 
     throw new IllegalStateException("Capacity exhausted."); 
    } 
    return freeNode; 
    } 

    // Mark it free. 
    public void remove(Node<T> it, T element) { 
    totalFreed.incrementAndGet(); 
    // Remove the element first. 
    it.detach(element); 
    // Mark it as free. 
    if (!it.free.compareAndSet(false, true)) { 
     throw new IllegalStateException("Freeing a freed node."); 
    } 
    } 

    // The Node class. It is static so needs the <T> repeated. 
    public static class Node<T> { 

    // The element in the node. 
    private T element; 
    // Are we free? 
    private AtomicBoolean free = new AtomicBoolean(true); 
    // The next reference in whatever list I am in. 
    private Node<T> next; 

    // Construct a node of the list 
    private Node() { 
     // Start empty. 
     element = null; 
    } 

    // Attach the element. 
    public Node<T> attach(T element) { 
     // Sanity check. 
     if (this.element == null) { 
     this.element = element; 
     } else { 
     throw new IllegalArgumentException("There is already an element attached."); 
     } 
     // Useful for chaining. 
     return this; 
    } 

    // Detach the element. 
    public Node<T> detach(T element) { 
     // Sanity check. 
     if (this.element == element) { 
     this.element = null; 
     } else { 
     throw new IllegalArgumentException("Removal of wrong element."); 
     } 
     // Useful for chaining. 
     return this; 
    } 

    public T get() { 
     return element; 
    } 

    @Override 
    public String toString() { 
     return element != null ? element.toString() : "null"; 
    } 
    } 

    // Provides an iterator across all items in the container. 
    public Iterator<T> iterator() { 
    return new UsedNodesIterator<T>(this); 
    } 

    // Iterates across used nodes. 
    private static class UsedNodesIterator<T> implements Iterator<T> { 
    // Where next to look for the next used node. 

    Node<T> it; 
    int limit = 0; 
    T next = null; 

    public UsedNodesIterator(Container<T> c) { 
     // Snapshot the head node at this time. 
     it = c.head.get(); 
     limit = c.capacity; 
    } 

    public boolean hasNext() { 
     // Made into a `while` loop to fix issue reported by @Nim in code review 
     while (next == null && limit > 0) { 
     // Scan to the next non-free node. 
     while (limit > 0 && it.free.get() == true) { 
      it = it.next; 
      // Step down 1. 
      limit -= 1; 
     } 
     if (limit != 0) { 
      next = it.element; 
     } 
     } 
     return next != null; 
    } 

    public T next() { 
     T n = null; 
     if (hasNext()) { 
     // Give it to them. 
     n = next; 
     next = null; 
     // Step forward. 
     it = it.next; 
     limit -= 1; 
     } else { 
     // Not there!! 
     throw new NoSuchElementException(); 
     } 
     return n; 
    } 

    public void remove() { 
     throw new UnsupportedOperationException("Not supported."); 
    } 
    } 

    @Override 
    public String toString() { 
    StringBuilder s = new StringBuilder(); 
    Separator comma = new Separator(","); 
    // Keep counts too. 
    int usedCount = 0; 
    int freeCount = 0; 
    // I will iterate the list myself as I want to count free nodes too. 
    Node<T> it = head.get(); 
    int count = 0; 
    s.append("["); 
    // Scan to the end. 
    while (count < capacity) { 
     // Is it in-use? 
     if (it.free.get() == false) { 
     // Grab its element. 
     T e = it.element; 
     // Is it null? 
     if (e != null) { 
      // Good element. 
      s.append(comma.sep()).append(e.toString()); 
      // Count them. 
      usedCount += 1; 
     } else { 
      // Probably became free while I was traversing. 
      // Because the element is detached before the entry is marked free. 
      freeCount += 1; 
     } 
     } else { 
     // Free one. 
     freeCount += 1; 
     } 
     // Next 
     it = it.next; 
     count += 1; 
    } 
    // Decorate with counts "]used+free". 
    s.append("]").append(usedCount).append("+").append(freeCount); 
    if (usedCount + freeCount != capacity) { 
     // Perhaps something was added/freed while we were iterating. 
     s.append("?"); 
    } 
    return s.toString(); 
    } 
} 

Beachten Sie, dass dies in der Nähe von O1 ist. A Separator emittiert einfach "" zuerst und dann seinen Parameter von da an.

Edit: Testmethoden hinzugefügt.

// ***** Following only needed for testing. ***** 
private static boolean Debug = false; 
private final static String logName = "Container.log"; 
private final static NamedFileOutput log = new NamedFileOutput("C:\\Junk\\"); 

private static synchronized void log(boolean toStdoutToo, String s) { 
    if (Debug) { 
    if (toStdoutToo) { 
     System.out.println(s); 
    } 
    log(s); 
    } 
} 

private static synchronized void log(String s) { 
    if (Debug) { 
    try { 
     log.writeLn(logName, s); 
    } catch (IOException ex) { 
     ex.printStackTrace(); 
    } 
    } 
} 
static volatile boolean testing = true; 

// Tester object to exercise the container. 
static class Tester<T> implements Runnable { 
    // My name. 

    T me; 
    // The container I am testing. 
    Container<T> c; 

    public Tester(Container<T> container, T name) { 
    c = container; 
    me = name; 
    } 

    private void pause() { 
    try { 
     Thread.sleep(0); 
    } catch (InterruptedException ex) { 
     testing = false; 
    } 
    } 

    public void run() { 
    // Spin on add/remove until stopped. 
    while (testing) { 
     // Add it. 
     Node<T> n = c.add(me); 
     log("Added " + me + ": " + c.toString()); 
     pause(); 
     // Remove it. 
     c.remove(n, me); 
     log("Removed " + me + ": " + c.toString()); 
     pause(); 
    } 
    } 
} 
static final String[] strings = { 
    "One", "Two", "Three", "Four", "Five", 
    "Six", "Seven", "Eight", "Nine", "Ten" 
}; 
static final int TEST_THREADS = Math.min(10, strings.length); 

public static void main(String[] args) throws InterruptedException { 
    Debug = true; 
    log.delete(logName); 
    Container<String> c = new Container<String>(10); 

    // Simple add/remove 
    log(true, "Simple test"); 
    Node<String> it = c.add(strings[0]); 
    log("Added " + c.toString()); 
    c.remove(it, strings[0]); 
    log("Removed " + c.toString()); 

    // Capacity test. 
    log(true, "Capacity test"); 
    ArrayList<Node<String>> nodes = new ArrayList<Node<String>>(strings.length); 
    // Fill it. 
    for (int i = 0; i < strings.length; i++) { 
    nodes.add(i, c.add(strings[i])); 
    log("Added " + strings[i] + " " + c.toString()); 
    } 
    // Add one more. 
    try { 
    c.add("Wafer thin mint!"); 
    } catch (IllegalStateException ise) { 
    log("Full!"); 
    } 
    c.clear(); 
    log("Empty: " + c.toString()); 

    // Iterate test. 
    log(true, "Iterator test"); 
    for (int i = 0; i < strings.length; i++) { 
    nodes.add(i, c.add(strings[i])); 
    } 
    StringBuilder all = new StringBuilder(); 
    Separator sep = new Separator(","); 
    for (String s : c) { 
    all.append(sep.sep()).append(s); 
    } 
    log("All: "+all); 
    for (int i = 0; i < strings.length; i++) { 
    c.remove(nodes.get(i), strings[i]); 
    } 
    sep.reset(); 
    all.setLength(0); 
    for (String s : c) { 
    all.append(sep.sep()).append(s); 
    } 
    log("None: " + all.toString()); 

    // Multiple add/remove 
    log(true, "Multi test"); 
    for (int i = 0; i < strings.length; i++) { 
    nodes.add(i, c.add(strings[i])); 
    log("Added " + strings[i] + " " + c.toString()); 
    } 
    log("Filled " + c.toString()); 
    for (int i = 0; i < strings.length - 1; i++) { 
    c.remove(nodes.get(i), strings[i]); 
    log("Removed " + strings[i] + " " + c.toString()); 
    } 
    c.remove(nodes.get(strings.length - 1), strings[strings.length - 1]); 
    log("Empty " + c.toString()); 

    // Multi-threaded add/remove 
    log(true, "Threads test"); 
    c.clear(); 
    for (int i = 0; i < TEST_THREADS; i++) { 
    Thread t = new Thread(new Tester<String>(c, strings[i])); 
    t.setName("Tester " + strings[i]); 
    log("Starting " + t.getName()); 
    t.start(); 
    } 
    // Wait for 10 seconds. 
    long stop = System.currentTimeMillis() + 10 * 1000; 
    while (System.currentTimeMillis() < stop) { 
    Thread.sleep(100); 
    } 
    // Stop the testers. 
    testing = false; 
    // Wait some more. 
    Thread.sleep(1 * 100); 
    // Get stats. 
    double added = c.totalAdded.doubleValue(); 
    double skipped = c.totalSkipped.doubleValue(); 
    //double freed = c.freed.doubleValue(); 
    log(true, "Stats: added=" + c.totalAdded + ",freed=" + c.totalFreed + ",skipped=" + c.totalSkipped + ",O(" + ((added + skipped)/added) + ")"); 
} 
+0

Haben Sie eine formale Bestätigung für die Richtigkeit dieses Algorithmus? Lock-free Datenstrukturen sind notorisch schwer zu korrigieren, es sei denn, Sie vermeiden die Wiederverwendung von Knoten ... – thkala

+0

@thkala - wie 'formal' brauchen Sie? Der primäre Algorithmus ist in der 'getFree'-Methode, die einen freien Knoten auswählt und ihn zur Verwendung markiert. Es ist ziemlich einfach und seine Korrektheit sollte offensichtlich sein. Ich habe meine Testmethoden hinzugefügt. Vielleicht werden sie helfen. – OldCurmudgeon

+0

Die Art von "formal", die veröffentlichte und Peer-Review-Algorithmen haben.Ich habe mit Lock-Free-Datenstrukturen ausgiebig gearbeitet und sie können extrem schwierig sein, richtig zu machen. Es gibt einfach zu viele Ecken ... – thkala

1

auch einen Blick auf java.util.concurrent haben.

Blocking Warteschlangen blockiert, bis es etwas zu konsumieren oder (optional) Raum zu produzieren:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html

Concurrent verknüpfte Warteschlange ist nicht blockierend und verwendet einen Slick-Algorithmus, der einen Erzeuger und Verbraucher ermöglicht, aktiv gleichzeitig:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html

+0

CLQ ist nicht gebunden, es funktioniert nicht. – bestsss

1

Hazelcast ist Queue bietet fast alles, was man sich wünschen, aber suppor nicht t Rundheit. Aber von Ihrer Beschreibung bin ich nicht sicher, ob Sie es tatsächlich brauchen.

0

Wenn ich es wäre, würde ich den CircularFIFOBuffer verwenden, wie Sie angegeben haben, und synchronisieren Sie den Puffer beim Schreiben (hinzufügen). Wenn die Überwachungsanwendung den Puffer lesen möchte, synchronisieren Sie den Puffer und kopieren oder klonen Sie ihn anschließend, um ihn für die Berichterstellung zu verwenden.

Dieser Vorschlag basiert auf der Annahme, dass die Latenz minimal ist, um den Puffer in ein neues Objekt zu kopieren/klonen. Wenn es eine große Anzahl von Elementen gibt und die Kopierzeit langsam ist, ist dies keine gute Idee.

Pseudo-Code-Beispiel:

public void writeRequest(String requestID) { 
    synchronized(buffer) { 
     buffer.add(requestID); 
    } 
} 

public Collection<String> getRequests() { 
    synchronized(buffer) { 
     return buffer.clone(); 
    } 
} 
2

Vielleicht möchten Sie bei Disruptor - Concurrent Programming Framework suchen.

  • ein Papier finden Sie die Alternativen, Design und auch eine Leistung comparement zu java.util.concurrent.ArrayBlockingQueue hier beschreiben: pdf
  • Betrachten Sie die ersten drei Artikel aus BlogsAndArticles

Wenn die Bibliothek ist zu viel zu lesen, Stick zu java.util.concurrent.ArrayBlockingQueue