Hadoopのタスクの起動
まずHadoopはどうやってInputFormatを使ってMapタスクを起動すると説明しよう。InputFormatはHadoopの一つのInterfaceである:
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException;
}
タスクが起動された時に、masterサーバはまずInputFormatのgetSplits()メソッドを呼び出し、InputSplitの配列を生成する。InputSplitもInterfaceであり、入力を持って、サーバの間にデータを通信するメソッドを実装した。そしてInputSplitのオブジェクトたちがMapperのサーバへ送信される。Mapperは一つInputSplitを受け取ってから、このInputSplitを入力としてInputFormatのgetRecordReader()メソッドを呼び出し、RecordReaderを取得する。このRecordReaderの機能はInputFormatから一つ一つのkey/valueのペアを読み出しである。最後に読み出したベアを入力としてMapタスクを起動する。つまり、自前のInputFormatを実装するために、InputSplitとRecordReaderも実装しなければならない。
InputSplitを実装
一つのInputSplitは入力データの一部である。主に通信する時に使われるreadFields()とwrite()メソッドを実装する。今回はランダムに生成された整数をvalueListへ保存する。keyをkeyListへ保存する(keyは今回使われてないけど)。
public class MyInputSplit implements InputSplit{
private List<Integer> keyList;
private List<Integer> valueList;
public MyInputSplit() {
keyList = new ArrayList<Integer>();
valueList = new ArrayList<Integer>();
}
public void add(int key, int value) {
keyList.add(key);
valueList.add(value);
}
@Override
public void readFields(DataInput in) throws IOException {
keyList = readList(in);
valueList = readList(in);
}
private List<Integer> readList(DataInput in) throws IOException {
IntWritable sizeWritable = new IntWritable();
sizeWritable.readFields(in);
int size = sizeWritable.get();
List<Integer> list = new ArrayList<Integer>();
for(int i=0; i<size; i++) {
IntWritable valueWritable = new IntWritable();
valueWritable.readFields(in);
list.add(valueWritable.get());
}
return list;
}
@Override
public void write(DataOutput out) throws IOException {
writeList(out, keyList);
writeList(out, valueList);
}
private void writeList(DataOutput out, List<Integer> list) throws IOException {
int size = list.size();
IntWritable sizeWritable = new IntWritable(size);
sizeWritable.write(out);
for(Integer value : list) {
IntWritable valueWritable = new IntWritable(value);
valueWritable.write(out);
}
}
@Override
public long getLength() throws IOException {
return keyList.size();
}
@Override
public String[] getLocations() throws IOException {
return new String[0];
}
public List<Integer> getKeyList() {
return keyList;
}
public List<Integer> getValueList() {
return valueList;
}
}
RecordReaderを実装
RecordReaderの役目は受け取ったInputSplitからnext()メソッドで一つずつkey/valueペアを読み出す。public class MyRecordReader implements RecordReader<IntWritable, IntWritable>{
private final List<Integer> keyList;
private final List<Integer> valueList;
private int pos = 0;
private final int length;
public MyRecordReader(MyInputSplit split) throws IOException {
this.keyList = split.getKeyList();
this.valueList = split.getValueList();
length = keyList.size();
}
@Override
public void close() throws IOException {
}
@Override
public IntWritable createKey() {
return new IntWritable();
}
@Override
public IntWritable createValue() {
return new IntWritable();
}
@Override
public long getPos() throws IOException {
return pos;
}
@Override
public float getProgress() throws IOException {
return (float)pos / length;
}
@Override
public boolean next(IntWritable key, IntWritable value) throws IOException {
if(pos < length) {
int keyInt = keyList.get(pos);
int valueInt = valueList.get(pos);
key.set(keyInt);
value.set(valueInt);
pos++;
return true;
} else {
return false;
}
}
}
InputFormatを実装
最後に自前のInputSplitとRecordReaderを使ってInputFormatを実装する。getSplits()はランダムで入力データを生成して、分割してInputSplitへ保存する。getRecordReader()はただ受け取ったInputSplitからRecordReaderを作成しかない。public class MyInputFormat implements InputFormat<IntWritable, IntWritable> {
@Override
public RecordReader<IntWritable, IntWritable> getRecordReader(
InputSplit split, JobConf job, Reporter report) throws IOException {
MyInputSplit mySplit = (MyInputSplit)split;
return new MyRecordReader(mySplit);
}
@Override
public InputSplit[] getSplits(JobConf job, int numSplit) throws IOException {
int mapTaskNum = job.getNumMapTasks();
int totalSize = job.getInt("total.size", 1000);
int sizeInMapTask = (int)Math.ceil((double)totalSize / mapTaskNum);
InputSplit[] splits = new InputSplit[mapTaskNum];
Random r = new Random();
int count = 0;
for(int i=0; i<totalSize; i++) {
MyInputSplit split = new MyInputSplit();
for(int j=0; j<sizeInMapTask; j++) {
if(count >= totalSize) {
break;
}
split.add(i, r.nextInt(totalSize));
count++;
}
splits[i] = split;
}
return splits;
}
}
MapReduceのメインクラス
自前のInputFormatをsetInputFormat()に設定して、MapReduceタスクを起動する。public class MyJob extends Configured implements Tool {
public static class Map extends MapReduceBase implements
Mapper<IntWritable, IntWritable, IntWritable, LongWritable> {
long sum = 0;
OutputCollector<IntWritable, LongWritable> out = null;
@Override
public void map(IntWritable key, IntWritable value,
OutputCollector<IntWritable, LongWritable> output,
Reporter report) throws IOException {
if (out == null) {
out = output;
}
sum += value.get();
}
@Override
public void close() {
try {
out.collect(new IntWritable(0), new LongWritable(sum));
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static class Reduce extends MapReduceBase implements
Reducer<IntWritable, LongWritable, LongWritable, NullWritable> {
@Override
public void reduce(IntWritable key, Iterator<LongWritable> values,
OutputCollector<LongWritable, NullWritable> output,
Reporter report) throws IOException {
long sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(new LongWritable(sum), NullWritable.get());
}
}
@Override
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Args Error!");
System.exit(-1);
}
int totalSize = Integer.parseInt(args[0]);
int mapTaskNum = Integer.parseInt(args[1]);
JobConf conf = new JobConf(getConf());
conf.setInt("total.size", totalSize);
conf.setNumMapTasks(mapTaskNum);
conf.setJarByClass(MyJob.class);
conf.setJobName("MyJob");
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(LongWritable.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(NullWritable.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setInputFormat(MyInputFormat.class);
Job job = new Job(conf);
boolean ret = job.waitForCompletion(true);
return ret ? 0 : -1;
}
public static void main(String[] args) throws Exception {
int result = ToolRunner.run(new Configuration(), new MyJob(), args);
System.exit(result);
}
}