2012-09-11 5 views
9

Ich muss zwei MapReduce-Jobs verketten. Ich habe JobControl verwendet, um Job2 als abhängig von Job1 zu setzen. Es funktioniert, Ausgabedateien werden erstellt !! Aber es hört nicht auf! In der Schale bleibt es in diesem Zustand:(Hadoop) MapReduce - Jobketten - JobControl stoppt nicht

12/09/11 19:06:24 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
12/09/11 19:06:25 INFO input.FileInputFormat: Total input paths to process : 1 
12/09/11 19:06:25 INFO util.NativeCodeLoader: Loaded the native-hadoop library 
12/09/11 19:06:25 WARN snappy.LoadSnappy: Snappy native library not loaded 
12/09/11 19:07:00 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
12/09/11 19:07:00 INFO input.FileInputFormat: Total input paths to process : 1 

Wie kann ich es stoppen? Das ist meine Hauptsache.

public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    Configuration conf2 = new Configuration(); 

    Job job1 = new Job(conf, "canzoni"); 
    job1.setJarByClass(CanzoniOrdinate.class); 
    job1.setMapperClass(CanzoniMapper.class); 
    job1.setReducerClass(CanzoniReducer.class); 
    job1.setOutputKeyClass(Text.class); 
    job1.setOutputValueClass(IntWritable.class); 

    ControlledJob cJob1 = new ControlledJob(conf); 
    cJob1.setJob(job1); 
    FileInputFormat.addInputPath(job1, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job1, new Path("/user/hduser/tmp")); 


    Job job2 = new Job(conf2, "songsort"); 
    job2.setJarByClass(CanzoniOrdinate.class); 
    job2.setMapperClass(CanzoniSorterMapper.class); 
    job2.setSortComparatorClass(ReverseOrder.class); 
    job2.setInputFormatClass(KeyValueTextInputFormat.class); 
    job2.setReducerClass(CanzoniSorterReducer.class); 
    job2.setMapOutputKeyClass(IntWritable.class); 
    job2.setMapOutputValueClass(Text.class); 
    job2.setOutputKeyClass(Text.class); 
    job2.setOutputValueClass(IntWritable.class); 

    ControlledJob cJob2 = new ControlledJob(conf2); 
    cJob2.setJob(job2); 
    FileInputFormat.addInputPath(job2, new Path("/user/hduser/tmp/part*")); 
    FileOutputFormat.setOutputPath(job2, new Path(args[1])); 

    JobControl jobctrl = new JobControl("jobctrl"); 
    jobctrl.addJob(cJob1); 
    jobctrl.addJob(cJob2); 
    cJob2.addDependingJob(cJob1); 
    jobctrl.run(); 


    //////////////// 
    // NEW CODE /// 
    ////////////// 


    // delete jobctrl.run(); 
    Thread t = new Thread(jobctrl); 
    t.start(); 
    String oldStatusJ1 = null; 
    String oldStatusJ2 = null; 
    while (!jobctrl.allFinished()) { 
     String status =cJob1.toString(); 
     String status2 =cJob2.toString(); 
     if (!status.equals(oldStatusJ1)) { 
     System.out.println(status); 
     oldStatusJ1 = status; 
     } 
     if (!status2.equals(oldStatusJ2)) { 
     System.out.println(status2); 
     oldStatusJ2 = status2; 
     }  
    } 
    System.exit(0); 

} }

+1

ich es gelöst einen Thread mit Jobcontrol zu starten. Ich habe überprüft, dass Jobs mit einem while-Zyklus ausgeführt wurden: while (! Jobctrl.allFinished()) und eine System.exit() den Zyklus. Jetzt möchte ich, dass Aufträge Informationsnachrichten zurückgeben, alles, was ich erhalten, ist zu wissen, welcher Job ausgeführt wird, mit ControlledJob.toString(). Ich weiß nicht, wie man Informationsnachrichten erhält wie: Anzahl der Mapper-Aufgaben, Anzahl der reduzierenden Aufgaben, Datensätze in der Eingabe oder in der Ausgabe usw. ... irgendeine Idee, diese Nachrichten zu bekommen? –

+0

Ist "job.getCounters(). ToString()" genug? – zsxwing

+0

Ist das ein Fehler in der JobControl-Klasse? – Rags

Antwort

5

Ich habe im Wesentlichen, was Pietro oben erwähnt.

public class JobRunner implements Runnable { 
    private JobControl control; 

    public JobRunner(JobControl _control) { 
    this.control = _control; 
    } 

    public void run() { 
    this.control.run(); 
    } 
} 

und in meiner Map/Reduce-Klasse I haben:

public void handleRun(JobControl control) throws InterruptedException { 
    JobRunner runner = new JobRunner(control); 
    Thread t = new Thread(runner); 
    t.start(); 

    while (!control.allFinished()) { 
     System.out.println("Still running..."); 
     Thread.sleep(5000); 
    } 
} 

, in dem ich nur passieren die Jobcontrol-Objekt.

+2

+1 für die Bereitstellung eines Arbeitsbeispiels – beterthanlife

3

Die Jobcontrol-Objekt selbst ist Runnable, so können Sie es wie folgt verwenden:

new Thread(myJobControlInstance).start() 
0

nur ein zwicken, um den Code-Schnipsel, was sinemetu1 geteilt hatte ..

Sie können Anruf an die Drop JobRunner als Jobcontrol selbst implementiert Runnable

 Thread thread = new Thread(jobControl); 
     thread.start(); 

     while (!jobControl.allFinished()) { 
      System.out.println("Still running..."); 
      Thread.sleep(5000); 
     } 

stolperte ich auch auf diesen Link, wo der Benutzer bestätigt, dass Jobcontrol mit neuen Thread nur dann ausgeführt werden kann. https://www.mail-archive.com/[email protected]/msg00556.html

0

versuchen Sie dies:

Thread jcThread = new Thread(jobControl); 
    jcThread.start(); 
    System.out.println("循环判断jobControl运行状态 >>>>>>>>>>>>>>>>"); 
    while (true) { 
     if (jobControl.allFinished()) { 
     System.out.println("====>> jobControl.allFinished=" + jobControl.getSuccessfulJobList()); 
     jobControl.stop(); 
     // 如果不加 break 或者 return,程序会一直循环 
     break; 
    } 

    if (jobControl.getFailedJobList().size() > 0) { 
     succ = 0; 
     System.out.println("====>> jobControl.getFailedJobList=" + jobControl.getFailedJobList()); 
     jobControl.stop(); 

     // 如果不加 break 或者 return,程序会一直循环 
     break; 
    } 
}