2016-04-26 9 views
0

MRJob wartet, bis jeder Auftrag abgeschlossen ist, bevor dem Benutzer die Steuerung zurückgegeben wird. Ich habe einen großen EMR-Schritt in kleinere zerlegt und möchte sie alle auf einmal einreichen.Übergeben von Aufträgen an den EMR-Cluster mit MRJob

Die Dokumente sprechen über programmatically submitting tasks, aber der Beispielcode wartet auch auf den Abschluss des Auftrags (seit sie den Befehl runner.run() aufrufen, der blocks until the job is complete).

Auch EMR hat eine Beschränkung von 256 aktiven Jobs, aber wie gehen wir über das Auffüllen dieser 256 Jobs statt Schleifen und erhalten die Ausgabe auf der angeschlossenen Konsole.

Antwort

0

Nach Tagen zu versuchen, ist das Folgende das Beste, was ich mir vorstellen konnte.

Mein erster Versuch, als ich feststellte, dass ein eingereichter Job nicht abgebrochen wird, wenn das Terminal getrennt wird, war es, (in einem Bash-Skript) Jobs zu senden und zu killen. Dies funktionierte jedoch nicht sehr gut, da AWS Anrufe an EMR drosselte und daher einige der Jobs vor der Übermittlung beendet wurden.

Aktuelle beste Lösung

from jobs import MyMRJob 
import logging 

logging.basicConfig(
    level=logging.INFO, 
    format = '%(asctime)-15s %(levelname)-8s %(message)s', 
) 
log = logging.getLogger('submitjobs') 

def main(): 
    cluster_id="x-MXMXMX" 
    log.info('Cluster: %s', cluster_id) 
    for i in range(10): 
     n = '%04d' % i 
     log.info('Adding job: %s', n) 
     mr_job = MyMRJob(args=[ 
      '-r', 'emr', 
      '--conf-path', 'mrjob.conf', 
      '--no-output', 
      '--output-dir', 's3://mybucket/mrjob/%s' % n, 
      '--cluster-id', cluster_id, 
      'input/file.%s' % n 
    ]) 
    runner = mr_job.make_runner() 
    # the following is the secret sauce, submits the job and returns 
    # it is a private method though, so may be changed without notice 
    runner._launch() 

if __name__ == '__main__': 
    main()