Hadoopのタスクの起動
まずHadoopはどうやってInputFormatを使ってMapタスクを起動すると説明しよう。InputFormatはHadoopの一つのInterfaceである:
public interface InputFormat<K, V> { InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException; }タスクが起動された時に、masterサーバはまずInputFormatのgetSplits()メソッドを呼び出し、InputSplitの配列を生成する。InputSplitもInterfaceであり、入力を持って、サーバの間にデータを通信するメソッドを実装した。そしてInputSplitのオブジェクトたちがMapperのサーバへ送信される。Mapperは一つInputSplitを受け取ってから、このInputSplitを入力としてInputFormatのgetRecordReader()メソッドを呼び出し、RecordReaderを取得する。このRecordReaderの機能はInputFormatから一つ一つのkey/valueのペアを読み出しである。最後に読み出したベアを入力としてMapタスクを起動する。
つまり、自前のInputFormatを実装するために、InputSplitとRecordReaderも実装しなければならない。
InputSplitを実装
一つのInputSplitは入力データの一部である。主に通信する時に使われるreadFields()とwrite()メソッドを実装する。今回はランダムに生成された整数をvalueListへ保存する。keyをkeyListへ保存する(keyは今回使われてないけど)。
public class MyInputSplit implements InputSplit{ private List<Integer> keyList; private List<Integer> valueList; public MyInputSplit() { keyList = new ArrayList<Integer>(); valueList = new ArrayList<Integer>(); } public void add(int key, int value) { keyList.add(key); valueList.add(value); } @Override public void readFields(DataInput in) throws IOException { keyList = readList(in); valueList = readList(in); } private List<Integer> readList(DataInput in) throws IOException { IntWritable sizeWritable = new IntWritable(); sizeWritable.readFields(in); int size = sizeWritable.get(); List<Integer> list = new ArrayList<Integer>(); for(int i=0; i<size; i++) { IntWritable valueWritable = new IntWritable(); valueWritable.readFields(in); list.add(valueWritable.get()); } return list; } @Override public void write(DataOutput out) throws IOException { writeList(out, keyList); writeList(out, valueList); } private void writeList(DataOutput out, List<Integer> list) throws IOException { int size = list.size(); IntWritable sizeWritable = new IntWritable(size); sizeWritable.write(out); for(Integer value : list) { IntWritable valueWritable = new IntWritable(value); valueWritable.write(out); } } @Override public long getLength() throws IOException { return keyList.size(); } @Override public String[] getLocations() throws IOException { return new String[0]; } public List<Integer> getKeyList() { return keyList; } public List<Integer> getValueList() { return valueList; } }
RecordReaderを実装
RecordReaderの役目は受け取ったInputSplitからnext()メソッドで一つずつkey/valueペアを読み出す。public class MyRecordReader implements RecordReader<IntWritable, IntWritable>{ private final List<Integer> keyList; private final List<Integer> valueList; private int pos = 0; private final int length; public MyRecordReader(MyInputSplit split) throws IOException { this.keyList = split.getKeyList(); this.valueList = split.getValueList(); length = keyList.size(); } @Override public void close() throws IOException { } @Override public IntWritable createKey() { return new IntWritable(); } @Override public IntWritable createValue() { return new IntWritable(); } @Override public long getPos() throws IOException { return pos; } @Override public float getProgress() throws IOException { return (float)pos / length; } @Override public boolean next(IntWritable key, IntWritable value) throws IOException { if(pos < length) { int keyInt = keyList.get(pos); int valueInt = valueList.get(pos); key.set(keyInt); value.set(valueInt); pos++; return true; } else { return false; } } }
InputFormatを実装
最後に自前のInputSplitとRecordReaderを使ってInputFormatを実装する。getSplits()はランダムで入力データを生成して、分割してInputSplitへ保存する。getRecordReader()はただ受け取ったInputSplitからRecordReaderを作成しかない。public class MyInputFormat implements InputFormat<IntWritable, IntWritable> { @Override public RecordReader<IntWritable, IntWritable> getRecordReader( InputSplit split, JobConf job, Reporter report) throws IOException { MyInputSplit mySplit = (MyInputSplit)split; return new MyRecordReader(mySplit); } @Override public InputSplit[] getSplits(JobConf job, int numSplit) throws IOException { int mapTaskNum = job.getNumMapTasks(); int totalSize = job.getInt("total.size", 1000); int sizeInMapTask = (int)Math.ceil((double)totalSize / mapTaskNum); InputSplit[] splits = new InputSplit[mapTaskNum]; Random r = new Random(); int count = 0; for(int i=0; i<totalSize; i++) { MyInputSplit split = new MyInputSplit(); for(int j=0; j<sizeInMapTask; j++) { if(count >= totalSize) { break; } split.add(i, r.nextInt(totalSize)); count++; } splits[i] = split; } return splits; } }
MapReduceのメインクラス
自前のInputFormatをsetInputFormat()に設定して、MapReduceタスクを起動する。public class MyJob extends Configured implements Tool { public static class Map extends MapReduceBase implements Mapper<IntWritable, IntWritable, IntWritable, LongWritable> { long sum = 0; OutputCollector<IntWritable, LongWritable> out = null; @Override public void map(IntWritable key, IntWritable value, OutputCollector<IntWritable, LongWritable> output, Reporter report) throws IOException { if (out == null) { out = output; } sum += value.get(); } @Override public void close() { try { out.collect(new IntWritable(0), new LongWritable(sum)); } catch (IOException e) { e.printStackTrace(); } } } public static class Reduce extends MapReduceBase implements Reducer<IntWritable, LongWritable, LongWritable, NullWritable> { @Override public void reduce(IntWritable key, Iterator<LongWritable> values, OutputCollector<LongWritable, NullWritable> output, Reporter report) throws IOException { long sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(new LongWritable(sum), NullWritable.get()); } } @Override public int run(String[] args) throws Exception { if (args.length < 2) { System.err.println("Args Error!"); System.exit(-1); } int totalSize = Integer.parseInt(args[0]); int mapTaskNum = Integer.parseInt(args[1]); JobConf conf = new JobConf(getConf()); conf.setInt("total.size", totalSize); conf.setNumMapTasks(mapTaskNum); conf.setJarByClass(MyJob.class); conf.setJobName("MyJob"); conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(LongWritable.class); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(NullWritable.class); conf.setMapperClass(Map.class); conf.setReducerClass(Reduce.class); conf.setOutputFormat(TextOutputFormat.class); conf.setInputFormat(MyInputFormat.class); Job job = new Job(conf); boolean ret = job.waitForCompletion(true); return ret ? 0 : -1; } public static void main(String[] args) throws Exception { int result = ToolRunner.run(new Configuration(), new MyJob(), args); System.exit(result); } }