Apache Stormのトポロジーチュートリアル
本記事ではApache Stormのトポロジーを中心しに解説していこうと思います。
前提知識
StormのAPIがデザインパターンBuilder
を使用しますので知らなければ学習してください。
簡単にインストールできます。
brew install storm
用語
- topology: SpoutとBoltから構成されるネットワーク構造。
- tuple: 流れるデータの集まりである。値のリストである。
- Spout: ストリームのソース。
- Bolt: 処理を施すノード。
Data modelを定義する
データモデルとしてタプルを使用します。
トポロジの一つ一つは、output field
を定義しなくてはなりません。
ノードがどのようなoutputを出すのかを定義しないといけません。
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("double", "triple")); }
ここでは["double", "triple"]
を定義しています。
トポロジーを作る
一つのSpoutと二つのBoltを定義する。
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("words", new TestWordSpout(), 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3) .shuffleGrouping("words"); builder.setBolt("exclaim2", new ExclamationBolt(), 2) .shuffleGrouping("exclaim1");
ここでは逐次的な構築をしている。
10
, 3
などのマジカルナンバーはその構成要素の数(スレッド数)を表す。
ShuffleGrouping
とは各ボルトが等しい数のタプルを受け取れるようにするもの。
FieldGrouping
はフィールドごとにボルトを設定できるもの。
word
-> exclaim1
-> exclaim2
の順で送られることが定義されてある。
そして、exclaimX
はそれぞれ同じ数のタプル数を受け取る。
Spoutについて
SpoutはタプルをnextTuple
でストリームすることができる。
たとえばSpoutは以下のようになります。
public class RandomSentenceSpout extends BaseRichSpout { @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } @Override public void nextTuple() { Utils.sleep(100); String[] sentences = new String[]{sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"), sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")}; final String sentence = sentences[_rand.nextInt(sentences.length)]; LOG.debug("Emitting tuple: {}", sentence); _collector.emit(new Values(sentence)); } }
Boltについて
Boltは変換処理をする。
重要な点として、以下の点があります。
execute
: 処理を実行する。awk
を返さないといけない。declareOutputFields
: 出力のタプルのフィールドを指定する。
public static class WordCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
BaseBasicBolt
を継承してBoltを定義します。
Topologyをデプロイする
以下のコマンドで実行できます。
storm jar [jarファイル] [クラス名] [...引数]
たとえば、brew install
した中にはexample
が入っています。
その場合は
storm jar examples/storm-starter/target/storm-starter-1.2.2.jar org.apache.storm.starter.ExclamationTopology -local
になります。特に引数は気にしていません。
Web UIで確認するとアップロードできていることがわかります。
exampleの場合だと、TestWordSpout
というSpoutとExclamationBoltがBoltとして定義されているTopologyをデプロイしています。
可視化
Topology Summry -> Topology Visualization -> 「Show Visualization」と書いてある。
まとめ
- とにかく
@Override
する項目を定義すればokです。 - スレッド数など簡単に設定できる一方で、正しく設定するには経験が必要そう
- Web UIは非常に強力である。