跳到主要内容

Scrapy

简介

Scrapy 是一个强大的 Python 爬虫框架,用于快速、高效地抓取网站数据。主要特点:

  • 高性能:异步网络框架,支持并发请求
  • 可扩展:中间件、管道系统,易于扩展功能
  • 易维护:结构清晰,代码复用性高
  • 功能丰富:自动限速、重试、代理、Cookie 管理等

核心组件:

  • Engine:引擎,控制数据流
  • Scheduler:调度器,管理请求队列
  • Downloader:下载器,抓取网页
  • Spiders:爬虫,解析数据
  • Item Pipeline:管道,处理数据
  • Middlewares:中间件,处理请求/响应

快速开始

安装

# 安装 Scrapy
pip install scrapy

# 验证安装
scrapy version

# 可选:安装额外依赖
pip install scrapy-splash # JavaScript 渲染
pip install scrapy-redis # 分布式爬虫
pip install pillow # 图片处理

创建项目

# 创建新项目
scrapy startproject myproject

# 项目结构
# myproject/
# ├── scrapy.cfg # 项目配置文件
# └── myproject/ # 项目 Python 模块
# ├── __init__.py
# ├── items.py # Item 定义
# ├── middlewares.py # 中间件
# ├── pipelines.py # 管道
# ├── settings.py # 设置
# └── spiders/ # 爬虫目录
# ├── __init__.py
# └── example.py # 示例爬虫

# 创建爬虫
cd myproject
scrapy genspider example example.com

# 查看可用模板
scrapy genspider -l

# 使用特定模板创建爬虫
scrapy genspider -t crawl myspider example.com

第一个Spider

# spiders/quotes_spider.py

import scrapy

class QuotesSpider(scrapy.Spider):
"""爬虫类"""
name = 'quotes' # 爬虫名称(唯一标识)
allowed_domains = ['quotes.toscrape.com'] # 允许的域名
start_urls = ['http://quotes.toscrape.com/'] # 起始 URL

def parse(self, response):
"""解析响应的默认回调函数"""
# 提取数据
for quote in response.css('div.quote'):
yield {
'text': quote.css('span.text::text').get(),
'author': quote.css('span small::text').get(),
'tags': quote.css('div.tags a.tag::text').getall(),
}

# 翻页
next_page = response.css('li.next a::attr(href)').get()
if next_page is not None:
yield response.follow(next_page, callback=self.parse)

运行爬虫:

# 运行爬虫
scrapy crawl quotes

# 保存到文件
scrapy crawl quotes -o quotes.json
scrapy crawl quotes -o quotes.csv
scrapy crawl quotes -o quotes.xml

# 指定日志级别
scrapy crawl quotes -L INFO

# 输出到特定文件
scrapy crawl quotes -o output/data.json

Spider

Spider类

Spider 是 Scrapy 中用于定义爬取逻辑的类:

import scrapy

class MySpider(scrapy.Spider):
"""基本爬虫类"""

# 爬虫名称(必需)
name = 'myspider'

# 允许的域名(可选)
allowed_domains = ['example.com']

# 起始 URL 列表(可选)
start_urls = ['http://example.com/']

# 自定义设置(可选)
custom_settings = {
'DOWNLOAD_DELAY': 2,
'CONCURRENT_REQUESTS': 1,
}

def start_requests(self):
"""生成初始请求(可选)"""
for url in self.start_urls:
yield scrapy.Request(
url=url,
callback=self.parse,
headers={'User-Agent': 'Custom User-Agent'}
)

def parse(self, response):
"""解析响应"""
# 提取数据或生成新的请求
pass

def closed(self, reason):
"""爬虫关闭时调用"""
self.logger.info(f'Spider closed, reason: {reason}')

常用 Spider 类型:

# 1. Spider - 基础爬虫
from scrapy.spiders import Spider

class BasicSpider(Spider):
name = 'basic'
start_urls = ['http://example.com']

def parse(self, response):
yield {'title': response.css('title::text').get()}

# 2. CrawlSpider - 自动爬取整站
from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor

class MyCrawlSpider(CrawlSpider):
name = 'crawl'
allowed_domains = ['example.com']
start_urls = ['http://example.com']

rules = (
Rule(LinkExtractor(allow=r'/page/\d+'), callback='parse_item'),
Rule(LinkExtractor(allow=r'/category/'), follow=True),
)

def parse_item(self, response):
yield {'url': response.url}

# 3. XMLFeedSpider - XML/Atom/RSS
from scrapy.spiders import XMLFeedSpider

class MyXMLSpider(XMLFeedSpider):
name = 'xml'
iterator = 'iternodes' # 'iternodes', 'html', 'xml'
itertag = 'item'

def parse_node(self, response, node):
yield {'title': node.xpath('./title/text()').get()}

# 4. SitemapSpider - 站点地图
from scrapy.spiders import SitemapSpider

class MySitemapSpider(SitemapSpider):
name = 'sitemap'
sitemap_urls = ['http://example.com/sitemap.xml']

def parse(self, response):
yield {'url': response.url}

parse方法

parse 方法是处理响应的回调函数:

def parse(self, response):
"""解析响应"""

# 1. 提取数据
title = response.css('title::text').get()
items = response.css('div.item')

for item in items:
yield {
'name': item.css('h2::text').get(),
'price': item.css('.price::text').get(),
'url': response.url,
}

# 2. 生成新请求
# 方式 1:使用 response.follow
next_page = response.css('.next::attr(href)').get()
if next_page:
yield response.follow(next_page, callback=self.parse)

# 方式 2:使用 scrapy.Request
detail_url = response.css('.detail::attr(href)').get()
if detail_url:
yield scrapy.Request(
response.urljoin(detail_url),
callback=self.parse_detail
)

# 3. 传递数据给回调
yield response.follow(
next_page,
callback=self.parse_with_meta,
meta={'item': {'name': 'Product'}}
)

def parse_detail(self, response):
"""解析详情页"""
# 获取传递的数据
item = response.meta.get('item')
item['description'] = response.css('.desc::text').get()
yield item

def parse_with_meta(self, response):
"""使用传递的元数据"""
item = response.meta['item']
# 处理数据...
yield item

Response对象

Response 对象包含服务器返回的响应数据:

def parse(self, response):
"""Response 对象的常用属性和方法"""

# URL 相关
print(response.url) # 当前 URL
print(response.status) # HTTP 状态码
print(response.headers) # 响应头
print(response.request) # 对应的 Request 对象

# 内容
print(response.text) # 响应文本(自动解码)
print(response.body) # 响应字节
print(response.encoding) # 编码

# 选择器
response.css('h1::text') # CSS 选择器
response.xpath('//h1') # XPath 选择器

# URL 操作
response.urljoin('/page/2') # 相对 URL 转绝对 URL
response.follow('/next') # 创建新请求

# 其他
response.css('a::attr(href)').get() # 获取第一个
response.css('a::attr(href)').getall() # 获取所有
response.css('a').get() # 获取第一个元素
response.css('a').getall() # 获取所有元素

选择器

XPath

def parse(self, response):
"""XPath 选择器"""

# 基本选择
response.xpath('//h1') # 所有 h1
response.xpath('//div[@class="content"]') # class 为 content 的 div
response.xpath('//div[@id="main"]') # id 为 main 的 div

# 文本提取
response.xpath('//h1/text()') # h1 的直接文本
response.xpath('//h1//text()') # h1 的所有文本(包括后代)
response.xpath('string(//h1)') # h1 的所有文本拼接

# 属性提取
response.xpath('//a/@href') # 所有链接的 href
response.xpath('//img/@src') # 图片 src
response.xpath('//div/@class') # class 属性

# 层级选择
response.xpath('//div/p') # div 下的直接 p
response.xpath('//div//p') # div 下的所有 p
response.xpath('//div[@class="main"]/h1') # class=main 的 div 下的 h1

# 条件选择
response.xpath('//a[contains(@class, "btn")]') # class 包含 btn
response.xpath('//a[starts-with(@href, "http")]') # href 以 http 开头
response.xpath('//div[position() < 3]') # 前两个 div

# 轴(Axes)
response.xpath('//div/following-sibling::div') # 后面的兄弟 div
response.xpath('//div/preceding-sibling::div') # 前面的兄弟 div
response.xpath('//a/parent::div') # a 的父 div
response.xpath('//div/ancestor::body') # div 的祖先 body

CSS选择器

def parse(self, response):
"""CSS 选择器"""

# 基本选择
response.css('h1') # 所有 h1
response.css('.content') # class 为 content
response.css('#main') # id 为 main
response.css('div.content') # div 且 class=content

# 属性选择
response.css('a[href]') # 有 href 的 a
response.css('a[href="http://"]') # href 等于 http://
response.css('a[href^="http"]') # href 以 http 开头
response.css('a[href$=".pdf"]') # href 以 .pdf 结尾
response.css('a[href*="example"]') # href 包含 example

# 层级选择
response.css('div > p') # div 的直接子元素 p
response.css('div p') # div 的所有后代 p
response.css('div.main > h1') # class=main 的 div 下的直接 h1

# 伪类
response.css('li:first-child') # 第一个 li
response.css('li:last-child') # 最后一个 li
response.css('li:nth-child(2)') # 第二个 li
response.css('a:nth-of-type(3)') # 同类型中的第三个

# 组合选择
response.css('h1, h2, h3') # 所有 h1, h2, h3

# 提取文本和属性
response.css('h1::text') # 文本
response.css('a::attr(href)') # href 属性
response.css('div::text') # 所有文本节点

正则表达式

import re

def parse(self, response):
"""使用正则表达式提取数据"""

# 方式 1:使用 re 模块
html = response.text
titles = re.findall(r'<h2>(.*?)</h2>', html)
emails = re.findall(r'[\w.+-]+@[\w-]+\.[\w.-]+', html)

# 方式 2:使用 Scrapy 的 .re() 方法
response.css('div.price::text').re(r'\$\d+\.\d{2}')
response.xpath('//div[@class="price"]/text()').re(r'(\d+\.\d{2})')

# 方式 3:使用 .re_first() 获取第一个匹配
price = response.css('.price::text').re_first(r'\$\d+\.\d{2}')

# 提取链接中的 ID
product_id = response.css('a::attr(href)').re_first(r'/product/(\d+)/')

# 组合使用
for div in response.css('div.item'):
text = div.css('::text').get()
numbers = re.findall(r'\d+', text)
yield {'numbers': numbers}

数据提取

提取文本

def parse(self, response):
"""提取文本的多种方法"""

# 1. get() - 获取第一个
title = response.css('h1::text').get()
# title = None 如果没有匹配

# 2. getall() - 获取所有
paragraphs = response.css('p::text').getall()
# paragraphs = ['text1', 'text2', ...]

# 3. 使用默认值
title = response.css('h1::text').get() or 'Default Title'

# 4. 提取并清理文本
text = response.css('div.content::text').get()
if text:
text = text.strip() # 去除首尾空白

# 5. 提取所有文本(包括后代)
all_text = response.xpath('string(//div[@class="content"])').get()

# 6. 提取多个文本并拼接
items = response.css('p::text').getall()
combined_text = ' '.join(items).strip()

# 7. 处理换行和空白
text = ' '.join(response.css('div::text').getall()).split()

提取属性

def parse(self, response):
"""提取元素属性"""

# 1. 提取单个属性
href = response.css('a::attr(href)').get()
src = response.xpath('//img/@src').get()

# 2. 提取多个属性
all_hrefs = response.css('a::attr(href)').getall()

# 3. 提取多个属性
for link in response.css('a'):
yield {
'href': link.css('::attr(href)').get(),
'text': link.css('::text').get(),
'title': link.css('::attr(title)').get(),
}

# 4. 提取 data-* 属性
data_id = response.css('div::attr(data-id)').get()

# 5. 提取 class 属性
classes = response.css('div::attr(class)').get()

# 6. 提取 style 属性
style = response.css('div::attr(style)').get()

提取链接

def parse(self, response):
"""提取和清理链接"""

# 1. 获取所有链接
links = response.css('a::attr(href)').getall()

# 2. 转换为绝对 URL
for link in links:
absolute_url = response.urljoin(link)
yield {'url': absolute_url}

# 3. 使用 response.follow(推荐)
for href in response.css('a::attr(href)').getall():
yield response.follow(href, callback=self.parse_detail)

# 4. 提取特定模式的链接
for link in response.css('a::attr(href)').getall():
if '/product/' in link:
yield response.follow(link, callback=self.parse_product)

# 5. 使用正则过滤链接
import re
for link in response.css('a::attr(href)').getall():
if re.match(r'/product/\d+', link):
yield response.follow(link, callback=self.parse_product)

# 6. 处理相对路径
relative_links = response.css('a[href^="/"]::attr(href)').getall()
for link in relative_links:
yield {'url': response.urljoin(link)}

Item

定义Item

Item 是保存抓取数据的容器:

# items.py

import scrapy

class ProductItem(scrapy.Item):
"""定义商品 Item"""
# 字段定义
name = scrapy.Field() # 商品名称
price = scrapy.Field() # 价格
description = scrapy.Field() # 描述
image_urls = scrapy.Field() # 图片 URL 列表
images = scrapy.Field() # 下载后的图片
url = scrapy.Field() # 商品链接
sku = scrapy.Field() # SKU
stock = scrapy.Field() # 库存
ratings = scrapy.Field() # 评分
reviews = scrapy.Field() # 评论数

class ArticleItem(scrapy.Item):
"""文章 Item"""
title = scrapy.Field()
content = scrapy.Field()
author = scrapy.Field()
publish_date = scrapy.Field()
tags = scrapy.Field()
category = scrapy.Field()

使用 Item:

# spiders/product_spider.py

from myproject.items import ProductItem

def parse(self, response):
"""创建和返回 Item"""
item = ProductItem()

# 方式 1:直接赋值
item['name'] = response.css('h1::text').get()
item['price'] = response.css('.price::text').get()
item['url'] = response.url

# 方式 2:字典式
item = ProductItem({
'name': response.css('h1::text').get(),
'price': response.css('.price::text').get(),
'url': response.url,
})

# 方式 3:批量赋值
item = ProductItem()
item.update({
'name': response.css('h1::text').get(),
'price': response.css('.price::text').get(),
})

yield item

Item Loader

Item Loader 提供了一种便捷的方式来填充 Item:

# items.py
from scrapy.loader import ItemLoader
from scrapy.loader.processors import TakeFirst, MapCompose, Join

def clean_price(value):
"""清理价格"""
if value:
return float(value.replace('$', '').replace(',', '').strip())
return None

def strip_whitespace(value):
"""去除空白"""
return value.strip() if value else value

class ProductItemLoader(ItemLoader):
"""商品 Item Loader"""

# 默认输出处理器:取第一个值
default_output_processor = TakeFirst()

# 自定义字段处理器
name_in = MapCompose(strip_whitespace)
price_in = MapCompose(strip_whitespace, clean_price)
tags_out = Join(',') # 列表用逗号连接

使用 Item Loader:

from myproject.items import ProductItem, ProductItemLoader

def parse(self, response):
"""使用 Item Loader"""
# 创建 Loader
loader = ProductItemLoader(item=ProductItem(), response=response)

# 添加值
loader.add_css('name', 'h1::text')
loader.add_css('price', '.price::text')
loader.add_css('description', '.description::text')
loader.add_value('url', response.url)
loader.add_xpath('sku', '//span[@class="sku"]/@data-id')

# 加载并返回 Item
yield loader.load_item()

Field Processors

字段处理器用于处理和清理数据:

from scrapy.loader.processors import (
TakeFirst, MapCompose, Join, Compose
)
import re

def remove_duplicates(values):
"""去重"""
return list(set(values))

def extract_number(text):
"""提取数字"""
match = re.search(r'\d+', text)
return match.group() if match else text

def parse_tags(tags):
"""解析标签"""
return [tag.strip().lower() for tag in tags if tag.strip()]

# items.py
class ProductItem(scrapy.Item):
name = scrapy.Field(
input_processor=MapCompose(str.strip, str.title),
output_processor=TakeFirst()
)
price = scrapy.Field(
input_processor=MapCompose(extract_number, float),
output_processor=TakeFirst()
)
tags = scrapy.Field(
input_processor=MapCompose(str.strip),
output_processor=Compose(remove_duplicates, sorted)
)
description = scrapy.Field(
input_processor=MapCompose(str.strip),
output_processor=Join(' ')
)

Request与Response

Request对象

Request 对象表示一个 HTTP 请求:

import scrapy

# 创建基本请求
request = scrapy.Request(
url='http://example.com/page',
callback=self.parse,
method='GET',
headers={'User-Agent': 'Custom'},
cookies={'session': 'abc123'},
meta={'key': 'value'}, # 传递数据
encoding='utf-8',
priority=1, # 优先级(数字越大优先级越高)
dont_filter=False, # 是否过滤重复 URL
errback=self.errback_handler, # 错误回调
)

# 传递数据给回调
yield scrapy.Request(
url='http://example.com/detail',
callback=self.parse_detail,
meta={
'item': {'name': 'Product'},
'proxy': 'http://proxy.com:8080',
}
)

def parse_detail(self, response):
"""获取传递的数据"""
item = response.meta['item']
item['detail'] = response.css('.detail::text').get()
yield item

Response对象

def parse(self, response):
"""Response 对象"""

# 基本信息
response.url # URL
response.status # 状态码
response.headers # 响应头
response.body # 字节内容
response.text # 文本内容
response.encoding # 编码

# Request 相关
response.request # 对应的 Request 对象
response.meta # 元数据字典
response.callback # 回调函数

# 选择器
response.css('h1') # CSS 选择器
response.xpath('//h1') # XPath 选择器

# URL 操作
response.urljoin('/other') # 转换为绝对 URL
response.follow(next_url) # 创建新请求

# 检查响应类型
if response.status == 200:
self.logger.info('Success')

# 获取特定头信息
content_type = response.headers.get('Content-Type', b'').decode()

FormRequest

FormRequest 用于提交表单:

from scrapy.http import FormRequest

# 方式 1:直接 POST
yield FormRequest(
url='http://example.com/post',
formdata={
'username': 'user',
'password': 'pass',
},
callback=self.after_post
)

# 方式 2:模拟表单提交
yield FormRequest.from_response(
response,
formdata={'username': 'user', 'password': 'pass'},
clickdata={'type': 'submit', 'name': 'login'},
callback=self.after_login
)

# 方式 3:文件上传
yield FormRequest(
url='http://example.com/upload',
formdata={
'file': ('filename.txt', open('file.txt', 'rb'), 'text/plain')
},
callback=self.after_upload
)

# 登录示例
def parse(self, response):
"""首次访问,获取登录表单"""
return FormRequest.from_response(
response,
formdata={'username': 'myuser', 'password': 'mypass'},
callback=self.after_login
)

def after_login(self, response):
"""登录后的回调"""
if "authentication failed" in response.text:
self.logger.error("Login failed")
return

# 继续爬取
yield scrapy.Request(
url='http://example.com/dashboard',
callback=self.parse_dashboard
)

翻页处理

下一页链接

def parse(self, response):
"""提取列表数据和下一页链接"""

# 提取当前页数据
for product in response.css('div.product'):
yield {
'name': product.css('h2::text').get(),
'price': product.css('.price::text').get(),
}

# 提取下一页链接
# 方式 1:CSS 选择器
next_page = response.css('li.next a::attr(href)').get()

# 方式 2:XPath
next_page = response.xpath('//a[contains(text(), "Next")]/@href').get()

# 方式 3:查找包含特定文本的链接
next_page = response.css('a:contains("Next")::attr(href)').get()

# 方式 4:根据页码构造
current_page = response.meta.get('page', 1)
next_page = f'/page/{current_page + 1}'

# 生成下一页请求
if next_page:
yield response.follow(next_page, callback=self.parse)

回调函数

def parse(self, response):
"""列表页:提取数据和链接"""

# 方式 1:同一个回调处理所有页
for item in response.css('div.item'):
detail_url = item.css('a::attr(href)').get()
yield response.follow(detail_url, callback=self.parse_detail)

# 翻页
next_page = response.css('.next::attr(href)').get()
if next_page:
yield response.follow(next_page, callback=self.parse)

# 方式 2:不同的回调处理详情页
def parse_detail(self, response):
"""详情页:提取详细信息"""
yield {
'title': response.css('h1::text').get(),
'content': response.css('.content::text').get(),
'url': response.url,
}

请求传递

def parse(self, response):
"""列表页:提取数据并传递给详情页"""

for item in response.css('div.item'):
# 提取部分数据
data = {
'name': item.css('h2::text').get(),
'price': item.css('.price::text').get(),
}

# 构造详情页请求,传递数据
detail_url = item.css('a::attr(href)').get()
yield scrapy.Request(
url=response.urljoin(detail_url),
callback=self.parse_detail,
meta={'data': data} # 通过 meta 传递
)

def parse_detail(self, response):
"""详情页:接收传递的数据并补充"""

# 获取列表页传递的数据
data = response.meta.get('data', {})

# 补充详情数据
data.update({
'description': response.css('.desc::text').get(),
'images': response.css('.img::attr(src)').getall(),
'url': response.url,
})

yield data

# 方式 2:传递页码
def parse(self, response):
page = response.meta.get('page', 1)

# 爬取当前页数据
yield {'page': page, 'data': response.css('div.item').getall()}

# 下一页
if page < 10:
yield scrapy.Request(
url=f'http://example.com/page/{page + 1}',
callback=self.parse,
meta={'page': page + 1}
)

中间件

Downloader Middleware

下载器中间件处理请求和响应:

# middlewares.py

class UserAgentMiddleware:
"""自定义 User-Agent 中间件"""

def __init__(self, user_agent):
self.user_agent = user_agent

@classmethod
def from_crawler(cls, crawler):
"""从 settings 获取配置"""
return cls(
user_agent=crawler.settings.get('USER_AGENT')
)

def process_request(self, request, spider):
"""处理请求(发送前)"""
request.headers['User-Agent'] = self.user_agent
return None # 返回 None 继续处理其他中间件

def process_response(self, request, response, spider):
"""处理响应(返回后)"""
# 可以修改响应
if response.status == 404:
self.logger.info(f'404: {request.url}')
return response # 返回 Response 或 Request

def process_exception(self, request, exception, spider):
"""处理异常"""
self.logger.error(f'Exception: {exception}')
# 可以返回新的 Request 继续爬取
return None # 或返回 Request


class ProxyMiddleware:
"""代理中间件"""

def __init__(self, proxy_url):
self.proxy_url = proxy_url

@classmethod
def from_crawler(cls, crawler):
return cls(
proxy_url=crawler.settings.get('PROXY_URL')
)

def process_request(self, request, spider):
"""为请求设置代理"""
if self.proxy_url:
request.meta['proxy'] = self.proxy_url


class RetryMiddleware:
"""重试中间件"""

def __init__(self, retry_times):
self.retry_times = retry_times

@classmethod
def from_crawler(cls, crawler):
return cls(
retry_times=crawler.settings.get('RETRY_TIMES', 2)
)

def process_response(self, request, response, spider):
"""检查响应,决定是否重试"""
if response.status in [500, 502, 503, 504, 408, 429]:
# 检查重试次数
retry_times = request.meta.get('retry_times', 0)
if retry_times < self.retry_times:
self.logger.info(f'Retrying {request.url}')
retry_req = request.copy()
retry_req.meta['retry_times'] = retry_times + 1
return retry_req # 返回 Request 进行重试
return response

启用中间件:

# settings.py

DOWNLOADER_MIDDLEWARES = {
# 数字越小优先级越高
'myproject.middlewares.UserAgentMiddleware': 400,
'myproject.middlewares.ProxyMiddleware': 410,
'myproject.middlewares.RetryMiddleware': 500,

# 禁用默认中间件
'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
}

Spider Middleware

Spider 中间件处理 Spider 的输入输出:

# middlewares.py

class SpiderDepthMiddleware:
"""限制爬取深度"""

def __init__(self, max_depth):
self.max_depth = max_depth

@classmethod
def from_crawler(cls, crawler):
return cls(
max_depth=crawler.settings.get('MAX_DEPTH', 2)
)

def process_start_requests(self, start_requests, spider):
"""处理初始请求"""
for request in start_requests:
request.meta['depth'] = 0
yield request

def process_spider_input(self, response, spider):
"""处理输入到 Spider 的响应"""
depth = response.meta.get('depth', 0)
if depth > self.max_depth:
self.logger.info(f'Max depth {self.max_depth} reached')
# 可以抛出异常停止处理
# raise scrapy.exceptions.CloseSpider('max_depth')
return None

def process_spider_output(self, response, result, spider):
"""处理 Spider 的输出(Item 或 Request)"""
for item in result:
if isinstance(item, scrapy.Request):
# 增加深度
depth = response.meta.get('depth', 0)
item.meta['depth'] = depth + 1
yield item

def process_spider_exception(self, response, exception, spider):
"""处理 Spider 抛出的异常"""
self.logger.error(f'Spider exception: {exception}')
return []

Pipelines

Item Pipeline

Item Pipeline 用于处理 Item:

# pipelines.py

class DataCleaningPipeline:
"""数据清洗管道"""

def process_item(self, item, spider):
"""处理每个 Item"""
# 去除空白
for field, value in item.items():
if isinstance(value, str):
item[field] = value.strip()

# 处理空值
if not item.get('name'):
raise DropItem(f'Missing name in {item}')

return item


class ValidationPipeline:
"""数据验证管道"""

def process_item(self, item, spider):
"""验证数据"""
# 检查必需字段
required_fields = ['name', 'price', 'url']
for field in required_fields:
if not item.get(field):
raise DropItem(f'Missing {field} in {item}')

# 验证价格格式
price = item.get('price')
if price:
try:
item['price'] = float(price.replace('$', ''))
except ValueError:
raise DropItem(f'Invalid price: {price}')

return item


class DuplicatePipeline:
"""去重管道"""

def __init__(self):
self.seen = set()

def process_item(self, item, spider):
"""检查重复"""
# 使用唯一标识去重
identifier = (item.get('url'), item.get('sku'))

if identifier in self.seen:
raise DropItem(f'Duplicate item: {identifier}')

self.seen.add(identifier)
return item

def close_spider(self, spider):
"""爬虫关闭时调用"""
spider.logger.info(f'Total unique items: {len(self.seen)}')

数据清洗

import re
from itemadapter import ItemAdapter

class DataCleaningPipeline:
"""数据清洗"""

def process_item(self, item, spider):
adapter = ItemAdapter(item)

# 清洗文本字段
text_fields = ['name', 'description', 'author']
for field in text_fields:
if adapter.get(field):
# 去除多余空白
text = ' '.join(adapter[field].split())
adapter[field] = text

# 清洗数字字段
if adapter.get('price'):
# 提取数字
price_text = adapter['price']
numbers = re.findall(r'\d+\.?\d*', price_text)
if numbers:
adapter['price'] = float(numbers[0])

# 清洗日期
if adapter.get('date'):
date = adapter['date']
adapter['date'] = date.replace('Published:', '').strip()

return item

数据验证

from scrapy.exceptions import DropItem

class ValidationPipeline:
"""数据验证"""

def process_item(self, item, spider):
"""验证 Item 数据"""

# 必填字段检查
if not item.get('title'):
raise DropItem('Missing title')

# 长度检查
title = item['title']
if len(title) < 5 or len(title) > 200:
raise DropItem(f'Invalid title length: {len(title)}')

# 格式检查
if item.get('email'):
email = item['email']
if '@' not in email:
raise DropItem(f'Invalid email: {email}')

# 范围检查
if item.get('price'):
price = float(item['price'])
if price <= 0 or price > 1000000:
raise DropItem(f'Invalid price: {price}')

return item


class TypeConversionPipeline:
"""类型转换"""

def process_item(self, item, spider):
"""转换数据类型"""

# 字符串转数字
numeric_fields = ['price', 'quantity', 'rating']
for field in numeric_fields:
if item.get(field):
try:
item[field] = float(item[field])
except ValueError:
item[field] = None

# 字符串转列表
if item.get('tags'):
if isinstance(item['tags'], str):
item['tags'] = [tag.strip() for tag in item['tags'].split(',')]

# 字符串转日期
if item.get('publish_date'):
from datetime import datetime
item['publish_date'] = datetime.strptime(
item['publish_date'], '%Y-%m-%d'
)

return item

数据存储

import json
import csv
import sqlite3
import pymongo
from itemadapter import ItemAdapter

class JsonWriterPipeline:
"""保存到 JSON 文件"""

def __init__(self, file_path):
self.file_path = file_path
self.file = None

@classmethod
def from_crawler(cls, crawler):
return cls(
file_path=crawler.settings.get('JSON_FILE', 'output.json')
)

def open_spider(self, spider):
"""爬虫开始时打开文件"""
self.file = open(self.file_path, 'w', encoding='utf-8')
self.file.write('[\n')

def close_spider(self, spider):
"""爬虫结束时关闭文件"""
self.file.write('\n]')
self.file.close()

def process_item(self, item, spider):
"""写入 Item"""
adapter = ItemAdapter(item)
line = json.dumps(adapter.asdict(), ensure_ascii=False)
self.file.write(line + ',\n')
return item


class CsvPipeline:
"""保存到 CSV 文件"""

def __init__(self, file_path):
self.file_path = file_path
self.file = None
self.writer = None

@classmethod
def from_crawler(cls, crawler):
return cls(file_path=crawler.settings.get('CSV_FILE', 'output.csv'))

def open_spider(self, spider):
"""打开文件并写入表头"""
self.file = open(self.file_path, 'w', newline='', encoding='utf-8')
self.writer = csv.writer(self.file)

def close_spider(self, spider):
"""关闭文件"""
self.file.close()

def process_item(self, item, spider):
"""写入行"""
if self.writer:
# 首次写入表头
if len(self.file.tell() == 0 if hasattr(self.file, 'tell') else False):
self.writer.writerow(item.keys())
self.writer.writerow(item.values())
return item


class MySQLPipeline:
"""保存到 MySQL"""

def __init__(self, mysql_config):
self.mysql_config = mysql_config

@classmethod
def from_crawler(cls, crawler):
return cls(
mysql_config={
'host': crawler.settings.get('MYSQL_HOST', 'localhost'),
'user': crawler.settings.get('MYSQL_USER', 'root'),
'password': crawler.settings.get('MYSQL_PASSWORD', ''),
'database': crawler.settings.get('MYSQL_DATABASE', 'scrapy'),
}
)

def open_spider(self, spider):
"""连接数据库"""
import pymysql
self.connection = pymysql.connect(**self.mysql_config)
self.cursor = self.connection.cursor()

def close_spider(self, spider):
"""关闭连接"""
self.connection.close()

def process_item(self, item, spider):
"""插入数据"""
sql = """
INSERT INTO products (name, price, url)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE price=VALUES(price)
"""
self.cursor.execute(sql, (
item.get('name'),
item.get('price'),
item.get('url'),
))
self.connection.commit()
return item


class MongoDBPipeline:
"""保存到 MongoDB"""

def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db

@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI', 'mongodb://localhost:27017'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'scrapy'),
)

def open_spider(self, spider):
"""连接 MongoDB"""
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]

def close_spider(self, spider):
"""关闭连接"""
self.client.close()

def process_item(self, item, spider):
"""插入文档"""
collection_name = item.__class__.__name__
self.db[collection_name].insert_one(ItemAdapter(item).asdict())
return item

启用 Pipeline:

# settings.py

ITEM_PIPELINES = {
'myproject.pipelines.DataCleaningPipeline': 100,
'myproject.pipelines.ValidationPipeline': 200,
'myproject.pipelines.DuplicatePipeline': 300,
'myproject.pipelines.JsonWriterPipeline': 800,
'myproject.pipelines.MySQLPipeline': 900,
}

数据存储

JSON

# 方式 1:命令行输出
scrapy crawl myspider -o output.json

# 方式 2:指定编码
scrapy crawl myspider -o output.json -s FEED_EXPORT_ENCODING=utf-8

# 方式 3:JSON Lines(每行一个 JSON 对象)
scrapy crawl myspider -o output.jsonl

CSV

# 基本用法
scrapy crawl myspider -o output.csv

# 指定分隔符
scrapy crawl myspider -o output.csv -s FEED_EXPORT_DELIMITER=;

# 指定字段
scrapy crawl myspider -o output.csv -s FEED_EXPORT_FIELDS=name,price,url

XML

# 输出 XML
scrapy crawl myspider -o output.xml

# 指定根元素名称
scrapy crawl myspider -o output.xml -s FEED_EXPORT_INDENT=2

MySQL

# pipelines.py

import pymysql

class MySQLPipeline:
def __init__(self, mysql_config):
self.mysql_config = mysql_config

@classmethod
def from_crawler(cls, crawler):
return cls(
mysql_config={
'host': crawler.settings.get('MYSQL_HOST', 'localhost'),
'port': int(crawler.settings.get('MYSQL_PORT', 3306)),
'user': crawler.settings.get('MYSQL_USER', 'root'),
'password': crawler.settings.get('MYSQL_PASSWORD', ''),
'database': crawler.settings.get('MYSQL_DATABASE', 'scrapy'),
'charset': 'utf8mb4',
}
)

def open_spider(self, spider):
"""建立数据库连接"""
self.connection = pymysql.connect(**self.mysql_config)
self.cursor = self.connection.cursor()

# 创建表(如果不存在)
self.create_table()

def create_table(self):
"""创建数据表"""
sql = """
CREATE TABLE IF NOT EXISTS products (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
price DECIMAL(10, 2),
description TEXT,
url VARCHAR(500),
sku VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY unique_url (url)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
self.cursor.execute(sql)

def close_spider(self, spider):
"""关闭连接"""
self.connection.close()

def process_item(self, item, spider):
"""插入或更新数据"""
sql = """
INSERT INTO products (name, price, description, url, sku)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
name=VALUES(name),
price=VALUES(price),
description=VALUES(description),
updated_at=CURRENT_TIMESTAMP
"""
try:
self.cursor.execute(sql, (
item.get('name'),
item.get('price'),
item.get('description'),
item.get('url'),
item.get('sku'),
))
self.connection.commit()
except Exception as e:
spider.logger.error(f'MySQL error: {e}')
self.connection.rollback()
return item

MongoDB

# pipelines.py

import pymongo
from itemadapter import ItemAdapter

class MongoDBPipeline:
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db

@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI', 'mongodb://localhost:27017'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'scrapy'),
)

def open_spider(self, spider):
"""连接 MongoDB"""
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]

def close_spider(self, spider):
"""关闭连接"""
self.client.close()

def process_item(self, item, spider):
"""插入或更新文档"""
collection_name = item.__class__.__name__.replace('Item', '').lower()
collection = self.db[collection_name]

# 使用 URL 作为唯一标识
url = item.get('url')
if url:
collection.update_one(
{'url': url},
{'$set': ItemAdapter(item).asdict()},
upsert=True
)
else:
collection.insert_one(ItemAdapter(item).asdict())

return item

请求设置

Headers

# settings.py

# 默认请求头
DEFAULT_REQUEST_HEADERS = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
}

# 自定义 User-Agent
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'

在 Spider 中设置:

def start_requests(self):
"""自定义请求头"""
headers = {
'User-Agent': 'Custom User-Agent',
'Referer': 'https://example.com',
'Accept': 'application/json',
}
for url in self.start_urls:
yield scrapy.Request(url, headers=headers, callback=self.parse)

Cookies

# 方式 1:在 settings 中设置
COOKIES_ENABLED = True # 启用 Cookie
COOKIES_DEBUG = True # 调试 Cookie

# 方式 2:在 Request 中设置
def start_requests(self):
yield scrapy.Request(
url='http://example.com',
cookies={'session_id': 'abc123', 'token': 'xyz'},
callback=self.parse
)

# 方式 3:从响应中获取 Cookie 并使用
def parse(self, response):
# 获取 Cookie
cookies = response.headers.get('Set-Cookie')
# 在后续请求中使用
yield scrapy.Request(
url='http://example.com/protected',
cookies=response.request.cookies, # 使用当前 Cookie
callback=self.parse_protected
)

# 方式 4:使用 CookieJar
def parse(self, response):
# 保存所有 Cookie
yield scrapy.Request(
url='http://example.com/page2',
meta={'cookiejar': response.meta.get('cookiejar')},
callback=self.parse_page2
)

代理

# settings.py

# 代理设置
PROXY_LIST = [
'http://proxy1.com:8080',
'http://proxy2.com:8080',
'http://proxy3.com:8080',
]

# 禁用 HTTP/1.1(某些代理需要)
DOWNLOAD_TIMEOUT = 30

代理中间件:

# middlewares.py
import random

class ProxyMiddleware:
"""代理中间件"""

def __init__(self, proxy_list):
self.proxy_list = proxy_list

@classmethod
def from_crawler(cls, crawler):
return cls(
proxy_list=crawler.settings.get('PROXY_LIST', [])
)

def process_request(self, request, spider):
"""为请求设置代理"""
if self.proxy_list:
proxy = random.choice(self.proxy_list)
request.meta['proxy'] = proxy
spider.logger.info(f'Using proxy: {proxy}')

User-Agent

# 方式 1:固定 User-Agent
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'

# 方式 2:User-Agent 池
USER_AGENT_LIST = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)',
'Mozilla/5.0 (X11; Linux x86_64)',
]

# 中间件中随机选择
class RandomUserAgentMiddleware:
def __init__(self, user_agents):
self.user_agents = user_agents

@classmethod
def from_crawler(cls, crawler):
return cls(
user_agents=crawler.settings.get('USER_AGENT_LIST', [])
)

def process_request(self, request, spider):
"""设置随机 User-Agent"""
if self.user_agents:
request.headers['User-Agent'] = random.choice(self.user_agents)

下载延迟

# settings.py

# 下载延迟(秒)
DOWNLOAD_DELAY = 2 # 每个请求延迟 2 秒

# 随机延迟
RANDOMIZE_DOWNLOAD_DELAY = True # 在 DOWNLOAD_DELAY 基础上随机 0.5-1.5 倍

# 并发请求数
CONCURRENT_REQUESTS = 16 # 全局并发
CONCURRENT_REQUESTS_PER_DOMAIN = 8 # 每个域名并发
CONCURRENT_REQUESTS_PER_IP = 8 # 每个 IP 并发

# 下载超时
DOWNLOAD_TIMEOUT = 180 # 默认 180 秒

CrawlSpider

Rule

CrawlSpider 通过规则自动爬取网站:

from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor

class MyCrawlSpider(CrawlSpider):
"""自动爬取 Spider"""
name = 'mycrawler'
allowed_domains = ['example.com']
start_urls = ['http://example.com/']

# 定义爬取规则
rules = (
# Rule 1:提取产品页链接,用 parse_item 处理
Rule(
LinkExtractor(allow=r'/product/\d+'), # 允许的 URL 模式
callback='parse_item', # 回调函数
follow=False, # 是否继续跟踪
),

# Rule 2:提取分类页链接,继续跟踪
Rule(
LinkExtractor(allow=r'/category/'), # 允许的 URL
deny=r'/admin/', # 排除的 URL
follow=True, # 继续跟踪
),

# Rule 3:提取分页链接
Rule(
LinkExtractor(
allow=r'/page/\d+', # 匹配分页
restrict_xpaths=['//div[@class="pagination"]'], # 限定区域
),
follow=True,
),
)

def parse_item(self, response):
"""解析商品页"""
yield {
'name': response.css('h1::text').get(),
'price': response.css('.price::text').get(),
'url': response.url,
}

LinkExtractor

LinkExtractor 用于从页面提取链接:

from scrapy.linkextractors import LinkExtractor

# 基本用法
le = LinkExtractor()

# 提取所有链接
links = le.extract_links(response)
for link in links:
print(link.url) # 链接 URL
print(link.text) # 链接文本

# 常用参数
le = LinkExtractor(
allow=r'/product/', # 允许的 URL 模式(正则)
deny=r'/admin/', # 排除的 URL 模式
allow_domains=('example.com',), # 允许的域名
deny_domains=('spam.com',), # 排除的域名
restrict_xpaths=('//div[@class="content"]',), # 限定 XPath 区域
restrict_css=('.content',), # 限定 CSS 区域
tags=('a', 'area'), # 考虑的标签
attrs=('href',), # 考虑的属性
canonicalize=True, # 规范化 URL
unique=True, # 去重
process_value=None, # 处理值的函数
)

# 在 CrawlSpider 中使用
rules = (
Rule(
LinkExtractor(
allow=r'/articles/',
restrict_xpaths=['//div[@class="article-list"]'],
),
callback='parse_article',
follow=False,
),
)

分布式爬虫

scrapy-redis

安装:

pip install scrapy-redis

配置:

# settings.py

# 使用 scrapy-redis 调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"

# 去重(使用 Redis)
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"

# Redis 配置
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0

# 不清除 Redis 队列(支持暂停/恢复)
SCHEDULER_PERSIST = True

# 优先级队列
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'

# 序列化
SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat"

Spider:

from scrapy_redis.spiders import RedisSpider

class MyRedisSpider(RedisSpider):
"""Redis Spider"""
name = 'myredis'
redis_key = 'mycrawler:start_urls' # Redis 列表键名

# 可选:允许的域名
allowed_domains = ['example.com']

def parse(self, response):
"""解析响应"""
yield {
'url': response.url,
'title': response.css('title::text').get(),
}

# 继续添加 URL 到队列
for link in response.css('a::attr(href)').getall():
yield scrapy.Request(response.urljoin(link), callback=self.parse)

运行:

# 方式 1:运行爬虫(等待从 Redis 获取 URL)
scrapy crawl myredis

# 方式 2:向 Redis 添加起始 URL
redis-cli LPUSH mycrawler:start_urls http://example.com

# 方式 3:使用 Python 添加 URL
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
r.lpush('mycrawler:start_urls', 'http://example.com/page1')
r.lpush('mycrawler:start_urls', 'http://example.com/page2')

Redis队列

# 手动管理 Redis 队列

import redis
import json

class RedisQueue:
"""Redis 队列管理"""

def __init__(self, host='localhost', port=6379, db=0):
self.redis = redis.StrictRedis(host=host, port=port, db=db)
self.queue_key = 'scrapy:queue'

def push(self, url, meta=None):
"""添加 URL 到队列"""
data = {'url': url, 'meta': meta or {}}
self.redis.rpush(self.queue_key, json.dumps(data))

def pop(self):
"""从队列弹出 URL"""
data = self.redis.lpop(self.queue_key)
if data:
return json.loads(data)
return None

def size(self):
"""队列大小"""
return self.redis.llen(self.queue_key)

# 使用示例
queue = RedisQueue()
queue.push('http://example.com/page1', meta={'priority': 1})
queue.push('http://example.com/page2', meta={'priority': 2})

print(f'Queue size: {queue.size()}')

反爬策略

User-Agent池

# middlewares.py
import random

USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
]

class RandomUserAgentMiddleware:
"""随机 User-Agent"""

def process_request(self, request, spider):
request.headers['User-Agent'] = random.choice(USER_AGENTS)

代理池

# middlewares.py

class ProxyPoolMiddleware:
"""代理池中间件"""

def __init__(self, proxy_list):
self.proxy_list = proxy_list
self.current_proxy = None
self.failed_count = 0

@classmethod
def from_crawler(cls, crawler):
return cls(
proxy_list=crawler.settings.get('PROXY_LIST', [])
)

def process_request(self, request, spider):
"""设置代理"""
if self.proxy_list:
self.current_proxy = random.choice(self.proxy_list)
request.meta['proxy'] = self.current_proxy
spider.logger.info(f'Using proxy: {self.current_proxy}')

def process_response(self, request, response, spider):
"""检查响应"""
if response.status != 200:
self.failed_count += 1
if self.failed_count > 3:
# 更换代理
spider.logger.info(f'Proxy failed: {self.current_proxy}')
self.failed_count = 0
else:
self.failed_count = 0
return response

从 API 获取代理:

import requests

class DynamicProxyMiddleware:
"""动态代理中间件"""

def __init__(self, proxy_api):
self.proxy_api = proxy_api
self.proxies = []
self.fetch_proxies()

def fetch_proxies(self):
"""从 API 获取代理"""
try:
response = requests.get(self.proxy_api)
self.proxies = response.json().get('proxies', [])
except Exception as e:
self.logger.error(f'Failed to fetch proxies: {e}')

def process_request(self, request, spider):
"""使用动态代理"""
if self.proxies:
proxy = random.choice(self.proxies)
request.meta['proxy'] = f"http://{proxy['ip']}:{proxy['port']}"

IP限制

# settings.py

# 并发控制
CONCURRENT_REQUESTS_PER_DOMAIN = 2 # 每个域名的并发数
CONCURRENT_REQUESTS_PER_IP = 2 # 每个 IP 的并发数
DOWNLOAD_DELAY = 3 # 下载延迟

# 自动限速
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 2
AUTOTHROTTLE_MAX_DELAY = 10
AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
AUTOTHROTTLE_DEBUG = False

限速中间件:

class DomainThrottleMiddleware:
"""域名限速中间件"""

def __init__(self):
self.domain_requests = {}
self.last_request_time = {}

def process_request(self, request, spider):
"""限制每个域名的请求频率"""
from urllib.parse import urlparse
domain = urlparse(request.url).netloc

# 检查是否需要延迟
if domain in self.last_request_time:
elapsed = time.time() - self.last_request_time[domain]
if elapsed < 2: # 至少间隔 2 秒
time.sleep(2 - elapsed)

self.last_request_time[domain] = time.time()

验证码

# 方式 1:手动处理(暂停爬虫)

class CaptchaMiddleware:
"""验证码中间件"""

def process_response(self, request, response, spider):
"""检测验证码"""
if 'captcha' in response.text.lower():
spider.logger.warning(f'Captcha detected: {request.url}')

# 保存页面供人工查看
with open('captcha.html', 'w') as f:
f.write(response.text)

# 暂停爬虫,等待人工处理
raise scrapy.exceptions.CloseSpider('captcha_found')

return response

# 方式 2:使用第三方服务
import requests

class CaptchaSolver:
"""验证码解决器(示例)"""

def solve_captcha(self, image_url):
"""调用验证码识别 API"""
# 使用 2captcha、Anti-Captcha 等服务
api_url = 'http://captcha-service.com/solve'
response = requests.post(api_url, json={'image': image_url})
return response.json().get('solution')

日志与调试

# Spider 中使用日志

import scrapy

class MySpider(scrapy.Spider):
name = 'myspider'

def parse(self, response):
# 使用 logger
self.logger.info(f'Processing: {response.url}')
self.logger.debug(f'Response status: {response.status}')
self.logger.warning('Possible issue detected')
self.logger.error('Error occurred')

# 使用 print(调试用)
print(f'Debug: {response.url}')

# Scrapy shell 测试
from scrapy.shell import inspect_response
inspect_response(response, self)

配置:

# settings.py

# 日志级别
LOG_LEVEL = 'INFO' # DEBUG, INFO, WARNING, ERROR, CRITICAL

# 日志文件
LOG_FILE = 'scrapy.log'

# 日志格式
LOG_FORMAT = '%(asctime)s [%(name)s] %(levelname)s: %(message)s'
LOG_DATEFORMAT = '%Y-%m-%d %H:%M:%S'

# 统计信息
STATS_CLASS = 'scrapy.statscollectors.StatsCollector'

部署

Scrapyd

安装和运行:

# 安装
pip install scrapyd

# 启动服务
scrapyd

# 访问
# http://localhost:6800

部署项目:

# 安装 scrapyd-client
pip install scrapyd-client

# 部署
scrapyd-deploy

# 部署到指定目标
scrapyd-deploy target -p projectname

API 使用:

# 启动爬虫
curl http://localhost:6800/schedule.json \
-d project=myproject \
-d spider=myspider

# 取消爬虫
curl http://localhost:6800/cancel.json \
-d project=myproject \
-d job=jobid

# 查看状态
curl http://localhost:6800/list_jobs.json?project=myproject

# 查看日志
curl http://localhost:6800/logs/myproject/myspider/jobid.log

Scrapy Cloud

# 安装
pip install shub

# 登录
shub login

# 部署
shub deploy

# 部署到特定项目
shub deploy 12345

# 查看日志
shub logs 12345

# 运行爬虫
shub schedule 12345 myspider

最佳实践

1. 遵守 robots.txt

# settings.py
ROBOTSTXT_OBEY = True

2. 设置合理的延迟

# settings.py
DOWNLOAD_DELAY = 2
AUTOTHROTTLE_ENABLED = True

3. 使用 Item Pipeline

# 数据处理放在 Pipeline,而不是 Spider
# Pipeline 负责:验证、清洗、存储

4. 错误处理

def parse(self, response):
try:
data = response.css('.data::text').get()
yield {'data': data}
except Exception as e:
self.logger.error(f'Error parsing {response.url}: {e}')

5. 限流和重试

# settings.py
RETRY_ENABLED = True
RETRY_TIMES = 2
RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 429]

6. 数据去重

# 使用 URL 去重
DUPEFILTER_CLASS = 'scrapy.dupefilters.RFPDupeFilter'

# 自定义去重
class CustomDupeFilter:
def __init__(self):
self.seen = set()

def request_seen(self, request):
fp = request.url + request.meta.get('data', '')
if fp in self.seen:
return True
self.seen.add(fp)
return False

7. 分模块开发

project/
├── spiders/
│ ├── products.py # 产品爬虫
│ ├── articles.py # 文章爬虫
│ └── __init__.py
├── items.py # Item 定义
├── pipelines.py # 数据处理
├── middlewares.py # 中间件
└── utils.py # 工具函数

8. 监控和日志

# 记录统计信息
def closed(self, reason):
self.logger.info(f'Items scraped: {len(self.items)}')
self.logger.info(f'Total requests: {self.crawler.stats.get_value(\'downloader/request_count\')}')

法律与道德

法律合规

  1. 遵守 robots.txt:尊重网站爬虫协议
  2. 版权法:注意抓取内容的使用
  3. 计算机欺诈和滥用法:未经授权访问可能违法
  4. 服务条款:遵守网站使用条款

道德准则

  1. 识别自己:使用合适的 User-Agent
  2. 合理频率:避免对服务器造成压力
  3. 尊重隐私:不抓取个人敏感信息
  4. 用途正当:合法使用爬取数据

最佳实践

# 1. 设置标识
USER_AGENT = 'MyBot (+http://mysite.com/bot)'

# 2. 遵守规则
ROBOTSTXT_OBEY = True

# 3. 控制速率
DOWNLOAD_DELAY = 2
CONCURRENT_REQUESTS = 8

# 4. 尊重隐私
# 不抓取个人信息、密码、私人通信

# 5. 联系网站
# 大规模爬取前,联系网站所有者

通过 Scrapy,你可以构建强大、高效、可扩展的爬虫系统,但务必遵守法律和道德准则。