一般的に一つのEMRのジョブフローは一つのMapReduceジョブしかない。実はEMRの起動したHadoopクラスタは普通のHadoopクラスタと同じように複数のMapReduceを実行できる。一つの方法はEMRのジョブフローを設定する時に複数のstepを設定する。この方法はEMRの公式の方法だけど、ちょっと使いにくい。MapReduceジョブの数がジョブフローを起動する前に決まらなければならないし、MapReduceジョブの実行順番についていろいろな制限もある。もう一つの方法はEMRのジョブフローのJavaコードに自分で複数のMapReduceジョブを起動する。この方法はMapReduceジョブの数と順番などを自分のJavaコードで管理するので、「複数のstepを設定する」より自由で、複数のMapReduceジョブを繰り返し、或いは同時に実行できる。この方法を紹介する。
複数のMapReduceジョブを繰り返し
まず一つのMapReduceジョブを定義しよう。ファイルMyJob.java:
public class MyJob extends Configured implements Tool {
public static class Map extends MapReduceBase implements
Mapper {
/** Mapper **/
}
public static class Reduce extends MapReduceBase implements
Reducer {
/** Reducer **/
}
@Override
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Args Error!");
System.exit(-1);
}
String inputFilePath = args[0];
String outputDir = args[1];
JobConf conf = new JobConf(getConf());
conf.setJarByClass(MyJob.class);
conf.setJobName("MyJob");
//入力出力のタイプを設定する
conf.setMapOutputKeyClass(Writable.class);
conf.setMapOutputValueClass(Writable.class);
conf.setOutputKeyClass(Writable.class);
conf.setOutputValueClass(Writable.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setInputFormat(TextInputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(inputFilePath));
FileOutputFormat.setOutputPath(conf, new Path(outputDir));
Job job = new Job(conf);
boolean ret = job.waitForCompletion(true);
return ret ? 0 : -1;
}
}
EMRジョブフローのメインクラスを作って、このMapReduceジョブを繰り返し実行する。ファイルMyJobFollow.java:
public class MyJobFollow {
public static void main(String[] args) {
String input = "input";
String output = "input";
int i = 0;
while(true) {
i++;
input = output;
output = "output_"+i;
String args = new String[]{input, output};
ToolRunner.run(new Configuration(), new MyJob(), args);
//繰り返しが終わるかどうかを判断する
if (/** 終わる条件 **/) {
break;
}
}
}
}
でも、HadoopのMapReduceジョブは何もしなくても、30秒ぐらいをかける(ジョブの起動時間など)。だから、この繰り返しがとても遅いと思う。
複数のMapReduceジョブを同時に実行する
繰り返しと同じように、ジョブフローのメインクラスの中に複数のMapReduceジョブを同時に起動すると同時実行できる。でも、ToolRunnerのrunメソッドはプロセスをブロックするので、ここで複数のスレッドが並列処理が必要である。ファイルMyJobFollow.java:
public class MyJobFollow {
public static class JobRunnable implements Runnable {
private String[] args;
public JobRunnable(String[] args) {
this.args = args;
}
@Override
public void run() {
ToolRunner.run(new Configuration(), new MyJob(), args);
}
}
public static void main(String[] args) {
int threadNum = Integer.parseInt(args[2]);
Thread[] threads = new Thread[threadNum];
for(int i=0; i<threadNum; i++) {
Runnable runnable = new JobRunnable(args);
threads[i] = new Thread(runnable);
threads[i].start();
}
for(int i=0; i<threadNum; i++) {
threads[i].join();
}
}
}
FairSchedulerを設定する
前の方法で一つのEMRジョブフローに複数のMapReduceジョブを起動できるが、Hadoopはデフォールトで複数のジョブを一つずつ実行する。複数のジョブを同時に実行するためにFairSchedulerを設定しなければならない。JavaコードでEMRを起動すると、このように:
JobFlowInstancesConfig instances = new JobFlowInstancesConfig();
/** instancesのいろいろな設定
* ......
*/
RunJobFlowRequest request = new RunJobFlowRequest("MyEMRJob", instances);
/** requireについて他の設定(stepとか)
* ......
*/
//FairSchedulerを設定
BootstrapActions bootstrapActions = new BootstrapActions();
request.withBootstrapActions(bootstrapActions.newConfigureHadoop()
.withKeyValue(ConfigFile.Mapred, "mapred.jobtracker.taskScheduler",
"org.apache.hadoop.mapred.FairScheduler"));
PropertiesCredentials credentials = new PropertiesCredentials(getClassLoader().getResourceAsStream("AwsCredentials.properties"));
AmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials);
emr.runJobFlow(request);//EMRジョブを起動する
没有评论:
发表评论