ApacheのIncubatorプロジェクトApache Gearpumpでストリーム処理をしてみる
Apache Gearpumpとは
概要
Apache Gearpumpとは、
リアルタイムビッグデータストリームエンジン
です。なお、Apache Software Foundation Incubator projectになっています。
特徴
他のストリーム処理エンジンと違って、Gearpumpはイベント/メッセージ(あとで解説する)ベースである。
他にも特徴としては
- 非常に高いスループットと低いレイテンシストリーム処理
- メッセージセマンティクスを設定することができる(at least once, exactly once)
- アプリケーションのホットなリデプロイ
- アプリケーションモニタリングのための包括的なダッシュボード
- Stormアプリケーションの互換性
- Samoaアプリケーションの互換性
- 使いやすいAPIと拡張性の高いAPI。
- 高レベルも低レベルAPIもサポート
インターフェース
以下のインターフェースが用意されています。
- JavaAPI
- ScalaAPI
- RESTAPI
基本的なコンセプト
MasterとWorker
Gearpumpはマスタースレーブアーキテクチャをとります。
すべてのクラスタには一つのMasterが一つ以上のWorkerを持っています。
役割としては
- Master: クラスタのリソースをまとめる
- Worker: マシン単体のリソースを管理する
さらに詳細は以下のように構成になっています。
Application
Applicationとは、私たちが分散処理をしたいアプリケーションのことをさします。 バッジ処理を対象とするMapReduceアプリケーションや、ストリーム処理を対象とするストリーム処理アプリケーションがありますが、Gearpumpはストリーム処理アプリケーションをサポートしている。
AppMasterとExecutor
ランタイムの中で、すべてのアプリケーションのインスタンスは一つのAppMasterと、Executorのリストで構成されます。
AppMasterはApplication自体のコントロールセンターのようなもので命令します。
これはユーザーとも、Masterとも、Workerとも、Executorとも通信し、ジョブと呼ばれる処理を終えることを全うします。
Executorは分散したワーカーの中での並行処理を行うものです。
Applicationのアップロードフロー
公式ドキュメントでは、Application Submission Flowと書かれていますがアップロードと訳しています。
- ユーザーがアプリケーションをアップロードすると、Masterは実行できるWorkerを探してAppMasterが作れそうなら、Masterが受け取ります。
- AppMasterが起動されると、MasterにWorkerを要求します。Worker中のExecutorが起動するとジョブを始める。
- Masterにその結果が返される
以下のようなフローになっています。
ApMasterは使用可能なWorkerを配置して、WorkerのサブプロセスとしてAppMasterを起動します。
ストリームトポロジー、プロセッサー、タスク
ストリームアプリケーションでは、DAGのトポロジーをもつアプリケーションがあり、データプローを定義している。
ストリームタスクとパーティショナー
タスクとはストリーム処理内の並行処理可能なアトミックな概念である。
ランタイム内では、プロセッサーの中にタスクがいくつかある。
その中で、どのようにデータフローを制御するかを設定することができる。
技術的なハイライト
Gearpumpは高いパフォーマンスと、柔軟で、耐障害性の高い、レスポンジブルなストリームプラットフォームである。
どこでもActor
再度、掲載になりますが、Actorのヒエラルキーは以下のようになっています。
アクターモデルを取っていて、マイクロサービスのように他のアクターとは切り離されている凝縮性の高いコンポーネントで構成されています。
アクターをたくさん定義することによって、複雑なタスクをこなすことができる。
Exactly onceセマンティクス
厳密にデータは一回だけ確実に届くのがExactly onceなメッセージセマンティクスです。
そして時間計算より、再送信して、未来の時間で再度計算されるようなことはありません。
Flowコントロール
ビルドインでフローコントロールがサポートされています。
タスク間でメッセージがやりとりされる中で、下流のタスクが詰まってしまわないようにします。
ビルドインダッシュボード
バックエンドにRESTAPIを使っとWebUIダッシュボードが用意されています。
ビルドインApache KafkaとHDFSコネクター
一般的に広くデータ基盤開発で使われているApache KafkaやHDFSのコネクターがすでにあるので、簡単に導入しやすそうです。
Dockerでインストール
コンテナの起動
念のためDocker Imageがあることを確認します。
docker search gearpump NAME DESCRIPTION STARS OFFICIAL AUTOMATED gearpump/gearpump Gearpump docker container for one node clust… 4 [OK] ...
そして以下のコマンドでまずgearpumpのコンテナを取得します。
docker pull gearpump/gearpump
そして以下のコマンドでコンテナを起動します。
docker run -t -p 8090:8090 --name gearpump gearpump/gearpump
バックグラウンドで起動したいときは-d
オプションをつけてください。
ここでhttps://localhost:8090にアクセスすると、ダッシュボードを見ることができます。
ちなみに
- username:
admin
- password:
admin
でログインをすることができます。
もちろんアプリケーションは何もできていません。
Dockerfileを読む
以下がDockerコンテナを作るためのDockerfileになります。
# Docker file FROM errordeveloper/oracle-jre # Prepare the package RUN curl --location --retry 3 --insecure https://github.com/gearpump/gearpump/releases/download/0.7.1/gearpump-2.11-0.7.1.zip -o tmp.zip && unzip -q tmp.zip && rm tmp.zip && chmod +x gearpump-2.11-0.7.1/bin/* ADD gear.conf gearpump-2.11-0.7.1/conf/gear.conf EXPOSE 3000 EXPOSE 8090 ENTRYPOINT gearpump-2.11-0.7.1/bin/local -workernum 1 & gearpump-2.11-0.7.1/bin/services
ここで知る必要があるのは以下のADD
が書かれている行です。
ADD gear.conf gearpump-2.11-0.7.1/conf/gear.conf
このコマンドだと自分のカレントディレクトリにあるgear.conf
をコピーをコンテナ内に追加しています。
どうやらgear.conf
を作って設定ファイルを定義していけば良さそうなことが推測できます。
サンプルアプリケーションをアップロードする
以下のリポジトリにgearpumpの公式リポジトリとなるのでクローンしてください。
git clone git@github.com:apache/incubator-gearpump.git
そして、ビルドしてください。
sbt clean assembly packArchiveZip
そして.zip
ファイルができるので、回答します。
unzip gearpump-2.11-0.8.4.zip
そしてCLIツールでjarファイルをアップロードします。
bin/gear app -jar examples/wordcount-2.11-0.8.4-assembly.jar org.apache.gearpump.streaming.examples.wordcount.WordCount
設定ファイルgear.conf
について
以下のドキュメントで設定について見ることができます。
Configuration - Apache Gearpump(incubating)
Gearpumpトポロジーの書き方
今回はScalaで説明します。
object WordCount extends AkkaApp with ArgumentsParser { override val options: Array[(String, CLIOption[Any])] = Array.empty override def main(akkaConf: Config, args: Array[String]): Unit = { val context = ClientContext(akkaConf) val app = StreamApp("dsl", context) val data = "This is a good start, bingo!! bingo!!" //count for each word and output to log app.source(data.lines.toList, 1, "source"). // word => (word, count) flatMap(line => line.split("[\\s]+")).map((_, 1)). // (word, count1), (word, count2) => (word, count1 + count2) groupByKey().sum.log context.submit(app).waitUntilFinish() context.close() } }
基本的にはAkkaApp
をextendすることで実行できます。
Source
がトポロジー内でデータの入力となります。
そして中で処理をしたあと、context
をclose
することによってトポロジーを閉じることができます。
まとめ
- どの言語でもかけるわけではないので、つらい
- 基本的な構成はおおまか、どのストリーム処理エンジンと一緒である
- 調べ不足なだけかもしれないが、耐障害性の根拠が気になる
適宜、追記する予定です。