AWSはJavaのSDKがある。このSDKを使って、JavaコードでEMRのジョブを起動、管理できる。この記事はJavaコードでEMRのGangliaを使う方法を紹介する。
JavaコードでEMRタスクと共にGangliaを起動
EMRでGangliaを起動する方法はBootstrap Actionを設定する。Javaコードも同じ、EMRジョブにBootstrap Actionを設定する。このように:JobFlowInstancesConfig instances = new JobFlowInstancesConfig();
instances.setHadoopVersion("0.20.205");
instances.setInstanceCount(10);
instances.setMasterInstanceType("c1.medium");
instances.setSlaveInstanceType("c1.medium");
/** instancesのいろいろな設定
* ......
*/
RunJobFlowRequest request = new RunJobFlowRequest("MyEMRJob", instances);
/** requireについて他の設定(stepとか)
* ......
*/
//GangliaのBootstrap Actionを設定
BootstrapActionConfig bootstrapActionConfig = new BootstrapActionConfig();
ScriptBootstrapActionConfig scriptBootstrapAction = new ScriptBootstrapActionConfig();
scriptBootstrapAction.setPath("s3://elasticmapreduce/bootstrap-actions/install-ganglia");
bootstrapActionConfig.setScriptBootstrapAction(scriptBootstrapAction);
bootstrapActionConfig.setName("install-ganglia");
BootstrapActions bootstrapActions = new BootstrapActions();
request.withBootstrapActions(bootstrapActionConfig);//requireに添加
PropertiesCredentials credentials = new PropertiesCredentials(getClassLoader().getResourceAsStream("AwsCredentials.properties"));
AmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials);
emr.runJobFlow(request);//EMRジョブを起動する
Gangliaの監視結果をS3へアップロード
Gangliaの監視結果はmaster instanceにrrdのフォーマットで保存されている。rrdフォーマットが直接に読めないし、EMRジョブが終わってからmaster instanceがすぐに消されるし、Gangliaの監視結果をcsvファイルに変換してS3へアップロードする。このように: public static void writeAndUploadGangliaReports(String bucketName, String outputPath, long startTime, long endTime) {
String[] nameList = new String[]{"cpu_user", "bytes_in", "bytes_out", "pkts_in", "pkts_out", "mem_free", "jvm.metrics.memHeapUsedM"};
File dirPath = new File("/mnt/var/log/ganglia/rrds/AMZN-EMR");
if (!dirPath.exists()) {
return;
}
File[] fileList = dirPath.listFiles();
String outDir = "/home/hadoop/GangliaLog/";
File logDir = new File(outDir);
if (!logDir.exists()) {
logDir.mkdirs();
}
String key = outputPath;
AmazonS3 s3 = new AmazonS3Client(/** CredentialのファイルのStream **/):
Runtime runtime = Runtime.getRuntime();
for (int i = 0; i < fileList.length; i++) {
if (!fileList[i].isDirectory()
|| fileList[i].getName().contains("_SummaryInfo_")) {
continue;
}
for (int j = 0; j < nameList.length; j++) {
//rrdtool fetchのコマンドを作る
StringBuilder sb = new StringBuilder("rrdtool fetch ");
sb.append(fileList[i].getPath());
sb.append(File.separator);
sb.append(nameList[j]);
String startStr = Long.toString((startTime)/1000);
String endStr = Long.toString((endTime)/1000);
sb.append(".rrd AVERAGE -r 1 -s "+startStr+" -e "+endStr);
StringBuilder outPath = new StringBuilder(outDir);
StringBuilder outName = new StringBuilder(fileList[i].getName());
outName.append("_");
outName.append(nameList[j]);
outName.append(".csv");
String outputName = outName.toString();
outPath.append(outputName);
Process proc;
try {
//実行してstdoutの内容をファイルに書く
proc = runtime.exec(sb.toString());
BufferedReader reader = new BufferedReader(
new InputStreamReader(proc.getInputStream()));
BufferedWriter writer = new BufferedWriter(new FileWriter(
new File(outPath.toString())));
String line;
while ((line = reader.readLine()) != null) {
writer.write(line);
writer.write("\n");
}
writer.close();
reader.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
s3.putObject(new PutObjectRequest(bucketName, key + "/" + outputName, new File(
outPath.toString()));//S3へアップロード
}
}
}
ジョブのmainメソッドの最後にこのメソッドを呼び出せばいい。このように:
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
//MyHadoopJobはHadoopジョブの設定したクラスである
ToolRunner.run(new Configuration(), new MyHadoopJob(), args);
long endTime = System.currentTimeMillis();
writeAndUploadGangliaReports("mybucket", "ganglia_output", startTime, endTime);
}
没有评论:
发表评论