2013年2月4日星期一

1つのEMRのジョブフローの中に複数のMapReudceジョブを実行する

一般的に一つのEMRのジョブフローは一つのMapReduceジョブしかない。実はEMRの起動したHadoopクラスタは普通のHadoopクラスタと同じように複数のMapReduceを実行できる。一つの方法はEMRのジョブフローを設定する時に複数のstepを設定する。この方法はEMRの公式の方法だけど、ちょっと使いにくい。MapReduceジョブの数がジョブフローを起動する前に決まらなければならないし、MapReduceジョブの実行順番についていろいろな制限もある。もう一つの方法はEMRのジョブフローのJavaコードに自分で複数のMapReduceジョブを起動する。この方法はMapReduceジョブの数と順番などを自分のJavaコードで管理するので、「複数のstepを設定する」より自由で、複数のMapReduceジョブを繰り返し、或いは同時に実行できる。この方法を紹介する。

複数のMapReduceジョブを繰り返し

まず一つのMapReduceジョブを定義しよう。ファイルMyJob.java:

public class MyJob extends Configured implements Tool {
 public static class Map extends MapReduceBase implements
   Mapper {
   /** Mapper **/
 }
 public static class Reduce extends MapReduceBase implements
   Reducer {
   /** Reducer **/
 }
 @Override
 public int run(String[] args) throws Exception {
  if (args.length < 2) {
   System.err.println("Args Error!");
   System.exit(-1);
  }
  String inputFilePath = args[0];
  String outputDir = args[1];
  JobConf conf = new JobConf(getConf());
  conf.setJarByClass(MyJob.class);
  conf.setJobName("MyJob");
  //入力出力のタイプを設定する
  conf.setMapOutputKeyClass(Writable.class);
  conf.setMapOutputValueClass(Writable.class);
  conf.setOutputKeyClass(Writable.class);
  conf.setOutputValueClass(Writable.class);
  conf.setMapperClass(Map.class);
  conf.setReducerClass(Reduce.class);
  conf.setOutputFormat(TextOutputFormat.class);
  conf.setInputFormat(TextInputFormat.class);
  FileInputFormat.setInputPaths(conf, new Path(inputFilePath));
  FileOutputFormat.setOutputPath(conf, new Path(outputDir));
  Job job = new Job(conf);
  boolean ret = job.waitForCompletion(true);
  return ret ? 0 : -1;
 }
}
EMRジョブフローのメインクラスを作って、このMapReduceジョブを繰り返し実行する。ファイルMyJobFollow.java:

public class MyJobFollow {
 public static void main(String[] args) {
  String input = "input";
  String output = "input";
  int i = 0;
  while(true) {
   i++;
   input = output;
   output = "output_"+i;
   String args = new String[]{input, output};
   ToolRunner.run(new Configuration(), new MyJob(), args);
   //繰り返しが終わるかどうかを判断する
   if (/** 終わる条件 **/) {
    break;
   }
  }
 }
}
でも、HadoopのMapReduceジョブは何もしなくても、30秒ぐらいをかける(ジョブの起動時間など)。だから、この繰り返しがとても遅いと思う。

複数のMapReduceジョブを同時に実行する

繰り返しと同じように、ジョブフローのメインクラスの中に複数のMapReduceジョブを同時に起動すると同時実行できる。でも、ToolRunnerのrunメソッドはプロセスをブロックするので、ここで複数のスレッドが並列処理が必要である。ファイルMyJobFollow.java:

public class MyJobFollow {
 public static class JobRunnable implements Runnable {
  private String[] args;
  public JobRunnable(String[] args) {
   this.args = args;
  }
  @Override
  public void run() {
   ToolRunner.run(new Configuration(), new MyJob(), args);
  }
 }
 public static void main(String[] args) {
  int threadNum = Integer.parseInt(args[2]);
  Thread[] threads = new Thread[threadNum];
  for(int i=0; i<threadNum; i++) {
   Runnable  runnable = new JobRunnable(args);
   threads[i] = new Thread(runnable);
   threads[i].start();
  }
  for(int i=0; i<threadNum; i++) {
   threads[i].join();
  }
 }
}

FairSchedulerを設定する

前の方法で一つのEMRジョブフローに複数のMapReduceジョブを起動できるが、Hadoopはデフォールトで複数のジョブを一つずつ実行する。複数のジョブを同時に実行するためにFairSchedulerを設定しなければならない。JavaコードでEMRを起動すると、このように:

JobFlowInstancesConfig instances = new JobFlowInstancesConfig();
/** instancesのいろいろな設定
 *  ......
 */

RunJobFlowRequest request = new RunJobFlowRequest("MyEMRJob", instances);
/** requireについて他の設定(stepとか)
 *  ......
 */
//FairSchedulerを設定
BootstrapActions bootstrapActions = new BootstrapActions();
request.withBootstrapActions(bootstrapActions.newConfigureHadoop()
  .withKeyValue(ConfigFile.Mapred, "mapred.jobtracker.taskScheduler",
    "org.apache.hadoop.mapred.FairScheduler"));

PropertiesCredentials credentials = new PropertiesCredentials(getClassLoader().getResourceAsStream("AwsCredentials.properties"));
AmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials);
emr.runJobFlow(request);//EMRジョブを起動する

没有评论:

发表评论