2013年2月6日星期三

自前のInputFormatを実装

HadoopのオリジナルのInputFormatの実装はHDFSから入力する。だから、HadoopのMapReduceを起動する前に、入力データをHDFSへアップロードする必要である。でもあるタスクに対してファイル形式の入力は必要がない。このようなタスクにわざわざHDFSのファイルから入力すると、性能の浪費である。もしファイルから入力したくないと、自分でInputFormatを実装する必要がある。今回は一つの例を使ってこの方法を説明する。例のタスクはMapReduceで大量の整数を加算する。入力データはタスクが起動される時にランダムで生成されて、HDFSファイルに保存されなくて、直接にMapperのサーバへ渡されてMapタスクを起動する。

Hadoopのタスクの起動

まずHadoopはどうやってInputFormatを使ってMapタスクを起動すると説明しよう。
InputFormatはHadoopの一つのInterfaceである:
public interface InputFormat<K, V> {

  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

  RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException;
}
タスクが起動された時に、masterサーバはまずInputFormatのgetSplits()メソッドを呼び出し、InputSplitの配列を生成する。InputSplitもInterfaceであり、入力を持って、サーバの間にデータを通信するメソッドを実装した。そしてInputSplitのオブジェクトたちがMapperのサーバへ送信される。Mapperは一つInputSplitを受け取ってから、このInputSplitを入力としてInputFormatのgetRecordReader()メソッドを呼び出し、RecordReaderを取得する。このRecordReaderの機能はInputFormatから一つ一つのkey/valueのペアを読み出しである。最後に読み出したベアを入力としてMapタスクを起動する。
つまり、自前のInputFormatを実装するために、InputSplitとRecordReaderも実装しなければならない。

InputSplitを実装

一つのInputSplitは入力データの一部である。主に通信する時に使われるreadFields()とwrite()メソッドを実装する。今回はランダムに生成された整数をvalueListへ保存する。keyをkeyListへ保存する(keyは今回使われてないけど)。
public class MyInputSplit implements InputSplit{
 private List<Integer> keyList;
 private List<Integer> valueList;
 
 public MyInputSplit() {
  keyList = new ArrayList<Integer>();
  valueList = new ArrayList<Integer>();
 }
 
 public void add(int key, int value) {
  keyList.add(key);
  valueList.add(value);
 }
  
 @Override
 public void readFields(DataInput in) throws IOException {
  keyList = readList(in);
  valueList = readList(in);
 }
 
 private List<Integer> readList(DataInput in) throws IOException {
  IntWritable sizeWritable = new IntWritable();
  sizeWritable.readFields(in);
  int size = sizeWritable.get();
  List<Integer> list = new ArrayList<Integer>();
  for(int i=0; i<size; i++) {
   IntWritable valueWritable = new IntWritable();
   valueWritable.readFields(in);
   list.add(valueWritable.get());
  }
  return list;
 }

 @Override
 public void write(DataOutput out) throws IOException {
  writeList(out, keyList);
  writeList(out, valueList);
 }
 
 private void writeList(DataOutput out, List<Integer> list) throws IOException {
  int size = list.size();
  IntWritable sizeWritable = new IntWritable(size);
  sizeWritable.write(out);
  for(Integer value : list) {
   IntWritable valueWritable = new IntWritable(value);
   valueWritable.write(out);
  }
 }

 @Override
 public long getLength() throws IOException {
  return keyList.size();
 }

 @Override
 public String[] getLocations() throws IOException {
  return new String[0];
 }

 public List<Integer> getKeyList() {
  return keyList;
 }

 public List<Integer> getValueList() {
  return valueList;
 }

}

RecordReaderを実装

RecordReaderの役目は受け取ったInputSplitからnext()メソッドで一つずつkey/valueペアを読み出す。
public class MyRecordReader implements RecordReader<IntWritable, IntWritable>{
 private final List<Integer> keyList;
 private final List<Integer> valueList;
 private int pos = 0;
 private final int length;
 public MyRecordReader(MyInputSplit split) throws IOException {
  this.keyList = split.getKeyList();
  this.valueList = split.getValueList();
  length = keyList.size();
 }
 @Override
 public void close() throws IOException {
  
 }

 @Override
 public IntWritable createKey() {
  return new IntWritable();
 }

 @Override
 public IntWritable createValue() {
  return new IntWritable();
 }

 @Override
 public long getPos() throws IOException {
  return pos;
 }

 @Override
 public float getProgress() throws IOException {
  return (float)pos / length;
 }

 @Override
 public boolean next(IntWritable key, IntWritable value) throws IOException {
  if(pos < length) {
   int keyInt = keyList.get(pos);
   int valueInt = valueList.get(pos);
   key.set(keyInt);
   value.set(valueInt);
   pos++;
   return true;
  } else {
   return false;
  }
 }

}

InputFormatを実装

最後に自前のInputSplitとRecordReaderを使ってInputFormatを実装する。getSplits()はランダムで入力データを生成して、分割してInputSplitへ保存する。getRecordReader()はただ受け取ったInputSplitからRecordReaderを作成しかない。
public class MyInputFormat implements InputFormat<IntWritable, IntWritable> {
 @Override
 public RecordReader<IntWritable, IntWritable> getRecordReader(
   InputSplit split, JobConf job, Reporter report) throws IOException {
  MyInputSplit mySplit = (MyInputSplit)split;
  return new MyRecordReader(mySplit);
 }
 @Override
 public InputSplit[] getSplits(JobConf job, int numSplit) throws IOException {
  int mapTaskNum = job.getNumMapTasks();
  int totalSize = job.getInt("total.size", 1000);
  int sizeInMapTask = (int)Math.ceil((double)totalSize / mapTaskNum);
  InputSplit[] splits = new InputSplit[mapTaskNum];
  Random r = new Random();
  int count = 0;
  for(int i=0; i<totalSize; i++) {
   MyInputSplit split = new MyInputSplit();
   for(int j=0; j<sizeInMapTask; j++) {
    if(count >= totalSize) {
     break;
    }
    split.add(i, r.nextInt(totalSize));
    count++;
   }
   splits[i] = split;
  }
  return splits;
 }


}

MapReduceのメインクラス

自前のInputFormatをsetInputFormat()に設定して、MapReduceタスクを起動する。
public class MyJob extends Configured implements Tool {
 public static class Map extends MapReduceBase implements
   Mapper<IntWritable, IntWritable, IntWritable, LongWritable> {
  long sum = 0;
  OutputCollector<IntWritable, LongWritable> out = null;

  @Override
  public void map(IntWritable key, IntWritable value,
    OutputCollector<IntWritable, LongWritable> output,
    Reporter report) throws IOException {
   if (out == null) {
    out = output;
   }
   sum += value.get();
  }

  @Override
  public void close() {
   try {
    out.collect(new IntWritable(0), new LongWritable(sum));
   } catch (IOException e) {
    e.printStackTrace();
   }
  }

 }

 public static class Reduce extends MapReduceBase implements
   Reducer<IntWritable, LongWritable, LongWritable, NullWritable> {

  @Override
  public void reduce(IntWritable key, Iterator<LongWritable> values,
    OutputCollector<LongWritable, NullWritable> output,
    Reporter report) throws IOException {
   long sum = 0;
   while (values.hasNext()) {
    sum += values.next().get();
   }
   output.collect(new LongWritable(sum), NullWritable.get());
  }

 }

 @Override
 public int run(String[] args) throws Exception {
  if (args.length < 2) {
   System.err.println("Args Error!");
   System.exit(-1);
  }
  int totalSize = Integer.parseInt(args[0]);
  int mapTaskNum = Integer.parseInt(args[1]);
  JobConf conf = new JobConf(getConf());
  conf.setInt("total.size", totalSize);
  conf.setNumMapTasks(mapTaskNum);
  conf.setJarByClass(MyJob.class);
  conf.setJobName("MyJob");
  conf.setMapOutputKeyClass(IntWritable.class);
  conf.setMapOutputValueClass(LongWritable.class);
  conf.setOutputKeyClass(LongWritable.class);
  conf.setOutputValueClass(NullWritable.class);
  conf.setMapperClass(Map.class);
  conf.setReducerClass(Reduce.class);
  conf.setOutputFormat(TextOutputFormat.class);
  conf.setInputFormat(MyInputFormat.class);
  Job job = new Job(conf);
  boolean ret = job.waitForCompletion(true);
  return ret ? 0 : -1;
 }

 public static void main(String[] args) throws Exception {
  int result = ToolRunner.run(new Configuration(), new MyJob(), args);
  System.exit(result);
 }
}


没有评论:

发表评论