Hadoopで処理を実装してみる──Hadoop Streamingでの処理、script-runner.jarの使い方きょうから試せる Hadoop“スモールスタート”ガイド(8)(2/4 ページ)

» 2017年03月21日 05時00分 公開

Hadoop Streamingで処理を実装する

 HadoopではMapフェーズ、Shuffleフェーズ、Reduceフェーズの順で処理が行われます(第2回「『Hadoopの処理の流れ』を理解し、実践する」を参照)。ここで重要なルールは、Mapフェーズの出力は「"key<タブ>value"という形式で行えば、同じkeyのデータは必ず同じReducerに渡される」というものでした。

 そのこと忘れずに、実際にHadoopで処理を実装していきましょう。

検索キーワードランキングを生成する

 まずは、例として検索キーワードランキングを生成する場合を考えてみましょう。短い期間での計算であればもちろんリレーショナルデータベースを使って計算した方が簡単で早いですが、長期間での計算となれば(データ量にもよりますが)Hadoopで計算した方が良いのではないかと思います。

 例えば入力データとしては以下のような、検索キーワードとユーザIDで構成されているログが流れてくるとします。

keyword.log
ナス,193032
お弁当,39110
きゅうり,1039001
カレー 豚肉,90123
ズッキーニ,82919
パスタ トマト,1281032
からあげ,72920
じゃがいも キャベツ,402918
トマト,2101302
ナス ピーマン,822924
お弁当 おかず,291038
きゅうり サラダ,224703
豚肉 たまねぎ,90123
ハンバーグ,1038733
たいやき,547291
トマト,392018
お弁当 ウインナー,39110
にんじん カレー,382027
キャベツ,1039200
なす トマト,2101302
卵 スープ,191938
...

 検索キーワードはスペースで区切られて複数のキーワードが指定されている場合もあり、それらは別々のキーワードとしてカウントするものとします。

 Mapper、Reducerはこのようになります。

keyword_mapper.rb
keyword_counter = Hash.new {|h,k| h[k] = 0 }
 
ARGF.each do |log|
  log.chomp!
  keyword, user_id = log.split(',')
 
  keywords = keyword.split(' ')
  keywords.each do |keyword|
    keyword_counter[keyword] += 1
    keyword_counter['__TOTAL__'] += 1
  end
end
 
keyword_counter.each do |keyword, count|
  puts "#{keyword}\t#{count}"end
end
keyword_reducer.rb
keyword_counter = Hash.new {|h,k| h[k] = 0 }
previous_keyword = nil
 
ARGF.each do |log|
  log.chomp!
  keyword, count = log.split(/\t/)
 
  if keyword == previous_keyword
    keyword_counter[keyword] += count.to_i
  else
    if previous_keyword
      puts "#{previous_keyword}\t#{keyword_counter[previous_keyword]}"
    end
    previous_keyword = keyword
    keyword_counter[keyword] += count.to_i
  end
end
 
puts "#{previous_keyword}\t#{keyword_counter[previous_keyword]}"

 手元でこのようにして動作確認が取れたら、入力データやMapper、ReducerはS3に上げておきます。

 ではHadoopで処理してみましょう。まずEMRクラスタを起動します。

 ここに先ほどのMapper、Reducerでステップを追加します。

 このような結果が出力されるはずです。__TOTAL__は全検索回数を表します。

__TOTAL__	32
おかず		1
お弁当		3
からあげ		1
きゅうり		2
じゃがいも	1
たいやき		1
たまねぎ		1
なす		1
にんじん		1
ウインナー	1
カレー		2
キャベツ		2
サラダ		1
スープ		1
ズッキーニ	1
トマト		4
ナス		2
ハンバーグ	1
パスタ		1
ピーマン		1
卵		1
豚肉		2

 これで確かに欲しいデータは得られたわけですが、まだいくつか問題があります。

  • 結果がReducerの数だけ複数のファイルに分割されていて、扱いにくい
  • ランキング順に並び替えたい

 などです。それならば、EMRで出力されたファイルを連結して、それを並び替えてあげれば良さそうです。そういった処理を行うにはどうしたら良いのでしょうか。

Copyright © ITmedia, Inc. All Rights Reserved.

RSSについて

アイティメディアIDについて

メールマガジン登録

@ITのメールマガジンは、 もちろん、すべて無料です。ぜひメールマガジンをご購読ください。