2014-05-23 19 views
8

Ich versuche, einen Pool von Arbeitern in Go zu implementieren. Der Abschnitt go-wiki (und Effektiv Go in den Channels) enthält hervorragende Beispiele für die Verwendung von Ressourcen. Erstellen Sie einfach einen Kanal mit einem Puffer, der so groß ist wie der Worker-Pool. Fülle dann diesen Kanal mit Arbeitern und schicke sie zurück in den Kanal, wenn sie fertig sind. Empfangen von den Kanalblöcken, bis ein Mitarbeiter verfügbar ist. Also der Kanal und eine Schleife ist die gesamte Implementierung - sehr cool!Idiomatische Variable-Size-Worker-Pool in Go

Alternativ könnte man beim Senden in den Kanal blockieren, aber gleiche Idee.

Meine Frage ist über die Größe des Worker-Pools ändern, während es ausgeführt wird. Ich glaube nicht, dass es einen Weg gibt, die Größe eines Kanals zu ändern. Ich habe einige Ideen, aber die meisten scheinen viel zu kompliziert. This page implementiert tatsächlich ein Semaphor mit einem Kanal und leere Strukturen in etwa die gleiche Weise, aber es hat das gleiche Problem (diese Dinge kommen die ganze Zeit beim googeln für "golang Semaphor".

+0

Warum möchten Sie einen Worker Pool mit variabler Größe haben? – fabrizioM

+0

@fabrizioM Die Arbeiter machen sehr wenig für sich - sie kontrollieren wirklich nur einige externe Prozesse. Die Anzahl der Arbeiter sollte von der Last der Maschinen mit den externen Prozessen und anderen Faktoren abhängen (z. B. Priorität einer Art von Arbeit gegenüber einer anderen). – Hut8

+0

In diesem Fall sollten Sie idealerweise einen anderen Mechanismus als einen Pool mit einstellbarer Größe verwenden, um dies zu tunen. Sie könnten zum Beispiel eine goroutine starten, die den Arbeitsspeicher oder die Auslastung einer Zielmaschine überprüft, und eine Aufgabe starten, wenn die Maschine bereit ist, und eine weitere Prüfung einplanen, wenn nicht. Ich weiß natürlich nicht, was funktioniert, da ich nicht wirklich weiß, was Sie bauen. – twotwotwo

Antwort

17

Ich würde es anders herum tun. Anstatt viele Goroutines hervorzubringen (die immer noch eine beträchtliche Menge an Speicher benötigen) und einen Kanal zu benutzen, um sie zu blockieren, würde ich die Arbeiter als Göroutines modellieren und einen Kanal benutzen, um die Arbeit zu verteilen. So etwas wie dieses:

package main 

import (
    "fmt" 
    "sync" 
) 

type Task string 

func worker(tasks <-chan Task, quit <-chan bool, wg *sync.WaitGroup) { 
    defer wg.Done() 
    for { 
     select { 
     case task, ok := <-tasks: 
      if !ok { 
       return 
      } 
      fmt.Println("processing task", task) 
     case <-quit: 
      return 
     } 
    } 
} 

func main() { 
    tasks := make(chan Task, 128) 
    quit := make(chan bool) 
    var wg sync.WaitGroup 

    // spawn 5 workers 
    for i := 0; i < 5; i++ { 
     wg.Add(1) 
     go worker(tasks, quit, &wg) 
    } 

    // distribute some tasks 
    tasks <- Task("foo") 
    tasks <- Task("bar") 

    // remove two workers 
    quit <- true 
    quit <- true 

    // add three more workers 
    for i := 0; i < 3; i++ { 
     wg.Add(1) 
     go worker(tasks, quit, &wg) 
    } 

    // distribute more tasks 
    for i := 0; i < 20; i++ { 
     tasks <- Task(fmt.Sprintf("additional_%d", i+1)) 
    } 

    // end of tasks. the workers should quit afterwards 
    close(tasks) 
    // use "close(quit)", if you do not want to wait for the remaining tasks 

    // wait for all workers to shut down properly 
    wg.Wait() 
} 

Es könnte eine gute Idee sein, einen separaten WorkerPool-Typ mit einigen bequemen Methoden zu erstellen. Anstelle von type Task string ist es üblich, eine Struktur zu verwenden, die auch einen done-Kanal enthält, der verwendet wird, um anzuzeigen, dass die Aufgabe erfolgreich ausgeführt wurde.

Edit: Ich habe ein bisschen mehr herum gespielt und kam mit dem folgenden: http://play.golang.org/p/VlEirPRk8V. Es ist im Grunde das gleiche Beispiel mit einer besseren API.

+1

Danke dafür! Sieht großartig aus. –

+1

In diesem Fall ist es möglich, dass alle Arbeiter einen Quit vor dem Beispiel erhalten: http://play.golang.org/p/19DKaA4Q6G – Oleg

2

Eine einfache Änderung, die denken kann, ist um einen Kanal zu haben, der steuert, wie groß der Semaphor ist Der relevante Teil sind die Select-Anweisungen.Wenn es mehr Arbeit von der Warteschlange gibt, verarbeite sie mit dem aktuellen Semaphor.Wenn es eine Anfrage gibt, die Größe des Semaphors zu ändern, ändere sie und weiterhin die erf Warteschlange Verarbeitung mit dem neuen Semaphore. Beachten Sie, dass der alte sein wird Müll gesammelt.

package main 

import "time" 
import "fmt" 

type Request struct{ num int } 
var quit chan struct{} = make(chan struct{}) 

func Serve(queue chan *Request, resize chan int, semsize int) { 
    for { 
     sem := make(chan struct{}, semsize) 
     var req *Request 
     select { 
     case semsize = <-resize: 
      { 
       sem = make(chan struct{}, semsize) 
       fmt.Println("changing semaphore size to ", semsize) 
      } 
     case req = <-queue: 
      { 
       sem <- struct{}{} // Block until there's capacity to process a request. 
       go handle(req, sem) // Don't wait for handle to finish. 
      } 
       case <-quit: 
        return 
     } 

    } 
} 

func process(r *Request) { 
    fmt.Println("Handled Request", r.num) 
} 

func handle(r *Request, sem chan struct{}) { 
    process(r) // May take a long time & use a lot of memory or CPU 
    <-sem  // Done; enable next request to run. 
} 

func main() { 
    workq := make(chan *Request, 1) 
    ctrlq := make(chan int) 
    go func() { 
     for i := 0; i < 20; i += 1 { 
      <-time.After(100 * time.Millisecond) 
      workq <- &Request{i} 
     } 
     <-time.After(500 * time.Millisecond) 
      quit <- struct{}{} 
    }() 
    go func() { 
     <-time.After(500 * time.Millisecond) 
     ctrlq <- 10 
    }() 
    Serve(workq, ctrlq, 1) 
} 

http://play.golang.org/p/AHOLlAv2LH