2015-03-03 5 views
6

Ich versuche Sailfish zu verwenden, das mehrere Fastq-Dateien als Argumente in einer Ruffus-Pipeline verwendet. Ich führe Sailfish mit dem Subprozess-Modul in Python aus, aber <() im Subprozess-Aufruf funktioniert nicht, selbst wenn ich shell=True setze.Mehrere Pipes im Subprozess

Dies ist der Befehl, den ich möchte Python ausführen:

sailfish quant [options] -1 <(cat sample1a.fastq sample1b.fastq) -2 <(cat sample2a.fastq sample2b.fastq) -o [output_file] 

oder (vorzugsweise):

sailfish quant [options] -1 <(gunzip sample1a.fastq.gz sample1b.fastq.gz) -2 <(gunzip sample2a.fastq.gz sample2b.fastq.gz) -o [output_file] 

Eine Verallgemeinerung:

someprogram <(someprocess) <(someprocess) 

Wie würde ich mich über das Tun Das in Python? Ist Subprozess der richtige Ansatz?

+0

verwandte: [Bash Stil Prozess Substitution mit Python Popen] (http://stackoverflow.com/q/15343447/4279) – jfs

+0

auf den Titel bezogen werden: [Wie kann ich subprocess.Popen verwenden, um mehrere Prozesse zu verbinden durch Rohre?] (http://Stackoverflow.com/q/295459/4279) – jfs

Antwort

8

Um die bash process substitution zu emulieren:

#!/usr/bin/env python 
from subprocess import check_call 

check_call('someprogram <(someprocess) <(anotherprocess)', 
      shell=True, executable='/bin/bash') 

In Python, Sie Named Pipes verwenden:

#!/usr/bin/env python 
from subprocess import Popen 

with named_pipes(n=2) as paths: 
    someprogram = Popen(['someprogram'] + paths) 
    processes = [] 
    for path, command in zip(paths, ['someprocess', 'anotherprocess']): 
     with open(path, 'wb', 0) as pipe: 
      processes.append(Popen(command, stdout=pipe, close_fds=True)) 
    for p in [someprogram] + processes: 
     p.wait() 

wo named_pipes(n) ist:

import os 
import shutil 
import tempfile 
from contextlib import contextmanager 

@contextmanager 
def named_pipes(n=1): 
    dirname = tempfile.mkdtemp() 
    try: 
     paths = [os.path.join(dirname, 'named_pipe' + str(i)) for i in range(n)] 
     for path in paths: 
      os.mkfifo(path) 
     yield paths 
    finally: 
     shutil.rmtree(dirname) 

Ein anderes und vor Abnutzbare Möglichkeit (keine Notwendigkeit, einen benannten Eintrag auf der Festplatte zu erstellen), um die bash-Prozesssubstitution zu implementieren, ist die Verwendung von /dev/fd/N Dateinamen (falls verfügbar) als suggested by @Dunes. Auf FreeBSD, fdescfs(5) (/dev/fd/#) creates entries for all file descriptors opened by the process. Um die Verfügbarkeit zu testen, führen Sie Folgendes aus:

$ test -r /dev/fd/3 3</dev/null && echo /dev/fd is available 

Wenn es fehlschlägt; versuchen /dev/fd zu proc(5) Symlink wie es auf einigen Linux-Versionen getan wird:

$ ln -s /proc/self/fd /dev/fd 

Hier /dev/fd basierte Implementierung von someprogram <(someprocess) <(anotherprocess) bash Befehl:

#!/usr/bin/env python3 
from contextlib import ExitStack 
from subprocess import CalledProcessError, Popen, PIPE 

def kill(process): 
    if process.poll() is None: # still running 
     process.kill() 

with ExitStack() as stack: # for proper cleanup 
    processes = [] 
    for command in [['someprocess'], ['anotherprocess']]: # start child processes 
     processes.append(stack.enter_context(Popen(command, stdout=PIPE))) 
     stack.callback(kill, processes[-1]) # kill on someprogram exit 

    fds = [p.stdout.fileno() for p in processes] 
    someprogram = stack.enter_context(
     Popen(['someprogram'] + ['/dev/fd/%d' % fd for fd in fds], pass_fds=fds)) 
    for p in processes: # close pipes in the parent 
     p.stdout.close() 
# exit stack: wait for processes 
if someprogram.returncode != 0: # errors shouldn't go unnoticed 
    raise CalledProcessError(someprogram.returncode, someprogram.args) 

Hinweis: auf meinem Ubuntu-Rechner, der subprocess Code funktioniert nur in Python 3.4+, obwohl pass_fds seit Python 3.2 verfügbar ist.

+0

Danke JF Sebastian! Es funktionierte tatsächlich mit dem einfachen Subprozessargument 'executable = '/ bin/bash', das ich vorher vermisste. Es funktioniert jetzt mit diesem Aufruf: 'check_call ('sailfish quant [Optionen] <(gunzip -c Datei1 Datei2) <(gunzip -c Datei3 Datei4)', shell = True, ausführbar = '/ bin/bash')'. Vielen Dank für deine Hilfe! In Ihrer Antwort haben Sie wirklich alles getan - Sie haben mir nicht nur geholfen, mein Problem zu lösen, sondern auch dazu beigetragen, Piping in Python besser zu verstehen. – Michelle

2

Während J.F. Sebastian eine Antwort mit Named Pipes zur Verfügung gestellt hat, ist dies mit anonymen Pipes möglich.

import shlex 
from subprocess import Popen, PIPE 

inputcmd0 = "zcat hello.gz" # gzipped file containing "hello" 
inputcmd1 = "zcat world.gz" # gzipped file containing "world" 

def get_filename(file_): 
    return "/dev/fd/{}".format(file_.fileno()) 

def get_stdout_fds(*processes): 
    return tuple(p.stdout.fileno() for p in processes) 

# setup producer processes 
inputproc0 = Popen(shlex.split(inputcmd0), stdout=PIPE) 
inputproc1 = Popen(shlex.split(inputcmd1), stdout=PIPE) 

# setup consumer process 
# pass input processes pipes by "filename" eg. /dev/fd/5 
cmd = "cat {file0} {file1}".format(file0=get_filename(inputproc0.stdout), 
    file1=get_filename(inputproc1.stdout)) 
print("command is:", cmd) 
# pass_fds argument tells Popen to let the child process inherit the pipe's fds 
someprogram = Popen(shlex.split(cmd), stdout=PIPE, 
    pass_fds=get_stdout_fds(inputproc0, inputproc1)) 

output, error = someprogram.communicate() 

for p in [inputproc0, inputproc1, someprogram]: 
    p.wait() 

assert output == b"hello\nworld\n" 
+0

Ihr Code tut: 'inputcmd | someproc' - es unterscheidet sich von 'someproc <(inputcmd)'. Übrigens sollten Sie 'inputproc.communicate()' anstelle von 'inputproc aufrufen.wait() ', um' inputproc.stdout.close() 'im Elternteil zu schließen, so dass' inputproc' nicht hängen bleibt, wenn 'someproc' vorzeitig beendet wird. Es ist nicht klar, was Sie mit StreamConnector erreichen wollen, aber es scheint aufgebläht zu sein. – jfs

+0

Mein Fehler. Ich dachte, '<(cmdlist)' verband eine Reihe von Befehlen stdout mit der Standardeingabe des Konsumentenprozesses. Die Klasse sollte ein cat-ähnliches Dienstprogramm für Streams und nicht für Dateien sein. Die Antwort ist jetzt viel einfacher. – Dunes

+0

'/ dev/fd/#' oder Named Pipes, wenn der erste nicht verfügbar ist, ist genau wie bash implementiert die Prozesssubstitution. Sie sollten die Pipes im Parent so schließen, dass 'inputproc1' oder' inputproc2' vorzeitig absterben; 'someprogram' könnte früher beendet werden. Sonst * sollte die Lösung auf Python 3.4 + * funktionieren. Ich habe eine Ausnahme-sichere Version des Codes zu [meiner Antwort] hinzugefügt (http://stackoverflow.com/a/28840955/4279) (nur als Übung). – jfs