2013年2月4日星期一

JavaコードでEMRのGangliaを使う

GangliaはHadoopのクラスタのパフォーマンスを監視するツールである。AWSのEMRにもGangliaをBootstrap Actionとして使える。普通な使い方はこちらへ
AWSはJavaのSDKがある。このSDKを使って、JavaコードでEMRのジョブを起動、管理できる。この記事はJavaコードでEMRのGangliaを使う方法を紹介する。

JavaコードでEMRタスクと共にGangliaを起動

EMRでGangliaを起動する方法はBootstrap Actionを設定する。Javaコードも同じ、EMRジョブにBootstrap Actionを設定する。このように:

JobFlowInstancesConfig instances = new JobFlowInstancesConfig();
instances.setHadoopVersion("0.20.205");
instances.setInstanceCount(10);
instances.setMasterInstanceType("c1.medium");
instances.setSlaveInstanceType("c1.medium");
/** instancesのいろいろな設定
 *  ......
 */

RunJobFlowRequest request = new RunJobFlowRequest("MyEMRJob", instances);
/** requireについて他の設定(stepとか)
 *  ......
 */
//GangliaのBootstrap Actionを設定
BootstrapActionConfig bootstrapActionConfig = new BootstrapActionConfig();
ScriptBootstrapActionConfig scriptBootstrapAction = new ScriptBootstrapActionConfig();
scriptBootstrapAction.setPath("s3://elasticmapreduce/bootstrap-actions/install-ganglia");
bootstrapActionConfig.setScriptBootstrapAction(scriptBootstrapAction);
bootstrapActionConfig.setName("install-ganglia");
BootstrapActions bootstrapActions = new BootstrapActions();
request.withBootstrapActions(bootstrapActionConfig);//requireに添加

PropertiesCredentials credentials = new PropertiesCredentials(getClassLoader().getResourceAsStream("AwsCredentials.properties"));
AmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials);
emr.runJobFlow(request);//EMRジョブを起動する

Gangliaの監視結果をS3へアップロード

Gangliaの監視結果はmaster instanceにrrdのフォーマットで保存されている。rrdフォーマットが直接に読めないし、EMRジョブが終わってからmaster instanceがすぐに消されるし、Gangliaの監視結果をcsvファイルに変換してS3へアップロードする。このように:

 public static void writeAndUploadGangliaReports(String bucketName, String outputPath, long startTime, long endTime) {
  String[] nameList = new String[]{"cpu_user", "bytes_in", "bytes_out", "pkts_in", "pkts_out", "mem_free", "jvm.metrics.memHeapUsedM"};
  File dirPath = new File("/mnt/var/log/ganglia/rrds/AMZN-EMR");

  if (!dirPath.exists()) {
   return;
  }
  File[] fileList = dirPath.listFiles();

  String outDir = "/home/hadoop/GangliaLog/";
  File logDir = new File(outDir);
  if (!logDir.exists()) {
   logDir.mkdirs();
  }
  String key = outputPath;
  AmazonS3 s3 = new AmazonS3Client(/** CredentialのファイルのStream **/):

  Runtime runtime = Runtime.getRuntime();

  for (int i = 0; i < fileList.length; i++) {
   if (!fileList[i].isDirectory()
     || fileList[i].getName().contains("_SummaryInfo_")) {
    continue;
   }
   for (int j = 0; j < nameList.length; j++) {
    //rrdtool fetchのコマンドを作る
    StringBuilder sb = new StringBuilder("rrdtool fetch ");
    sb.append(fileList[i].getPath());
    sb.append(File.separator);
    sb.append(nameList[j]);
    String startStr = Long.toString((startTime)/1000);
    String endStr = Long.toString((endTime)/1000);
    sb.append(".rrd AVERAGE -r 1 -s "+startStr+" -e "+endStr);

    StringBuilder outPath = new StringBuilder(outDir);

    StringBuilder outName = new StringBuilder(fileList[i].getName());
    outName.append("_");
    outName.append(nameList[j]);
    outName.append(".csv");

    String outputName = outName.toString();

    outPath.append(outputName);

    Process proc;
    try {
     //実行してstdoutの内容をファイルに書く
     proc = runtime.exec(sb.toString());
     BufferedReader reader = new BufferedReader(
       new InputStreamReader(proc.getInputStream()));
     BufferedWriter writer = new BufferedWriter(new FileWriter(
       new File(outPath.toString())));
     String line;
     while ((line = reader.readLine()) != null) {
      writer.write(line);
      writer.write("\n");
     }
     writer.close();
     reader.close();
    } catch (Exception e) {
     throw new RuntimeException(e);
    }
    s3.putObject(new PutObjectRequest(bucketName, key + "/" + outputName, new File(
      outPath.toString()));//S3へアップロード
   }
  }
 }

ジョブのmainメソッドの最後にこのメソッドを呼び出せばいい。このように:

public static void main(String[] args) throws Exception {
 long startTime = System.currentTimeMillis();
 //MyHadoopJobはHadoopジョブの設定したクラスである
 ToolRunner.run(new Configuration(), new MyHadoopJob(), args);
 long endTime = System.currentTimeMillis();
 writeAndUploadGangliaReports("mybucket", "ganglia_output", startTime, endTime);
}

没有评论:

发表评论