76

Wie benutzt man multiprocessing, um embarrassingly parallel problems anzugehen?Peinlich parallele Probleme mit Python Multiprocessing lösen

Peinlicher parallel Probleme bestehen typischerweise aus drei Hauptteilen:

  1. lesen Eingangsdaten (aus einer Datei, Datenbank, TCP-Verbindung, etc.).
  2. Führen Sie Berechnungen für die Eingabedaten durch, wobei jede Berechnung unabhängig von einer anderen Berechnung ist.
  3. Schreiben Sie Ergebnisse von Berechnungen (zu einer Datei, Datenbank, Tcp-Verbindung, etc.).

wir das Programm in zwei Dimensionen parallelisieren können:

  • Teil 2 auf mehrere Kerne ausgeführt werden kann, da jede Berechnung unabhängig ist; Reihenfolge der Verarbeitung spielt keine Rolle.
  • Jeder Teil kann unabhängig voneinander ausgeführt werden. Teil 1 kann Daten in eine Eingabewarteschlange stellen, Teil 2 kann Daten aus der Eingabewarteschlange ziehen und Ergebnisse in eine Ausgabewarteschlange stellen, und Teil 3 kann Ergebnisse aus der Ausgabewarteschlange ziehen und aufschreiben.

Dies scheint ein grundlegendes Muster in paralleler Programmierung, aber ich bin immer noch bei dem Versuch, es verloren zu lösen, so sie ein kanonisches Beispiel schreiben zu veranschaulichen, wie diese Verwendung erfolgt Multiprozessing.

Hier ist das Beispielproblem: Gegeben eine CSV file mit Reihen von ganzen Zahlen als Eingabe, berechnen Sie ihre Summen. Trennen Sie das Problem in drei Teile, die alle parallel ausgeführt werden können:

  1. Prozess die Eingabedatei in Rohdaten (Listen/Iterables von ganzen Zahlen)
  2. Berechnen Sie die Summe der Daten, parallel
  3. Ausgang die Summen

unten traditionelle, Single-Process-Python-Programm gebunden, die diese drei Aufgaben lösen:

#!/usr/bin/env python 
# -*- coding: UTF-8 -*- 
# basicsums.py 
"""A program that reads integer values from a CSV file and writes out their 
sums to another CSV file. 
""" 

import csv 
import optparse 
import sys 

def make_cli_parser(): 
    """Make the command line interface parser.""" 
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV", 
      __doc__, 
      """ 
ARGUMENTS: 
    INPUT_CSV: an input CSV file with rows of numbers 
    OUTPUT_CSV: an output file that will contain the sums\ 
"""]) 
    cli_parser = optparse.OptionParser(usage) 
    return cli_parser 


def parse_input_csv(csvfile): 
    """Parses the input CSV and yields tuples with the index of the row 
    as the first element, and the integers of the row as the second 
    element. 

    The index is zero-index based. 

    :Parameters: 
    - `csvfile`: a `csv.reader` instance 

    """ 
    for i, row in enumerate(csvfile): 
     row = [int(entry) for entry in row] 
     yield i, row 


def sum_rows(rows): 
    """Yields a tuple with the index of each input list of integers 
    as the first element, and the sum of the list of integers as the 
    second element. 

    The index is zero-index based. 

    :Parameters: 
    - `rows`: an iterable of tuples, with the index of the original row 
     as the first element, and a list of integers as the second element 

    """ 
    for i, row in rows: 
     yield i, sum(row) 


def write_results(csvfile, results): 
    """Writes a series of results to an outfile, where the first column 
    is the index of the original row of data, and the second column is 
    the result of the calculation. 

    The index is zero-index based. 

    :Parameters: 
    - `csvfile`: a `csv.writer` instance to which to write results 
    - `results`: an iterable of tuples, with the index (zero-based) of 
     the original row as the first element, and the calculated result 
     from that row as the second element 

    """ 
    for result_row in results: 
     csvfile.writerow(result_row) 


def main(argv): 
    cli_parser = make_cli_parser() 
    opts, args = cli_parser.parse_args(argv) 
    if len(args) != 2: 
     cli_parser.error("Please provide an input file and output file.") 
    infile = open(args[0]) 
    in_csvfile = csv.reader(infile) 
    outfile = open(args[1], 'w') 
    out_csvfile = csv.writer(outfile) 
    # gets an iterable of rows that's not yet evaluated 
    input_rows = parse_input_csv(in_csvfile) 
    # sends the rows iterable to sum_rows() for results iterable, but 
    # still not evaluated 
    result_rows = sum_rows(input_rows) 
    # finally evaluation takes place as a chain in write_results() 
    write_results(out_csvfile, result_rows) 
    infile.close() 
    outfile.close() 


if __name__ == '__main__': 
    main(sys.argv[1:]) 

Nehmen wir dieses Programm und schreiben es um, um Multiprocessing zu verwenden, um die drei oben beschriebenen Teile zu parallelisieren. Unten ist ein Skelett dieser neuen, parallelisierte Programm, das die Teile in den Kommentaren zur Adresse werden muss konkretisiert:

#!/usr/bin/env python 
# -*- coding: UTF-8 -*- 
# multiproc_sums.py 
"""A program that reads integer values from a CSV file and writes out their 
sums to another CSV file, using multiple processes if desired. 
""" 

import csv 
import multiprocessing 
import optparse 
import sys 

NUM_PROCS = multiprocessing.cpu_count() 

def make_cli_parser(): 
    """Make the command line interface parser.""" 
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV", 
      __doc__, 
      """ 
ARGUMENTS: 
    INPUT_CSV: an input CSV file with rows of numbers 
    OUTPUT_CSV: an output file that will contain the sums\ 
"""]) 
    cli_parser = optparse.OptionParser(usage) 
    cli_parser.add_option('-n', '--numprocs', type='int', 
      default=NUM_PROCS, 
      help="Number of processes to launch [DEFAULT: %default]") 
    return cli_parser 


def main(argv): 
    cli_parser = make_cli_parser() 
    opts, args = cli_parser.parse_args(argv) 
    if len(args) != 2: 
     cli_parser.error("Please provide an input file and output file.") 
    infile = open(args[0]) 
    in_csvfile = csv.reader(infile) 
    outfile = open(args[1], 'w') 
    out_csvfile = csv.writer(outfile) 

    # Parse the input file and add the parsed data to a queue for 
    # processing, possibly chunking to decrease communication between 
    # processes. 

    # Process the parsed data as soon as any (chunks) appear on the 
    # queue, using as many processes as allotted by the user 
    # (opts.numprocs); place results on a queue for output. 
    # 
    # Terminate processes when the parser stops putting data in the 
    # input queue. 

    # Write the results to disk as soon as they appear on the output 
    # queue. 

    # Ensure all child processes have terminated. 

    # Clean up files. 
    infile.close() 
    outfile.close() 


if __name__ == '__main__': 
    main(sys.argv[1:]) 

Diese Teile des Codes können sowie another piece of code that can generate example CSV files für Testzwecke, found on github sein.

Ich würde mich über jede Einsicht freuen, wie Sie sich mit Nebenläufigkeitsgurus diesem Problem nähern würden.


Hier sind einige Fragen, die ich hatte, als über dieses Problem nachzudenken. Bonuspunkte für die Adressierung jeden/alle:

  • Soll ich untergeordnete Prozesse in den Daten zu lesen und sie in die Warteschlange platzieren, oder kann der Hauptprozess tut dies ohne, bis alle Eingänge Blockierung gelesen wird?
  • Soll ich einen untergeordneten Prozess zum Schreiben der Ergebnisse aus der verarbeiteten Warteschlange haben, oder kann der Hauptprozess dies tun, ohne auf alle Ergebnisse warten zu müssen?
  • Sollte ich eine processes pool für die Summenoperationen verwenden?
  • Wenn ja, welche Methode rufe ich den Pool auf, damit er mit der Verarbeitung der Ergebnisse in der Eingabewarteschlange beginnt, ohne die Eingabe- und Ausgabeprozesse zu blockieren? apply_async()? map_async()? imap()? imap_unordered()?
  • Angenommen, wir mussten die Eingabe- und Ausgabewarteschlangen nicht als Daten eingeben, sondern konnten warten, bis alle Eingaben analysiert wurden und alle Ergebnisse berechnet wurden (z. B. weil wir alle Ein- und Ausgaben kennen in den Systemspeicher passen). Sollten wir den Algorithmus in irgendeiner Weise ändern (z. B. keine Prozesse gleichzeitig mit I/O ausführen)?
+2

Haha, ich liebe den Begriff peinlich-parallel. Ich bin überrascht, dass dies das erste Mal ist, dass ich den Begriff gehört habe, es ist eine großartige Möglichkeit, sich auf dieses Konzept zu beziehen. –

Antwort

62

Meine Lösung verfügt über eine zusätzliche Glocke und Pfeife, um sicherzustellen, dass die Reihenfolge der Ausgabe der Reihenfolge der Eingabe entspricht. Ich benutze multiprocessing.queues, um Daten zwischen Prozessen zu senden, stopp Nachrichten zu senden, so dass jeder Prozess weiß, die Warteschlangen zu überprüfen. Ich denke, die Kommentare in der Quelle sollten klarstellen, was vor sich geht, aber wenn nicht, lass es mich wissen.

#!/usr/bin/env python 
# -*- coding: UTF-8 -*- 
# multiproc_sums.py 
"""A program that reads integer values from a CSV file and writes out their 
sums to another CSV file, using multiple processes if desired. 
""" 

import csv 
import multiprocessing 
import optparse 
import sys 

NUM_PROCS = multiprocessing.cpu_count() 

def make_cli_parser(): 
    """Make the command line interface parser.""" 
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV", 
      __doc__, 
      """ 
ARGUMENTS: 
    INPUT_CSV: an input CSV file with rows of numbers 
    OUTPUT_CSV: an output file that will contain the sums\ 
"""]) 
    cli_parser = optparse.OptionParser(usage) 
    cli_parser.add_option('-n', '--numprocs', type='int', 
      default=NUM_PROCS, 
      help="Number of processes to launch [DEFAULT: %default]") 
    return cli_parser 

class CSVWorker(object): 
    def __init__(self, numprocs, infile, outfile): 
     self.numprocs = numprocs 
     self.infile = open(infile) 
     self.outfile = outfile 
     self.in_csvfile = csv.reader(self.infile) 
     self.inq = multiprocessing.Queue() 
     self.outq = multiprocessing.Queue() 

     self.pin = multiprocessing.Process(target=self.parse_input_csv, args=()) 
     self.pout = multiprocessing.Process(target=self.write_output_csv, args=()) 
     self.ps = [ multiprocessing.Process(target=self.sum_row, args=()) 
         for i in range(self.numprocs)] 

     self.pin.start() 
     self.pout.start() 
     for p in self.ps: 
      p.start() 

     self.pin.join() 
     i = 0 
     for p in self.ps: 
      p.join() 
      print "Done", i 
      i += 1 

     self.pout.join() 
     self.infile.close() 

    def parse_input_csv(self): 
      """Parses the input CSV and yields tuples with the index of the row 
      as the first element, and the integers of the row as the second 
      element. 

      The index is zero-index based. 

      The data is then sent over inqueue for the workers to do their 
      thing. At the end the input process sends a 'STOP' message for each 
      worker. 
      """ 
      for i, row in enumerate(self.in_csvfile): 
       row = [ int(entry) for entry in row ] 
       self.inq.put((i, row)) 

      for i in range(self.numprocs): 
       self.inq.put("STOP") 

    def sum_row(self): 
     """ 
     Workers. Consume inq and produce answers on outq 
     """ 
     tot = 0 
     for i, row in iter(self.inq.get, "STOP"): 
       self.outq.put((i, sum(row))) 
     self.outq.put("STOP") 

    def write_output_csv(self): 
     """ 
     Open outgoing csv file then start reading outq for answers 
     Since I chose to make sure output was synchronized to the input there 
     is some extra goodies to do that. 

     Obviously your input has the original row number so this is not 
     required. 
     """ 
     cur = 0 
     stop = 0 
     buffer = {} 
     # For some reason csv.writer works badly across processes so open/close 
     # and use it all in the same process or else you'll have the last 
     # several rows missing 
     outfile = open(self.outfile, "w") 
     self.out_csvfile = csv.writer(outfile) 

     #Keep running until we see numprocs STOP messages 
     for works in range(self.numprocs): 
      for i, val in iter(self.outq.get, "STOP"): 
       # verify rows are in order, if not save in buffer 
       if i != cur: 
        buffer[i] = val 
       else: 
        #if yes are write it out and make sure no waiting rows exist 
        self.out_csvfile.writerow([i, val]) 
        cur += 1 
        while cur in buffer: 
         self.out_csvfile.writerow([ cur, buffer[cur] ]) 
         del buffer[cur] 
         cur += 1 

     outfile.close() 

def main(argv): 
    cli_parser = make_cli_parser() 
    opts, args = cli_parser.parse_args(argv) 
    if len(args) != 2: 
     cli_parser.error("Please provide an input file and output file.") 

    c = CSVWorker(opts.numprocs, args[0], args[1]) 

if __name__ == '__main__': 
    main(sys.argv[1:]) 
+1

Dies ist die * einzige * Antwort, die tatsächlich 'multiprocessing' verwendet. Das Kopfgeld geht an Sie, Sir. – gotgenes

+1

Ist es wirklich notwendig, bei den Eingabe- und Zahlenverarbeitungsprozessen 'Join' aufzurufen? Konnten Sie nicht damit davonkommen, nur dem Ausgabeprozess beizutreten und die anderen zu ignorieren? Wenn ja, gibt es noch einen guten Grund, bei allen anderen Prozessen 'Join' zu nennen? –

+1

Schöne Antwort, muss ich sagen. Definitiv für immer mit Lesezeichen versehen! – Blender

4

Alte Schule.

p1.py

import csv 
import pickle 
import sys 

with open("someFile", "rb") as source: 
    rdr = csv.reader(source) 
    for line in eumerate(rdr): 
     pickle.dump(line, sys.stdout) 

p2.py

import pickle 
import sys 

while True: 
    try: 
     i, row = pickle.load(sys.stdin) 
    except EOFError: 
     break 
    pickle.dump(i, sum(row)) 

p3.py

import pickle 
import sys 
while True: 
    try: 
     i, row = pickle.load(sys.stdin) 
    except EOFError: 
     break 
    print i, row 

Hier ist die multi-processing Endstruktur.

python p1.py | python p2.py | python p3.py 

Ja, die Shell hat diese zusammen auf Betriebssystemebene gestrickt. Es scheint mir einfacher zu sein und es funktioniert sehr gut.

Ja, es gibt etwas mehr Aufwand bei der Verwendung von Gurke (oder cPickle). Die Vereinfachung scheint jedoch die Mühe wert.

Wenn der Dateiname ein Argument für p1.py sein soll, ist das eine einfache Änderung.

Noch wichtiger ist eine Funktion wie die folgende sehr praktisch.

def get_stdin(): 
    while True: 
     try: 
      yield pickle.load(sys.stdin) 
     except EOFError: 
      return 

, dass Sie dies tun können:

for item in get_stdin(): 
    process item 

Dies ist sehr einfach, aber es funktioniert nicht leicht ermöglicht es Ihnen, mehrere Kopien von P2.py ausgeführt haben.

Sie haben zwei Probleme: Fan-Out und Fan-In. Das P1.py muss sich irgendwie auf mehrere P2.py's verteilen. Und die P2.pys müssen ihre Ergebnisse irgendwie in einen einzigen P3.py zusammenführen.

Die Old-School-Ansatz für Fan-out ist eine "Push" -Architektur, die sehr effektiv ist.

Theoretisch sind mehrere P2.pys, die aus einer gemeinsamen Warteschlange ziehen, die optimale Zuweisung von Ressourcen. Dies ist oft ideal, aber es ist auch eine Menge Programmierung. Ist die Programmierung wirklich notwendig? Oder wird die Round-Robin-Verarbeitung gut genug sein?

Praktisch, Sie werden feststellen, dass P1.py ein einfaches "Round Robin" zwischen mehreren P2.py machen kann ziemlich gut sein. Sie hätten P1.py so konfiguriert, dass n Kopien von P2.py über Named Pipes verarbeitet werden. Die P2.pys würden jeweils von ihrer passenden Pipe lesen.

Was passiert, wenn ein P2.py alle "worst case" Daten bekommt und weit hinterherläuft? Ja, Round-Robin ist nicht perfekt. Aber es ist besser als nur eine P2.py und Sie können diese Verzerrung mit einfacher Randomisierung adressieren.

Fan-in von mehreren P2.py's zu einem P3.py ist ein bisschen komplexer, immer noch. An diesem Punkt hört der Old-School-Ansatz auf, vorteilhaft zu sein. P3.py muss aus mehreren benannten Pipes lesen, die die select-Bibliothek verwenden, um die Lesevorgänge zu verschachteln.

+0

Wäre das nicht haariger, wenn ich 'n' Instanzen von p2.py starten möchte, sie' m' Stücke von 'r' Zeilen ausgeben und verarbeiten lassen soll und p3.py die' m 'x'r' ergibt sich aus allen' n' p2.py-Instanzen? – gotgenes

+1

Ich habe diese Anforderung in der Frage nicht gesehen. (Vielleicht war die Frage zu lang und zu komplex, um diese Anforderung hervorzuheben.) Wichtig ist, dass Sie einen wirklich guten Grund haben sollten, zu erwarten, dass mehrere P2 tatsächlich Ihr Leistungsproblem lösen. Während wir die Hypothese aufstellen können, dass eine solche Situation existiert, hat die * nix-Architektur das nie gehabt und niemand hat es für angebracht gehalten, sie hinzuzufügen. Es könnte hilfreich sein, mehrere P2 zu haben. Aber in den letzten 40 Jahren hat niemand genug gesehen, um es zu einem erstklassigen Teil der Shell zu machen. –

+0

Das ist dann meine Schuld. Lassen Sie mich diesen Punkt bearbeiten und verdeutlichen. Um mir zu helfen, die Frage zu verbessern, kommt die Verwirrung von der Verwendung von 'sum()'? Das dient nur zur Veranschaulichung. Ich hätte es durch 'do_something()' ersetzen können, aber ich wollte ein konkretes, leicht zu verstehendes Beispiel (siehe ersten Satz). In Wirklichkeit ist mein 'do_something()' sehr CPU-intensiv, aber peinlich parallelisierbar, da jeder Aufruf unabhängig ist. Daher helfen mehrere Kerne dabei zu kauen. – gotgenes

0

Es ist wahrscheinlich möglich, ein wenig Parallelität in Teil 1 als auch einzuführen. Wahrscheinlich kein Problem mit einem Format, das so einfach wie CSV ist, aber wenn die Verarbeitung der Eingabedaten merklich langsamer ist als das Lesen der Daten, könnten Sie größere Teile lesen und weiterlesen, bis Sie ein "Zeilentrennzeichen" finden (Newline im CSV-Fall, aber auch das hängt vom gelesenen Format ab; funktioniert nicht, wenn das Format ausreichend komplex ist).

Diese Chunks, von denen jeder wahrscheinlich mehrere Einträge enthält, können dann an eine Menge paralleler Prozesse übergeben werden, die Jobs aus einer Warteschlange lesen, wo sie geparst und aufgeteilt werden und dann in die Warteschlange für Stufe 2 gelegt werden.

5

Ich merke, dass ich ein bisschen zu spät für die Party bin, aber ich habe kürzlich GNU parallel entdeckt und möchte zeigen, wie einfach es ist, diese typische Aufgabe damit zu erfüllen.

cat input.csv | parallel ./sum.py --pipe > sums 

Etwas Ähnliches wird für sum.py tun:

#!/usr/bin/python 

from sys import argv 

if __name__ == '__main__': 
    row = argv[-1] 
    values = (int(value) for value in row.split(',')) 
    print row, ':', sum(values) 

Parallel wird sum.py für jede Zeile in input.csv (parallel, natürlich), dann geben die Ergebnisse zu sums laufen. Deutlich besser als multiprocessing Ärger

+3

Parallele GNU-Dokumente rufen für jede Zeile in der Eingabedatei einen neuen Python-Interpreter auf. Der Overhead beim Starten eines neuen Python-Interpreters (etwa 30 Millisekunden für Python 2.7 und 40 Millisekunden für Python 3.3 auf meinem i7 MacBook Pro mit einem Solid-State-Laufwerk) kann die Zeit für die Verarbeitung einer einzelnen Datenzeile erheblich übersteigen und zu einem führen viel verschwendete Zeit und schlechtere Gewinne als erwartet. Im Fall Ihres Beispielproblems würde ich wahrscheinlich nach [multiprocessing.Pool] (http://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool) greifen. – gotgenes

5

kommt spät zur Party ...

joblib hat eine Schicht auf der Oberseite des Mehrfaches macht parallel zum Schleifen zu helfen. Es bietet Ihnen Funktionen wie eine faule Verteilung von Jobs und eine bessere Fehlerberichterstattung zusätzlich zu seiner sehr einfachen Syntax.

Als Haftungsausschluss bin ich der ursprüngliche Autor von joblib.

+1

Ist Joblib also in der Lage, die E/A parallel zu bearbeiten oder müssen Sie das manuell erledigen? Können Sie mit Joblib ein Codebeispiel bereitstellen? Danke! –