け日記

最近はPythonでいろいろやってます

Jupyter NotebookをAPI経由で操作する

Jupyter Notebookのノートブックファイルを外部から実行する要件がありましたので、API経由で操作する方法について整理します。

Jupyter API

Jupyter Notebook ServerではJupyterの基本的な操作 (ファイルの閲覧・取得、カーネルの起動や実行など) をREST + JSONのAPIで提供してます。

github.com

ところが具体的な仕様、特にカーネルを実行してコードを動かす方法については上のWikiページには詳細な記述は無く、下のStackOverflowのエントリを参考にしながら使い方を調査しました。

stackoverflow.com

ノートブックファイルの実行は4ステップでできます。

  1. ノートブックファイルを取得
  2. カーネルを起動
  3. 起動したカーネルにWebSocketでコードを渡して実行
  4. カーネルを終了

準備

実行したノートブックファイル (ここではtest.ipynb) は以下のとおりです。

  • パスはhttp://localhost:8889/notebooks/test/test.ipynb
  • markdownが1セル、Pythonコードが2セルからなります

f:id:ohke:20190525154006p:plain

必要なパッケージのインポートと変数をセットします。

  • カーネルとの通信ではWebSocketを使いますので、websocket-clientをインストールします
  • リクエストのAuthorizationヘッダに、Jupyter起動時に得られるトークンをセットすることで認証してます
import json
import requests
import uuid
import websocket  # pip install websocket-client

# JupyterサーバのURL
base_url = 'http://localhost:8889'

# 実行するノートブックファイル
file_path = 'test/test.ipynb'

# リクエスト共通のヘッダ
headers = {
    # Jupyter起動時のトークンをセット
    'Authorization': 'token ' + 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
}

疎通確認のために /api へGETリクエストして、Jupyter Notebookサーバのバージョンを取得します。

# バージョン確認
url = base_url + '/api'

response = requests.get(url)
print(response.status_code, response.text)  # 200 {"version": "5.6.0"}

ノートブックファイルの取得

ノートブックファイルは /api/contents/{ファイルパス} にGETリクエストすることで取得できます。

ノートブックファイルのデータがJSON形式で返されます。content.cellsに各セルのデータが入ってます。

# ノートブックファイルの取得
url = base_url + '/api/contents/' + file_path

response = requests.get(url, headers=headers)

notebook = json.loads(response.text)
print(notebook)
# {'content': {'cells': [{'cell_type': 'markdown',
#     'metadata': {},
#     'source': 'Jupyter APIのテスト用のファイルです。'},
#    {'cell_type': 'code',
#     'execution_count': 7,
#     'metadata': {'trusted': True},
#     'outputs': [{'name': 'stdout',
#       'output_type': 'stream',
#       'text': 'datetimeをロード\n'}],
#     'source': "import datetime\nprint('datetimeをロード')"},
#    {'cell_type': 'code',
#     'execution_count': 8,
#     'metadata': {'scrolled': True, 'trusted': True},
#     'outputs': [{'name': 'stdout',
#       'output_type': 'stream',
#       'text': '2019-05-25 13:57:08.410129\n'}],
#     'source': 'print(datetime.datetime.now())'}],
#   'metadata': {'kernelspec': {'display_name': 'Python 3',
#     'language': 'python',
#     'name': 'python3'},
#    'language_info': {'codemirror_mode': {'name': 'ipython', 'version': 3},
#     'file_extension': '.py',
#     'mimetype': 'text/x-python',
#     'name': 'python',
#     'nbconvert_exporter': 'python',
#     'pygments_lexer': 'ipython3',
#     'version': '3.5.5'}},
#   'nbformat': 4,
#   'nbformat_minor': 2},
#  'created': '2019-05-25T04:57:57.130649Z',
#  'format': 'json',
#  'last_modified': '2019-05-25T04:57:57.130649Z',
#  'mimetype': None,
#  'name': 'test.ipynb',
#  'path': 'test/test.ipynb',
#  'size': 1177,
#  'type': 'notebook',
#  'writable': True}

# ノートブックファイルのコードのみを取得
codes = [c['source'] for c in notebook['content']['cells'] if c['cell_type'] == 'code']

カーネルの起動

次に実行環境となるカーネルを /api/kernels にPOSTすることで起動します。(ちなみに同じエンドポイントにGETリクエストすると、実行中のカーネル一覧を取得できます。)

返ってくるカーネルのidは、この後カーネル上でコードを実行するために必要となります。

# カーネルの起動
url = base_url + '/api/kernels'

response = requests.post(url, headers=headers)  # getでカーネルのリストを取得できます
kernel = json.loads(response.text)
print(kernel)
# {'connections': 0,
#  'execution_state': 'starting',
#  'id': 'e686c2f3-f302-448d-915c-0f467b794d4b',
#  'last_activity': '2019-05-25T06:52:58.958138Z',
#  'name': 'python3'}

コードの実行

カーネルを操作するには、先程取得したカーネルのidでWebSocketのコネクションを開き、実行するコードを全て送信してから、実行結果を受信する、という流れで行います。

  • WebSocketで接続する場合、URLのプロトコルは ws:// となり、パスはapi/kernels/{カーネルのid}/channels で行います
    • 接続成功の場合、ステータスコードは101となります
  • 実行結果の受信時に、WebSocketのステータスはstreamとなりますので、そのときの値をouputsに詰めています
# WebSocketで接続
url = 'ws://localhost:8889/api/kernels/' + kernel['id'] + '/channels'

socket = websocket.create_connection(url, header=headers)
print(socket.status)  # 101

# コードを実行
for code in codes:
    header = {
        'msg_type': 'execute_request',
        'msg_id': uuid.uuid1().hex,
        'session': uuid.uuid1().hex
    }
    
    message = json.dumps({
        'header': header,
        'parent_header': header,
        'metadata': {},
        'content': {
            'code': code,
            'silent': False
        }
    })
    
    # 送信
    socket.send(message)

# 結果の保持
outputs = []

for _ in range(len(codes)):
    msg_type, prev_msg_type = '', ''
    
    while msg_type != 'stream':
        if msg_type != prev_msg_type:
            print(prev_msg_type, '->', msg_type)    # WebSocketの状態遷移をトレース
            prev_msg_type = msg_type
        
        response = json.loads(socket.recv())  # メッセージを受信
        msg_type = response['msg_type']
    
    print(prev_msg_type, '->', msg_type)

    outputs.append(response['content']['text'])

# WebSocketをクローズ
socket.close()

print(outputs)
# ['datetimeをロード\n', '2019-05-25 15:56:08.355065\n']

なおWebSocketの状態は以下の順番で遷移します。

 -> status
status -> execute_input
execute_input -> stream
# セル1の出力の取得
 -> execute_reply
execute_reply -> status
status -> execute_input
execute_input -> stream
# セル2の出力の取得

出力無しのセルがある場合

現実的には、全てのセルが出力を持つとは限りません。例えば以下のように、1つ目のcodeセルは出力がないノートブックです。

f:id:ohke:20190525161251p:plain

この場合、実行後もstreamにはならず、execute_inputからexecute_replyへ直接遷移します。コードの実行数とstreamへの遷移数が一致しないので、上のコードではWebSocketをクローズするタイミングがわかりません。

 -> status
status -> execute_input
execute_input -> execute_reply
execute_reply -> status
status -> execute_input
execute_input -> stream
execute_reply -> status

対処方法として、2つくらい考えられます。強制的にレスポンスを返すパラメータなどがあればよいのですが、見当たりませんでした。

  • execute_replyでカウントする
  • カーネルに渡すコードリストの最後に、任意の出力を行うコードを埋め込み、出力が埋め込んだコードと一致した場合にクローズする

2つ目については、例えば↓の感じで実装できます。

# ノートブックファイルのコードのみを取得して、実行
codes = [c['source'] for c in notebook['content']['cells'] if c['cell_type'] == 'code']
codes.append('print("' + kernel['id'] + '", end="")')  # 改行しないようにendを空文字で指定

# 送信
for code in codes:
    # ...(省略)...

# 結果の保持
outputs = []
output = ''

while True:
    response = json.loads(socket.recv())
    msg_type = response['msg_type']

    if msg_type == 'stream':
        output = response['content']['text']
        
        if output == kernel['id']:
            socket.close()  # 最後に追加した出力と一致したらクローズ
        else:
            outputs.append(output)

print(outputs)
# ['2019-05-25 16:53:02.271994\n']

カーネルの終了

カーネルの終了は /api/kernels/{カーネルのid} へDELETEでリクエストすることで終了します。

# カーネルのシャットダウン
url = base_url + '/api/kernels/' + kernel['id']

response = requests.delete(url, headers=headers)
print(response.status_code)  # 204