前回に引き続き、Scrapyを使ってこの日記のクローリングを行います。
今回はクローリングで得られた値を、バリデーションしてPostgreSQLに保存するPipelineを実装します。Spiderの実装は前回の投稿も参考にしてみてください。
こちらの書籍を参考にしてます。
Pipeline
ScrapyにおけるPipelineは、Spiderがクローリング・スクレイピングした値に対して、バリデーションチェックや永続化などの後処理を行うための仕組みです。
Spiderが取得した値をItemに詰めて返すと、優先順位に従って複数のタスクが実行されます。
ここでは例として、前回作成したarchive_spiderを使い、取得した記事タイトル・投稿日のフォーマットをチェックするPipelineと、PostgreSQLに保存するPipelineを作ります。以下のフォルダ構造となってます。
また事前にDBとテーブルを作成しておきます。postsテーブルとしておきます。
CREATE DATABASE kenikki CREATE TABLE public.posts ( title varchar(1024) NULL, posted_at date NULL )
Itemの実装
最初にItemの継承クラスをitems.pyに記述します。SpiderからPipelineへのスクレイピングした値の受け渡しは、このクラスを使って行われます。
記事タイトルと投稿日を取り出しますので、titleとdateとしてクラス変数を定義してます。
import scrapy class PostItem(scrapy.Item): title = scrapy.Field() # 記事タイトル date = scrapy.Field() # 投稿日
Spiderの変更
Spiderは上で作成したItemオブジェクトに取得した値を詰めて、ジェネレートします。
- Itemのフィールドには辞書形式 (
item['title']
など) でアクセスしてます
import scrapy from ..items import PostItem class ArchiveSpider(scrapy.Spider): name = 'archive_spider' start_urls = [ 'https://ohke.hateblo.jp/archive/2015', 'https://ohke.hateblo.jp/archive/2016', 'https://ohke.hateblo.jp/archive/2017', 'https://ohke.hateblo.jp/archive/2018' ] def parse(self, response): # ページネーションされている場合は次のページにもリクエスト next_page_url = response.css('span.pager-next a::attr(href)').extract_first() if next_page_url is not None: yield scrapy.Request(next_page_url, callback=self.parse) # タイトルと投稿日時を取得 title_list = response.css('a.entry-title-link::text').extract() date_list = response.xpath('//time/@datetime').extract() # PostItemに詰めてジェネレート for title, date in zip(title_list, date_list): item = PostItem() item['title'] = title item['date'] = date yield item
Pipelineの実装と設定
最後にPipelineの実装です。pipelines.pyに追記しています。
Pipelineでは3つのメソッドを定義します。
open_spider()
とclose_spider()
は、Spiderの起動時と終了時に呼び出される- PostgresPipelineではDBへのコネクションの開始と終了を行っています
- 接続文字列はSpiderの設定から取得します
process_item()
では、Spiderがジェネレートしたアイテムを引数に呼び出される- ValidationPipelineでは、値が空でないか、正しいフォーマットかどうかをチェックしています
- PostgreSQLでは、INSERTを行っています
import scrapy import psycopg2 import re # 値のバリデーションチェック class ValidationPipeline(object): def process_item(self, item: scrapy.Item, spider: scrapy.Spider): if item['title'] is None or item['title'] == '': raise scrapy.exceptions.DropItem('Missing value: title') if item['date'] is None or re.match('^(\d{4})-(\d{2})-(\d{2})$', '2017-09-18') is None: raise scrapy.exceptions.DropItem('Missing value: date') return item # PostgreSQLへの保存 class PostgresPipeline(object): def open_spider(self, spider: scrapy.Spider): # コネクションの開始 url = spider.settings.get('POSTGRESQL_URL') self.conn = psycopg2.connect(url) def close_spider(self, spider: scrapy.Spider): # コネクションの終了 self.conn.close() def process_item(self, item: scrapy.Item, spider: scrapy.Spider): sql = "INSERT INTO posts VALUES (%s, %s)" curs = self.conn.cursor() curs.execute(sql, (item['title'], item['date'])) self.conn.commit() return item
これだけではPipelineは呼び出されず、settings.pyへ以下のようにITEM_PIPELINE
へ2つのクラスを追記する必要があります。
辞書形式となっており、値は優先順位です。0〜1000の値で設定でき、値が小さいほうから順に実行されます (つまりValidationPipeline→PostgresPipelineです)。
ついでにPostgreSQLへの接続文字列も追記しておきます。
# Configure item pipelines ITEM_PIPELINES = { 'kenikki.pipelines.ValidationPipeline': 100, 'kenikki.pipelines.PostgresPipeline': 200 } # DB POSTGRESQL_URL = 'postgresql://user:password@localhost:5432/kenikki'
これで実行すると、スクレイピングした値がpostsテーブルに展開されていることがわかります。
$ scrapy crawl archive_spider
まとめ
今回もScrapyを使い、Pipelineを実装することで、スクレイピングで取得した値をDBに残せるようにしました。
参考文献

Pythonクローリング&スクレイピング -データ収集・解析のための実践開発ガイド-
- 作者: 加藤耕太
- 出版社/メーカー: 技術評論社
- 発売日: 2016/12/16
- メディア: 大型本
- この商品を含むブログ (3件) を見る