2013年2月6日星期三

自前のInputFormatを実装

HadoopのオリジナルのInputFormatの実装はHDFSから入力する。だから、HadoopのMapReduceを起動する前に、入力データをHDFSへアップロードする必要である。でもあるタスクに対してファイル形式の入力は必要がない。このようなタスクにわざわざHDFSのファイルから入力すると、性能の浪費である。もしファイルから入力したくないと、自分でInputFormatを実装する必要がある。今回は一つの例を使ってこの方法を説明する。例のタスクはMapReduceで大量の整数を加算する。入力データはタスクが起動される時にランダムで生成されて、HDFSファイルに保存されなくて、直接にMapperのサーバへ渡されてMapタスクを起動する。

Hadoopのタスクの起動

まずHadoopはどうやってInputFormatを使ってMapタスクを起動すると説明しよう。
InputFormatはHadoopの一つのInterfaceである:
public interface InputFormat<K, V> {

  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

  RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException;
}
タスクが起動された時に、masterサーバはまずInputFormatのgetSplits()メソッドを呼び出し、InputSplitの配列を生成する。InputSplitもInterfaceであり、入力を持って、サーバの間にデータを通信するメソッドを実装した。そしてInputSplitのオブジェクトたちがMapperのサーバへ送信される。Mapperは一つInputSplitを受け取ってから、このInputSplitを入力としてInputFormatのgetRecordReader()メソッドを呼び出し、RecordReaderを取得する。このRecordReaderの機能はInputFormatから一つ一つのkey/valueのペアを読み出しである。最後に読み出したベアを入力としてMapタスクを起動する。
つまり、自前のInputFormatを実装するために、InputSplitとRecordReaderも実装しなければならない。

InputSplitを実装

一つのInputSplitは入力データの一部である。主に通信する時に使われるreadFields()とwrite()メソッドを実装する。今回はランダムに生成された整数をvalueListへ保存する。keyをkeyListへ保存する(keyは今回使われてないけど)。
public class MyInputSplit implements InputSplit{
 private List<Integer> keyList;
 private List<Integer> valueList;
 
 public MyInputSplit() {
  keyList = new ArrayList<Integer>();
  valueList = new ArrayList<Integer>();
 }
 
 public void add(int key, int value) {
  keyList.add(key);
  valueList.add(value);
 }
  
 @Override
 public void readFields(DataInput in) throws IOException {
  keyList = readList(in);
  valueList = readList(in);
 }
 
 private List<Integer> readList(DataInput in) throws IOException {
  IntWritable sizeWritable = new IntWritable();
  sizeWritable.readFields(in);
  int size = sizeWritable.get();
  List<Integer> list = new ArrayList<Integer>();
  for(int i=0; i<size; i++) {
   IntWritable valueWritable = new IntWritable();
   valueWritable.readFields(in);
   list.add(valueWritable.get());
  }
  return list;
 }

 @Override
 public void write(DataOutput out) throws IOException {
  writeList(out, keyList);
  writeList(out, valueList);
 }
 
 private void writeList(DataOutput out, List<Integer> list) throws IOException {
  int size = list.size();
  IntWritable sizeWritable = new IntWritable(size);
  sizeWritable.write(out);
  for(Integer value : list) {
   IntWritable valueWritable = new IntWritable(value);
   valueWritable.write(out);
  }
 }

 @Override
 public long getLength() throws IOException {
  return keyList.size();
 }

 @Override
 public String[] getLocations() throws IOException {
  return new String[0];
 }

 public List<Integer> getKeyList() {
  return keyList;
 }

 public List<Integer> getValueList() {
  return valueList;
 }

}

RecordReaderを実装

RecordReaderの役目は受け取ったInputSplitからnext()メソッドで一つずつkey/valueペアを読み出す。
public class MyRecordReader implements RecordReader<IntWritable, IntWritable>{
 private final List<Integer> keyList;
 private final List<Integer> valueList;
 private int pos = 0;
 private final int length;
 public MyRecordReader(MyInputSplit split) throws IOException {
  this.keyList = split.getKeyList();
  this.valueList = split.getValueList();
  length = keyList.size();
 }
 @Override
 public void close() throws IOException {
  
 }

 @Override
 public IntWritable createKey() {
  return new IntWritable();
 }

 @Override
 public IntWritable createValue() {
  return new IntWritable();
 }

 @Override
 public long getPos() throws IOException {
  return pos;
 }

 @Override
 public float getProgress() throws IOException {
  return (float)pos / length;
 }

 @Override
 public boolean next(IntWritable key, IntWritable value) throws IOException {
  if(pos < length) {
   int keyInt = keyList.get(pos);
   int valueInt = valueList.get(pos);
   key.set(keyInt);
   value.set(valueInt);
   pos++;
   return true;
  } else {
   return false;
  }
 }

}

InputFormatを実装

最後に自前のInputSplitとRecordReaderを使ってInputFormatを実装する。getSplits()はランダムで入力データを生成して、分割してInputSplitへ保存する。getRecordReader()はただ受け取ったInputSplitからRecordReaderを作成しかない。
public class MyInputFormat implements InputFormat<IntWritable, IntWritable> {
 @Override
 public RecordReader<IntWritable, IntWritable> getRecordReader(
   InputSplit split, JobConf job, Reporter report) throws IOException {
  MyInputSplit mySplit = (MyInputSplit)split;
  return new MyRecordReader(mySplit);
 }
 @Override
 public InputSplit[] getSplits(JobConf job, int numSplit) throws IOException {
  int mapTaskNum = job.getNumMapTasks();
  int totalSize = job.getInt("total.size", 1000);
  int sizeInMapTask = (int)Math.ceil((double)totalSize / mapTaskNum);
  InputSplit[] splits = new InputSplit[mapTaskNum];
  Random r = new Random();
  int count = 0;
  for(int i=0; i<totalSize; i++) {
   MyInputSplit split = new MyInputSplit();
   for(int j=0; j<sizeInMapTask; j++) {
    if(count >= totalSize) {
     break;
    }
    split.add(i, r.nextInt(totalSize));
    count++;
   }
   splits[i] = split;
  }
  return splits;
 }


}

MapReduceのメインクラス

自前のInputFormatをsetInputFormat()に設定して、MapReduceタスクを起動する。
public class MyJob extends Configured implements Tool {
 public static class Map extends MapReduceBase implements
   Mapper<IntWritable, IntWritable, IntWritable, LongWritable> {
  long sum = 0;
  OutputCollector<IntWritable, LongWritable> out = null;

  @Override
  public void map(IntWritable key, IntWritable value,
    OutputCollector<IntWritable, LongWritable> output,
    Reporter report) throws IOException {
   if (out == null) {
    out = output;
   }
   sum += value.get();
  }

  @Override
  public void close() {
   try {
    out.collect(new IntWritable(0), new LongWritable(sum));
   } catch (IOException e) {
    e.printStackTrace();
   }
  }

 }

 public static class Reduce extends MapReduceBase implements
   Reducer<IntWritable, LongWritable, LongWritable, NullWritable> {

  @Override
  public void reduce(IntWritable key, Iterator<LongWritable> values,
    OutputCollector<LongWritable, NullWritable> output,
    Reporter report) throws IOException {
   long sum = 0;
   while (values.hasNext()) {
    sum += values.next().get();
   }
   output.collect(new LongWritable(sum), NullWritable.get());
  }

 }

 @Override
 public int run(String[] args) throws Exception {
  if (args.length < 2) {
   System.err.println("Args Error!");
   System.exit(-1);
  }
  int totalSize = Integer.parseInt(args[0]);
  int mapTaskNum = Integer.parseInt(args[1]);
  JobConf conf = new JobConf(getConf());
  conf.setInt("total.size", totalSize);
  conf.setNumMapTasks(mapTaskNum);
  conf.setJarByClass(MyJob.class);
  conf.setJobName("MyJob");
  conf.setMapOutputKeyClass(IntWritable.class);
  conf.setMapOutputValueClass(LongWritable.class);
  conf.setOutputKeyClass(LongWritable.class);
  conf.setOutputValueClass(NullWritable.class);
  conf.setMapperClass(Map.class);
  conf.setReducerClass(Reduce.class);
  conf.setOutputFormat(TextOutputFormat.class);
  conf.setInputFormat(MyInputFormat.class);
  Job job = new Job(conf);
  boolean ret = job.waitForCompletion(true);
  return ret ? 0 : -1;
 }

 public static void main(String[] args) throws Exception {
  int result = ToolRunner.run(new Configuration(), new MyJob(), args);
  System.exit(result);
 }
}


2013年2月5日星期二

Oracle Streamsの設定方法

Oracle StreamsはOracleのレプリケーションを建てるためにサーバの間に自動同期の技術である。今回はAWS EC2でOracle Streamsを配置した。この方法を記録する。

Streamsの設定準備

EC2で同じAMIから二つのデータベースのinstanceを起動する。この中に一つのサーバをmasterとして、他のサーバをslaveとする。
まずinit.oraファイルに、このパラメータを設定(masterとslave):
aq_tm_processes=2
global_names=true
job_queue_processes=10
parallel_max_servers=20
undo_retention=3600
nls_date_format='YYYY-MM-DD HH24:MI:SS'
streams_pool_size=25M
utl_file_dir='*'
open_links=4
#archive modeの設定:
log_archive_dest_1='LOCATION=/data/archivefile/' 
log_archive_format='%t_%s_%r.arc'

archive modeを有効にする(masterとslave):
sqlplus '/ as sysdba'
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;

archive modeが有効されたかどうかこのように検証できる:
archive log list

streamsのuserを作る(masterとslave):
create tablespace tbs_stream datafile '/data/datafile/tbs_stream01.dbf' size 100m autoextend on maxsize unlimited segment space management auto;
execute dbms_logmnr_d.set_tablespace('tbs_stream');
create user strmadmin identified by strmadmin default tablespace tbs_stream temporary tablespace temp;
grant connect,resource,dba,aq_administrator_role to strmadmin;

次はmasterとslaveの間のDBLinkを作る。
masterのtnsnames.oraファイルにこれを添加:
 SLAVE =
    (DESCRIPTION =
     (ADDRESS = (PROTOCOL = TCP)(HOST = 192.168.***.***)(PORT = 1521))#slave server ip
      (CONNECT_DATA =
      (SERVER = DEDICATED)
      (SERVICE_NAME = ****)#database name
     )
     )
slaveのtnsnames.oraファイルにこれを添加:
 MASTER =
    (DESCRIPTION =
     (ADDRESS = (PROTOCOL = TCP)(HOST = 192.168.***.***)(PORT = 1521))#master server ip
      (CONNECT_DATA =
      (SERVER = DEDICATED)
      (SERVICE_NAME = ****)#database name
     )
     )

global_nameがDBlink名と違うとダメなので、global_nameを変える。masterに:
alter database rename global_name to MASTER
slaveに:
alter database rename global_name to SLAVE

DBLinkを作る。masterに:
create database link SLAVE connect to strmadmin identified by strmadmin using 'SLAVE';
slaveに:
create database link MASTER connect to strmadmin identified by strmadmin using 'MASTER';

Streamsを配置

Oracle 10gからStreamsを簡単に一つスクリプトで配置できる。masterに:
DECLARE
empty_tbs DBMS_STREAMS_TABLESPACE_ADM.TABLESPACE_SET;
BEGIN
DBMS_STREAMS_ADM.PRE_INSTANTIATION_SETUP(
maintain_mode => 'GLOBAL',
tablespace_names => empty_tbs,
source_database => 'MASTER',
destination_database => 'SLAVE',
perform_actions => true,
bi_directional => false,
include_ddl => true,
start_processes => true,
exclude_schemas => NULL,
exclude_flags => DBMS_STREAMS_ADM.EXCLUDE_FLAGS_UNSUPPORTED +
DBMS_STREAMS_ADM.EXCLUDE_FLAGS_DML +
DBMS_STREAMS_ADM.EXCLUDE_FLAGS_DDL);
END;
/

今回はmasterとslaveが同じAMIから起動されたので、データが同じ、同期の必要がない。もしそうじゃないと、ここでまずmasterのデータをslaveに同期する。そして、masterに:
DECLARE
until_scn NUMBER;
BEGIN
until_scn:= DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER;
DBMS_OUTPUT.PUT_LINE('Until SCN:' || until_scn);
END;
/
scnを記録して、masterに次のコマンドを実行する:
DECLARE
empty_tbs DBMS_STREAMS_TABLESPACE_ADM.TABLESPACE_SET;
BEGIN
DBMS_STREAMS_ADM.POST_INSTANTIATION_SETUP(
maintain_mode => 'GLOBAL',
tablespace_names => empty_tbs,
source_database => 'MASTER',
destination_database => 'SLAVE',
perform_actions => true,
bi_directional => false,
include_ddl => true,
start_processes => true,
instantiation_scn => ********, #前に記録したscn
exclude_schemas => NULL,
exclude_flags => DBMS_STREAMS_ADM.EXCLUDE_FLAGS_UNSUPPORTED +
DBMS_STREAMS_ADM.EXCLUDE_FLAGS_DML +
DBMS_STREAMS_ADM.EXCLUDE_FLAGS_DDL);
END;
/

これでStreamsが配置完了した。masterにデータが変更すると、slaveに反映される。

2013年2月4日星期一

JavaコードでEMRのDebugモードを有効する

EMRのジョブフローを起動する時、Debugモードを設定できる。Debugモードが有効にされると、EMRのジョブが実行される時にジョブの詳細情報が見える。MapReduceプログラムの開発にとても有用な機能である。ウェブページでEMRのジョブフローを起動すると、「enable debug」をチェックすればいい。JavaコードでEMRのジョブフローを起動すると、一つのstepの形式でdebugモードを有効する。このように:

JobFlowInstancesConfig instances = new JobFlowInstancesConfig();
/** instancesのいろいろな設定
 *  ......
 */
  
RunJobFlowRequest request = new RunJobFlowRequest("MyEMRJob", instances);

HadoopJarStepConfig debugJarConfig = new HadoopJarStepConfig("s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar");
debugJarConfig.setArgs(Arrays.asList(new String[] {"s3://us-east-1.elasticmapreduce/libs/state-pusher/0.1/fetch"}));
StepConfig enableDebugStepConfig = new StepConfig("enableDebug", debugJarConfig);
RunJobFlowRequest request = new RunJobFlowRequest("MyJobFollow", instances);
/** requireについて他の設定(stepとか)
 * MapReduceジョブの構成をstepConfigに設定する
 *  ......
 */
request.setSteps(Arrays.asList(new StepConfig[] {enableDebugStepConfig, stepConfig }));
  
PropertiesCredentials credentials = new PropertiesCredentials(getClassLoader().getResourceAsStream("AwsCredentials.properties"));
AmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials);
emr.runJobFlow(request);//EMRジョブを起動する

JavaコードでEMRの同時に実行するタスク数を設定

Hadoopの一つのMapReduceジョブは複数のMapタスクとReduceタスクがある。タスクがクラスタのサーバへ渡されて実行される。一つのサーバは複数のタスクを実行する。同時に実行するタスクの数の最大値を設定できる。JavaコードでEMRのジョブフローを起動すると、このように設定する:

JobFlowInstancesConfig instances = new JobFlowInstancesConfig();
/** instancesのいろいろな設定
 *  ......
 */
 
RunJobFlowRequest request = new RunJobFlowRequest("MyEMRJob", instances);
/** requireについて他の設定(stepとか)
 *  ......
 */
//tasks.maximumを設定
BootstrapActions bootstrapActions = new BootstrapActions();
request.withBootstrapActions(bootstrapActions.newConfigureHadoop()
 .withKeyValue(ConfigFile.Mapred, "mapred.tasktracker.map.tasks.maximum", "5")
 .withKeyValue(ConfigFile.Mapred, "mapred.tasktracker.reduce.tasks.maximum", "5").build());
 
PropertiesCredentials credentials = new PropertiesCredentials(getClassLoader().getResourceAsStream("AwsCredentials.properties"));
AmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials);
emr.runJobFlow(request);//EMRジョブを起動する

これで、1台のサーバは同時に実行するMapとReduceタスクの最大値を5に設定した。

1つのEMRのジョブフローの中に複数のMapReudceジョブを実行する

一般的に一つのEMRのジョブフローは一つのMapReduceジョブしかない。実はEMRの起動したHadoopクラスタは普通のHadoopクラスタと同じように複数のMapReduceを実行できる。一つの方法はEMRのジョブフローを設定する時に複数のstepを設定する。この方法はEMRの公式の方法だけど、ちょっと使いにくい。MapReduceジョブの数がジョブフローを起動する前に決まらなければならないし、MapReduceジョブの実行順番についていろいろな制限もある。もう一つの方法はEMRのジョブフローのJavaコードに自分で複数のMapReduceジョブを起動する。この方法はMapReduceジョブの数と順番などを自分のJavaコードで管理するので、「複数のstepを設定する」より自由で、複数のMapReduceジョブを繰り返し、或いは同時に実行できる。この方法を紹介する。

複数のMapReduceジョブを繰り返し

まず一つのMapReduceジョブを定義しよう。ファイルMyJob.java:

public class MyJob extends Configured implements Tool {
 public static class Map extends MapReduceBase implements
   Mapper {
   /** Mapper **/
 }
 public static class Reduce extends MapReduceBase implements
   Reducer {
   /** Reducer **/
 }
 @Override
 public int run(String[] args) throws Exception {
  if (args.length < 2) {
   System.err.println("Args Error!");
   System.exit(-1);
  }
  String inputFilePath = args[0];
  String outputDir = args[1];
  JobConf conf = new JobConf(getConf());
  conf.setJarByClass(MyJob.class);
  conf.setJobName("MyJob");
  //入力出力のタイプを設定する
  conf.setMapOutputKeyClass(Writable.class);
  conf.setMapOutputValueClass(Writable.class);
  conf.setOutputKeyClass(Writable.class);
  conf.setOutputValueClass(Writable.class);
  conf.setMapperClass(Map.class);
  conf.setReducerClass(Reduce.class);
  conf.setOutputFormat(TextOutputFormat.class);
  conf.setInputFormat(TextInputFormat.class);
  FileInputFormat.setInputPaths(conf, new Path(inputFilePath));
  FileOutputFormat.setOutputPath(conf, new Path(outputDir));
  Job job = new Job(conf);
  boolean ret = job.waitForCompletion(true);
  return ret ? 0 : -1;
 }
}
EMRジョブフローのメインクラスを作って、このMapReduceジョブを繰り返し実行する。ファイルMyJobFollow.java:

public class MyJobFollow {
 public static void main(String[] args) {
  String input = "input";
  String output = "input";
  int i = 0;
  while(true) {
   i++;
   input = output;
   output = "output_"+i;
   String args = new String[]{input, output};
   ToolRunner.run(new Configuration(), new MyJob(), args);
   //繰り返しが終わるかどうかを判断する
   if (/** 終わる条件 **/) {
    break;
   }
  }
 }
}
でも、HadoopのMapReduceジョブは何もしなくても、30秒ぐらいをかける(ジョブの起動時間など)。だから、この繰り返しがとても遅いと思う。

複数のMapReduceジョブを同時に実行する

繰り返しと同じように、ジョブフローのメインクラスの中に複数のMapReduceジョブを同時に起動すると同時実行できる。でも、ToolRunnerのrunメソッドはプロセスをブロックするので、ここで複数のスレッドが並列処理が必要である。ファイルMyJobFollow.java:

public class MyJobFollow {
 public static class JobRunnable implements Runnable {
  private String[] args;
  public JobRunnable(String[] args) {
   this.args = args;
  }
  @Override
  public void run() {
   ToolRunner.run(new Configuration(), new MyJob(), args);
  }
 }
 public static void main(String[] args) {
  int threadNum = Integer.parseInt(args[2]);
  Thread[] threads = new Thread[threadNum];
  for(int i=0; i<threadNum; i++) {
   Runnable  runnable = new JobRunnable(args);
   threads[i] = new Thread(runnable);
   threads[i].start();
  }
  for(int i=0; i<threadNum; i++) {
   threads[i].join();
  }
 }
}

FairSchedulerを設定する

前の方法で一つのEMRジョブフローに複数のMapReduceジョブを起動できるが、Hadoopはデフォールトで複数のジョブを一つずつ実行する。複数のジョブを同時に実行するためにFairSchedulerを設定しなければならない。JavaコードでEMRを起動すると、このように:

JobFlowInstancesConfig instances = new JobFlowInstancesConfig();
/** instancesのいろいろな設定
 *  ......
 */

RunJobFlowRequest request = new RunJobFlowRequest("MyEMRJob", instances);
/** requireについて他の設定(stepとか)
 *  ......
 */
//FairSchedulerを設定
BootstrapActions bootstrapActions = new BootstrapActions();
request.withBootstrapActions(bootstrapActions.newConfigureHadoop()
  .withKeyValue(ConfigFile.Mapred, "mapred.jobtracker.taskScheduler",
    "org.apache.hadoop.mapred.FairScheduler"));

PropertiesCredentials credentials = new PropertiesCredentials(getClassLoader().getResourceAsStream("AwsCredentials.properties"));
AmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials);
emr.runJobFlow(request);//EMRジョブを起動する

JavaコードでEMRのGangliaを使う

GangliaはHadoopのクラスタのパフォーマンスを監視するツールである。AWSのEMRにもGangliaをBootstrap Actionとして使える。普通な使い方はこちらへ
AWSはJavaのSDKがある。このSDKを使って、JavaコードでEMRのジョブを起動、管理できる。この記事はJavaコードでEMRのGangliaを使う方法を紹介する。

JavaコードでEMRタスクと共にGangliaを起動

EMRでGangliaを起動する方法はBootstrap Actionを設定する。Javaコードも同じ、EMRジョブにBootstrap Actionを設定する。このように:

JobFlowInstancesConfig instances = new JobFlowInstancesConfig();
instances.setHadoopVersion("0.20.205");
instances.setInstanceCount(10);
instances.setMasterInstanceType("c1.medium");
instances.setSlaveInstanceType("c1.medium");
/** instancesのいろいろな設定
 *  ......
 */

RunJobFlowRequest request = new RunJobFlowRequest("MyEMRJob", instances);
/** requireについて他の設定(stepとか)
 *  ......
 */
//GangliaのBootstrap Actionを設定
BootstrapActionConfig bootstrapActionConfig = new BootstrapActionConfig();
ScriptBootstrapActionConfig scriptBootstrapAction = new ScriptBootstrapActionConfig();
scriptBootstrapAction.setPath("s3://elasticmapreduce/bootstrap-actions/install-ganglia");
bootstrapActionConfig.setScriptBootstrapAction(scriptBootstrapAction);
bootstrapActionConfig.setName("install-ganglia");
BootstrapActions bootstrapActions = new BootstrapActions();
request.withBootstrapActions(bootstrapActionConfig);//requireに添加

PropertiesCredentials credentials = new PropertiesCredentials(getClassLoader().getResourceAsStream("AwsCredentials.properties"));
AmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials);
emr.runJobFlow(request);//EMRジョブを起動する

Gangliaの監視結果をS3へアップロード

Gangliaの監視結果はmaster instanceにrrdのフォーマットで保存されている。rrdフォーマットが直接に読めないし、EMRジョブが終わってからmaster instanceがすぐに消されるし、Gangliaの監視結果をcsvファイルに変換してS3へアップロードする。このように:

 public static void writeAndUploadGangliaReports(String bucketName, String outputPath, long startTime, long endTime) {
  String[] nameList = new String[]{"cpu_user", "bytes_in", "bytes_out", "pkts_in", "pkts_out", "mem_free", "jvm.metrics.memHeapUsedM"};
  File dirPath = new File("/mnt/var/log/ganglia/rrds/AMZN-EMR");

  if (!dirPath.exists()) {
   return;
  }
  File[] fileList = dirPath.listFiles();

  String outDir = "/home/hadoop/GangliaLog/";
  File logDir = new File(outDir);
  if (!logDir.exists()) {
   logDir.mkdirs();
  }
  String key = outputPath;
  AmazonS3 s3 = new AmazonS3Client(/** CredentialのファイルのStream **/):

  Runtime runtime = Runtime.getRuntime();

  for (int i = 0; i < fileList.length; i++) {
   if (!fileList[i].isDirectory()
     || fileList[i].getName().contains("_SummaryInfo_")) {
    continue;
   }
   for (int j = 0; j < nameList.length; j++) {
    //rrdtool fetchのコマンドを作る
    StringBuilder sb = new StringBuilder("rrdtool fetch ");
    sb.append(fileList[i].getPath());
    sb.append(File.separator);
    sb.append(nameList[j]);
    String startStr = Long.toString((startTime)/1000);
    String endStr = Long.toString((endTime)/1000);
    sb.append(".rrd AVERAGE -r 1 -s "+startStr+" -e "+endStr);

    StringBuilder outPath = new StringBuilder(outDir);

    StringBuilder outName = new StringBuilder(fileList[i].getName());
    outName.append("_");
    outName.append(nameList[j]);
    outName.append(".csv");

    String outputName = outName.toString();

    outPath.append(outputName);

    Process proc;
    try {
     //実行してstdoutの内容をファイルに書く
     proc = runtime.exec(sb.toString());
     BufferedReader reader = new BufferedReader(
       new InputStreamReader(proc.getInputStream()));
     BufferedWriter writer = new BufferedWriter(new FileWriter(
       new File(outPath.toString())));
     String line;
     while ((line = reader.readLine()) != null) {
      writer.write(line);
      writer.write("\n");
     }
     writer.close();
     reader.close();
    } catch (Exception e) {
     throw new RuntimeException(e);
    }
    s3.putObject(new PutObjectRequest(bucketName, key + "/" + outputName, new File(
      outPath.toString()));//S3へアップロード
   }
  }
 }

ジョブのmainメソッドの最後にこのメソッドを呼び出せばいい。このように:

public static void main(String[] args) throws Exception {
 long startTime = System.currentTimeMillis();
 //MyHadoopJobはHadoopジョブの設定したクラスである
 ToolRunner.run(new Configuration(), new MyHadoopJob(), args);
 long endTime = System.currentTimeMillis();
 writeAndUploadGangliaReports("mybucket", "ganglia_output", startTime, endTime);
}

EMRのgangliaをカスタム設定:1秒ごとに監視する


はじめに

GangliaとはHadoopクラスタのCPU、メモリなどの性能を監視するツールである。AWSのEMRを使うと、EMRのBootstrap Actionを通じて自動的にGangliaをインストール、設定できる。自分で何をしなくてもGangliaを使える。便利が便利だけど、柔軟性を犠牲にした。EMRにデフォルトで設定されたGangliaはほとんどのニーズを満たせるけど、「特殊なニーズ」があればカスタムで設定できない。たとえば今回に遭った「特殊なニーズ」:実行時間が短い(1、2分間ぐらいだけ)MapReduceのタスクを監視したい。Gangliaはデフォルトで15秒ごとにサーバの性能を監視するので、1分間だけのタスクにおいて4つの記録しかなく、あまり意味がない。だからEMRのGangliaを「1秒ごとに監視する」ように設定するために、EMRのGangliaをカスタム設定する方法を調べた。この記事は「1秒ごとに監視する」の設定を例としてEMRのGangliaをカスタム設定の方法を紹介します。

EMRはGangliaを起動、設定する方法

まずEMRはGangliaを起動、設定する方法を見よう。EMRではGangliaを使用する方法はとても簡単だ。EMRのタスクを起動する時にBootstrap Actionに一つActionを添加して、スクリプトのアドレスは:s3://elasticmapreduce/bootstrap-actions/install-ganglia。Bootstrap Actionについて詳細情報はこちらへ。install-gangliaはRubyのスクリプトです。クラスタのインスタンスは起動完了してから、S3からこのスクリプトをダウンロードして実行する。このinstall-gangliaというスクリプトの内容を見よう。このアドレスからinstall-gangliaをダウンロードできる:https://s3.amazonaws.com/elasticmapreduce/bootstrap-actions/install-ganglia。内容は簡単だ。Hadoopのバージョンによって、もう一つのスクリプトをダウンロード、そして実行する。

executor.run("hadoop fs -copyToLocal s3://#{BUCKET_NAME}/bootstrap-actions/ganglia/#{version_num}/ganglia-installer .")

Hadoopのバージョンは0.20.205の場合、このスクリプトのアドレスは:https://s3.amazonaws.com/elasticmapreduce/bootstrap-actions/ganglia/2.0/ganglia-installer。このスクリプトはgangliaをインストールと設定する。中身を見よう。
まず、Gangliaをダウンロード、解凍する:

def download_and_unzip_ganglia
    run("mkdir -p ~/source")
    run("cd ~/source && wget http://#{BUCKET_NAME}.s3.amazonaws.com/bootstrap-actions/ganglia/2.0/#{GANGLIA}.tar.gz")
    run("cd ~/source && tar xvfz #{GANGLIA}.tar.gz")
  end
そして、様々な設定である。今回のタスクに関するのは二つ部分だけ。gmondの設定:

def configure_gmond  
  run("sudo ldconfig")  
  run("sudo gmond --default_config > ~/gmond.conf")  
  run("sudo mv ~/gmond.conf /etc/gmond.conf")  
  run("sudo perl -pi -e 's/name = \"unspecified\"/name = \"AMZN-EMR\"/g' /etc/gmond.conf")  
  run("sudo perl -pi -e 's/owner = \"unspecified\"/name = \"AMZN-EMR\"/g' /etc/gmond.conf")  
  run("sudo perl -pi -e 's/send_metadata_interval = 0/send_metadata_interval = 10/g' /etc/gmond.conf")  
  ......
  $e.run("sudo gmond")  
end 
gmetaの設定:

  def configure_gmetad
    ganglia_log_dir = "/mnt/var/log/ganglia/rrds/"
    ganglia_templates_dir = "/mnt/var/log/ganglia/dwoo/"
    run("sudo cp #{GANGLIA_HOME}/gmetad/gmetad.conf /etc/")
    run("sudo mkdir -p #{ganglia_log_dir}")
    run("sudo chown -R nobody #{ganglia_log_dir}")
    run("sudo sed -i -e 's$# rrd_rootdir .*$rrd_rootdir #{ganglia_log_dir}$g' /etc/gmetad.conf")
    run("sudo mkdir -p #{ganglia_templates_dir}")
    run("sudo chown -R nobody #{ganglia_templates_dir}")
    run("sudo chmod -R 777 #{ganglia_templates_dir}")
  
    #Setup pushing rrds to S3
    parsed = JSON.parse(File.read("/etc/instance-controller/logs.json"))
    newEntry = Hash["fileGlob", "/mnt/var/log/ganglia/rrds/AMZN-EMR/(.*)/(.*)", "s3Path", "node/$instance-id/ganglia/$0/$1", "delayPush", true]
    parsed["logFileTypes"][1]["logFilePatterns"].push(newEntry)
    run("sudo mv /etc/instance-controller/logs.json /etc/instance-controller/logs.json.bak")
    File.open("/tmp/logs.json" , "w") do |fil|
    fil.puts(JSON.generate(parsed))
    end
    $e.run("sudo mv /tmp/logs.json /etc/instance-controller/")
     
  end 
このganglia-installerはlinuxのsedなどのコマンドを通じてgangliaの設定ファイル内容を変える。私たちもこの方法でEMRのgangliaをカスタム設定できる。このganglia-installerを修正して、幾つかのsedコマンドを添加する。修正したganglia-installをS3へアップロードして、EMRのタスクを起動する時は自分のganglia-installを呼び出すようにBootstrap Actionを設定すればいい。

Gangliaを「1秒ごとに監視する」ように設定

Gangliaを「1秒ごとに監視する」ように設定するのは、実はとても難しい問題です。私はこの問題を解決のために、すごく時間をかかった。しかし、この記事の焦点はEMRのgangliaをカスタム設定の方法で、gangliaの設定方法じゃないので、関係があるgangliaの設定だけ紹介する。
Gangliaはgmondとgmetaという二つ部分がある。gmondはクラスタの全員のサーバに実行されていて、サーバーのパフォーマンスのデータを他のサーバーへ送信する設定である。gmondの設定ファイルはこのように:

collection_group {
  collect_every = 20
  time_threshold = 90
  /* CPU status */
  metric {
    name = "cpu_user"
    value_threshold = "1.0"
    title = "CPU User"
  }
  metric {
    name = "cpu_system"
    value_threshold = "1.0"
    title = "CPU System"
  }
  metric {
    name = "cpu_idle"
    value_threshold = "5.0"
    title = "CPU Idle"
  }
  metric {
    name = "cpu_nice"
    value_threshold = "1.0"
    title = "CPU Nice"
  }
  metric {
    name = "cpu_aidle"
    value_threshold = "5.0"
    title = "CPU aidle"
  }
  metric {
    name = "cpu_wio"
    value_threshold = "1.0"
    title = "CPU wio"
  }
  /* The next two metrics are optional if you want more detail...
     ... since they are accounted for in cpu_system.
  metric {
    name = "cpu_intr"
    value_threshold = "1.0"
    title = "CPU intr"
  }
  metric {
    name = "cpu_sintr"
    value_threshold = "1.0"
    title = "CPU sintr"
  }
  */
}
このデフォルト設定で、gmondは20秒(collect_every)ごとにサーバーのデータを取得して、もし値はvalue_thresholdを超えたら、すぐにクラスタへ送信する。もしそうじゃないと、90秒(time_threshold)ごとに送信する。つまり、gmondを1秒ごとに送信させるために、collect_everyとtime_thresholdを1に設定すべきである。

gmetaはクラスタ全員のgmondから集めたデータを整理、保存するプログラムである。gmetaの設定ファイルgmeta.confには今回に関する部分はこれ:

# What to monitor. The most important section of this file.
#
# The data_source tag specifies either a cluster or a grid to
# monitor. If we detect the source is a cluster, we will maintain a complete
# set of RRD databases for it, which can be used to create historical
# graphs of the metrics. If the source is a grid (it comes from another gmetad),
# we will only maintain summary RRDs for it.
#
# Format:
# data_source "my cluster" [polling interval] address1:port addreses2:port ...
#
# The keyword 'data_source' must immediately be followed by a unique
# string which identifies the source, then an optional polling interval in
# seconds. The source will be polled at this interval on average.
# If the polling interval is omitted, 15sec is asssumed.
#
# A list of machines which service the data source follows, in the
# format ip:port, or name:port. If a port is not specified then 8649
# (the default gmond port) is assumed.
# default: There is no default value
#
# data_source "my cluster" 10 localhost my.machine.edu:8649 1.2.3.5:8655
# data_source "my grid" 50 1.3.4.7:8655 grid.org:8651 grid-backup.org:8651
# data_source "another source" 1.3.4.7:8655 1.3.4.8

data_source "my cluster" localhost
デフォルトで15秒ごとに保存するので、このように修正する:

data_source "my cluster" 1 localhost
gmeta.confにはもう一つ注意すべきなところがある:

#
# Round-Robin Archives
# You can specify custom Round-Robin archives here (defaults are listed below)
#
# RRAs "RRA:AVERAGE:0.5:1:244" "RRA:AVERAGE:0.5:24:244" "RRA:AVERAGE:0.5:168:244" "RRA:AVERAGE:0.5:672:244" \
#      "RRA:AVERAGE:0.5:5760:374"
#
この部分はgmetaが違うインターバルのデータを保存する数の最大値を設定する。例えば、このデフォルトの設定で、1秒ごとのデータを最大244件だけ保存する。244件を超えたら24秒ごとに変換して保存する。24秒ごとのデータも最大244件、超えたら168秒ごとに変換する。だから、1秒ごとの最大件数をもっと大きく設定すべきだ。
まとめて、ganglia-installerをこのように修正する:
1.configure_gmondの部分に、この行を添加:

run("sudo perl -pi -e 's/collect_every *=.*/collect_every = 1/g;s/time_threshold *=.*/time_threshold = 1/g;s/value_threshold *=.*/value_threshold = 0/g' /etc/gmond.conf")
これで全部のグループを1秒ごとに設定した。
2.configure_gmetaの部分に、この行を添加:

run("sudo sed -i -e 's/data_source \"my cluster\" localhost/data_source \"my cluster\" 1 localhost/g' /etc/gmetad.conf")
3.configure_gmetaの部分に、この行を添加:

run("sudo sed -i -e's/# RRAs \"RRA:.*/RRAs \"RRA:AVERAGE:0.5:1:3600\" \"RRA:AVERAGE:0.5:24:3600\" \"RRA:AVERAGE:0.5:168:3600\" \"RRA:AVERAGE:0.5:672:3600\"/g' /etc/gmetad.conf")

自分のスクリプトでGangliaを起動

前の述べたように修正したganglia-installerをS3へアップロードする。S3のBucket名はmybucketと仮定する。そして、install-gangliaという簡単なスクリプトを修正する。まずganglia-installerを呼び出すコードの行を探す:

executor.run("hadoop fs -copyToLocal s3://#{BUCKET_NAME}/bootstrap-actions/ganglia/#{version_num}/ganglia-installer .")
このように修正する:

executor.run("hadoop fs -copyToLocal s3://mybucket/ganglia-installer .")
この修正したinstall-gangliaもS3へアップロードする。最後にEMRタスクを起動する時はBootstrap Actionを一つ添加して、スクリプトのアドレスを「s3://mybucket/install-ganglia」と設定する。ようやくカスタム設定のGangliaが起動された。

他の可能な方法

今回は「sed」コマンドと「perl  -pi -e」で設定ファイルの内容を変える方法を使った。他の方法もある。例えば、まずgmond.confを修正して、S3にアップロードしておき、ganglia-installerにはこのファイルをダウンロードする。あるいはganglia全体をダウンロードして、必要な設定をして、S3にアップロードして、ganglia-installerはこの設定して置いたgangliaをダウンロードする。試したことがないけど、同じように動かせると思う。