Jupyter NotebookをAPI経由で操作する

Jupyter Notebookのノートブックファイルを外部から実行する要件がありましたので、API経由で操作する方法について整理します。

Jupyter API

Jupyter Notebook ServerではJupyterの基本的な操作 (ファイルの閲覧・取得、カーネルの起動や実行など) をREST + JSONのAPIで提供してます。

github.com

ところが具体的な仕様、特にカーネルを実行してコードを動かす方法については上のWikiページには詳細な記述は無く、下のStackOverflowのエントリを参考にしながら使い方を調査しました。

stackoverflow.com

ノートブックファイルの実行は4ステップでできます。

  1. ノートブックファイルを取得
  2. カーネルを起動
  3. 起動したカーネルにWebSocketでコードを渡して実行
  4. カーネルを終了

準備

実行したノートブックファイル (ここではtest.ipynb) は以下のとおりです。

  • パスはhttp://localhost:8889/notebooks/test/test.ipynb
  • markdownが1セル、Pythonコードが2セルからなります

f:id:ohke:20190525154006p:plain

必要なパッケージのインポートと変数をセットします。

  • カーネルとの通信ではWebSocketを使いますので、websocket-clientをインストールします
  • リクエストのAuthorizationヘッダに、Jupyter起動時に得られるトークンをセットすることで認証してます
import json
import requests
import uuid
import websocket  # pip install websocket-client

# JupyterサーバのURL
base_url = 'http://localhost:8889'

# 実行するノートブックファイル
file_path = 'test/test.ipynb'

# リクエスト共通のヘッダ
headers = {
    # Jupyter起動時のトークンをセット
    'Authorization': 'token ' + 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
}

疎通確認のために /api へGETリクエストして、Jupyter Notebookサーバのバージョンを取得します。

# バージョン確認
url = base_url + '/api'

response = requests.get(url)
print(response.status_code, response.text)  # 200 {"version": "5.6.0"}

ノートブックファイルの取得

ノートブックファイルは /api/contents/{ファイルパス} にGETリクエストすることで取得できます。

ノートブックファイルのデータがJSON形式で返されます。content.cellsに各セルのデータが入ってます。

# ノートブックファイルの取得
url = base_url + '/api/contents/' + file_path

response = requests.get(url, headers=headers)

notebook = json.loads(response.text)
print(notebook)
# {'content': {'cells': [{'cell_type': 'markdown',
#     'metadata': {},
#     'source': 'Jupyter APIのテスト用のファイルです。'},
#    {'cell_type': 'code',
#     'execution_count': 7,
#     'metadata': {'trusted': True},
#     'outputs': [{'name': 'stdout',
#       'output_type': 'stream',
#       'text': 'datetimeをロード\n'}],
#     'source': "import datetime\nprint('datetimeをロード')"},
#    {'cell_type': 'code',
#     'execution_count': 8,
#     'metadata': {'scrolled': True, 'trusted': True},
#     'outputs': [{'name': 'stdout',
#       'output_type': 'stream',
#       'text': '2019-05-25 13:57:08.410129\n'}],
#     'source': 'print(datetime.datetime.now())'}],
#   'metadata': {'kernelspec': {'display_name': 'Python 3',
#     'language': 'python',
#     'name': 'python3'},
#    'language_info': {'codemirror_mode': {'name': 'ipython', 'version': 3},
#     'file_extension': '.py',
#     'mimetype': 'text/x-python',
#     'name': 'python',
#     'nbconvert_exporter': 'python',
#     'pygments_lexer': 'ipython3',
#     'version': '3.5.5'}},
#   'nbformat': 4,
#   'nbformat_minor': 2},
#  'created': '2019-05-25T04:57:57.130649Z',
#  'format': 'json',
#  'last_modified': '2019-05-25T04:57:57.130649Z',
#  'mimetype': None,
#  'name': 'test.ipynb',
#  'path': 'test/test.ipynb',
#  'size': 1177,
#  'type': 'notebook',
#  'writable': True}

# ノートブックファイルのコードのみを取得
codes = [c['source'] for c in notebook['content']['cells'] if c['cell_type'] == 'code']

カーネルの起動

次に実行環境となるカーネルを /api/kernels にPOSTすることで起動します。(ちなみに同じエンドポイントにGETリクエストすると、実行中のカーネル一覧を取得できます。)

返ってくるカーネルのidは、この後カーネル上でコードを実行するために必要となります。

# カーネルの起動
url = base_url + '/api/kernels'

response = requests.post(url, headers=headers)  # getでカーネルのリストを取得できます
kernel = json.loads(response.text)
print(kernel)
# {'connections': 0,
#  'execution_state': 'starting',
#  'id': 'e686c2f3-f302-448d-915c-0f467b794d4b',
#  'last_activity': '2019-05-25T06:52:58.958138Z',
#  'name': 'python3'}

コードの実行

カーネルを操作するには、先程取得したカーネルのidでWebSocketのコネクションを開き、実行するコードを全て送信してから、実行結果を受信する、という流れで行います。

  • WebSocketで接続する場合、URLのプロトコルは ws:// となり、パスはapi/kernels/{カーネルのid}/channels で行います
    • 接続成功の場合、ステータスコードは101となります
  • 実行結果の受信時に、WebSocketのステータスはstreamとなりますので、そのときの値をouputsに詰めています
# WebSocketで接続
url = 'ws://localhost:8889/api/kernels/' + kernel['id'] + '/channels'

socket = websocket.create_connection(url, header=headers)
print(socket.status)  # 101

# コードを実行
for code in codes:
    header = {
        'msg_type': 'execute_request',
        'msg_id': uuid.uuid1().hex,
        'session': uuid.uuid1().hex
    }
    
    message = json.dumps({
        'header': header,
        'parent_header': header,
        'metadata': {},
        'content': {
            'code': code,
            'silent': False
        }
    })
    
    # 送信
    socket.send(message)

# 結果の保持
outputs = []

for _ in range(len(codes)):
    msg_type, prev_msg_type = '', ''
    
    while msg_type != 'stream':
        if msg_type != prev_msg_type:
            print(prev_msg_type, '->', msg_type)    # WebSocketの状態遷移をトレース
            prev_msg_type = msg_type
        
        response = json.loads(socket.recv())  # メッセージを受信
        msg_type = response['msg_type']
    
    print(prev_msg_type, '->', msg_type)

    outputs.append(response['content']['text'])

# WebSocketをクローズ
socket.close()

print(outputs)
# ['datetimeをロード\n', '2019-05-25 15:56:08.355065\n']

なおWebSocketの状態は以下の順番で遷移します。

 -> status
status -> execute_input
execute_input -> stream
# セル1の出力の取得
 -> execute_reply
execute_reply -> status
status -> execute_input
execute_input -> stream
# セル2の出力の取得

出力無しのセルがある場合

現実的には、全てのセルが出力を持つとは限りません。例えば以下のように、1つ目のcodeセルは出力がないノートブックです。

f:id:ohke:20190525161251p:plain

この場合、実行後もstreamにはならず、execute_inputからexecute_replyへ直接遷移します。コードの実行数とstreamへの遷移数が一致しないので、上のコードではWebSocketをクローズするタイミングがわかりません。

 -> status
status -> execute_input
execute_input -> execute_reply
execute_reply -> status
status -> execute_input
execute_input -> stream
execute_reply -> status

対処方法として、2つくらい考えられます。強制的にレスポンスを返すパラメータなどがあればよいのですが、見当たりませんでした。

  • execute_replyでカウントする
  • カーネルに渡すコードリストの最後に、任意の出力を行うコードを埋め込み、出力が埋め込んだコードと一致した場合にクローズする

2つ目については、例えば↓の感じで実装できます。

# ノートブックファイルのコードのみを取得して、実行
codes = [c['source'] for c in notebook['content']['cells'] if c['cell_type'] == 'code']
codes.append('print("' + kernel['id'] + '", end="")')  # 改行しないようにendを空文字で指定

# 送信
for code in codes:
    # ...(省略)...

# 結果の保持
outputs = []
output = ''

while True:
    response = json.loads(socket.recv())
    msg_type = response['msg_type']

    if msg_type == 'stream':
        output = response['content']['text']
        
        if output == kernel['id']:
            socket.close()  # 最後に追加した出力と一致したらクローズ
        else:
            outputs.append(output)

print(outputs)
# ['2019-05-25 16:53:02.271994\n']

カーネルの終了

カーネルの終了は /api/kernels/{カーネルのid} へDELETEでリクエストすることで終了します。

# カーネルのシャットダウン
url = base_url + '/api/kernels/' + kernel['id']

response = requests.delete(url, headers=headers)
print(response.status_code)  # 204

KerasでDCGANを作ってKMNISTのくずし字を生成する

KMNISTのくずし字をDCGANで生成する、というモデルをKerasで作ります。

DCGAN

DCGAN (Deep Convolutional GAN) はGAN (Generative Adversarial Network) の生成モデルの一種で、画像を生成するものです (提案論文) 。

GANは2つのモデルを学習によって獲得します。生成モデルは判別モデルを騙すように、判別モデルは生成モデルに騙されないように、それぞれ学習を行うことで、より高度な生成モデルを作ることが目的です。

  • 生成モデル (generator): データセットに近い (ありそうな) データを生成するモデル
  • 判別モデル (discriminator): データセットのデータ (本物) なのか、生成されたデータ (贋物) なのかを判別するモデル

DCGANでは、100次元の一様分布乱数を入力として画像を生成します (提案論文Figure 1抜粋) 。

f:id:ohke:20190518161024p:plain

データの準備

KMNISTくずし字データセット (doi:10.20676/00000341) を最初に準備します。KerasでLeNet-5を実装してKuzushiji-MNISTを分類する - け日記も参考にしてみてください。

codh.rois.ac.jp

今回は学習用の画像のみを使います。npz形式でダウンロードしておきます。

$ wget http://codh.rois.ac.jp/kmnist/dataset/kmnist/kmnist-train-imgs.npz

numpyでロードします。28ピクセル x 28ピクセル x 1チャネル (=グレースケール) x 60,000枚となります。文字は10種です。

import numpy as np

# データのロード
X_images = np.load('kmnist-train-imgs.npz')['arr_0'][:, :, :, np.newaxis]  # (60000, 28, 28, 1)

# -1〜+1に正規化
X_images = (X_images - [127.5]) / 127.5

モデルの作成

上の通り、DCGANは生成モデルと判別モデルからなりますので、それぞれKerasで実装していきます。

f:id:ohke:20190518172846p:plain

各モデルの実装は↓の書籍を参考にしています。

直感 Deep Learning ―Python×Kerasでアイデアを形にするレシピ

直感 Deep Learning ―Python×Kerasでアイデアを形にするレシピ

  • 作者: Antonio Gulli,Sujit Pal,大串正矢,久保隆宏,中山光樹
  • 出版社/メーカー: オライリージャパン
  • 発売日: 2018/08/17
  • メディア: 単行本(ソフトカバー)
  • この商品を含むブログを見る

import keras
from keras.models import Sequential
from keras.layers import (
    Dense, BatchNormalization, Activation, Reshape, UpSampling2D, 
    Conv2D, MaxPooling2D, Flatten
)
from keras.optimizers import Adam

生成モデルの実装

生成モデルでは100次元の一様分布の乱数 (-1〜+1) を入力とし、最終的に28 x 28 x 1で出力しています。

  • UpSampling2Dでは、画像の縦横を拡大します
    • size=(2,2)の場合、各セル (1x1) が2x2へコピーされます
  • 提案論文では64 x 64 x 3となっていますが、KMNISTに揃えています
# generatorの定義
def generator_model(name='generator'):
    model = Sequential(name=name)
    
    model.add(Dense(1024, input_shape=(100,), activation='tanh'))
    model.add(Dense(128*7*7))
    model.add(BatchNormalization())
    model.add(Activation('tanh'))
    model.add(Reshape((7, 7, 128), input_shape=(7*7*128,)))
    model.add(UpSampling2D(size=(2, 2)))
    model.add(Conv2D(64, (5, 5), padding='same', activation='tanh', data_format='channels_last'))
    model.add(UpSampling2D(size=(2, 2)))
    model.add(Conv2D(1, (5, 5), padding='same', activation='tanh', data_format='channels_last'))
    
    return model

判別モデルの実装

次に判別モデルですが、畳み込み層 x 2と全結合層 x 1の単純な構造です。

# discriminatorの定義
def discrimanator_model(name='discriminator'):
    model = Sequential(name=name)
    
    model.add(Conv2D(64, (5, 5), padding='same', input_shape=(28, 28, 1), activation='tanh', data_format='channels_last'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(128, (5, 5), activation='tanh', data_format='channels_last'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Flatten())
    model.add(Dense(1024, activation='tanh'))
    model.add(Dense(1, activation='sigmoid'))
    
    return model

DCGANモデルの実装

生成モデルと判別モデルを組み合わせます。それぞれ独立して学習するために、別々にcompileしていることを確認してください。

  • 生成モデル学習中に判別モデルのパラメータを変えてはいけないので、判別モデルを学習しないようにしてます (discriminator.trainable = False)
    • この場合でも、discriminatorを直接学習する場合は、パラメータは更新されます
# dcganの定義
def dcgan_model(generator, discriminator):
    model = Sequential()
    
    model.add(generator)
    model.add(discriminator)
    
    return model

# 各コンポーネントの生成
def create_models():
    discriminator = discrimanator_model()
    print(discriminator.summary())

    discriminator.compile(
        loss=keras.losses.binary_crossentropy, 
        optimizer=Adam()
    )

    discriminator.trainable = False
    
    generator = generator_model()
    print(generator.summary())

    dcgan = dcgan_model(generator, discriminator)
    print(dcgan.summary())

    dcgan.compile(
        loss=keras.losses.binary_crossentropy, 
        optimizer=Adam()
    )
    
    return (dcgan, generator, discriminator)

dcgan, generator, discriminator = create_models()

モデルの学習

学習は以下のステップで行います。

  1. generatorで、画像を生成し、データセットの画像と混ぜる (ここでは30枚ずつ1:1です)
  2. discriminatorで、生成画像 (y=0) or データセット画像 (y=1) を判別するように、ミニバッチで学習する
  3. generatorで、一様乱数から画像を生成し、discriminatorでy=1と判別されるように、同じくミニバッチで学習する (全て生成画像を入力とするため、目的変数はすべて1)
epochs = 30
batch_size = 30
batches = int(X_images.shape[0] / batch_size)
z_dimensions = 100

discriminator_losses = []
generator_losses = []

for epoch in range(epochs):
    for i in range(batches):
        # データセット画像 (本物)
        genuine_images = X_images[i*batch_size : i*batch_size + batch_size]  # (30, 28, 28, 1)

        # 生成画像 (贋物)
        noise = np.random.uniform(-1, 1, (batch_size, z_dimensions)) # (30, 100)
        fake_images = generator.predict(noise)  # (30, 28, 28, 1)

        # discriminatorの学習
        X = np.concatenate([genuine_images, fake_images])  # データセット画像と生成画像を混ぜる
        y = [1] * batch_size + [0] * batch_size  # データセットの画像を1とするように目的変数をセット
        discriminator_loss = discriminator.train_on_batch(X, y)
        
        # generatorの学習
        noise = np.random.uniform(-1, 1, (batch_size, z_dimensions)) # (30, 100)
        generator_loss =  dcgan.train_on_batch(noise, [1]*batch_size)  # データセット画像 (y=1) と誤判定されたいの (全て1)
        
    print('epoch ', epoch, ': discriminator_loss=', discriminator_loss, 'generator_loss=', generator_loss)
    generator_losses.append(generator_loss)
    discriminator_losses.append(discriminator_loss)
    
    # エポック完了後、生成した画像の一部を書き出す
    save_images(fake_images, str(epoch))

生成された画像

学習の結果、生成された画像を見ていきましょう。なおデータセットには以下のような10種の文字が含まれています (こちらより抜粋) 。

f:id:ohke:20190518183657p:plain

epoch 0終了後

最初のエポック終了後ですが、まだ文字らしくなっていません。

f:id:ohke:20190518182704p:plain f:id:ohke:20190518182719p:plain f:id:ohke:20190518182729p:plain f:id:ohke:20190518182738p:plain f:id:ohke:20190518182748p:plain

epoch 4終了後

文字らしくなってきており、"は (ハ)" や "つ" などは、形になってきてます。

f:id:ohke:20190518183525p:plain f:id:ohke:20190518183536p:plain f:id:ohke:20190518183545p:plain f:id:ohke:20190518183552p:plain f:id:ohke:20190518183601p:plain

epoch 29終了後

生成された文字のほとんどが "は (ハ)" や "つ" に偏っていました。これらの文字は字形が単純で生成しやすく、結果そればかりを生成するモデルになったと思われます。文字種ごとに分ける、などデータセットで分離する必要がありそうです。

f:id:ohke:20190518190331p:plain f:id:ohke:20190518190341p:plain f:id:ohke:20190518190351p:plain f:id:ohke:20190518190357p:plain f:id:ohke:20190518190405p:plain

小ネタ: PandasでCSVファイルからdatetimeカラムをロードする

今週もPandasの小ネタです。

CSVファイルのカラムをdatetime64としてロードする方法です。

Pandasはdatetime型を食わせるとdatetime64のSeriesになります。.dtプロパティも使えます。ここではカラムcがdatetime64になります。

import pandas as pd
import datetime

t = datetime.datetime.now()

df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': ['01', '02', '03'],
    'c': [t, t, t]
})

print(df.info())
# <class 'pandas.core.frame.DataFrame'>
# RangeIndex: 3 entries, 0 to 2
# Data columns (total 3 columns):
# a    3 non-null int64
# b    3 non-null object
# c    3 non-null datetime64[ns]
# dtypes: datetime64[ns](1), int64(1), object(1)
# memory usage: 152.0+ bytes

df.to_csv('test.csv', index=False)

to_csvで出力し、このままread_csvで読み込むとobject (文字列) として認識されてしまいます。そのため.dtプロパティなどもアクセスできません。

  • ちなみにカラムbは、もともと文字列だったのですが、int64としてロードされています
df1 = pd.read_csv('test.csv')

print(df1.info())
# <class 'pandas.core.frame.DataFrame'>
# RangeIndex: 3 entries, 0 to 2
# Data columns (total 3 columns):
# a    3 non-null int64
# b    3 non-null int64
# c    3 non-null object
# dtypes: int64(2), object(1)
# memory usage: 152.0+ bytes

print(df1['c'].dt.date)
# AttributeError: Can only use .dt accessor with datetimelike values

read_csvの引数parse_datesにdatetime64として認識させたいカラムをリストで指定すると、正しくロードできるようになります。

  • カラムbのように文字列としてロードさせたい場合、引数dtypeに辞書形式で渡します
df2 = pd.read_csv('test.csv', dtype = {'b':'object'}, parse_dates=['c'])

print(df2.info())
# <class 'pandas.core.frame.DataFrame'>
# RangeIndex: 3 entries, 0 to 2
# Data columns (total 3 columns):
# a    3 non-null int64
# b    3 non-null object
# c    3 non-null datetime64[ns]
# dtypes: datetime64[ns](1), int64(1), object(1)
# memory usage: 152.0+ bytes

print(df2['c'].dt.date)
# 0    2019-05-11
# 1    2019-05-11
# 2    2019-05-11
# Name: c, dtype: object