Kekeの日記

エンジニア、読書なんでも

EmbulkのPluginをRubyで作成する"いろは"

本記事

本記事は、EmbulkのInputとOutputに対応するPluginをRubyで作成する手順を解説します。

f:id:bobchan1915:20190312162414p:plain

対象としてはDialogflowです。私はGoogle HomeとLine Botを使って家計簿を管理しているのですが、訳あってDialogflowのプロジェクトを移行しないといけなくなりました。

しかしながら今回の記事では体系的にPluginを作る方法を解説するため、Dialogflowに限らずあらゆるPluginで使える知識を習得できるのではないかと思います。

Dialogflowを使ったアプリケーションはいくつも開発していますが、一つのサンプルは以前に記事にしましたので興味がある人はご覧ください。

www.1915keke.com

それでは、やっていきましょう。

アジェンダ

本記事のアジェンダは以下の通りです。

設計

0. ユースケースを考える

f:id:bobchan1915:20190312162738p:plain

デザイン思考的な考えにはなりますが、想定するユーザーの痛みを解決するものでしょうか。

実際、他の解決手段があるのかを考えなければなりません。

自分のDialogflowは他のアイデアによって解決できるでしょうか。

現段階では手作業しかありません。Pluginによって、同じような問題に直面している人のためになれるかもしれません。

自分のユースケースとしてはDialogflowのプロジェクト移行にあります。

またProject移行はInputとOutputがセットになっています。

Outputを他のファイル出力にすることによって、バックアップを取ることも可能になり、さらなるアプリケーションに期待できます。

1. 何が事前に必要なのかを考える

まず、Dialogflow APIを使うのでAccess Tokenが必要になります。また、InputとOutputで別Projectならば、二つのAccess Tokenが必要です。

また、Project Name(Agent Name)を事前に指定し、Projectを作らなけばAccess Tokenを取得できないので作成する必要があります。

まとめると

  • Access Token(1 or more)
  • Project ID(GCP)
  • Agent Name(Dialogflow)

が必要になります。自分が作りたいものに対して、何が必要なのかを考える必要があります。

事前に

  • Projectを作成すること
  • Dialogflow APIを有効化すること

が必要となってきます。

f:id:bobchan1915:20190312203621p:plain

2. Configure方法を考える

この項目を設けましたが、つまるところ、YAML設計というものです。

実際はドメイン構造になっているので、慣れしたんだオブジェクト指向を考えれば良いでしょう。

今回は

  • access_token
  • project_id
  • agent_name

しか必要ないのであまり考える必要がありません。GCPを使用するプラグインは、いくつもあるので、以下のようなPluginが参考になります。

github.com

github.com

github.com

例えば、Google Cloud PlatformのGoogle Cloud StorageのInput Pluginは以下のようなConfigをする必要があります。

in:
  type: gcs
  bucket: my-gcs-bucket
  path_prefix: sample_
  auth_method: private_key #default
  service_account_email: ABCXYZ123ABCXYZ123.gserviceaccount.com
  p12_keyfile: /path/to/p12_keyfile.p12
  application_name: Anything you like
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    header_line: true
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out: {type: stdout}

特にParserがハッシュになっていることがわかります。必要に応じて、設計をする必要があります。

3. Schemaを設計する

f:id:bobchan1915:20190312163036p:plain

SchemaとはEmbulkのプロトコルのようなものです。やりたいことを理解することが何よりも大事です。

プロトコルとしてあることによって、簡単にいうとInputからOutputへ入力から出力をすることができます。

安全に、そして簡単に受け渡すにはリレーショナルデータベースのようにスキーマを設計する必要性があります。

私の場合はDialogflow APIを使用するので、まずその理解が必要です。Dialogflow API仕様は以下のサイトにあります。

cloud.google.com

3.1 APIをcURLなどで探ってみる

この節はDialogflow APIに対してのメソッドなのでAPIを使わないアプリケーションの場合は、飛ばしてもらって構いません。

API使用を公式ドキュメントで知るだけでもいいですが、手を動かして探ってみます。

紐づくProject IDなどが必要なので、以下のように確認します。

f:id:bobchan1915:20190312153840p:plain

DialogflowはActions on Google Projectは以下のようにGCP ProjectがParent Projectとして紐づいています。構造としては以下のようになっています。

f:id:bobchan1915:20190312163616p:plain

Cloud SQLなど他のGCPサービスにアクセスするように、以下のようにサービスアカウントを作成して、JSONの秘密鍵を取得します。

f:id:bobchan1915:20190312155259p:plain

環境変数GOOGLE_APPLICATION_CREDENTIALSをそのファイルのパスにします。例えば、自分の場合は

set -x GOOGLE_APPLICATION_CREDENTIALS './old_private_key.json'

などとしました。次に、以下のようなcURLを実行できるようになります。

    curl -X GET -H "Authorization: Bearer "$(gcloud auth application-default print-access-token) \
    -H "Content-Type: application/json; charset=utf-8" \
    "https://dialogflow.googleapis.com/v2/projects/[YOUR_PROJECT_ID]/agent/intents" 

レスポンスは以下のようになります。

{
  "parent": "projects/[YOUR_PROJECT_ID]",
  "displayName": "[AGENT_NAME]",
  "defaultLanguageCode": "ja",
  "timeZone": "Asia/Tokyo",
  "enableLogging": true,
  "matchMode": "MATCH_MODE_HYBRID",
  "classificationThreshold": 0.3
}

これによってAPIを叩けるということが確認できました。

ExportとImportを確認する

dialogflow.com

本記事では、上記のAPIを使ってプロジェクトを移行していきます。

Plugin作成手順

これよりEmbulk Pluginの作成手順を解説します。

Input

1. Projectの作成

まず、最初にProjectを作成しましょう。embulk newコマンドで作成をすることができます。

$ embulk new <type> <name>

ここの<type>どの言語で、何をするかを定義します。

選択肢としては以下のものがあります。

Type
java-input mysql
java-output mysql
java-filter add-hostname
java-file-input ftp
java-file-output ftp
java-parser csv
java-formatter csv
java-decoder gzip
java-encoder gzip
ruby-input mysql
ruby-output mysql
ruby-filter add-hostname
ruby-parser csv
ruby-formatter csv

こちらは、Output作成するときも使用するので、再度そのときはご参照ください。

今回はRubyでInputを作成するので、以下のようにしましょう。

$ embulk new ruby-input dialogflow

このように作成したPluginの名前は

embulk-<typeの`言語-`の接頭語を取ったもの>-<name>

になります。つまり、今回の場合はembulk-input-dialogflowとなります。

私はFish shellを使っているので以下のコマンドです。

$ java -jar (which embulk) new ruby-input dialogflow

なぜ、このようなことをしないといけないのかはDigdagと一緒の理由なので、知りたければ以下のリンクをご覧ください。

github.com

先ほどのコマンドの結果は以下の通りです。

2019-03-06 16:15:13.644 +0900: Embulk v0.9.15
Creating embulk-input-dialogflow/

Plugin template is successfully generated.
Next steps:

  $ cd embulk-input-dialogflow
  $ bundle install                      # install one using rbenv & rbenv-build
  $ bundle exec rake                    # build gem to be released
  $ bundle exec embulk run config.yml   # you can run plugin using this command

なので、手順に従います。

$ cd  embulk-input-dialogflow
$ java --jar (which embulk) bundle install // Fishでなければembulk bundle install 

ここで問題があります。embulk-input-dialogflow.gemspecに書かれている依存関係でembulkに対して以下のように記述されています。

spec.add_development_dependency 'embulk', ['>= 0.9.15']

このままjava --jar (which embulk) bundle installすると以下のように

Fetching gem metadata from https://rubygems.org/..............
Fetching gem metadata from https://rubygems.org/.
Could not find gem 'embulk (>= 0.9.15)' in any of the gem sources listed in your Gemfile.

と「Gemが存在しないよ」と出ます。実際にembulkのGemについて確認します。

rubygems.org

するとどう考えても0.9以上のバージョンは存在しません。

f:id:bobchan1915:20190306165930p:plain

なのでここは

spec.add_development_dependency 'embulk', ['>= 0.8.15']

にして、再度、bundle installすると成功します。

そして、以下のコマンドを実行してBuildします。ここのステップはGemを公開するときだけでいいのですが、やっておきます。

$ bundle exec rake

このようにして、ひとまず準備はできました。

2. 実装編(Input)

2.1 テストをできる環境にする

開発中はGemを公開しないと思います。しかし、テストをしながら、開発を進めたいものです。

そのようなときに、embulk runで実行する際に-I path/to/your/plugin/libに指定することによってテストをすることができます。

config.ymlを作成したのちに、

$ java --jar (which embulk) preview -I lib/ config.yml 

でテストをします。最初は

in:
    type: dialogflow
    option1: example1
    option2: example2
out:
  type: stdout

のようなファイルを作って、試してみます。ここでout.typestdoutは標準出力です。

ついでにconfig.ymlをサンプルとして公開しない場合は.gitignoreconfig.yml`を追加しておきましょう。

$ echo "config.yml" > .gitignore

公開すると、手助けになるケースが多いのでconfig.sample.ymlなどと名前をつけてGit管理した方がいいのではないかと個人的には思います。

2.2 Configureを実装する

どのようなもの(API Keyなど)が事前に必要かをすでに考えているかと思います。

まず、実装をしてみます。以下の箇所が設定に関する箇所です。

def self.transaction(config, &control)
        # configuration code:
        task = {
          "option1" => config.param("option1", :integer),                     # integer, required
          "option2" => config.param("option2", :string, default: "myvalue"),  # string, optional
          "option3" => config.param("option3", :string, default: nil),        # string, optional
        }
      ...
end

まず、設計した通りに指定します。また、デフォルト値を設定しなければ、値が設定されないとエラーになります。

  • 名前
  • デフォルト値

今回は以下のようにしました。

task = {
     "access_token" => config.param("access_token", :string),    # string, required
     "project_id" => config.param("project_id", :string),                # string, required
     "agent_name" => config.param("agent_name", :string)         # string, required
}

また、これをREADME.mdに記述します。

例えば、以下のようにします。

## Configuration

### Required

| name | type | description |
|----:|----:|:----:|
| **access_token** | string | Developers access token for Dialogflow REST API. |
| **project_id** | string |  name. |

今回ではInputPluginをオーバーライドしていますが、特にすることがなければ何もする必要はありません。

def init
  # initialization code:
  @access_token = task["access_token"]
  @project_name = task["project_name"]
end

すると、インスタンス変数に代入すると便利に値を扱えます。ここのtask["hoge"]は、以下のように、先ほどのtask定義で定義したものを参照しているのです。

def self.transaction(config, &control)
        # configuration code:
        task = {
          "access_token" => config.param("access_token", :string),    # string, required
          "project_name" => config.param("project_name", :string),    # string, required
        }
        
        ...
end

そしてaccess_tokenproject_nameがどちらもconfig.ymlで読まれることを確認します。実際はテスト駆動開発で実践していくべきですが、今回はやりません。

in:
     type: dialogflow
     access_token: hoge
     project_id: fuga
     agent_name: piyo
out:
     type: stdout

そして、ドライランpreviewをしてみます。

$ java --jar (which embulk) preview -I lib/ config.yml

エラーが出ないことを確認します。そして、次に何をしないといけない(=Pluginの挙動)を決めます。

2.3 Schemaを定義する

初期コードでは、以下のような箇所でスキーマを定義をしています。

columns = [
  Column.new(0, "example", :string),
  Column.new(1, "column", :long),
  Column.new(2, "value", :double),
]

ここを設計通りに実装していきます。前提知識として、Embulkのスキーマで使える型は、以下の表の通りです。

Embulk 説明 Ruby
boolean 真偽値 Boolean
long 整数型 Integer
timestamp 時刻 Time
double 浮動小数点 Float
string 文字列 String
2.4 Runの処理を実装していく

HTTP Clientが必要なので、今回はFaradayを使って行こうと思います。

github.com

選定理由としては、4年ほど前にインターンシップで使用していたのと、実装が簡単そうだからです。.gemspecに以下の一行を追加してください。

spec.add_dependency 'faraday', ['>= 0.14.0']

リファクタリングなどはあとから全体を見通したあとにやればいいと思うので、今はdef runの中に押し込めてしまいます。簡略化のために以下のように実装しました。

def run
        conn = Faraday.new("https://dialogflow.googleapis.com/v2/projects/#{@project_id}/agent:export")

        response = conn.post do |req|
          req.headers["Content-Type"] = "application/json"
          req.headers["Authorization"] = "Bearer #{@access_token}"
        end

        body = JSON.parse(response.body)

        page_builder.add([body["name"], body["done"], body["response"]["agentContent"]])
        page_builder.finish

        task_report = {}
        return task_report
      end

実際にドライランをしてみると、結果がわかります。

$ java -jar (which embulk) preview -I lib/ config.yml

3. 実装編(Output)

定義したスキーマを通してInputからOutputへ受け渡すことができます。

Inputは先ほどの2章で設定をできたので、今回はOutputを作成しましょう。今回は公開していないInputとともにOutputを開発していきます。

まず、新しいProjectを作成してきます。

java --jar (which embulk) new ruby-output dialogflow

あとは、Inputを作成したときと同様なので省略します。

3.1 Inputから値を取得する

InputがDialogflowから取得してきたレコードを受け取ります。recordが取得をしています。

確認してみたければ、以下のようにしてください。

def add(page)
  # output code:
  page.each do |record|
    hash = Hash[schema.names.zip(record)]
    puts hash
  end

pageによって取得をすることができます。

実行するには、InputもOutputもどちらも公開はしていないローカルプラグインなので、以下のようにします。

java -jar (which embulk) preview -I lib -I ../embulk-input-dialogflow/lib config.yml

ここでErrorなく出力をできれば開発できる環境にあります。

3.2 Import APIを通してAPIへアクセスする

こちらはAPIで取得するだけなのでInput Pluginと全く同じです。なので省略させれいただきます。

事前にDialogflowでAgentを作成します。

f:id:bobchan1915:20190313132059p:plain

そしてOutput Pluginを実行します。

java -jar (which embulk) preview -I lib -I ../embulk-input-dialogflow/lib config.yml

Dialogflowで対応しなければならない相当するcurlは以下のものです。

curl \
  'https://dialogflow.googleapis.com/v2/projects/<project_name>/agent:import\
   -X POST \
   -H 'Authorization: Bearer '$(gcloud auth application-default
   print-access-token) \
   -H 'Accept: application/json' \
   -H 'Content-Type: application/json' \
   --compressed \
   --data-binary "{
      'agentContent': '$(cat <agent zip file> | base64 -w 0)'
   }"

これをRubyを使って実装すればいい話です。今回は本質的ではないので解説しませんが、いつものクライアントであることを意識すれば何も問題ではないかなと思います。

まとめ

Embulk上でDialogflowのあるProjectのAgentを別ProjectにAgentに移行する機会はあまりないので、対して価値のないPluginになりました。

しかし、このPlugin開発を通してEmbulkとEmbulk Pluginをより詳しく知ることができたので、今回は非常によい練習課題になったのではないかなと思っています。

ありがとうございました。

参考文献

embulk plugin developer guide (WIP) · GitHub