AWSはJavaのSDKがある。このSDKを使って、JavaコードでEMRのジョブを起動、管理できる。この記事はJavaコードでEMRのGangliaを使う方法を紹介する。
JavaコードでEMRタスクと共にGangliaを起動
EMRでGangliaを起動する方法はBootstrap Actionを設定する。Javaコードも同じ、EMRジョブにBootstrap Actionを設定する。このように:JobFlowInstancesConfig instances = new JobFlowInstancesConfig(); instances.setHadoopVersion("0.20.205"); instances.setInstanceCount(10); instances.setMasterInstanceType("c1.medium"); instances.setSlaveInstanceType("c1.medium"); /** instancesのいろいろな設定 * ...... */ RunJobFlowRequest request = new RunJobFlowRequest("MyEMRJob", instances); /** requireについて他の設定(stepとか) * ...... */ //GangliaのBootstrap Actionを設定 BootstrapActionConfig bootstrapActionConfig = new BootstrapActionConfig(); ScriptBootstrapActionConfig scriptBootstrapAction = new ScriptBootstrapActionConfig(); scriptBootstrapAction.setPath("s3://elasticmapreduce/bootstrap-actions/install-ganglia"); bootstrapActionConfig.setScriptBootstrapAction(scriptBootstrapAction); bootstrapActionConfig.setName("install-ganglia"); BootstrapActions bootstrapActions = new BootstrapActions(); request.withBootstrapActions(bootstrapActionConfig);//requireに添加 PropertiesCredentials credentials = new PropertiesCredentials(getClassLoader().getResourceAsStream("AwsCredentials.properties")); AmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials); emr.runJobFlow(request);//EMRジョブを起動する
Gangliaの監視結果をS3へアップロード
Gangliaの監視結果はmaster instanceにrrdのフォーマットで保存されている。rrdフォーマットが直接に読めないし、EMRジョブが終わってからmaster instanceがすぐに消されるし、Gangliaの監視結果をcsvファイルに変換してS3へアップロードする。このように:public static void writeAndUploadGangliaReports(String bucketName, String outputPath, long startTime, long endTime) { String[] nameList = new String[]{"cpu_user", "bytes_in", "bytes_out", "pkts_in", "pkts_out", "mem_free", "jvm.metrics.memHeapUsedM"}; File dirPath = new File("/mnt/var/log/ganglia/rrds/AMZN-EMR"); if (!dirPath.exists()) { return; } File[] fileList = dirPath.listFiles(); String outDir = "/home/hadoop/GangliaLog/"; File logDir = new File(outDir); if (!logDir.exists()) { logDir.mkdirs(); } String key = outputPath; AmazonS3 s3 = new AmazonS3Client(/** CredentialのファイルのStream **/): Runtime runtime = Runtime.getRuntime(); for (int i = 0; i < fileList.length; i++) { if (!fileList[i].isDirectory() || fileList[i].getName().contains("_SummaryInfo_")) { continue; } for (int j = 0; j < nameList.length; j++) { //rrdtool fetchのコマンドを作る StringBuilder sb = new StringBuilder("rrdtool fetch "); sb.append(fileList[i].getPath()); sb.append(File.separator); sb.append(nameList[j]); String startStr = Long.toString((startTime)/1000); String endStr = Long.toString((endTime)/1000); sb.append(".rrd AVERAGE -r 1 -s "+startStr+" -e "+endStr); StringBuilder outPath = new StringBuilder(outDir); StringBuilder outName = new StringBuilder(fileList[i].getName()); outName.append("_"); outName.append(nameList[j]); outName.append(".csv"); String outputName = outName.toString(); outPath.append(outputName); Process proc; try { //実行してstdoutの内容をファイルに書く proc = runtime.exec(sb.toString()); BufferedReader reader = new BufferedReader( new InputStreamReader(proc.getInputStream())); BufferedWriter writer = new BufferedWriter(new FileWriter( new File(outPath.toString()))); String line; while ((line = reader.readLine()) != null) { writer.write(line); writer.write("\n"); } writer.close(); reader.close(); } catch (Exception e) { throw new RuntimeException(e); } s3.putObject(new PutObjectRequest(bucketName, key + "/" + outputName, new File( outPath.toString()));//S3へアップロード } } }
ジョブのmainメソッドの最後にこのメソッドを呼び出せばいい。このように:
public static void main(String[] args) throws Exception { long startTime = System.currentTimeMillis(); //MyHadoopJobはHadoopジョブの設定したクラスである ToolRunner.run(new Configuration(), new MyHadoopJob(), args); long endTime = System.currentTimeMillis(); writeAndUploadGangliaReports("mybucket", "ganglia_output", startTime, endTime); }
没有评论:
发表评论