Kekeの日記

エンジニア、読書なんでも

Apache Stormのトポロジーチュートリアル

f:id:bobchan1915:20180816182740p:plain

本記事ではApache Stormのトポロジーを中心しに解説していこうと思います。

前提知識

StormのAPIがデザインパターンBuilderを使用しますので知らなければ学習してください。

簡単にインストールできます。

brew install storm

用語

  • topology: SpoutとBoltから構成されるネットワーク構造。
  • tuple: 流れるデータの集まりである。値のリストである。
  • Spout: ストリームのソース。
  • Bolt: 処理を施すノード。

Data modelを定義する

データモデルとしてタプルを使用します。

トポロジの一つ一つは、output fieldを定義しなくてはなりません。 ノードがどのようなoutputを出すのかを定義しないといけません。

@Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple"));
    }    

ここでは["double", "triple"]を定義しています。

トポロジーを作る

一つのSpoutと二つのBoltを定義する。

TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout("words", new TestWordSpout(), 10);        
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
        .shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
        .shuffleGrouping("exclaim1");

ここでは逐次的な構築をしている。 10, 3などのマジカルナンバーはその構成要素の数(スレッド数)を表す。

ShuffleGroupingとは各ボルトが等しい数のタプルを受け取れるようにするもの。

FieldGroupingはフィールドごとにボルトを設定できるもの。

word -> exclaim1 -> exclaim2の順で送られることが定義されてある。 そして、exclaimXはそれぞれ同じ数のタプル数を受け取る。

Spoutについて

SpoutはタプルをnextTupleでストリームすることができる。

たとえばSpoutは以下のようになります。

public class RandomSentenceSpout extends BaseRichSpout {
     @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    _collector = collector;
    _rand = new Random();
  }

  @Override
  public void nextTuple() {
    Utils.sleep(100);
    String[] sentences = new String[]{sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"),
            sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")};
    final String sentence = sentences[_rand.nextInt(sentences.length)];

    LOG.debug("Emitting tuple: {}", sentence);

    _collector.emit(new Values(sentence));
  }
}

Boltについて

Boltは変換処理をする。

重要な点として、以下の点があります。

  • execute: 処理を実行する。awkを返さないといけない。
  • declareOutputFields: 出力のタプルのフィールドを指定する。
public static class WordCount extends BaseBasicBolt {
    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
      String word = tuple.getString(0);
      Integer count = counts.get(word);
      if (count == null)
        count = 0;
      count++;
      counts.put(word, count);
      collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word", "count"));
    }
  }

BaseBasicBoltを継承してBoltを定義します。

Topologyをデプロイする

以下のコマンドで実行できます。

storm jar [jarファイル] [クラス名] [...引数]

たとえば、brew installした中にはexampleが入っています。

その場合は

storm jar examples/storm-starter/target/storm-starter-1.2.2.jar org.apache.storm.starter.ExclamationTopology -local

になります。特に引数は気にしていません。

Web UIで確認するとアップロードできていることがわかります。

f:id:bobchan1915:20180817193641p:plain

exampleの場合だと、TestWordSpoutというSpoutとExclamationBoltがBoltとして定義されているTopologyをデプロイしています。

可視化

Topology Summry -> Topology Visualization -> 「Show Visualization」と書いてある。

f:id:bobchan1915:20180817195049p:plain

まとめ

  • とにかく@Overrideする項目を定義すればokです。
  • スレッド数など簡単に設定できる一方で、正しく設定するには経験が必要そう
  • Web UIは非常に強力である。