Digdagとは、
主に、データ基盤でのETL処理のワークフローを定義・実行するための
ワークフローエンジンで。
TresureDataが公開しているオープンソースソフトウェアです。

Digdagのワークフローで、
次のような処理の定義をしたかったのですが、
上手く書く方法が見つけられなかったので、プラグインを作ってみました。

  • データベースに対してクエリを投げる
  • 得られた結果に対して同様の処理を繰り返し実行する

作成したプラグインはこちらです。

takemikami/digdag-plugin-shresult | GitHub
https://github.com/takemikami/digdag-plugin-shresult

# ちなみにTresureDataのサービスを使っている場合は、
# 以下のOperatorを使えば良いようです。
# http://docs.digdag.io/operators/td_for_each.html

# PostgreSQLの場合は、store_last_resultsを使えば良いようです。
# http://docs.digdag.io/operators/pg.html#options
# 以下の記事を書いている @hiroysato さんに教えて頂きました、ありがとうございます。
# ワークフローエンジンDigdagのまとめ
# https://qiita.com/hiroysato/items/d0fe5e2d88c267413a82

できること

digdag-plugin-shresultプラグインは、以下の処理を行います。

  • シェルの実行
  • 実行結果の標準出力の解析
  • 標準出力の解析結果を変数に代入

これによって、
このプラグインの後続のタスクでは、
代入された変数を利用することが出来ます。

例えば、
データベースに対してselect文を投げ、
for_eachで、select文の結果分繰り返し、処理を実行するということが実現できます。

使ってみる

次に、プラグインを使ってワークフローを動かしてみます。

# Diddagのセットアップは以下を参考にしてください
# http://docs.digdag.io/getting_started.html

作業ディレクトリを作って、
以下のsample.dig, resultset.jsonを作成します。

sample.dig

_export:
  plugin:
    repositories:
      - https://jitpack.io
    dependencies:
      - com.github.takemikami:digdag-plugin-shresult:0.0.1

+step1:
  sh_result>: cat resultset.json
  destination_variable: resultset
  stdout_format: json-list-map

+step2:
  for_each>:
    rv: ${resultset}
  _parallel:
    true
  _do:
    echo>: ${rv.id}=${rv.name}

resultset.json

[
  {
    "id": "001",
    "name": "hoge"
  },
  {
    "id": "002",
    "name": "fuga"
  }
]

次のようにsample.digを実行します。

digdag run sample.dig

表示された実行結果を見ると、
以下のように、resultset.jsonの内容でfor_eachが実行されていることが確認出来ます。

$ digdag run sample.dig
2018-06-08 21:16:00 +0900: Digdag v0.9.20
※途中省略※
2018-06-08 21:16:01 +0900 [INFO] (main): Starting a new session project id=1 workflow name=sample session_time=2018-06-08T00:00:00+00:00
2018-06-08 21:16:02 +0900 [INFO] (0017@[0:default]+sample+step1): sh_result>: cat resultset.json
2018-06-08 21:16:06 +0900 [INFO] (0017@[0:default]+sample+step1):
2018-06-08 21:16:06 +0900 [INFO] (0017@[0:default]+sample+step2): for_each>: {rv=[{"name":"hoge","id":"001"},{"name":"fuga","id":"002"}]}
2018-06-08 21:16:07 +0900 [INFO] (0017@[0:default]+sample+step2^sub+for-0=rv=0=%7B%22name%22%3A): echo>: 001=hoge
001=hoge
2018-06-08 21:16:07 +0900 [INFO] (0019@[0:default]+sample+step2^sub+for-0=rv=1=%7B%22name%22%3A): echo>: 002=fuga
002=fuga

このプラグインの機能としては以上ですが、
例えば、シェルで実行する内容を以下のように変更すると、
BigQueryに投げたクエリの結果でfor_eachを回すということも出来ます。

sh_result>: bq query --format=json 'select name from dataset1.table1 limit 10'

ちなみに、sh_resultオペレータの
destination_variableには、代入する変数の名称、
stdout_formatには、シェルの標準出力の形式を指定します。
まだ、フォーマットはjsonしか対応していないですが、csv形式も対応するつもりです。

+step1:
  sh_result>: cat resultset.json
  destination_variable: resultset
  stdout_format: json-list-map

プラグイン作りで参考にしたもの

プラグインの作成には以下を参考にしました。

myui/digdag-plugin-example | GitHub
https://github.com/myui/digdag-plugin-example

treasure-data/digdag - ShOperatorFactory | GitHub
https://github.com/treasure-data/digdag/blob/master/digdag-standards/src/main/java/io/digdag/standards/operator/ShOperatorFactory.java

このプラグインの実装ですが、
標準のshオペレータの後続に、変数代入処理を書き足しているだけです。
もっと綺麗な実装方法も考えたいところですが、
(自分自身が必要だったので)ひとまず作ってみました。