appengine mapreduce reducer 設計めも

どうせやらないよねとか言われるとやりたくなります。やればできる子だってことを証明するぜ!

ということで、ペンギンでreduceする方法を考えます。手元で若干コードは書き始めたのですが、方針を整理してだれかになんか言ってもらうのもいいかなと思ったのと、確かにreduceするのは難しいのでちゃんと設計したいなあというので、とりあえずあれこれ整理してます。

で、これはそのメモです。

hadoop mapreduceのセマンティクスのポイント整理】
・reduce inputはソート済みである
 いちばん重要。mapの終了がバリアになる。以下のポイントはほぼこれの副次的なもの。
・reduce inputをGroupingComparatorによってグループ化できる
 まあ、key-valuesを処理するのでグループ化できないとそもそもあれです
 reduce関数の実行の単位だけど、reduce taskの実行単位という訳ではない
・reduce input valuesをSortComparatorによって順序付けできる
 reduceでの処理順を決めれるので、valuesの種類を二つ用意しといて、種類毎に処理切り替えたりできるとか
 構成比とか出そうとすると、こういうのをやる必要があります
 いわゆる全手順のkeybreakです
・上記二つのComparatorはkeyのオブジェクトと関係ない条件を設定できる
 別にkey.equalsな関係のものでグループ化したりしないといけない訳でもない
 GroupingComparatorで常に同じ値を返せば1グループで処理できる的な
 まあresduce.tasks = 1で十分な訳ですが、一例です
・上記二つのComparatorは独立した条件を設定できる
 カスタムWritableをつくって、そのエンティティのプロパティの好きなやつでグループ化/ソートしたりとかできる
 偉い人はWritableのすべてのプロパティをデシリアライズしなくてもソートできるようにバイナリのフォーマットをちょっとだけデシリアライズして比較したりします
・combinerを設定できる
 意外とめんどくさげ。
 でもまあなきゃないでどうにかできるはずだし、combinerを使って結果がかわるなら設計間違ってます
 ということで当面は考えない

hadoop mapreduceのセマンティクスをなんとか保ちたい路線】
・map outputの置き方
 とりあえずdatastoreに書いておく
 ソートがいるので、key-valueをとっておく必要がある
 結局、datastore.put(key.group, key-value)をすることに
 とりあえずリストプロパティ的な感じでkey-valueのリストをとっておくとか
 reduce inputとするものとは別に、map outputのkey.groupをユニークになるようにdatastoreにおいておく
 すべてのmapが終了したらreducerを起動(実行状況のstateはもってるっぽいのでいけるはず)

・reduceフェーズの実行順
 key.groupの全量をとってきてそれぞれtaskqueueにenqueue
 taskqueueでreducerを実行する

・reduceの実行
 datastore.get(key.group)のすべてをSortComparaterでソートする(reduce inputをバッファリングできない!)
 sortしたやつをreduce(key, values, context)に渡してやる
 結果はcontext.writeでAppengineContextが書いてくれると思うよ


【困ること】
・Sort/Grouping
 key.groupでグループキーがとれるようなインターフェイスにしないといけない
 hadoopのRawComparator(S/G Comparatorのインターフェイス)だと、compareToを実装してソートするので、おもむろにkey.groupとか書いてたのはそもそも通らないの巻
 カスタムWritableをジョブにあわせてモデリングしなおす前提ならまあまあワークすると思われるものの、いかんともしがたい感も
 hadoop標準のWritableについては、対応するおのおののComparatorで比較してる値でグループ化してあげるとかすればまあ普通のは普通に動く系
 やりたくないけど
・バッファリングできない件
 datastore.get(key.group)とかした後にソートするとか言い出すと、それ全部メモリにおくんですよね的な
 reduce inputの数に制限があるとかなんだそれは
 どうにかしてメモリにロードする単位決めないと
 ロードする単位を決める方法を決めうちすれば、datastore.query(key.group/key.chunk)みたいなのですこしずつロードできるかも
 datastoreのソート機能を使うとかしてみる?
 key.group/key.chunkを指定して、しかもSortComparater相当のソートをするとなると、噂のインデックス爆発さんが発動する可能性がががが
 というあたりを決めうちしたりあきらめたりすれば、あの伝説のasyncをつかってちょっとずつとるとかするとdatastoreからのバッファリング相当のことができるかも? 


hadoopとかどうでもいいのでバリアなしでreduceする路線】
・reducerの実行
 おのおののmapが終わった後にいきなりreduceする
 key.groupごとに溜め込んでおいて適当なしきい値で処理させるとかそのぐらいのペースで
 process(current, map output key, map output values, context)とか?
 処理順序関係ないようなやつ(word countとか)ならこれでもいけるし、つくるのも楽。
 同時に書き込んじゃったとかそういうあれの再実行制御はmemcache.incrementとかでやることになるか。どうすかね。
 グループ化のためのソートとかreduce input valuesのソートとか考えなくていいのでまあ楽
 ていうか、こんなことするくらいならデータが発生した時点でなんかやればいいんじゃないのかな!
 いわゆるIGP(Insert Ganbaru Pattern)
 まあ、appengineでMahout動かしたいとか個人的にあるので、この路線もありといえばあり
 とかいってるとやらないよね
 とりあえず手を動かしてみる。俺がんばれ。
 といいつつ、Mahout路線ならStatistical Query Learningできるようなappengineでできるアルゴリズムについては別途検討価値がある気も。
 えと、Statistical Queryってなんかそういうのがあって、学習/逆行列の導出、固有値分解/集計の処理だけでできるやつをそういう言い方するらしいです
 たしか、Statistical QueryになってるやつはPAC学習可能だとかそういうあれがあるらしいです
 僕の理解は、学習/逆行列の導出、固有値分解は独立でできる様に構成しているらしいので、そのへんはmapだけで済むじゃん的な
 最後の集計がsumするとかmax/minとるとかだけなら、バリアなし路線での実装が楽、ということに。
 それが書いてある論文読みたいんだけど、iPadと同期がとれるiMacSSD増設しようとしてあけたまま放置しているので、iPadに論文入れられない的な
 これはこれで問題


ということで、なんかあればコメントください。