Kekeの日記

エンジニア、読書なんでも

NATS on Kubernetesのいろは。メッセージセマンティックからPrometheus、Grafanaによるモニタリングまで

f:id:bobchan1915:20181224151947p:plain

本記事

本記事はKubernetes 3 Advent Calendar 2018の14日目の記事です。

本日(2018/12/26)の投稿になっているのは、空きがあったため埋めるために参加したからなのであって、決して遅刻などではありません!笑

モチベーション

私は学部3年生だった去年の2017年6月あたりにオンライン学習プラットフォームを一人で作っていました。ここでいうオンライン学習プラットフォームは、機械学習分野での「オンライン学習」を指します。

簡単にいうとニア・リアルタイムで機械学習エンジンのパラメータを更新していく、なんとも難しいシステムです。当初は、単線的にニューラルネットワークでやろうと思っていましたが、そのアルゴリズム(誤差逆伝搬あたり)ゆえに、分散的に処理を実行するのは不可能でした。そのときはApache Hadoop、Apache Kafka, Apache Cassandraなどを主に使っていましたが、何より一人で構築するのは大変でした。

f:id:bobchan1915:20181226033724p:plain

ここでApache Kafkaという分散メッセージングシステムが出てきたわけですが、このときにメッセージングセマンティックスだったり考えることが多いなと思いました。ありとあらゆるメッセージングシステムがある今の時代に、知識なしでは選定するのは難しいなとその時は思いました。

たとえばApache Samza、Apache Pulsar(私の一番好きなもの)、Apache Beam、Apache Flinkなどなどです。主にメッセージングセマンティックは

  • at-least-once
  • at-most-once
  • exactly-once

とあるわけです。ここで「主に」とつけたのは、Apache Pulsarをはじめとして「工夫してくる」セマンティックが登場してきているからです。Apache PulsarでいうとEffectively-once(効率的に一つ)です。

短絡的にこれで解決じゃないか!と思うかもしれません。しかし、メッセージングセマンティックだけでなくコミュニティの成熟度をはじめとしてAPIのサポート状態、運用の難しさ、障害点の有無など、差異が多少なりとも存在するので、設計要件にあった技術選定ができるかどうかは一概に言えません。

少し話が変わりますが、私がいま専攻している工学という学問でも進学時に受けたガイダンスでは

工学は、判断を下すために客観的資料を揃え、決定をする総合性をもつ科学技術である。...。このような経験を通じて、工学は社会における工業のあり方、工学の意義、役割を改めて問い直し、一層、総合的な視野、判断をもつ科学として脱皮しつづけている。 工学は、人間が道具をつくり始めた技術の起源と共に長い歴史をもち、同時に絶えず脱皮、発展をつづける若い体質をもっていると言えよう。

と書かれてありました。やはり、工学(英語でエンジニアリング)を実践する人が技師(エンジニア)であり、総合的な問題解決手段の術を知ることがより良いエンジニアリングになれるのではないかと思っています。

その一環として、Cloud Native Computing FoundationのIncubator ProjectであるNATSを使ってみて、自分の知識として消化することを目指しています。

今までまとめた関連する備忘録的な記事は以下の通りです。

www.1915keke.com

www.1915keke.com

www.1915keke.com

www.1915keke.com

www.1915keke.com

www.1915keke.com

www.1915keke.com

www.1915keke.com

本記事のコンテンツ

以下のコンテンツがありますので、興味のあるセクションだけでもご覧ください。

NATSについて

NATS

f:id:bobchan1915:20181224151947p:plain

NATSとは、

Neural Autonomic Transport Systemの略で、オープンソースなメッセージングシステム

であるということができます。

NATSとNATS Streaming

このプロジェクトには大きく二つの機能があり、NATSとNATS Streamingです。

1. NATS

  • At-most-onceの保証
  • クラスタリングができる
  • Auto Pruning of Client機能によってパフォーマンスの悪いクライアントを切断

2. NATS Streaming

  • At-least-onceの保証
  • NATSストリーミングサーバーでストリーミングをする

メッセージモデル

NATSは以下の3つのメッセージモデルをサポートしています。

1. Pub/Sub

f:id:bobchan1915:20181225135542p:plain

Subjectに対してPublisherというメッセージを配信する人が、Subjectを購読する全ての人にメッセージを配信する配信機構で、Subject自体(NATSサーバー)がブローカーのような役割をする。

HTTPリクエストなどと異なり、データを配信するPublisherはレスポンスを待つ必要がないため、非常に高速に、あとの処理をSubscriberに任せることができます。不可抗力ですが、その反面、レスポンスを必要とするようなもの(例えばユーザーデータ取得など)には向いておらず、一方的なデータ配信になります。

2. Request Reply

次にRequest Replyメッセージモデルを説明します。

f:id:bobchan1915:20181225165732p:plain

いくつものSubscriberはいるのですがPub/Subと違ってRequestに対してはReplyという形で返答をすることができます。

HTTPリクエストによく似ていますが、サーバとクライアントの1対1関係ではなくて1 対多の関係にもすることができます。

一般的なストリーム処理エンジンは次の'Request Reply'などレスポンスを返すものをサポートしているのは珍しいため、NATSのユースケースとしてはメッセージングに限らないです。

3. Queue

f:id:bobchan1915:20181225140709p:plain

この図はあくまでもイメージ図なので気をつけてください。Apache Kafkaと違って、あらかじめTopicを立てる必要がありません。

ストリーム処理の中でのロードバランサーの役割を果たすものです。Queue Groupという同じQueue Nameを持つキューで構成するものを購読するSubscriberどれか一つに配信されます。どれか一つということが重要で、分散的に処理することができるというわけです。

PublishされたメッセージはQueue GroupのどれかのQueueに入ります。

そして、一つのSubscriberに配信されます。

ユースケース

クラスタを構築する場合はまだ多様なアーキテクチャがあるので、これからは単体NATS Serverでの話をします。

もちろんNATS Serverは一般的なWebサーバーとしても簡単な動作はすることはできるでしょう。

というのも、RESTfulにはできないものの、Request ReplyメッゼージモデルによってレスポンスをReplyできるからです。

f:id:bobchan1915:20181228104202p:plain

恩恵としては、クライアントとサーバーが1対1である必要がない点だと思います。

ストリームのメッセージに対して永続化を目的とするSubscriberと、Responceを高速化するためのSubscriberがあると、永続化とレスポンスを同時に並行的に行えたりします。

また、体系的な話として広く使われているラムダアーキテクチャにもNATS Serverを使うことができます。

f:id:bobchan1915:20181228104936p:plain

ザーピングレイヤと呼ばれる高速なDBを用いて可視化に対して高速にレスポンスできるようにします。また、直近のメッセージはスピードレイヤでストリーム処理され、(ニア)リアルタイムで可視化できるようにリアルタイムビューにします。スピードレイヤに関連する話で、以前にApache PrestoでApache Kafkaに分散クエリを投げたりしていました。

www.1915keke.com

このアーキテクチャのメリットとして、「ある確かさ」でのストリーム処理をできることでしょう。つまり、いずれストリーム処理の結果はバッジ処理によってより確かなものになるので、少し粗い(ニア)リアルタイムビューを表示しても問題は少ないです。

これに対して、さらに発展したカッパアーキテクチャというものがあります。これはラムダアーキテクチャがバッジ処理とストリーム処理の両方を実装しなければならないところを指摘して、より開発効率のよいアーキテクチャになっています。

f:id:bobchan1915:20181228105709p:plain

これはメッセージブローカー(ここでいうNATSサーバー)がメッセージを一定期間保持して、過去のメッセージと現在のメッセージに対してストリーム処理をします。処理が冪等であるならば、適宜バッジ処理をするようになります。アーキテクチャはシンプルな一方で、ストリーム処理に対して過去のデータの流し込むとメモリ消費量の増加などをはじめとする負荷があがってしまうので気をつけなければなりません。

構築編

Helmを使ってインストール

Helmが使える状態であることを前提にしています。

本記事を書く前に、拝見させていただいた以下の記事があります。

qiita.com

この記事の2018年5月あたりにはHelmにNATS Chartがなかったのですが6月に登場し、簡単にKubernetesにデプロイできるようになっています。

それでは始めていきましょう。

まず、Helmを使ってインストールします。

helm install --name nats stable/nats

少しインストールまで待ちます。

次にサービスを取得します。

kubernetes get svc -l app=nats

すると以下のようになります。

NAME                   TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
nats-nats-client       ClusterIP   10.31.250.222   <none>        4222/TCP            6m
nats-nats-cluster      ClusterIP   10.31.248.7     <none>        6222/TCP            6m
nats-nats-headless     ClusterIP   None            <none>        4222/TCP,6222/TCP   6m
nats-nats-monitoring   ClusterIP   10.31.241.104   <none>        8222/TCP            6m

このようにサービスがあり、Podは以下のようにStatefulsetなものが一つあります。

kubernetes get pods

NAME          READY     STATUS    RESTARTS   AGE
nats-nats-0   1/1       Running   0          7m

Monitoringサービス

Monitoringサービスを使ってみましょう。

NATSは以下のようなJSONを返すエンドポイントを持っています。

エンドポイント
varz 基本的な情報を提供。クライアントの接続数やCPU使用率など。
connz 現在のコネクションについてvarzより詳しい情報を提供。クライアントそのものの情報。 
routez クラスタでアクティブなルーティングを提供。
subz 現在の購読について詳しい情報を提供。サブスクライバー数や、メッセージ数など。

GUIでも確認できるのできます。まず、ポート転送をします。

kubectl port-forward --namespace default svc/nats-nats-monitoring 8222:8222

そして、以下のようにブラウザで情報を取得できればおっけいです。

f:id:bobchan1915:20181225043101p:plain

もちろんTerminalからも同様に取得できます。

curl http://localhost:8222/connz:title

{
  "server_id": "fCVm9lMRYCi9pSAM6TMiLr",
  "now": "2018-12-24T19:33:40.842378678Z",
  "num_connections": 0,
  "total": 0,
  "offset": 0,
  "limit": 1024,
  "connections": []
}

GUIから見るとわかりやすいってこともないので、あまり使い勝手がよいとは言えません。

サードパーティのDashboardを使ってみる

以下のプロジェクトによってNATSのモニタリングを可視化するダッシュボードを構築することができます。

github.com

まず、以下のように取得します。

npm install -g natsboard

そして起動するだけです。

natsboard

内容的には定期的にAPIから取得しているだけです。

f:id:bobchan1915:20181225044140g:plain

可視化をするには便利なものなので、あとから再度、使ってみます。

実装編

Pub/Subメッセージモデル

使用するライブラリは公式のGoライブラリを使います。

github.com

以下のようにインストールしてください。

go get github.com/nats-io/go-nats

また、今回はClientサーバーにアクセスするため、再度ポート転送しなければなりません。

kubectl port-forward --namespace default svc/nats-nats-client 4222:4222

Publisher側

以下のようにメッセージを送る、上流の方のコードを実装します。

図にすると以下のよう箇所に該当します。

f:id:bobchan1915:20181225062124p:plain

1. 接続を確立させる

f:id:bobchan1915:20181225054902p:plain

チュートリアルにはnats.DefaultURLというようにnats.DefaultURLが割り当てられています。

まず、接続を確立するところまで書きましょう。

package main

import (
    "log"

    nats "github.com/nats-io/go-nats"
)

func main() {
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()
}

実行するとエラーが出ます。

2018/12/25 04:59:13 nats: authorization violation
exit status 1

これは認証に失敗して接続ができない状態だからです。

NATSには二つの認証方法があり、

  • UsernameとPasswordによるもの
  • Tokenによるもの

の二つがあります。今回はHelmを使ってインストールしたのでデフォルトの前者を使っていきます。

以下のように取得をすることができます。

ここでNATSモニタリングサービスのログインに使うユーザー名とパスワードは以下のコマンドで確認することができます。

 export NATS_USER=$(kubectl get cm --namespace default nats-nats -o jsonpath='{.data.*}' | grep -m 1 user | awk '{print $2}')
 export NATS_PASS=$(kubectl get cm --namespace default nats-nats -o jsonpath='{.data.*}' | grep -m 1 password | awk '{print $2}')
  echo -e "Client credentials:\n\tUser: $NATS_USER\n\tPassword: $NATS_PASS"

これで取得することができます。Helmでデプロイされる場合はデフォルト値は

  • Username: nats_cluster
  • Password: random alhpanumeric string (10)

になっています。各々で設定をする場合は以下のようにHelmのValueファイルを作るといいでしょう。

cat > value.yaml <<EOF
auth:
     user: Keke
    password:  MY_PASSWORD
 EOF

そしてHelmにデプロイするときに-fオプションをつけてこのファイルを指定するといいでしょう。

helm install --name nats -f my_value.yml stable/nats

nats.Connectメソッドに第二引数であるOptionを渡すことができるので以下のように実装します。

package main

import (
    "log"

    nats "github.com/nats-io/go-nats"
)

func main() {
    nc, err := nats.Connect(nats.DefaultURL, nats.UserInfo("YOUR_USERNAME", "YOUR_PASSWORD"))
    if err != nil {
        log.Fatal(err)
    }

        time.Sleep(10 * time.Second)

    defer nc.Close()
}

そして実行すると無事エラーなく接続することができました。

go run hoge.go

だいたいは環境変数にて設定するので以下のようにします。.envには以下のように記述する。

USERNAME=anatano_username
PASSWORD=anatano_password

であり、goスクリプトの方は

package main

import (
    "log"
    "os"

    "github.com/joho/godotenv"
    nats "github.com/nats-io/go-nats"
)

func main() {
    err := godotenv.Load()

    if err != nil {
        log.Fatal(err)
    }

    nc, err := nats.Connect(nats.DefaultURL, nats.UserInfo(os.Getenv("USERNAME"), os.Getenv("PASSWORD")))
    if err != nil {
        log.Fatal(err)
    }

         time.Sleep(10 * time.Second)

    defer nc.Close()
}

です。環境変数としてKubernetesやDockerに渡すにはプレーンテキストで渡すようよりはGoogle Key Manegement Service(KMS)を使えばよいでしょう。

www.1915keke.com

今回は、本質的ではないのでこのまま進めます。

再度、ダッシュボードでConnectionをみるとプログラムが実行中の間は以下のようにでます。

f:id:bobchan1915:20181225055920p:plain

最初のメッセージストリーム実装なので複数のSubscriberがSubjectからメッセージを受け取れることを確認しましょう。今回は試しに3つのSubscriberを動作させます。

f:id:bobchan1915:20181225074830p:plain

出力は以下の通りです。

f:id:bobchan1915:20181225074551g:plain

そしてダッシュボードは以下のようになっています。

inよりoutが約3倍くらい大きな値となって正常でしょう。

f:id:bobchan1915:20181225074705g:plain

2. メッセージをPublishする

f:id:bobchan1915:20181225054744p:plain

以下のコードを付け足してメッセージをPublishします。とりあえず、Subjectgreetingに入れてみます。

for i := 0; i <= 1000; i++ {
    message := fmt.Sprintf("Hello World, %d times", i)
    nc.Publish("greeting", []byte(message))
    println(message)
    time.Sleep(100 * time.Millisecond)
}

1秒あたり10メッセージが含まれているはずです。

今回は1000メッセージを逐次処理で入れてみます。ここで第一引数はsubjectです。

実行してみると以下のようになります。標準出力は以下の通りです。

...
Hello World, 68 times
Hello World, 69 times
..

そしてダッシュボードは以下のようになっています。

f:id:bobchan1915:20181225055759g:plain

オーバーヘッドがあるため完全に10メッセージ/秒ではありませんが、だいたいは届けられています。

Subscriber側

メッセージを受け取る方の実装です。

対象箇所は以下の通りです。

f:id:bobchan1915:20181225062212p:plain

1. Subscribeをする

接続するのはPublisherと同様なので省略します。

そして、以下のように先ほど定義したgreetingサブジェクトを購読します。

f:id:bobchan1915:20181225062736p:plain

三つの方法があります。

1. 非同期的(コールバック)で実装する

以下のようにコールバックで実装します。

nc.Subscribe("greeting", func(m *nats.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
})

println("I will sleep 😴 ...")
time.Sleep(1 * time.Second)
println("I am awake 😎 !!!")

これによって非同期的にメッセージがサブジェクトにPublishされると実行されます。

f:id:bobchan1915:20181225063646p:plain

今後のためにも簡単なシェルスクリプトを書きます。

#!/bin/sh

PUBLISHER_NUMS=1

for ((i=0;i<${PUBLISHER_NUMS};i++))
do
    ./greeting-publisher/main.go &
done

./greeting-subscriber/main.go & 

実行します。すると以下のようになりました。

I will sleep 😴 ...
Received a message: Hello World, 17 times
Received a message: Hello World, 18 times
Received a message: Hello World, 19 times
Received a message: Hello World, 20 times
Received a message: Hello World, 21 times
Received a message: Hello World, 22 times
Received a message: Hello World, 23 times
Received a message: Hello World, 24 times
Received a message: Hello World, 25 times
Received a message: Hello World, 26 times
I am awake 😎 !!!

Sleepしているはずなのに、Callbackで受けることができているのです。非同期的ですね。

スクリプトの実行順序的に17からスタートしています。

f:id:bobchan1915:20181225065304p:plain

仮にmain関数のtime.Sleep()をなくすと即時に終了してしまうので注意が必要です。

2. 同期的に実装する

同期的に実装するには以下のようにSubscribeSyncメソッドを使えばいいでしょう。

sub, _ := nc.SubscribeSync("greeting")
m, _ := sub.NextMsg(timeout)
fmt.Printf("Received a message: %s\n", string(m.Data))

timeoutはその時間メッセージのPublishがなければ終了するような時間を指定します。

例えば、このまま実行すると一つだけ取得して終了します。出力は以下の通りです。

Received a message: Hello World, 173 times

**1. 非同期的(コールバック)で実装する**では、time.Sleep()を取り除くとmain関数が何もすることがなくなるので即時終了されますが、同期処理はすぐ終わることはありません。

うまく表現できないのですが、例えば以下のようなコードではメッセージを受け取るのにmain関数がブロックされてしまいます。

sub, _ := nc.SubscribeSync("greeting")
m, _ := sub.NextMsg(30 * time.Second)

fmt.Printf("Received a message: %s\n", string(m.Data))
println("I will sleep 😴 ...")
time.Sleep(1 * time.Second)
println("I am awake 😎 !!!")

とりあえず、同期を取らなければならないときは必要な機能です。

f:id:bobchan1915:20181225072901p:plain

3. チャネルで受け取る

チャネルを使っても受け取ることができます。

以下のようにSubscribeします。

ch := make(chan *nats.Msg, 64)
sub, err := nc.ChanSubscribe("greeting", ch)
msg := <-ch

メッセージが来るまで待ってしまっているので、同期処理のような実装になっています。わかりやすく、以下のようにするとメッセージがPublishされない状態だとmain関数はブロックされてしまいます。

Publisherは何もPublishしていない状態です。

ch := make(chan *nats.Msg, 64)
_, err = nc.ChanSubscribe("greeting", ch)
msg := <-ch
fmt.Printf("Received a message: %s\n", string(msg.Data))
println("I am executed 🔥")

何も動きません。何かメッセージをPublishしたときは動きます。完全に同期が取れています。msgで受けなくても<-chだけでやっても一緒です。

f:id:bobchan1915:20181225072958p:plain

これを非同期的にするには、並行処理ができればなんてことはありません。これはNATSに限らず、goのチャネルを使った並行処理実装では基本的な知識でしょう。

ch := make(chan *nats.Msg, 64)
_, err = nc.ChanSubscribe("greeting", ch)

go func() {
    for msg := range ch {
        fmt.Printf("Received a message: %s\n", string(msg.Data))
    }
}()

println("I will sleep 😴 ...")
time.Sleep(300 * time.Millisecond)
println("I am awake 😎 !!!")

println("I will sleep 😴 ...")
time.Sleep(300 * time.Millisecond)
println("I am awake 😎 !!!")

実行すると以下のような結果になります。

I will sleep 😴 ...
Received a message: Hello World, 633 times
Received a message: Hello World, 634 times
I am awake 😎 !!!
I will sleep 😴 ...
Received a message: Hello World, 635 times
Received a message: Hello World, 636 times
Received a message: Hello World, 637 times
I am awake 😎 !!!

f:id:bobchan1915:20181225073043p:plain

ここまでSubscriberについてやってきて一通り終わりました。

試しにダッシュボードでみてみようと思います。

綺麗にoutがあって良いですね。

f:id:bobchan1915:20181225073247g:plain

購読を終了するにはUnsubscribeをします。

sub.Unsubscribe()

Request Replyメッセージモデル

次はRequest Replyメッセージモデルを実装して確認してみます。

f:id:bobchan1915:20181225170028p:plain

Request側

f:id:bobchan1915:20181225170213p:plain

まずはメッセージを発信するRequest側の実装です。

以下のようにRequestを配信します。

msg, err := nc.Request("greeting", []byte("Hello, is anyone there?"), 50*time.Millisecond)

if err != nil {
    log.Fatal(err)
}
fmt.Printf("Received a message: %s\n", string(msg.Data))

この時点で実行しても

2018/12/25 16:41:50 nats: timeout
exit status 1

と出て、エラーになります。それはreplyを待っているのに何もこずにタイムアウトするからです。

Subscriber側

f:id:bobchan1915:20181225170247p:plain

次にSubscriberを実装します。

先ほどのPublisherのメッセージは"Hello, is anyone there?"(=だれかいますか?)っていうメッセージだったので、返事をしてみましょう。

今回はここにいるよとreplyをするコードを書きます。

nc.Subscribe("greeting", func(m *Msg) {
    nc.Publish(m.Reply, []byte("Yeah, I am here"))
})

そしてSubscriberを実行して、Publisherを実行すると以下のようになりました。

Received a message: Yeah, I am here

このようにRequestに対してResponceをすることができました。

ここで仮に、普通のSubscriberがいたらどのようになるのでしょうか。

Pub/Subのセクションで使った以下の機能をもつスクリプトを実行します。

nc.Subscribe("greeting", func(m *nats.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
})

図としては以下のような状態になっています。

f:id:bobchan1915:20181225165942p:plain

まず、普通のSubscriberの出力は以下のようになりました。

I will sleep 😴 ...
Received a message: Hello, is anyone there?
I am awake 😎 !!!

そしてReplyをするものは何もPrintしていないので、何も出ていません。

そしてPublisherですが、以下のようになっており、やり取りは正常にできている様子です。

Received a message: Yeah, I am here

Queueメッセージモデル

f:id:bobchan1915:20181225143906p:plain

次はQueueのメッセージモデルを試してみます。

以下のように簡単に実装できます。

nc.QueueSubscribe("greeting", "greetingGroup", func(m *nats.Msg) {
    fmt.Printf("Received a message: %s with queue group: %s\n:", string(m.Data), "group-A")
})

このときにQueue Groupの名前はキャメルケースでなければ動作しないことに気をつけてください。

結果は以下のようになります。

f:id:bobchan1915:20181225143506g:plain

Queue Groupを分けられれば以下のように構築することもできます。

f:id:bobchan1915:20181225144231p:plain

アドバンス編

WildcardでSubjectを指定する方法とSubjectの階層設計

Wildcardを使ってSubjectを指定することができ、.で区切られるトークンごとに使うことができます。

  1. *

これはgreeting.*とすることで

  • greeting
  • greeting.afternoon
  • greeting.morning
  • greeting.midnigth.quiet

など何にでもマッチします。

  1. >

これは>よりあとのトークンがあればマッチします。例えばgreeting.>とあれば

  • greeting

にはマッチしないものの

  • greeting.afternoon
  • greeting.morning
  • greeting.midnigth.quiet

にはマッチをします。

再度注意ですがgree*>ingなどとトークン内では使うことができないのです。

逆にいうとSubjectが設計を満たすように設計をしなければなりません。オブジェクト指向言語で、ひたすらクラスやドメインを抽象化して、共通部分を抜き出しきたエンジニアなら分かりやすいと思います。

(人間)という集合には(日本人)や、(米国人)などの集合があります。には(東京都民)という集合があって、、、と概念ごとに積層化することがわかりやすいSubjectになるのではないかと思います。

Prometheus + Grafanaでモニタリング

PrometheusとGrafanaのセットアップ

PrometheusではPrometheus Serverが監視対象にアクセスして各種メトリクスなどデータを収集してくるアーキテクチャです。

そして、あるアプリケーションがPrometheus Serverからのアクセスポイントとして設定しているものをExporterといいます。

例えばnode_exporterはここではGKEのKubernetesクラスタを構築するNodeのデータを提供しています。

NATS自体のモニタリングもExporterを持って行うことができます。簡単に示すと以下のようになっています。

f:id:bobchan1915:20181225221623p:plain

Exporterがアプリケーションサーバーとどのような関係であるかは、実はSidecarの関係になっています。

f:id:bobchan1915:20181226030211p:plain

NATSのHelmチャートではデフォルトではExporterはデプロイされない設定になっていますが、helm upgradeによってデプロイするようにします。

実際には、以下のようにmetrics.enabledで設定します。

helm upgrade --set metrics.enabled=true nats stable/nats

これによってSidecarであるExporterがデプロイされています。

確認までにPodをみてみましょう。

kubectl get pods

NAME          READY     STATUS    RESTARTS   AGE
nats-nats-0   2/2       Running   0          2m

このように2/2とNATSサーバー本体とNATS Prometheus Exporterの二つがあります。

ローカルからアクセスするには以下のようにポート転送をしてください。

kubectl port-forward --namespace default nats-nats-0 7777:7777

確認までに以下のようなリクエストを送ってください。

curl http://localhost:7777/metrics

# HELP gnatsd_connz_limit limit
# TYPE gnatsd_connz_limit gauge
gnatsd_connz_limit{server_id="http://localhost:8222"} 1024
# HELP gnatsd_connz_num_connections num_connections
# TYPE gnatsd_connz_num_connections gauge
gnatsd_connz_num_connections{server_id="http://localhost:8222"} 0
# HELP gnatsd_connz_offset offset
# TYPE gnatsd_connz_offset gauge
gnatsd_connz_offset{server_id="http://localhost:8222"} 0
# HELP gnatsd_connz_total total
# TYPE gnatsd_connz_total gauge
gnatsd_connz_total{server_id="http://localhost:8222"} 0
# HELP gnatsd_routez_num_routes num_routes
# TYPE gnatsd_routez_num_routes gauge
gnatsd_routez_num_routes{server_id="http://localhost:8222"} 0
# HELP gnatsd_subsz_avg_fanout avg_fanout
# TYPE gnatsd_subsz_avg_fanout gauge
gnatsd_subsz_avg_fanout{server_id="http://localhost:8222"} 0
# HELP gnatsd_subsz_cache_hit_rate cache_hit_rate
# TYPE gnatsd_subsz_cache_hit_rate gauge
gnatsd_subsz_cache_hit_rate{server_id="http://localhost:8222"} 0
# HELP gnatsd_subsz_max_fanout max_fanout
# TYPE gnatsd_subsz_max_fanout gauge
...

とメトリクスのレスポンスが返ってきます。

これからPrometheusが必要となるのでインストールをします。最初にモニタリングのためのnamespaceを作りましょう。

kubectl create namespace monitor

そしてHelmでPrometheusをインストールします。 f:id:bobchan1915:20181225221815p:plain

helm install --name prometheus  --namespace monitor stable/prometheus

次にGrafanaをインストールいます。

helm install --name grafana  --namespace monitor stable/grafana

最初にGrafanaのパスワードを取得します。ユーザー名はAdminです。

kubectl get secret --namespace monitor grafana -o jsonpath="{.data.admin-password}" | base64 --decode ; echo

そしてポート転送をしてログインをします。

export POD_NAME=$(kubectl get pods --namespace monitor -l "app=grafana,release=grafana" -o jsonpath="{.items[0].metadata.name}")
kubectl --namespace monitor port-forward $POD_NAME 3000

Kubernetesクラスタのモニタリング

PrometheusとGrafanaの全体像を掴んでほしいのでまずは簡単なKubernetesのNodeのモニタリングからやりましょう。

全体の流れとしては以下のようになっています。

f:id:bobchan1915:20181225224547p:plain

ますは先ほどGrafanaのポート転送をしたと思いますが、そのURLにブラウザからアクセスしてください。最初に以下のような画面が見えると思います。

f:id:bobchan1915:20181225221431p:plain

Grafanaにログインできると以下のような画面が見えます。

f:id:bobchan1915:20181225221703p:plain

Data sourceをクリックすると以下の画面になるのでPrometheusを選択します。

f:id:bobchan1915:20181225221725p:plain

すると以下の画面になります。

f:id:bobchan1915:20181225221815p:plain

ここではKubernetesの内部DNSに問い合わせることになるので

  • HTTP.URLをhttp://prometheus-server.monitor.svc.cluster.local

にすれば正しいです。Kubernetesにあまり詳しくない人は、内部DNSはよく使う項目なので調べておいても損はありません。

ここまでで、以下の設定ができました。

f:id:bobchan1915:20181225223233p:plain

しかし、このまま使うとGrafanaでメトリクスを可視化するためにコードを書かないといけません。

そのため既存のDashboardを使って構築します。Grafana Labsというサイトで探せて、今回は以下のものを使います。

f:id:bobchan1915:20181225223723p:plain

Dashboardを追加するボタンにImportというものがあるので選択をしてください。

f:id:bobchan1915:20181225223827p:plain

すると以下のように設定ができます。prometheusの箇所に先ほど定義したデータソースを指定します。

f:id:bobchan1915:20181225223918p:plain

これでKubernetesクラスタのモニタリングができるようになりました。

f:id:bobchan1915:20181225224055p:plain

常にクラスタノードのメトリクスをフィードバックしてもらえるのは非常によいですね。

NATS Exporterからメトリクスを取得する

Prometheusはデフォルトではtarget(対象)は、KubernetesのNodeに対してのメトリクスを提供するnode_exporterだけです。

以下の図のようにPrometheusからNATSのメトリクスを取得するURLにアクセスする必要があります。

f:id:bobchan1915:20181225224902p:plain

データを取得してくることをScrapeというのですが、PrometheusではScrape対象を記述するscrapes_configs[]に追加します。

PrometheusのGUIで確認してみます。Targetにありました。

f:id:bobchan1915:20181226033329p:plain

また、今回のNATS Serverを可視化するDashboardにはNATS Server Dashboardを使用します。

f:id:bobchan1915:20181225225844p:plain

先ほどと手順は同様です。最終的には以下のような監視をすることができます。

f:id:bobchan1915:20181226031156p:plain

これまでに作ったスクリプトを使って監視をしてみましょう。

f:id:bobchan1915:20181226032033g:plain

やはりメッセージングなのでPrometheusのScrap時間が長いように感じました。デフォルトではPrometheusがメトリクスを取ってくるインターバルは1分なので変更します。

helm upgrade --set server.global.scrape_interval=15s --namespace monitor --name prometheus stable/prometheus

このように15秒というScrape Intervalを設定できます。

まとめ

NATSというCloud Nativeにメッセージングシステムを構築できることがわかって、非常に面白かったです。特にメッセージングはエンジニアとしてデータベースなどの知識と同様に必要最低限であるので、さらに知識を広くすることができてよかったです。

簡単に構築できて、簡単に使うことができて、、、今までで一番Developing Experience(=開発経験)はよかったです。これからばしばし使って行こうと思います。来年はCloud NativeやKubernetesなどいろんな勉強会に出て、もっといろんな知識を蓄えたいと思います。

最後までありがとうございました!