処理結果でfor_eachするために、シェルの出力を変数に代入するDigdagプラグイン作ってみた
Digdagとは、
主に、データ基盤でのETL処理のワークフローを定義・実行するための
ワークフローエンジンで。
TresureDataが公開しているオープンソースソフトウェアです。
Digdagのワークフローで、
次のような処理の定義をしたかったのですが、
上手く書く方法が見つけられなかったので、プラグインを作ってみました。
- データベースに対してクエリを投げる
- 得られた結果に対して同様の処理を繰り返し実行する
作成したプラグインはこちらです。
takemikami/digdag-plugin-shresult | GitHub
https://github.com/takemikami/digdag-plugin-shresult
# ちなみにTresureDataのサービスを使っている場合は、
# 以下のOperatorを使えば良いようです。
# http://docs.digdag.io/operators/td_for_each.html
# PostgreSQLの場合は、store_last_resultsを使えば良いようです。
# http://docs.digdag.io/operators/pg.html#options
# 以下の記事を書いている @hiroysato さんに教えて頂きました、ありがとうございます。
# ワークフローエンジンDigdagのまとめ
# https://qiita.com/hiroysato/items/d0fe5e2d88c267413a82
できること
digdag-plugin-shresultプラグインは、以下の処理を行います。
- シェルの実行
- 実行結果の標準出力の解析
- 標準出力の解析結果を変数に代入
これによって、
このプラグインの後続のタスクでは、
代入された変数を利用することが出来ます。
例えば、
データベースに対してselect文を投げ、
for_eachで、select文の結果分繰り返し、処理を実行するということが実現できます。
使ってみる
次に、プラグインを使ってワークフローを動かしてみます。
# Diddagのセットアップは以下を参考にしてください
# http://docs.digdag.io/getting_started.html
作業ディレクトリを作って、
以下のsample.dig
, resultset.json
を作成します。
sample.dig
_export:
plugin:
repositories:
- https://jitpack.io
dependencies:
- com.github.takemikami:digdag-plugin-shresult:0.0.1
+step1:
sh_result>: cat resultset.json
destination_variable: resultset
stdout_format: json-list-map
+step2:
for_each>:
rv: ${resultset}
_parallel:
true
_do:
echo>: ${rv.id}=${rv.name}
resultset.json
[
{
"id": "001",
"name": "hoge"
},
{
"id": "002",
"name": "fuga"
}
]
次のようにsample.dig
を実行します。
digdag run sample.dig
表示された実行結果を見ると、
以下のように、resultset.json
の内容でfor_eachが実行されていることが確認出来ます。
$ digdag run sample.dig
2018-06-08 21:16:00 +0900: Digdag v0.9.20
※途中省略※
2018-06-08 21:16:01 +0900 [INFO] (main): Starting a new session project id=1 workflow name=sample session_time=2018-06-08T00:00:00+00:00
2018-06-08 21:16:02 +0900 [INFO] (0017@[0:default]+sample+step1): sh_result>: cat resultset.json
2018-06-08 21:16:06 +0900 [INFO] (0017@[0:default]+sample+step1):
2018-06-08 21:16:06 +0900 [INFO] (0017@[0:default]+sample+step2): for_each>: {rv=[{"name":"hoge","id":"001"},{"name":"fuga","id":"002"}]}
2018-06-08 21:16:07 +0900 [INFO] (0017@[0:default]+sample+step2^sub+for-0=rv=0=%7B%22name%22%3A): echo>: 001=hoge
001=hoge
2018-06-08 21:16:07 +0900 [INFO] (0019@[0:default]+sample+step2^sub+for-0=rv=1=%7B%22name%22%3A): echo>: 002=fuga
002=fuga
このプラグインの機能としては以上ですが、
例えば、シェルで実行する内容を以下のように変更すると、
BigQueryに投げたクエリの結果でfor_eachを回すということも出来ます。
sh_result>: bq query --format=json 'select name from dataset1.table1 limit 10'
ちなみに、sh_resultオペレータの
destination_variable
には、代入する変数の名称、
stdout_format
には、シェルの標準出力の形式を指定します。
まだ、フォーマットはjsonしか対応していないですが、csv形式も対応するつもりです。
+step1:
sh_result>: cat resultset.json
destination_variable: resultset
stdout_format: json-list-map
プラグイン作りで参考にしたもの
プラグインの作成には以下を参考にしました。
myui/digdag-plugin-example | GitHub
https://github.com/myui/digdag-plugin-example
treasure-data/digdag - ShOperatorFactory | GitHub
https://github.com/treasure-data/digdag/blob/master/digdag-standards/src/main/java/io/digdag/standards/operator/ShOperatorFactory.java
このプラグインの実装ですが、
標準のshオペレータの後続に、変数代入処理を書き足しているだけです。
もっと綺麗な実装方法も考えたいところですが、
(自分自身が必要だったので)ひとまず作ってみました。