2012-05-25 11 views
11

Kürzlich habe ich mit Amazon Web Services (AWS) gearbeitet und ich habe festgestellt, dass es nicht viel Dokumentation zu diesem Thema gibt, also habe ich meine Lösung hinzugefügt.Wie kann ich auf den Abschluss eines Jobablaufs für Elastic MapReduce in einer Java-Anwendung warten?

Ich schrieb eine Anwendung mit Amazon Elastic MapReduce (Amazon EMR). Nachdem die Berechnungen beendet waren, musste ich etwas an den von ihnen erstellten Dateien arbeiten, also musste ich wissen, wann der Jobfluss seine Arbeit beendet hat.

Dies ist, wie Sie überprüfen können, ob Ihr Auftragsablauf beendet:

AmazonElasticMapReduce mapReduce = new AmazonElasticMapReduceClient(credentials); 

DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowsRequest() 
    .withJobFlowStates("COMPLETED"); 

List<JobFlowDetail> jobs = mapReduce.describeJobFlows(jobAttributes).getJobFlows(); 
JobFlowDetail detail = jobs.get(0); 

detail.getJobFlowId(); //the id of one of the completed jobs 

Sie können in DescribeJobFlowsRequest für eine bestimmte Job-ID suchen auch dann, wenn diese Aufgabe, fehlgeschlagen ist zu prüfen, fertig.

Ich hoffe, es wird anderen helfen.

+5

Übermittlung Ihre eigene Lösung sofort auf Ihr Problem ist hier sehr willkommen, aber der gewünschte Ansatz ist dieses in eine Frage zu spalten und eine Antwort noch finden Sie unter [Es ist OK Ihre eigenen Fragen zu stellen und Antworten] (http : //blog.stackoverflow.com/2011/07/its-ok-to-ask-and-answer-your-own-questions/) - das hilft, Dinge richtig zu sortieren/zu kategorisieren, dh Platz für wirklich unbeantwortete Fragen zu schaffen anwendbar, danke! –

+0

Danke, ich werde es als zukünftige Referenz notieren. – siditom

+0

Sie sollten auch die anderen abgeschlossenen Staaten einschließen. Einige Leute, die dies lesen, könnten für immer eine Schleife machen, wenn sie 'jobAttributes' wie angegeben initialisieren. 'DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowRequest(). WithJobFlowStates (" COMPLETED "," TERMINATED "," FAILED ");' –

Antwort

1

Sobald der Jobfluss abgeschlossen ist, stoppt der Cluster und die HDFS-Partition ist verloren. Um Datenverluste zu vermeiden, konfigurieren Sie den letzten Schritt des Auftragsablaufs, um Ergebnisse in Amazon S3 zu speichern.

Wenn der JobFlowInstancesDetail: KeepJobFlowAliveWhenNoSteps Parameter auf TRUE gesetzt ist, wird der Auftrag zufließt Übergang in den Wartezustand, anstatt herunterzufahren, sobald die Schritte abgeschlossen haben.

In jedem Auftragsablauf sind maximal 256 Schritte zulässig.

Wenn Ihre Arbeit zeitaufwendig ist, empfehle ich Ihnen, die Ergebnisse regelmäßig zu speichern.

Lange Rede kurzer Sinn: Es gibt keine Möglichkeit zu wissen, wann es gemacht wird. Stattdessen müssen Sie Ihre Daten als Teil des Jobs speichern.

1

Verwenden Sie die Option --wait-for-steps, wenn Sie den Jobfluss erstellen.

./elastic-mapreduce --create \ 
... 
--wait-for-steps \ 
... 
3

Ich lief auch in dieses Problem, und hier ist die Lösung, die ich für jetzt kam. Es ist nicht perfekt, aber hoffentlich wird es hilfreich sein. Als Referenz verwende ich Java 1.7 und AWS Java SDK Version 1.9.13.

Beachten Sie, dass dieser Code geht davon aus, dass Sie für die Cluster warten zu beenden, nicht die Schritte streng genommen; Wenn Ihr Cluster beendet wird, wenn alle Ihre Schritte abgeschlossen sind, ist das in Ordnung, aber wenn Sie Cluster verwenden, die nach Abschluss des Schritts am Leben bleiben, wird Ihnen dies nicht zu viel helfen.

Beachten Sie außerdem, dass dieser Code Clusterstatusänderungen überwacht und protokolliert und zusätzlich diagnostiziert, ob der Cluster mit Fehlern beendet wurde, und eine Ausnahme auslöst, wenn dies der Fall ist.

private void yourMainMethod() { 
    RunJobFlowRequest request = ...; 

    try { 
     RunJobFlowResult submission = emr.runJobFlow(request); 
     String jobFlowId = submission.getJobFlowId(); 
     log.info("Submitted EMR job as job flow id {}", jobFlowId); 

     DescribeClusterResult result = 
      waitForCompletion(emr, jobFlowId, 90, TimeUnit.SECONDS); 
     diagnoseClusterResult(result, jobFlowId); 
    } finally { 
     emr.shutdown(); 
    } 
} 

private DescribeClusterResult waitForCompletion(
      AmazonElasticMapReduceClient emr, String jobFlowId, 
      long sleepTime, TimeUnit timeUnit) 
     throws InterruptedException { 
    String state = "STARTING"; 
    while (true) { 
     DescribeClusterResult result = emr.describeCluster(
       new DescribeClusterRequest().withClusterId(jobFlowId) 
     ); 
     ClusterStatus status = result.getCluster().getStatus(); 
     String newState = status.getState(); 
     if (!state.equals(newState)) { 
      log.info("Cluster id {} switched from {} to {}. Reason: {}.", 
        jobFlowId, state, newState, status.getStateChangeReason()); 
      state = newState; 
     } 

     switch (state) { 
      case "TERMINATED": 
      case "TERMINATED_WITH_ERRORS": 
      case "WAITING": 
       return result; 
     } 

     timeUnit.sleep(sleepTime); 
    } 
} 

private void diagnoseClusterResult(DescribeClusterResult result, String jobFlowId) { 
    ClusterStatus status = result.getCluster().getStatus(); 
    ClusterStateChangeReason reason = status.getStateChangeReason(); 
    ClusterStateChangeReasonCode code = 
     ClusterStateChangeReasonCode.fromValue(reason.getCode()); 
    switch (code) { 
    case ALL_STEPS_COMPLETED: 
     log.info("Completed EMR job {}", jobFlowId); 
     break; 
    default: 
     failEMR(jobFlowId, status); 
    } 
} 

private static void failEMR(String jobFlowId, ClusterStatus status) { 
    String msg = "EMR cluster run %s terminated with errors. ClusterStatus = %s"; 
    throw new RuntimeException(String.format(msg, jobFlowId, status)); 
}