2010-01-27 10 views
7

Kennt jemand einen Open-Source-BufferedIterator, in dem die nächsten N-Elemente auf einem Hintergrund-Thread eifrig abgerufen werden? Hier ist an implementation von einem TechRepublic article, aber ich nehme an, es wurde nicht gründlich getestet.Implementierung von BufferedIterator

Iterators .Puffer (Iterator toBuffer, int pufferSize) wäre eine nette Ergänzung zu Guava, hat das berücksichtigt worden?

+0

Dies ist wahrscheinlich eine sinnvolle Feature-Anfrage für Guava. http://code.google.com/p/guava-libraries/issues/entry –

+0

Erledigt: http://code.google.com/p/guava-libraries/issues/detail?id=318 –

Antwort

4

Die verknüpfte Implementierung für Java 4 geschrieben zu haben scheint und kann ein wenig mit Guave und java.util.concurrent vereinfacht werden:

import java.util.Iterator; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.atomic.AtomicReference; 

import com.google.common.base.Throwables; 
import com.google.common.collect.AbstractIterator; 
import com.google.common.util.concurrent.Executors; 

public abstract class Iterators2 { 
    public static <E> Iterator<E> buffer(final Iterator<E>  source, 
             int     capacity) { 
     return buffer(source, capacity, defaultExecutor); 
    } 

    public static <E> Iterator<E> buffer(final Iterator<E>  source, 
             int     capacity, 
             final ExecutorService exec) { 
     if (capacity <= 0) return source; 
     final BlockingQueue<E> queue = new ArrayBlockingQueue<E>(capacity); 

     // Temporary storage for an element we fetched but could not fit in the queue 
     final AtomicReference<E> overflow = new AtomicReference<E>(); 
     final Runnable inserter = new Runnable() { 
      @SuppressWarnings("unchecked") 
      public void run() { 
       E next = (E) END_MARKER; 
       if (source.hasNext()) { 
        next = source.next(); 
        // ArrayBlockingQueue does not allow nulls 
        if (next == null) next = (E) NULL_MARKER; 
       } 
       if (queue.offer(next)) { 
        // Keep buffering elements as long as we can 
        if (next != END_MARKER) exec.submit(this); 
       } else { 
        // Save the element. This also signals to the 
        // iterator that the inserter thread is blocked. 
        overflow.lazySet(next); 
       } 
      } 
     }; 
     // Fetch the first element. 
     // The inserter will resubmit itself as necessary to fetch more elements. 
     exec.submit(inserter); 
     Iterator<E> iterator = new AbstractIterator<E>() { 
      protected E computeNext() { 
       try { 
        E next = queue.take(); 
        E overflowElem = overflow.getAndSet(null); 
        if (overflowElem != null) { 
         // There is now a space in the queue 
         queue.put(overflowElem); 
         // Awaken the inserter thread 
         exec.submit(inserter); 
        } 
        if (next == END_MARKER) { 
         return endOfData(); 
        } else if (next == NULL_MARKER) { 
         return null; 
        } else { 
         return next; 
        } 
       } catch (InterruptedException ex) { 
        Thread.currentThread().interrupt(); 
        return endOfData(); 
       } 
      } 
     }; 

     return iterator; 
    } 

    protected Iterators2() { 
     throw Throwables.propagate(new InstantiationException(Iterators2.class + " is a static class and cannot be instantiated")); 
    } 

    private static ExecutorService defaultExecutor = 
     java.util.concurrent.Executors.newCachedThreadPool(Executors.daemonThreadFactory()); 

    private static final Object END_MARKER = new Object(); 

    private static final Object NULL_MARKER = new Object(); 
} 

Hinweis: Die obige Implementierung macht keinen Versuch, Ausnahmen in der Quelle zu behandeln Iterator (wenn einer geworfen wird, wird der Einfüge-Task abrupt beendet und der aufrufende Thread wird deadlocked.)

+0

Warum würdest du? Akzeptieren Sie einen ExecutorService. Wann möchten Sie etwas anderes als eine Daemon-Thread-Factory, wie Sie es standardmäßig getan haben? –

+2

Sie möchten möglicherweise Threads aus einem Pool fester Größe zuweisen. Oder Sie möchten die Standardpriorität ändern. Oder Sie möchten alle Threads im Auge behalten, falls Sie sie bei einer Datenbankverbindung löschen müssen. Es ist einfacher, die vorhandenen Schnittstellen ('ExecutorService' und' ThreadFactory') zu verwenden, anstatt eine Reihe von Überladungen für alle verschiedenen Optionen hinzuzufügen. – finnw

+1

Beachten Sie, dass diese Implementierung Ausnahmen vom Quellen-Iterator nicht angemessen behandelt. –