Scrapyでけ日記をクローリングする (2. PipelineでPostgreSQLに保存する)

前回に引き続き、Scrapyを使ってこの日記のクローリングを行います。

github.com

今回はクローリングで得られた値を、バリデーションしてPostgreSQLに保存するPipelineを実装します。Spiderの実装は前回の投稿も参考にしてみてください。

ohke.hateblo.jp

こちらの書籍を参考にしてます。

Pythonクローリング&スクレイピング -データ収集・解析のための実践開発ガイド-

Pipeline

ScrapyにおけるPipelineは、Spiderがクローリング・スクレイピングした値に対して、バリデーションチェックや永続化などの後処理を行うための仕組みです。
Spiderが取得した値をItemに詰めて返すと、優先順位に従って複数のタスクが実行されます。

ここでは例として、前回作成したarchive_spiderを使い、取得した記事タイトル・投稿日のフォーマットをチェックするPipelineと、PostgreSQLに保存するPipelineを作ります。以下のフォルダ構造となってます。

f:id:ohke:20180707152304p:plain

また事前にDBとテーブルを作成しておきます。postsテーブルとしておきます。

CREATE DATABASE kenikki

CREATE TABLE public.posts (
    title varchar(1024) NULL,
    posted_at date NULL
)

Itemの実装

最初にItemの継承クラスをitems.pyに記述します。SpiderからPipelineへのスクレイピングした値の受け渡しは、このクラスを使って行われます。
記事タイトルと投稿日を取り出しますので、titleとdateとしてクラス変数を定義してます。

import scrapy

class PostItem(scrapy.Item):
    title = scrapy.Field()  # 記事タイトル
    date = scrapy.Field()  # 投稿日

Spiderの変更

Spiderは上で作成したItemオブジェクトに取得した値を詰めて、ジェネレートします。

  • Itemのフィールドには辞書形式 (item['title']など) でアクセスしてます
import scrapy
from ..items import PostItem

class ArchiveSpider(scrapy.Spider):
    name = 'archive_spider'
    start_urls = [
        'https://ohke.hateblo.jp/archive/2015',
        'https://ohke.hateblo.jp/archive/2016',
        'https://ohke.hateblo.jp/archive/2017',
        'https://ohke.hateblo.jp/archive/2018'
    ]

    def parse(self, response):
        # ページネーションされている場合は次のページにもリクエスト
        next_page_url = response.css('span.pager-next a::attr(href)').extract_first()
        if next_page_url is not None:
            yield scrapy.Request(next_page_url, callback=self.parse)

        # タイトルと投稿日時を取得
        title_list = response.css('a.entry-title-link::text').extract()
        date_list = response.xpath('//time/@datetime').extract()

        # PostItemに詰めてジェネレート
        for title, date in zip(title_list, date_list):
            item = PostItem()
            item['title'] = title
            item['date'] = date
            yield item

Pipelineの実装と設定

最後にPipelineの実装です。pipelines.pyに追記しています。

Pipelineでは3つのメソッドを定義します。

  • open_spider()close_spider()は、Spiderの起動時と終了時に呼び出される
    • PostgresPipelineではDBへのコネクションの開始と終了を行っています
    • 接続文字列はSpiderの設定から取得します
  • process_item()では、Spiderがジェネレートしたアイテムを引数に呼び出される
    • ValidationPipelineでは、値が空でないか、正しいフォーマットかどうかをチェックしています
    • PostgreSQLでは、INSERTを行っています
import scrapy
import psycopg2
import re

# 値のバリデーションチェック
class ValidationPipeline(object):
    def process_item(self, item: scrapy.Item, spider: scrapy.Spider):
        if item['title'] is None or item['title'] == '':
            raise scrapy.exceptions.DropItem('Missing value: title')

        if item['date'] is None or re.match('^(\d{4})-(\d{2})-(\d{2})$', '2017-09-18') is None:
            raise scrapy.exceptions.DropItem('Missing value: date')

        return item

# PostgreSQLへの保存
class PostgresPipeline(object):
    def open_spider(self, spider: scrapy.Spider):
        # コネクションの開始
        url = spider.settings.get('POSTGRESQL_URL')
        self.conn = psycopg2.connect(url)

    def close_spider(self, spider: scrapy.Spider):
        # コネクションの終了
        self.conn.close()

    def process_item(self, item: scrapy.Item, spider: scrapy.Spider):
        sql = "INSERT INTO posts VALUES (%s, %s)"

        curs = self.conn.cursor()
        curs.execute(sql, (item['title'], item['date']))
        self.conn.commit()

        return item

これだけではPipelineは呼び出されず、settings.pyへ以下のようにITEM_PIPELINEへ2つのクラスを追記する必要があります。
辞書形式となっており、値は優先順位です。0〜1000の値で設定でき、値が小さいほうから順に実行されます (つまりValidationPipeline→PostgresPipelineです)。

ついでにPostgreSQLへの接続文字列も追記しておきます。

# Configure item pipelines
ITEM_PIPELINES = {
   'kenikki.pipelines.ValidationPipeline': 100,
   'kenikki.pipelines.PostgresPipeline': 200
}

# DB
POSTGRESQL_URL = 'postgresql://user:password@localhost:5432/kenikki'

これで実行すると、スクレイピングした値がpostsテーブルに展開されていることがわかります。

$ scrapy crawl archive_spider

f:id:ohke:20180707154729p:plain

まとめ

今回もScrapyを使い、Pipelineを実装することで、スクレイピングで取得した値をDBに残せるようにしました。

参考文献

Pythonクローリング&スクレイピング -データ収集・解析のための実践開発ガイド-

Pythonクローリング&スクレイピング -データ収集・解析のための実践開発ガイド-