2016-06-09 27 views
1

Synchron Beispiel:Golang Nicht Blocking Buffer

type job struct { 
    Id int 
    Message string 
} 

for { 
    // getJob() blocks until job is received 
    job := getJob() 
    doSomethingWithJob(job) 
} 

Ich wünsche Jobs zu verarbeiten, wie sie in von getJob mit doSomethingWithJob kommen. z.B. getJob könnte eine Payload sein, die von einer MessagingQueue wie RabbitMQ/Beanstalkd oder einer HTTP-Anfrage empfangen wird.

Ich möchte getJob nicht blockieren, während ich bin doSomethingWithJob & umgekehrt. Ich möchte jedoch die Anzahl der Jobs kontrollieren/puffern, damit ich das System nicht überlasten kann. z.B. max Gleichzeitigkeit von 5.

Das Konzept der Go-Routinen verwirren mich im Moment, so dass alle Zeiger in die richtige Richtung würde geschätzt viel zu helfen mir zu lernen.

Aktualisieren: Danke @JimB für Ihre Hilfe. Warum nimmt Arbeiter 5 immer den Job auf?

jobCh := make(chan *job) 

// Max 5 Workers 
for i := 0; i < 5; i++ { 

    go func() { 

     for job := range jobCh { 
      time.Sleep(time.Second * time.Duration(rand.Intn(3))) 
      log.Println(i, string(job.Message)) 
     } 
    }() 
} 

for { 
    job, err := getJob() 
    if err != nil { 
     log.Println("Closing Channel") 
     close(jobCh) 
     break 
    } 

    jobCh <- job 
} 

log.Println("Complete") 

Beispiel Ausgabe

2016/06/09 22:19:57 5 {"id":10692,"name":"Test Message"} 
2016/06/09 22:19:57 5 {"id":10687,"name":"Test Message"} 
2016/06/09 22:19:57 5 {"id":10699,"name":"Test Message"} 
2016/06/09 22:19:57 5 {"id":10701,"name":"Test Message"} 
2016/06/09 22:19:57 5 {"id":10703,"name":"Test Message"} 
2016/06/09 22:19:57 5 {"id":10704,"name":"Test Message"} 
+0

es ist nicht 5. Arbeiter, jeder Arbeiter hat i = 5. Es sollte 'gehen func (int i) {...} (i)' https://golang.org/doc/faq#closures_and_goroutines – Darigaaz

Antwort

4

Sie können 5 goroutines von einem Kanal zu lesen beginnen doSomethingWithJob zu nennen. Auf diese Weise werden nie mehr als 5 Jobs gleichzeitig verarbeitet.

jobCh := make(chan *job) 

// start 5 workers to process jobs 
for i := 0; i < 5; i++ { 
    go func() { 
     for job := range jobCh { 
      doSomethingWithJob(job) 
     } 
    }() 
} 

// send jobs to workers as fast as we can 
for { 
    jobCh <- getJob() 
} 
+0

Wow ... Ich habe bei all dem hin und overcomplicating suchen! – Gravy

+0

Ich habe die Methode oben versucht. Und es funktioniert, aber es sieht so aus, als ob nur Arbeiter 5 die Arbeit aufnimmt. Siehe Update in Frage. – Gravy

+0

@Gravy: https://golang.org/doc/faq#closures_and_goroutines. Jede Goroutine verwendet die gleiche "i" Variable und der letzte Wert von i war "5". – JimB