2016-03-25 19 views
3

Ich bin ein New-grad SWE Lernen Go (und liebe es).GoLang: Dekomprimieren bz2 in Goroutine, konsumieren in anderen Goroutine

Ich baue einen Parser für Wikipedia-Dump-Dateien - im Grunde eine riesige bzip2-komprimierte XML-Datei (~ 50GB unkomprimiert).

Ich möchte sowohl Streaming-Dekomprimierung und Parsing, die einfach genug klingt. Für Dekompression, das tue ich:

inputFilePath := flag.Arg(0) inputReader := bzip2.NewReader(inputFile)

Und dann den Leser auf den XML-Parser übergeben:

decoder := xml.NewDecoder(inputFile)

Da jedoch beide Dekomprimieren und Parsing teure Operationen sind, würde Ich mag haben Sie werden in separaten Go-Routinen ausgeführt, um zusätzliche Kerne zu verwenden. Wie würde ich das in Go machen?

Das einzige, was ich mir vorstellen kann, ist das Wrappen der Datei in einem chan [] Byte und das Implementieren der io.Reader-Schnittstelle, aber ich nehme an, dass es einen gebauten (und saubereren) Weg gibt.

Hat schon mal jemand so etwas gemacht?

Danke! Manuel

Antwort

2

Sie können io.Pipe verwenden, dann io.Copy verwenden, um die dekomprimierten Daten in das Rohr zu schieben, und in einem anderen goroutine lesen:

package main 

import (
    "bytes" 
    "encoding/json" 
    "fmt" 
    "io" 
    "sync" 
) 

func main() { 

    rawJson := []byte(`{ 
      "Foo": { 
       "Bar": "Baz" 
      } 
     }`) 

    bzip2Reader := bytes.NewReader(rawJson) // this stands in for the bzip2.NewReader 

    var wg sync.WaitGroup 
    wg.Add(2) 

    r, w := io.Pipe() 

    go func() { 
     // write everything into the pipe. Decompression happens in this goroutine. 
     io.Copy(w, bzip2Reader) 
     w.Close() 
     wg.Done() 
    }() 

    decoder := json.NewDecoder(r) 

    go func() { 
     for { 
      t, err := decoder.Token() 
      if err != nil { 
       break 
      } 
      fmt.Println(t) 
     } 
     wg.Done() 
    }() 

    wg.Wait() 
} 

http://play.golang.org/p/fXLnfnaWYA

+1

Das ist genau das, was ich brauchte, danke! Leider scheint es, dass die Leistung des Stardard-lib-bzip2-Dekompressors nicht groß ist, so dass es immer noch der limitierende Faktor ist. Ich kann zu diesem Kompressor wechseln: https://godoc.org/github.com/dsnet/compress/bzip2 Allerdings ist es immer noch etwa 33% langsamer als so etwas wie pbzip2. –

+0

Wie viel von einer Beschleunigung haben Sie am Ende bekommen, @ManuelMenzella? Ich mag das Aussehen dieses Codes - es scheint als ob es funktionieren sollte, aber in meinen Tests ist es nur marginal schneller als alles Singlethread (67 sec vs 72 sec auf 1M records). Irgendeine Idee, was ich falsch machen könnte, @ user1431317? – EM0

+0

Vielleicht ist es immer noch begrenzt durch wie schnell die bzip2-Dekomprimierung Daten liefern kann, und die XML-Dekodierung benötigt nicht so viel CPU-Leistung. Die Pipe fügt wahrscheinlich etwas Overhead hinzu, obwohl io.Copy Optimierungen hat, wenn eines oder beide Enden io.Reader/io.Writer sind. Es ist möglich, dass es viele kleine temporäre Puffer zuweist, und das verursacht zu viel Müll. Vielleicht würde ein gepufferter Leser oder Schreiber helfen. Sie sollten Ihre App profilieren (sowohl CPU- als auch Mem-Profil - das mem-Profil kann Ihnen helfen, viele unnötige Zuweisungen zu finden). – user1431317