SQL ServerでJSON文字列の値の取得・設定を行なう

SQL Server 2016からJSON用の関数がいくつか追加されていますので、使い方を備忘録にしておきます。

ここでは以下のようにカラムにJSONの値が入っているテーブルを例にします。

f:id:ohke:20180426194640p:plain

値がJSONフォーマットかどうか

値がJSONフォーマットかどうか確認するには、ISJSON関数を使います。返り値が1ならJSONフォーマットです。

SELECT ISJSON(JSON_COLUMN)
FROM TEST
WHERE ID = 1

f:id:ohke:20180426194727p:plain

JSON文字列から値を取得する

JSONの値を取り出すには、[JSON_VALUE関数]'(https://docs.microsoft.com/ja-jp/sql/t-sql/functions/json-value-transact-sql?view=sql-server-2017)を使います。
この関数は2つの引数を取り、第1引数がJSON文字列、第2引数がどこの値を取り出すかを指定するパス式です。$.ownerとすればownerの値が、$.bag.goods[1]とすればgoodsの1番目の値が取り出せます。

SELECTでもWHEREでも使うこともできます。

SELECT
    ID,
    JSON_VALUE(JSON_COLUMN, '$.owner')
FROM TEST

sizeが5以上のID(つまり1のみ)が抽出されます。

SELECT ID
FROM TEST
WHERE JSON_VALUE(JSON_COLUMN, '$.bag.size') > 5

goodsの1番目の値を取り出すのは以下のように書きます。値が無い場合、NULLとなります。

SELECT
    ID,
    JSON_VALUE(JSON_COLUMN, '$.bag.goods[1]')
FROM TEST

f:id:ohke:20180426201149p:plain

JSON文字列に値をセットする

JSONの値を編集することもでき、この場合はJSON_MODIFY関数を使います。第1引数はJSON文字列、第2引数はパス式、第3引数は変更後の値です。

例えば、ID=1のownerを"suzuki"に変更したい場合は、こんなかんじです。

UPDATE TEST
SET JSON_COLUMN = JSON_MODIFY(JSON_COLUMN, '$.owner', 'suzuki')
WHERE ID = 1

f:id:ohke:20180426200420p:plain

値を追加する場合、パス式の先頭にappendを記述します。ID=2のgoodsに"wallet"を追加してみます。

UPDATE TEST
SET JSON_COLUMN = JSON_MODIFY(JSON_COLUMN, 'append $.bag.goods', 'wallet')
WHERE ID = 2

f:id:ohke:20180426200533p:plain

値を削除する場合は、第3引数にNULLを設定します。

UPDATE TEST
SET JSON_COLUMN = JSON_MODIFY(JSON_COLUMN, '$.bag.size', NULL)
WHERE ID = 2

こういうお便利関数が定義されると、DBのカラムに気兼ねなくJSON文字列を突っ込めますね。

Apache AirFlowをDocker環境で構築して簡単なジョブを作る

仕事でApache AirFlowを使う機会がありましたので、調査がてらに、Dockerで環境を構築し、簡単なジョブを定義します。

AirFlow

AirFlowはジョブのスケジューリング・監視を、コード(主にPython)で定義・制御するためのプラットフォームです。ワークフロー、データフローの実装・運用に使われます。

github.com

類似のソフトウェアには、前回・前々回で紹介したLuigiや、Treasure DatasのDigDagなどがあります。

  • AirFlowではスケジューラも提供されているため、Luigiよりも広い範囲をカバーします
  • ワークフローを、AirFlowではPython、DigDagでは.dagファイルで定義します

Dockerで環境構築

ローカルに環境をインストールすることもできますが、Dockerでサクッと環境構築します。

以下のレポジトリからdocker-compose-CeleryExecutor.ymlをダウンロードしてきます。

GitHub - puckel/docker-airflow: Docker Apache Airflow

そして、docker-composeでビルド・起動まで行います。合計6個のコンテナが稼働していることが確認できます。

$ docker-compose -f docker-compose-CeleryExecutor.yml up -d

$ docker ps
CONTAINER ID        IMAGE                           COMMAND                  CREATED             STATUS                    PORTS                                        NAMES
da8e02c7fc22        puckel/docker-airflow:1.9.0-2   "/entrypoint.sh work…"   26 seconds ago      Up 29 seconds             5555/tcp, 8080/tcp, 8793/tcp                 airflow_worker_1
73ebe8057b5a        puckel/docker-airflow:1.9.0-2   "/entrypoint.sh sche…"   28 seconds ago      Up 30 seconds             5555/tcp, 8080/tcp, 8793/tcp                 airflow_scheduler_1
f66972f4f5d5        puckel/docker-airflow:1.9.0-2   "/entrypoint.sh webs…"   29 seconds ago      Up 31 seconds (healthy)   5555/tcp, 8793/tcp, 0.0.0.0:8080->8080/tcp   airflow_webserver_1
1ac77f898545        puckel/docker-airflow:1.9.0-2   "/entrypoint.sh flow…"   29 seconds ago      Up 31 seconds             8080/tcp, 0.0.0.0:5555->5555/tcp, 8793/tcp   airflow_flower_1
99b6fce9e010        postgres:9.6                    "docker-entrypoint.s…"   30 seconds ago      Up 32 seconds             5432/tcp                                     airflow_postgres_1
7344396dc041        redis:3.2.7                     "docker-entrypoint.s…"   30 seconds ago      Up 32 seconds             6379/tcp                                     airflow_redis_1

ポート8080がAirFlowコンテナにフォワードされていますので、ブラウザからhttp://localhost:8080でアクセスすると、画面が表示されます。まだ何も定義していないので、空です。

f:id:ohke:20180414142554p:plain

DAGの定義と実行

それではDAGを定義していきます。

プロジェクトルートに作成されるdagsディレクトリ配下にPythonファイルを追加することで、DAGを定義します。
AirFlowでは、デフォルトで$AIRFLOW_HOME/dags以下にあるPythonファイルをDAGとして認識・実行するようになっています。設定ファイル$AIRFLOW_HOME/airflow.cfgで変更可能です。

最初のDAGとしてhello_airflow.pyを作成します。
起動したairflow_webserver_1コンテナはこのdagsディレクトリを$AIRFLOW_HOME/dagsにマウントされています。そのため、dagsディレクトリにファイルを追加すると、DAGとして認識されます。

.
├── dags
│   └── hello_airflow.py
└── docker-compose-CeleryExecutor.yml

hello_airflow.pyは以下のように定義します。基本的な流れは、DAGを作る、DAGと紐づくタスクを作る、タスク間に依存関係を定義するの3段階です。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


# DAGを作る
default_args = {
    'owner': 'ohke',
    'depends_on_past': False,
    'start_date': datetime(2018, 4, 21, 10, 0, 0),
    'schedule_interval': timedelta(days=1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('first_dag', default_args=default_args)

# DAGと紐づくタスクを作る
t1 = BashOperator(
    task_id='t1',
    bash_command='echo t1',
    dag=dag)

t2 = BashOperator(
    task_id='t2',
    bash_command='exit 1',
    retries=3,
    dag=dag)

t3 = BashOperator(
    task_id='t3',
    bash_command='echo "{{ params.greeting }}"',
    params={'greeting': 'Hello, AirFlow!'},
    dag=dag)

t4 = BashOperator(
    task_id='t4',
    bash_command='echo t4', 
    dag=dag
)

# タスク間に依存関係を定義する
t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream([t2, t3])

DAGを作る

DAGクラスをインスタンス化することでDAGを作ります。

  • 最初の引数で、DAGのIDを設定
  • default_argsで、各種パラメータを設定
    • start_dateでDAGの実行開始日時、schedule_intervalで実行間隔(ここでは1日おき)
    • retriesでリトライ回数、retry_delayでリトライ間隔(ここでは5分おき)

[4/29 追記]
ややこしいのですが、最初のタスクの実行日時は、start_dateではなく、start_date+schedule_intervalとなります。
例えば、start_dateを4/22 0時、schedule_intervalをdaily(timedelta(days=1))に設定すると、最初にジョブが実行されるのは4/23 0時です 。
イメージとしては、start_dateで指定した日時からデータが蓄積され始め、schedule_intervalが経過したらdailyであれば1日分のデータを処理を行う、という感じです。

default_args = {
    'owner': 'ohke',
    'depends_on_past': False,
    'start_date': datetime(2018, 4, 22, 0, 0, 0),
    'schedule_interval': timedelta(days=1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('first_dag', default_args=default_args)

DAGと紐づくタスクを作る

ここではt1〜t4の4つのタスクを定義し、作成したdagと紐付けています。

AirFlowでは、典型的ないくつかのタスクのためにOperatorが定義されています。
今回は、シェルスクリプトを実行するBashOperatorを使っていますが、これ以外にもPythonOperatorPostgresOperatorなどが提供されています。

t1 = BashOperator(
    task_id='t1',
    bash_command='echo t1',
    dag=dag)

t2 = BashOperator(
    task_id='t2',
    bash_command='exit 1',
    retries=3,
    dag=dag)

t3 = BashOperator(
    task_id='t3',
    bash_command='echo "{{ params.greeting }}"',
    params={'greeting': 'Hello, AirFlow!'},
    dag=dag)

t4 = BashOperator(
    task_id='t4',
    bash_command='echo t4', 
    dag=dag
)

タスク間に依存関係を定義する

Operatorクラスにはタスク間の依存関係を設定するメソッドが定義されています。

set_upstream()で待つタスクを設定できます。以下の例では、t1の後にt2とt3、t2・t3の後にt4が実行されます。
逆に、自身のタスク完了後に別のタスクを実行する関係はset_downstream()で定義できます。

AIrFlow 1.8以降では、メソッド名の代わりにビット演算子を使うこともでき、例えばt1 >> t2t2 << t1(いずれもt1→t2の順で実行)と記述できます。

t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream([t2, t3])

DAGの実行

画面に戻ってみますと、新たに追加したfirst_dagが追加されていることが確認できます。

f:id:ohke:20180421093016p:plain

DAG名をクリックして依存関係のツリーも表示できます。t1の後にt2とt3が、t2とt3の後にt4がそれぞれ実行されることが可視化されています。

f:id:ohke:20180421093654p:plain

DAGを実行する場合、ToggleをOnにします。これで時間が来たら実行されます。LinksからTrigger Dagをクリックすると、DAGが即時実行されます。
DAG RunsにそのDAGの成功・失敗の回数、Recent Tasksには直近実行されたDAGの各タスクのステータスが表示されます。下図では、DAGが1回成功し、DAG内の4つのタスク(t1〜t4)も全て正常完了していることがわかります。

f:id:ohke:20180421100759p:plain

タスクに失敗した場合の動きも確認します。
タスクt2が失敗するように変更(exitコード0以外で返す)し、Trigger Dagで即時実行させます。

t2 = BashOperator(
    task_id='t2',
    bash_command='exit 1',
    retries=3,
    dag=dag)

すると、DAGは失敗(DAG Runsの右端の赤丸数字がFailedを表す)し、タスクは2つ成功(t1とt3)、1つ失敗(t2)、1つは依存するタスク失敗による未実行(t4)というステータスになっています。ツリーでさらにタスクごとの成功・失敗を確認することもできます。

まとめ

DockerでAirFlowの環境を構築し、簡単なジョブを作って実行させてみました。

今回は紹介できませんでしたが、Pythonコードを実行できるPythonOperatorや、特定の条件を満たすまで次のタスクの実行を待たせるための各種Sensorなどを組み合わせると、さらに柔軟かつ簡単に、複雑なフローを定義することもできるようになります。またの機会で触れていきます。

Python: Luigiの実行結果をCentral Schedulerで見やすくする

前回の投稿(Python: Luigiでデータパイプラインを作る - け日記)では、Pythonのデータパイプラインフレームワーク Luigi を使って依存関係のあるタスクを定義しました。

前回は1つ1つコマンドライン上で実行結果を見ていましたが、今回はCentral Schedulerを使って実行結果を見やすくします。

github.com

Central Scheduler

Central SchedulerはLuigi付属のサービスで、タスクの実行結果をWebブラウザから見ることができるようになります。

まず、luigidコマンドでサービスデーモンを起動します(Luigiがインストールされていることが前提です。pip install luigiでインストールできます)。

$ luigid --background --pidfile ./tmp/pidfile --logdir ./log --state-path ./tmp/state

--state-pathオプションで、luigidの最新状態を保存するファイルを指定します。
luigidを終了すると、--state-pathで指定したファイルに、その状態をpickleでシリアライズしてファイルに残しています。次回の起動時には、--state-pathのファイルをデシリアライズして読み込むことで、最新状態を復元しています。

luigidコマンドによりサービスが立ち上がり、localhost:8082でアクセスできるようになります(--portオプションで任意のポートを起動時に指定できます)。
ブラウザからhttp://localhost:8082にアクセスすると、以下のような管理画面が現れます。初期状態のため、実行結果はまだ表示されていません。

f:id:ohke:20180414103706p:plain

それでは、サービスを立ち上げた状態で、前回の投稿で作成したtasksを実行します。

main.pyの実装を示します。前回とほぼ同じですが、唯一異なるのがluigi.run()の引数から、--local-schedulerオプションを除いている点です。これにより、localhost:8082にアクセスしてタスクが実行されるようになります。

import luigi
import time


class Task1(luigi.Task):
    task_namespace = 'tasks'

    def run(self):
        print("Task1: run")
        with self.output().open("w") as target:
            target.write(f"This file was generated by Task1 at {time.asctime()}.")

    def output(self):
        print("Task1: output")
        return luigi.LocalTarget("intermediate/task1.txt")


class Task2(luigi.Task):
    task_namespace = 'tasks'

    def requires(self):
        print("Task2: requires")
        return Task1()

    def run(self):
        print("Task2: run")
        with self.input().open("r") as intermediate, self.output().open("w") as target:
            task1_text = intermediate.read()

            target.write(f"{task1_text}\n")
            target.write(f"This file was generated by Task2 at {time.asctime()}.")

    def output(self):
        print("Task2: output")
        return luigi.LocalTarget("output/task2.txt")


if __name__ == '__main__':
    #luigi.run(['tasks.Task2', '--workers', '1', '--local-scheduler'])
    luigi.run(['tasks.Task2', '--workers', '1'])

作成したmain.pyを$ python main.pyで実行した後、再度http://localhost:8202を見ると、tasks.Task1とtasks.Task2の実行結果が表示されていることがわかります。

f:id:ohke:20180414105731p:plain

Actionsのアイコンをクリックすると、タスクの依存関係もグラフ化して表示できます。

f:id:ohke:20180414110234p:plain

実行履歴の永続化

デフォルトでは、最新状態のみを持ち、ジョブの実行履歴は保持されません。

実行履歴をDBで永続化させるためには、設定ファイルluigi.cfgを以下のように編集する必要があります。
db_connectionで永続化に使うDBの接続文字列を設定します。ここでは、sqlite3のファイルを指定しています。DBを作っただけで、テーブルの作成等は行っていません。

[scheduler]
record_task_history = True

[task_history]
db_connection = sqlite:///./db/history.sqlite

luigidを再起動してmain.pyを実行した後、http://localhost:8082/historyにアクセスすると、タスクの実行履歴が表示されます。

f:id:ohke:20180414111125p:plain

各タスクをクリックすると、いつ実行開始され、いつ完了したのかなどの詳細な情報も閲覧できます。

作成したDBにアクセスすると、3つのテーブルが作成され、実行履歴が永続化されていることがわかります。

$ sqlite3 ./db/history.sqlite

sqlite> .table
task_events      task_parameters  tasks

sqlite> select * from tasks ;
1|tasks.Task2__99914b932b|tasks.Task2|XXXX
2|tasks.Task1__99914b932b|tasks.Task1|XXXX

sqlite> select * from task_events ;
1|1|PENDING|2018-04-14 10:10:11.890329
2|2|PENDING|2018-04-14 10:10:11.905884
3|2|RUNNING|2018-04-14 10:10:11.924232
4|2|DONE|2018-04-14 10:10:11.946267
5|1|RUNNING|2018-04-14 10:10:11.960480
6|1|DONE|2018-04-14 10:10:11.979264

まとめ

Luigiのタスクの実行結果をCentral Schedulerで見やすくしました。
Central Schedulerからタスクを起動するなどはできませんが、Luigiとセットで導入すれば、日々の実行時間の確認や問題発生時の原因調査などの役に立つかと思います。