連載
» 2017年01月24日 05時00分 UPDATE

きょうから試せる Hadoop“スモールスタート”ガイド(2):「Hadoopの処理の流れ」を理解し、実践する (1/3)

実際にHadoopで処理を実装していきながら「Hadoopは、誰にだって扱える」を体感しましょう。今回は「Hadoopの処理の流れ」を解説します。

[佐々木達也,著]

連載目次

Hadoopファーストガイド

書籍の中から有用な技術情報をピックアップして紹介する本シリーズ。今回は、秀和システム発行の書籍『Hadoopファーストガイド(2012年9月20日発行)』からの抜粋です。

ご注意:本稿は、著者及び出版社の許可を得て、そのまま転載したものです。このため用字用語の統一ルールなどは@ITのそれとは一致しません。あらかじめご了承ください。


Hadoopの処理の流れ

 前回はHadoopのメリットとデメリットを説明しました。今回は、「Hadoopの処理の流れ」を説明していきたいと思います。なお、本稿では基本的に「Hadoop Streaming」の挙動について説明していきますが、参考のためにHadoopの挙動についても触れていきます。

Mapフェーズ

 MapReduceはMapフェーズ、Shuffleフェーズ、Reduceフェーズの順で処理されていきます。まず、入力データはMapフェーズでMapperにより処理されます。Mapフェーズでは主に以下のような処理が行われます。

  • 不要な情報を取り除く
  • 値を別の形式に変換する

 具体的には、入力データが渡され、不要な情報を取り除いたり、値を別の形式に変換したりします。その後、Mapperでの出力結果がReducerへと渡されていきます。

 なお、Mapperに対してHadoopではkeyとvalueがきちんと別々の引数として渡ってくるのですが、Hadoop Streamingの場合には単なる文字列として渡ってくるために、利用者が明示的にkeyとvalueに分解する必要があります(図2-1)。Web上の説明などではHadoop StreamingではなくHadoopに関しての説明が多いため、HadoopStreamingを利用しようとしてもここの部分のイメージがつかみにくく、混乱することが多いように感じます。

図2-1 Mapperに渡ってくるデータの形式が違う 図2-1 Mapperに渡ってくるデータの形式が違う

 これについてはHadoopのソースコードを見ると分かりやすいと思います。Hadoopのソースコードはこちらからダウンロードできるのでぜひダウンロードして確認してみてください。2012年6月現在、最新の安定板は1.0.3となっているので1.0.3を見てみましょう(編注:2017年1月現在の安定版は2.7.3となります)。

Hadoopの主なバージョン(執筆時点)
バージョン 説明
1.0.x 現在のstable版(2012/5/16 v1.0.3リリース)
2.x.x 現在のalpha版
0.20.203.x 以前のstable版
0.22.x セキュリティ非対応版
0.23.x MapReduce2.0対応版

 ちなみに、バージョンがたくさんあるのでそれぞれがどのような関係なのかよく分からないと思いますが、これらは以下のような関係となっています※1。動作が安定してきたからこれまでのstable版である0.20.203.x系列がついに1.0.0がナンバリングされたというだけであって、特別な変更が行われたわけではありません(図2-2)。


図2-2 Hadoopのバージョンの親子関係 図2-2 Hadoopのバージョンの親子関係

 さて、ダウンロードしてきたソースコードの中から「src/mapred/org/apache/hadoop/mapreduce/Mapper.java」を見てみましょう。Javaで書かれたMapperのソースコードが見つかると思います。Mapper部分を抜粋するとこのようなコードとなっています。

src/mapred/org/apache/hadoop/mapreduce/Mapper.java
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  protected void map(KEYIN key, VALUEIN value,
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }
}

 このコードを見るとMapperで引数を受け取った時点でkeyとvalueを引数として別々に受け取っていることが確認できます。一方、Hadoop Streamingでは標準入力のテキストデータがそのまま渡される仕様となっているので明示的にkeyとvalueに分割する必要があります※2

※2 もちろん、必要がなければ無理に分割する必要はありません



 さらに、ソースコードの中にはサンプルコードも付随しています。例えばsrc/examples/org/apache/hadoop/examples/WordCount.javaは単純にスペース区切りの文字数をカウントするだけのプログラムですが、非常にシンプルで分かりやすいのではないでしょうか。

src/examples/org/apache/hadoop/examples/WordCount.java
package org.apache.hadoop.examples;
 
import java.io.IOException;
import java.util.StringTokenizer;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class WordCount {
public static class TokenizerMapper
    extends Mapper<Object, Text, Text, IntWritable>{
 
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
 
  public void map(Object key, Text value, Context context
                  ) throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
      context.write(word, one);
    }
  }
}
 
public static class IntSumReducer
      extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();
 
    public void reduce(Text key, Iterable<IntWritable> values,
                     Context context
                     ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }
 
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

 Mapフェーズでは入力データの合計サイズがブロックサイズ何個分かによって起動するMapperの数が決まります。つまり、例えば10TBの入力データがあって、ブロックサイズが128MBである場合、Mapperの数は約78,000となります。

 10000000MB÷128MB ≒ 78,000

 Mapperの最適な並列処理数は、1つのノードにつきおよそ10〜100のようです※3。大量に起動させれば良さそうですが、Mapperは起動にも時間がかかるので、大量のMapperを起動するとそれだけで時間がかかってしまいます。1つのMapperが少なくとも1分程度はかかるデータサイズになるようにブロックサイズを調整すると良いでしょう。

※3 CPUにかかる負荷が非常に低いMapperの場合、300まで起動した実績もあります



       1|2|3 次のページへ

Copyright© 2017 ITmedia, Inc. All Rights Reserved.

@IT Special

- PR -

TechTargetジャパン

この記事に関連するホワイトペーパー

RSSについて

アイティメディアIDについて

メールマガジン登録

@ITのメールマガジンは、 もちろん、すべて無料です。ぜひメールマガジンをご購読ください。