scrapy项目管道(item pipeline)

781 阅读3分钟

scrapy提供了很多中间组件可以让我们更加自定义想要的效果,比如项目管道(item pipeline),下载中间件(downloader middleware),蜘蛛中间件(spider middleware)等。通过更改或者添加的方式我们可以实现很多的功能。这种中间组件就像钩子一样,在完成结果之前,修改部分内容来使结果不一样,或者只是记录一下内容。

先来说说项目管道,项目管道处理爬虫返回或迭代的item,也就是爬虫抓取的结果。最常用的也就是将结果保存在数据库了,就比如存储在MySQL和MongoDB。示例代码:

import pymysql
from twisted.enterprise import adbapi


class MysqlPipeline(object):
    def __init__(self, adbparams, table):
        self.table = table
        self.adbparams = adbparams

    @classmethod
    def from_crawler(cls, crawler):
        adbparams = dict(
            host=crawler.settings.get('MYSQL_HOST'),
            port=crawler.settings.get('MSYQL_PORT'),
            db=crawler.settings.get('MYSQL_DBNAME'),
            user=crawler.settings.get('MYSQL_USER'),
            password=crawler.settings.get('MYSQL_PASSWORD'),
            chatset=crawler.settings.get('MYSQL_CHARSET'),
            cursorclass=pymysql.cursors.DictCursor   # 指定cursor类型
        )
        table = crawler.settings.get('MYSQL_TABLE')
        return cls(adbparams, table)

    def open_spider(self, spider):
        self.dbpool = adbapi.ConnectionPool('pymysql', **self.adbparams)

    def close_spider(self, spider):
        self.dbpool.close()

    def process_item(self, item, spider):
        query = self.dbpool.runInteraction(self.do_insert, item)  # 指定操作方法和操作数据
        query.addCallback(self.handle_error)  # 处理异常
        return item

    def do_insert(self, cursor, item):
        # 对数据库进行插入操作,并不需要commit,twisted会自动commit
        keys = ','.join(item.keys())
        values = ','.join(['%s'] * len(item))
        sql = '''INSERT INTO {table} ({keys}) VALUES ({values})'''.format(table=self.table, keys=keys, values=values)
        try:
            cursor.execute(sql, tuple(item.values()))  
        except Exception:
            pass

    def handle_error(self, failure):
        pass

存储在MongoDB的示例代码:

import pymongo

class MongoPipeline(object):

    collection_name = 'scrapy_items'

    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mongo_uri=crawler.settings.get('MONGO_URI'),
            mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
        )

    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]

    def close_spider(self, spider):
        self.client.close()

    def process_item(self, item, spider):
        self.db[self.collection_name].insert_one(dict(item))
        return item

项目管道有四个内置方法:

  • open_spider:蜘蛛启动时被调用,一般用于打开资源
  • close_spider:蜘蛛关闭时被调用,一般用于关闭资源
  • process_item:当蜘蛛返回item时被调用,一般用于处理item(保存或者舍弃)
  • from_crawler: 类方法,用于获取settings.py中的配置参数。用from_settings也是一样

上面两个示例有点区别,MySQL用了twisted实现异步存储,而MongoDB只是同步的。

当然除了保存数据到数据库之外,项目管道还有一个比较常用的功能,数据去重。scrapy虽然内置了请求去重的类scrapy.dupefilters.RFPDupeFilter,但是并没有对抓取下来的数据进行去重。难道不同的请求会出现同样的数据吗?有可能,当然这需要看实际情况。不过很多情况下,可以直接使用数据库的去重功能,比如设置某个字段不能重复即可。只有一些不保存在数据库,或者数据库不方便去重和含有大量的重复数据不想增加数据库服务器的负担才会对数据去重。

官网去重示例代码(这里只是简单的使用python集合,最好使用redis):

from scrapy.exceptions import DropItem


class DuplicatesPipeline(object):

    def __init__(self):
        self.ids_seen = set()

    def process_item(self, item, spider):
        if item['id'] in self.ids_seen:
            raise DropItem("Duplicate item found: %s" % item)
        else:
            self.ids_seen.add(item['id'])
            return item