Kekeの日記

エンジニア、読書なんでも

ApacheのIncubatorプロジェクトApache Gearpumpでストリーム処理をしてみる

f:id:bobchan1915:20180908225908p:plain

Apache Gearpumpとは

概要

Apache Gearpumpとは、

リアルタイムビッグデータストリームエンジン

です。なお、Apache Software Foundation Incubator projectになっています。

特徴

他のストリーム処理エンジンと違って、Gearpumpはイベント/メッセージ(あとで解説する)ベースである。

他にも特徴としては

  • 非常に高いスループットと低いレイテンシストリーム処理
  • メッセージセマンティクスを設定することができる(at least once, exactly once)
  • アプリケーションのホットなリデプロイ
  • アプリケーションモニタリングのための包括的なダッシュボード
  • Stormアプリケーションの互換性
  • Samoaアプリケーションの互換性
  • 使いやすいAPIと拡張性の高いAPI。
  • 高レベルも低レベルAPIもサポート

インターフェース

以下のインターフェースが用意されています。

  • JavaAPI
  • ScalaAPI
  • RESTAPI

基本的なコンセプト

MasterとWorker

Gearpumpはマスタースレーブアーキテクチャをとります。

すべてのクラスタには一つのMasterが一つ以上のWorkerを持っています。

役割としては

  • Master: クラスタのリソースをまとめる
  • Worker: マシン単体のリソースを管理する

さらに詳細は以下のように構成になっています。

https://gearpump.apache.org/releases/latest/img/actor_hierarchy.png

Application

Applicationとは、私たちが分散処理をしたいアプリケーションのことをさします。 バッジ処理を対象とするMapReduceアプリケーションや、ストリーム処理を対象とするストリーム処理アプリケーションがありますが、Gearpumpはストリーム処理アプリケーションをサポートしている。

AppMasterとExecutor

ランタイムの中で、すべてのアプリケーションのインスタンスは一つのAppMasterと、Executorのリストで構成されます。

AppMasterはApplication自体のコントロールセンターのようなもので命令します。

これはユーザーとも、Masterとも、Workerとも、Executorとも通信し、ジョブと呼ばれる処理を終えることを全うします。

Executorは分散したワーカーの中での並行処理を行うものです。

Applicationのアップロードフロー

公式ドキュメントでは、Application Submission Flowと書かれていますがアップロードと訳しています。

https://gearpump.apache.org/releases/latest/img/submit.png

  1. ユーザーがアプリケーションをアップロードすると、Masterは実行できるWorkerを探してAppMasterが作れそうなら、Masterが受け取ります。
  2. AppMasterが起動されると、MasterにWorkerを要求します。Worker中のExecutorが起動するとジョブを始める。
  3. Masterにその結果が返される

以下のようなフローになっています。

https://gearpump.apache.org/releases/latest/img/submit2.png

ApMasterは使用可能なWorkerを配置して、WorkerのサブプロセスとしてAppMasterを起動します。

ストリームトポロジー、プロセッサー、タスク

ストリームアプリケーションでは、DAGのトポロジーをもつアプリケーションがあり、データプローを定義している。

https://gearpump.apache.org/releases/latest/img/dag.png

ストリームタスクとパーティショナー

タスクとはストリーム処理内の並行処理可能なアトミックな概念である。

ランタイム内では、プロセッサーの中にタスクがいくつかある。

その中で、どのようにデータフローを制御するかを設定することができる。

https://gearpump.apache.org/releases/latest/img/shuffle.png

技術的なハイライト

Gearpumpは高いパフォーマンスと、柔軟で、耐障害性の高い、レスポンジブルなストリームプラットフォームである。

どこでもActor

再度、掲載になりますが、Actorのヒエラルキーは以下のようになっています。

https://gearpump.apache.org/releases/latest/img/actor_hierarchy.png

アクターモデルを取っていて、マイクロサービスのように他のアクターとは切り離されている凝縮性の高いコンポーネントで構成されています。

アクターをたくさん定義することによって、複雑なタスクをこなすことができる。

Exactly onceセマンティクス

厳密にデータは一回だけ確実に届くのがExactly onceなメッセージセマンティクスです。

そして時間計算より、再送信して、未来の時間で再度計算されるようなことはありません。

https://gearpump.apache.org/releases/latest/img/exact.png

Flowコントロール

ビルドインでフローコントロールがサポートされています。

タスク間でメッセージがやりとりされる中で、下流のタスクが詰まってしまわないようにします。

https://gearpump.apache.org/releases/latest/img/flowcontrol.png

ビルドインダッシュボード

バックエンドにRESTAPIを使っとWebUIダッシュボードが用意されています。

https://gearpump.apache.org/releases/latest/img/dashboard.gif

ビルドインApache KafkaとHDFSコネクター

一般的に広くデータ基盤開発で使われているApache KafkaやHDFSのコネクターがすでにあるので、簡単に導入しやすそうです。

Dockerでインストール

コンテナの起動

念のためDocker Imageがあることを確認します。

docker search gearpump

NAME                                      DESCRIPTION                                     STARS               OFFICIAL            AUTOMATED
gearpump/gearpump                         Gearpump docker container for one node clust…   4                                       [OK]
...

そして以下のコマンドでまずgearpumpのコンテナを取得します。

docker pull gearpump/gearpump

そして以下のコマンドでコンテナを起動します。

docker run -t -p 8090:8090 --name gearpump gearpump/gearpump

バックグラウンドで起動したいときは-dオプションをつけてください。

ここでhttps://localhost:8090にアクセスすると、ダッシュボードを見ることができます。

f:id:bobchan1915:20180908204207p:plain

ちなみに

  • username: admin
  • password: admin

でログインをすることができます。

もちろんアプリケーションは何もできていません。

f:id:bobchan1915:20180908204336p:plain

Dockerfileを読む

以下がDockerコンテナを作るためのDockerfileになります。

# Docker file
FROM errordeveloper/oracle-jre

# Prepare the package
RUN curl --location   --retry 3 --insecure https://github.com/gearpump/gearpump/releases/download/0.7.1/gearpump-2.11-0.7.1.zip -o tmp.zip && unzip -q tmp.zip && rm tmp.zip && chmod +x gearpump-2.11-0.7.1/bin/*

ADD gear.conf gearpump-2.11-0.7.1/conf/gear.conf

EXPOSE 3000

EXPOSE 8090

ENTRYPOINT gearpump-2.11-0.7.1/bin/local -workernum 1 & gearpump-2.11-0.7.1/bin/services

ここで知る必要があるのは以下のADDが書かれている行です。

ADD gear.conf gearpump-2.11-0.7.1/conf/gear.conf

このコマンドだと自分のカレントディレクトリにあるgear.confをコピーをコンテナ内に追加しています。

どうやらgear.confを作って設定ファイルを定義していけば良さそうなことが推測できます。

サンプルアプリケーションをアップロードする

以下のリポジトリにgearpumpの公式リポジトリとなるのでクローンしてください。

git clone git@github.com:apache/incubator-gearpump.git

そして、ビルドしてください。

sbt clean assembly packArchiveZip

そして.zipファイルができるので、回答します。

unzip gearpump-2.11-0.8.4.zip

そしてCLIツールでjarファイルをアップロードします。

bin/gear app -jar examples/wordcount-2.11-0.8.4-assembly.jar org.apache.gearpump.streaming.examples.wordcount.WordCount

設定ファイルgear.confについて

以下のドキュメントで設定について見ることができます。

Configuration - Apache Gearpump(incubating)

Gearpumpトポロジーの書き方

今回はScalaで説明します。

object WordCount extends AkkaApp with ArgumentsParser {

  override val options: Array[(String, CLIOption[Any])] = Array.empty

  override def main(akkaConf: Config, args: Array[String]): Unit = {
    val context = ClientContext(akkaConf)
    val app = StreamApp("dsl", context)
    val data = "This is a good start, bingo!! bingo!!"

    //count for each word and output to log
    app.source(data.lines.toList, 1, "source").
      // word => (word, count)
      flatMap(line => line.split("[\\s]+")).map((_, 1)).
      // (word, count1), (word, count2) => (word, count1 + count2)
      groupByKey().sum.log

     context.submit(app).waitUntilFinish()
     context.close()
  }
}

基本的にはAkkaAppをextendすることで実行できます。

Sourceがトポロジー内でデータの入力となります。

そして中で処理をしたあと、contextcloseすることによってトポロジーを閉じることができます。

まとめ

  • どの言語でもかけるわけではないので、つらい
  • 基本的な構成はおおまか、どのストリーム処理エンジンと一緒である
  • 調べ不足なだけかもしれないが、耐障害性の根拠が気になる

適宜、追記する予定です。