Kekeの日記

エンジニア、読書なんでも

PrestoでApache Kafkaに分散クエリを投げる

スクリーンショット 2018-08-14 21.02.12.png

対象読者

  • 分散SQL環境を構築したい方
  •  Prestoに入門したい方
  •  Kafkaの中身にクエリを投げたい方

Prestoとは

presto-og.png

分散SQLクエリエンジン 

以下のようなアーキテクチャです。

image.png

特徴 - 分散しているので高速なクエリが叩ける。Hiveはバッジ処理を目的とするため遅い。 - 複数のデータソースにクエリを投げれる。 - 似た者でApache HiveApache Impalaがあり、これらはどれもSQLエンジン。

詳しくは以下の記事をご覧ください。

tug.red

Apache Kafkaとは

download.png

分散ストリーミング基盤

世界の上場している企業の半数近くが採用しているらしいです。

特徴

  • ストリームを配信することができる
  • 分散してキューを取り出すことによって分散処理ができる

詳しくは以下の記事をご覧ください。

kafka.apache.org

PrestoでKafkaにクエリを投げる

ヘキサゴナルアーキテクチャ故に、概念が抽象化されています。

1. Connectorを定義する

Tutorialにもありますが、いくつかプロパティを定義しないといけません。 一覧にまとめました。

また、これらのcatalogはPRESTO_HOME/etc/catalog以下に作成しないといけません。

property 何を設定するか
kafka.table-names テーブル名
kafka.nodes Kafkaのクラスタのhostname:port
kafka.connect-timeout タイムアウトまでの時間
kafka.table-descrition-dir トピックの説明ファイル(.json)が入ったディレクトリ
kafka.default-schema テーブルのデフォルトのスキーマ
kafka.buffer-size バッファサイズ
kafka.hide-internal-columns 内部カラムがテーブルのスキーマに入るのかを定義 

 一つ一つ解説をしましょう。

kafka.table-names カンマ,で分けることにより複数定義することができます。

ここで定義されるテーブルはそれぞれ説明ファイルを定義することができる。 説明ファイルがなければtable名=topic名である。

<schema-name>.<table-name>のように名前をつけるとよい。

kafka.default-schema schemaがあると例えばトピックfront.userがあったとして、frontででもクエリを投げることができます。 これはkafkaの要請ではないので、あくまでもprestoを使ううえでのことです。

kafka.table-description-dir 省略可。 特に細かな設定をしない場合はきにすることがありません。

デフォルトでは、etc/kafkaに配置してあります。

私の場合はkafka.propertiesは以下のようになりました。

connector.name=kafka
kafka.table-names=liveai.greet
kafka.nodes=localhost:9092

テーブル定義ファイル

Kafkaのトピックはシリアライズされた物が入っているので、Prestoではこれらのデータはマッピングされないといけない。

.jsonで定義する。

いまのところ、Apache Avroはサポートされていません。

やっと投げてみる

設定したのを前提に進めていきます。

1. デモ用のトピックにデータをpublishする

あらかじめkafka Producer APIを使って1~99の連番をトピックに書き込みました。

試しにkafka CLIを使ってみて見ると

kafka-console-consumer --bootstrap-server localhost:9092 --partition 1 --topic liveai.greet --from-beginning         20:19 14-08-2018
1
3
4
7
8
9
13
14
16
19
22
24
25
28
29
30
31
32
33
35
39
40
44
46
50
53
57
59
60
64
65
66
70
71
73
75
77
78
79
80
81
84
87
88
90
92
96
97
99

値が飛び飛びになっているものは、このトピックがパーティション数を2にしているからです。だいたい半分取得できているのでいいでしょう。

2. Presto CLIを使って接続します。

私の場合はcatalog名が kafkaで、スキーマがliveaiなので以下のようになります。

presto --catalog kafka --schema liveai
presto:liveai>|

はじめに付け足すと、tabを使うと補完できるので知っておくと便利です。

presto:liveai>
CREATE TABLE     DESCRIBE         DROP TABLE       EXPLAIN          HELP             QUIT             SELECT           SHOW CATALOGS    SHOW COLUMNS     SHOW FUNCTIONS   SHOW SCHEMAS     SHOW SESSION     SHOW TABLES      USE

そして、テーブル名を見てみます。

presto:liveai> show tables;
 Table
-------
 greet
(1 row)

https://localhost:8080/uiでは、以下のようになります。(しょっぱなからサーバーをたて忘れていて、INTERNAL ERRORがででいますが気にしないでください。

スクリーンショット 2018-08-14 20.33.13.png

ここで注意してほしいのが、prest serverがたっていようとなかろうとpresto対話シェルは起動できます。 私がエラーを出したときもサーバーがたっていないことに気づきませんでした。

3. topic内をクエリで検索

SQL言語です。

presto:liveai> SELECT count(*) FROM greet;
 _col0
-------
   100
(1 row)

Query 20180814_122455_00019_4tvz4, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [100 rows, 190B] [230 rows/s, 437B/s]

ちゃんと100個ありますね。

UIは以下のように更新されています。

スクリーンショット 2018-08-14 21.28.35.png

まとめ

どちらもスタンドアローンモードがあるので試すのは簡単。ぜひ試してみてください。