Scrapy
简介
Scrapy 是一个强大的 Python 爬虫框架,用于快速、高效地抓取网站数据。主要特点:
- 高性能:异步网络框架,支持并发请求
- 可扩展:中间件、管道系统,易于扩展功能
- 易维护:结构清晰,代码复用性高
- 功能丰富:自动限速、重试、代理、Cookie 管理等
核心组件:
- Engine:引擎,控制数据流
- Scheduler:调度器,管理请求队列
- Downloader:下载器,抓取网页
- Spiders:爬虫,解析数据
- Item Pipeline:管道,处理数据
- Middlewares:中间件,处理请求/响应
快速开始
安装
# 安装 Scrapy
pip install scrapy
# 验证安装
scrapy version
# 可选:安装额外依赖
pip install scrapy-splash # JavaScript 渲染
pip install scrapy-redis # 分布式爬虫
pip install pillow # 图片处理
创建项目
# 创建新项目
scrapy startproject myproject
# 项目结构
# myproject/
# ├── scrapy.cfg # 项目配置文件
# └── myproject/ # 项目 Python 模块
# ├── __init__.py
# ├── items.py # Item 定义
# ├── middlewares.py # 中间件
# ├── pipelines.py # 管道
# ├── settings.py # 设置
# └── spiders/ # 爬虫目录
# ├── __init__.py
# └── example.py # 示例爬虫
# 创建爬虫
cd myproject
scrapy genspider example example.com
# 查看可用模板
scrapy genspider -l
# 使用特定模板创建爬虫
scrapy genspider -t crawl myspider example.com
第一个Spider
# spiders/quotes_spider.py
import scrapy
class QuotesSpider(scrapy.Spider):
"""爬虫类"""
name = 'quotes' # 爬虫名称(唯一标识)
allowed_domains = ['quotes.toscrape.com'] # 允许的域名
start_urls = ['http://quotes.toscrape.com/'] # 起始 URL
def parse(self, response):
"""解析响应的默认回调函数"""
# 提取数据
for quote in response.css('div.quote'):
yield {
'text': quote.css('span.text::text').get(),
'author': quote.css('span small::text').get(),
'tags': quote.css('div.tags a.tag::text').getall(),
}
# 翻页
next_page = response.css('li.next a::attr(href)').get()
if next_page is not None:
yield response.follow(next_page, callback=self.parse)
运行爬虫:
# 运行爬虫
scrapy crawl quotes
# 保存到文件
scrapy crawl quotes -o quotes.json
scrapy crawl quotes -o quotes.csv
scrapy crawl quotes -o quotes.xml
# 指定日志级别
scrapy crawl quotes -L INFO
# 输出到特定文件
scrapy crawl quotes -o output/data.json
Spider
Spider类
Spider 是 Scrapy 中用于定义爬取逻辑的类:
import scrapy
class MySpider(scrapy.Spider):
"""基本爬虫类"""
# 爬虫名称(必需)
name = 'myspider'
# 允许的域名(可选)
allowed_domains = ['example.com']
# 起始 URL 列表(可选)
start_urls = ['http://example.com/']
# 自定义设置(可选)
custom_settings = {
'DOWNLOAD_DELAY': 2,
'CONCURRENT_REQUESTS': 1,
}
def start_requests(self):
"""生成初始请求(可选)"""
for url in self.start_urls:
yield scrapy.Request(
url=url,
callback=self.parse,
headers={'User-Agent': 'Custom User-Agent'}
)
def parse(self, response):
"""解析响应"""
# 提取数据或生成新的请求
pass
def closed(self, reason):
"""爬虫关闭时调用"""
self.logger.info(f'Spider closed, reason: {reason}')
常用 Spider 类型:
# 1. Spider - 基础爬虫
from scrapy.spiders import Spider
class BasicSpider(Spider):
name = 'basic'
start_urls = ['http://example.com']
def parse(self, response):
yield {'title': response.css('title::text').get()}
# 2. CrawlSpider - 自动爬取整站
from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor
class MyCrawlSpider(CrawlSpider):
name = 'crawl'
allowed_domains = ['example.com']
start_urls = ['http://example.com']
rules = (
Rule(LinkExtractor(allow=r'/page/\d+'), callback='parse_item'),
Rule(LinkExtractor(allow=r'/category/'), follow=True),
)
def parse_item(self, response):
yield {'url': response.url}
# 3. XMLFeedSpider - XML/Atom/RSS
from scrapy.spiders import XMLFeedSpider
class MyXMLSpider(XMLFeedSpider):
name = 'xml'
iterator = 'iternodes' # 'iternodes', 'html', 'xml'
itertag = 'item'
def parse_node(self, response, node):
yield {'title': node.xpath('./title/text()').get()}
# 4. SitemapSpider - 站点地图
from scrapy.spiders import SitemapSpider
class MySitemapSpider(SitemapSpider):
name = 'sitemap'
sitemap_urls = ['http://example.com/sitemap.xml']
def parse(self, response):
yield {'url': response.url}
parse方法
parse 方法是处理响应的回调函数:
def parse(self, response):
"""解析响应"""
# 1. 提取数据
title = response.css('title::text').get()
items = response.css('div.item')
for item in items:
yield {
'name': item.css('h2::text').get(),
'price': item.css('.price::text').get(),
'url': response.url,
}
# 2. 生成新请求
# 方式 1:使用 response.follow
next_page = response.css('.next::attr(href)').get()
if next_page:
yield response.follow(next_page, callback=self.parse)
# 方式 2:使用 scrapy.Request
detail_url = response.css('.detail::attr(href)').get()
if detail_url:
yield scrapy.Request(
response.urljoin(detail_url),
callback=self.parse_detail
)
# 3. 传递数据给回调
yield response.follow(
next_page,
callback=self.parse_with_meta,
meta={'item': {'name': 'Product'}}
)
def parse_detail(self, response):
"""解析详情页"""
# 获取传递的数据
item = response.meta.get('item')
item['description'] = response.css('.desc::text').get()
yield item
def parse_with_meta(self, response):
"""使用传递的元数据"""
item = response.meta['item']
# 处理数据...
yield item
Response对象
Response 对象包含服务器返回的响应数据:
def parse(self, response):
"""Response 对象的常用属性和方法"""
# URL 相关
print(response.url) # 当前 URL
print(response.status) # HTTP 状态码
print(response.headers) # 响应头
print(response.request) # 对应的 Request 对象
# 内容
print(response.text) # 响应文本(自动解码)
print(response.body) # 响应字节
print(response.encoding) # 编码
# 选择器
response.css('h1::text') # CSS 选择器
response.xpath('//h1') # XPath 选择器
# URL 操作
response.urljoin('/page/2') # 相对 URL 转绝对 URL
response.follow('/next') # 创建新请求
# 其他
response.css('a::attr(href)').get() # 获取第一个
response.css('a::attr(href)').getall() # 获取所有
response.css('a').get() # 获取第一个元素
response.css('a').getall() # 获取所有元素
选择器
XPath
def parse(self, response):
"""XPath 选择器"""
# 基本选择
response.xpath('//h1') # 所有 h1
response.xpath('//div[@class="content"]') # class 为 content 的 div
response.xpath('//div[@id="main"]') # id 为 main 的 div
# 文本提取
response.xpath('//h1/text()') # h1 的直接文本
response.xpath('//h1//text()') # h1 的所有文本(包括后代)
response.xpath('string(//h1)') # h1 的所有文本拼接
# 属性提取
response.xpath('//a/@href') # 所有链接的 href
response.xpath('//img/@src') # 图片 src
response.xpath('//div/@class') # class 属性
# 层级选择
response.xpath('//div/p') # div 下的直接 p
response.xpath('//div//p') # div 下的所有 p
response.xpath('//div[@class="main"]/h1') # class=main 的 div 下的 h1
# 条件选择
response.xpath('//a[contains(@class, "btn")]') # class 包含 btn
response.xpath('//a[starts-with(@href, "http")]') # href 以 http 开头
response.xpath('//div[position() < 3]') # 前两个 div
# 轴(Axes)
response.xpath('//div/following-sibling::div') # 后面的兄弟 div
response.xpath('//div/preceding-sibling::div') # 前面的兄弟 div
response.xpath('//a/parent::div') # a 的父 div
response.xpath('//div/ancestor::body') # div 的祖先 body
CSS选择器
def parse(self, response):
"""CSS 选择器"""
# 基本选择
response.css('h1') # 所有 h1
response.css('.content') # class 为 content
response.css('#main') # id 为 main
response.css('div.content') # div 且 class=content
# 属性选择
response.css('a[href]') # 有 href 的 a
response.css('a[href="http://"]') # href 等于 http://
response.css('a[href^="http"]') # href 以 http 开头
response.css('a[href$=".pdf"]') # href 以 .pdf 结尾
response.css('a[href*="example"]') # href 包含 example
# 层级选择
response.css('div > p') # div 的直接子元素 p
response.css('div p') # div 的所有后代 p
response.css('div.main > h1') # class=main 的 div 下的直接 h1
# 伪类
response.css('li:first-child') # 第一个 li
response.css('li:last-child') # 最后一个 li
response.css('li:nth-child(2)') # 第二个 li
response.css('a:nth-of-type(3)') # 同类型中的第三个
# 组合选择
response.css('h1, h2, h3') # 所有 h1, h2, h3
# 提取文本和属性
response.css('h1::text') # 文本
response.css('a::attr(href)') # href 属性
response.css('div::text') # 所有文本节点
正则表达式
import re
def parse(self, response):
"""使用正则表达式提取数据"""
# 方式 1:使用 re 模块
html = response.text
titles = re.findall(r'<h2>(.*?)</h2>', html)
emails = re.findall(r'[\w.+-]+@[\w-]+\.[\w.-]+', html)
# 方式 2:使用 Scrapy 的 .re() 方法
response.css('div.price::text').re(r'\$\d+\.\d{2}')
response.xpath('//div[@class="price"]/text()').re(r'(\d+\.\d{2})')
# 方式 3:使用 .re_first() 获取第一个匹配
price = response.css('.price::text').re_first(r'\$\d+\.\d{2}')
# 提取链接中的 ID
product_id = response.css('a::attr(href)').re_first(r'/product/(\d+)/')
# 组合使用
for div in response.css('div.item'):
text = div.css('::text').get()
numbers = re.findall(r'\d+', text)
yield {'numbers': numbers}
数据提取
提取文本
def parse(self, response):
"""提取文本的多种方法"""
# 1. get() - 获取第一个
title = response.css('h1::text').get()
# title = None 如果没有匹配
# 2. getall() - 获取所有
paragraphs = response.css('p::text').getall()
# paragraphs = ['text1', 'text2', ...]
# 3. 使用默认值
title = response.css('h1::text').get() or 'Default Title'
# 4. 提取并清理文本
text = response.css('div.content::text').get()
if text:
text = text.strip() # 去除首尾空白
# 5. 提取所有文本(包括后代)
all_text = response.xpath('string(//div[@class="content"])').get()
# 6. 提取多个文本并拼接
items = response.css('p::text').getall()
combined_text = ' '.join(items).strip()
# 7. 处理换行和空白
text = ' '.join(response.css('div::text').getall()).split()
提取属性
def parse(self, response):
"""提取元素属性"""
# 1. 提取单个属性
href = response.css('a::attr(href)').get()
src = response.xpath('//img/@src').get()
# 2. 提取多个属性
all_hrefs = response.css('a::attr(href)').getall()
# 3. 提取多个属性
for link in response.css('a'):
yield {
'href': link.css('::attr(href)').get(),
'text': link.css('::text').get(),
'title': link.css('::attr(title)').get(),
}
# 4. 提取 data-* 属性
data_id = response.css('div::attr(data-id)').get()
# 5. 提取 class 属性
classes = response.css('div::attr(class)').get()
# 6. 提取 style 属性
style = response.css('div::attr(style)').get()
提取链接
def parse(self, response):
"""提取和清理链接"""
# 1. 获取所有链接
links = response.css('a::attr(href)').getall()
# 2. 转换为绝对 URL
for link in links:
absolute_url = response.urljoin(link)
yield {'url': absolute_url}
# 3. 使用 response.follow(推荐)
for href in response.css('a::attr(href)').getall():
yield response.follow(href, callback=self.parse_detail)
# 4. 提取特定模式的链接
for link in response.css('a::attr(href)').getall():
if '/product/' in link:
yield response.follow(link, callback=self.parse_product)
# 5. 使用正则过滤链接
import re
for link in response.css('a::attr(href)').getall():
if re.match(r'/product/\d+', link):
yield response.follow(link, callback=self.parse_product)
# 6. 处理相对路径
relative_links = response.css('a[href^="/"]::attr(href)').getall()
for link in relative_links:
yield {'url': response.urljoin(link)}
Item
定义Item
Item 是保存抓取数据的容器:
# items.py
import scrapy
class ProductItem(scrapy.Item):
"""定义商品 Item"""
# 字段定义
name = scrapy.Field() # 商品名称
price = scrapy.Field() # 价格
description = scrapy.Field() # 描述
image_urls = scrapy.Field() # 图片 URL 列表
images = scrapy.Field() # 下载后的图片
url = scrapy.Field() # 商品链接
sku = scrapy.Field() # SKU
stock = scrapy.Field() # 库存
ratings = scrapy.Field() # 评分
reviews = scrapy.Field() # 评论数
class ArticleItem(scrapy.Item):
"""文章 Item"""
title = scrapy.Field()
content = scrapy.Field()
author = scrapy.Field()
publish_date = scrapy.Field()
tags = scrapy.Field()
category = scrapy.Field()
使用 Item:
# spiders/product_spider.py
from myproject.items import ProductItem
def parse(self, response):
"""创建和返回 Item"""
item = ProductItem()
# 方式 1:直接赋值
item['name'] = response.css('h1::text').get()
item['price'] = response.css('.price::text').get()
item['url'] = response.url
# 方式 2:字典式
item = ProductItem({
'name': response.css('h1::text').get(),
'price': response.css('.price::text').get(),
'url': response.url,
})
# 方式 3:批量赋值
item = ProductItem()
item.update({
'name': response.css('h1::text').get(),
'price': response.css('.price::text').get(),
})
yield item
Item Loader
Item Loader 提供了一种便捷的方式来填充 Item:
# items.py
from scrapy.loader import ItemLoader
from scrapy.loader.processors import TakeFirst, MapCompose, Join
def clean_price(value):
"""清理价格"""
if value:
return float(value.replace('$', '').replace(',', '').strip())
return None
def strip_whitespace(value):
"""去除空白"""
return value.strip() if value else value
class ProductItemLoader(ItemLoader):
"""商品 Item Loader"""
# 默认输出处理器:取第一个值
default_output_processor = TakeFirst()
# 自定义字段处理器
name_in = MapCompose(strip_whitespace)
price_in = MapCompose(strip_whitespace, clean_price)
tags_out = Join(',') # 列表用逗号连接
使用 Item Loader:
from myproject.items import ProductItem, ProductItemLoader
def parse(self, response):
"""使用 Item Loader"""
# 创建 Loader
loader = ProductItemLoader(item=ProductItem(), response=response)
# 添加值
loader.add_css('name', 'h1::text')
loader.add_css('price', '.price::text')
loader.add_css('description', '.description::text')
loader.add_value('url', response.url)
loader.add_xpath('sku', '//span[@class="sku"]/@data-id')
# 加载并返回 Item
yield loader.load_item()
Field Processors
字段处理器用于处理和清理数据:
from scrapy.loader.processors import (
TakeFirst, MapCompose, Join, Compose
)
import re
def remove_duplicates(values):
"""去重"""
return list(set(values))
def extract_number(text):
"""提取数字"""
match = re.search(r'\d+', text)
return match.group() if match else text
def parse_tags(tags):
"""解析标签"""
return [tag.strip().lower() for tag in tags if tag.strip()]
# items.py
class ProductItem(scrapy.Item):
name = scrapy.Field(
input_processor=MapCompose(str.strip, str.title),
output_processor=TakeFirst()
)
price = scrapy.Field(
input_processor=MapCompose(extract_number, float),
output_processor=TakeFirst()
)
tags = scrapy.Field(
input_processor=MapCompose(str.strip),
output_processor=Compose(remove_duplicates, sorted)
)
description = scrapy.Field(
input_processor=MapCompose(str.strip),
output_processor=Join(' ')
)
Request与Response
Request对象
Request 对象表示一个 HTTP 请求:
import scrapy
# 创建基本请求
request = scrapy.Request(
url='http://example.com/page',
callback=self.parse,
method='GET',
headers={'User-Agent': 'Custom'},
cookies={'session': 'abc123'},
meta={'key': 'value'}, # 传递数据
encoding='utf-8',
priority=1, # 优先级(数字越大优先级越高)
dont_filter=False, # 是否过滤重复 URL
errback=self.errback_handler, # 错误回调
)
# 传递数据给回调
yield scrapy.Request(
url='http://example.com/detail',
callback=self.parse_detail,
meta={
'item': {'name': 'Product'},
'proxy': 'http://proxy.com:8080',
}
)
def parse_detail(self, response):
"""获取传递的数据"""
item = response.meta['item']
item['detail'] = response.css('.detail::text').get()
yield item
Response对象
def parse(self, response):
"""Response 对象"""
# 基本信息
response.url # URL
response.status # 状态码
response.headers # 响应头
response.body # 字节内容
response.text # 文本内容
response.encoding # 编码
# Request 相关
response.request # 对应的 Request 对象
response.meta # 元数据字典
response.callback # 回调函数
# 选择器
response.css('h1') # CSS 选择器
response.xpath('//h1') # XPath 选择器
# URL 操作
response.urljoin('/other') # 转换为绝对 URL
response.follow(next_url) # 创建新请求
# 检查响应类型
if response.status == 200:
self.logger.info('Success')
# 获取特定头信息
content_type = response.headers.get('Content-Type', b'').decode()
FormRequest
FormRequest 用于提交表单:
from scrapy.http import FormRequest
# 方式 1:直接 POST
yield FormRequest(
url='http://example.com/post',
formdata={
'username': 'user',
'password': 'pass',
},
callback=self.after_post
)
# 方式 2:模拟表单提交
yield FormRequest.from_response(
response,
formdata={'username': 'user', 'password': 'pass'},
clickdata={'type': 'submit', 'name': 'login'},
callback=self.after_login
)
# 方式 3:文件上传
yield FormRequest(
url='http://example.com/upload',
formdata={
'file': ('filename.txt', open('file.txt', 'rb'), 'text/plain')
},
callback=self.after_upload
)
# 登录示例
def parse(self, response):
"""首次访问,获取登录表单"""
return FormRequest.from_response(
response,
formdata={'username': 'myuser', 'password': 'mypass'},
callback=self.after_login
)
def after_login(self, response):
"""登录后的回调"""
if "authentication failed" in response.text:
self.logger.error("Login failed")
return
# 继续爬取
yield scrapy.Request(
url='http://example.com/dashboard',
callback=self.parse_dashboard
)
翻页处理
下一页链接
def parse(self, response):
"""提取列表数据和下一页链接"""
# 提取当前页数据
for product in response.css('div.product'):
yield {
'name': product.css('h2::text').get(),
'price': product.css('.price::text').get(),
}
# 提取下一页链接
# 方式 1:CSS 选择器
next_page = response.css('li.next a::attr(href)').get()
# 方式 2:XPath
next_page = response.xpath('//a[contains(text(), "Next")]/@href').get()
# 方式 3:查找包含特定文本的链接
next_page = response.css('a:contains("Next")::attr(href)').get()
# 方式 4:根据页码构造
current_page = response.meta.get('page', 1)
next_page = f'/page/{current_page + 1}'
# 生成下一页请求
if next_page:
yield response.follow(next_page, callback=self.parse)
回调函数
def parse(self, response):
"""列表页:提取数据和链接"""
# 方式 1:同一个回调处理所有页
for item in response.css('div.item'):
detail_url = item.css('a::attr(href)').get()
yield response.follow(detail_url, callback=self.parse_detail)
# 翻页
next_page = response.css('.next::attr(href)').get()
if next_page:
yield response.follow(next_page, callback=self.parse)
# 方式 2:不同的回调处理详情页
def parse_detail(self, response):
"""详情页:提取详细信息"""
yield {
'title': response.css('h1::text').get(),
'content': response.css('.content::text').get(),
'url': response.url,
}
请求传递
def parse(self, response):
"""列表页:提取数据并传递给详情页"""
for item in response.css('div.item'):
# 提取部分数据
data = {
'name': item.css('h2::text').get(),
'price': item.css('.price::text').get(),
}
# 构造详情页请求,传递数据
detail_url = item.css('a::attr(href)').get()
yield scrapy.Request(
url=response.urljoin(detail_url),
callback=self.parse_detail,
meta={'data': data} # 通过 meta 传递
)
def parse_detail(self, response):
"""详情页:接收传递的数据并补充"""
# 获取列表页传递的数据
data = response.meta.get('data', {})
# 补充详情数据
data.update({
'description': response.css('.desc::text').get(),
'images': response.css('.img::attr(src)').getall(),
'url': response.url,
})
yield data
# 方式 2:传递页码
def parse(self, response):
page = response.meta.get('page', 1)
# 爬取当前页数据
yield {'page': page, 'data': response.css('div.item').getall()}
# 下一页
if page < 10:
yield scrapy.Request(
url=f'http://example.com/page/{page + 1}',
callback=self.parse,
meta={'page': page + 1}
)
中间件
Downloader Middleware
下载器中间件处理请求和响应:
# middlewares.py
class UserAgentMiddleware:
"""自定义 User-Agent 中间件"""
def __init__(self, user_agent):
self.user_agent = user_agent
@classmethod
def from_crawler(cls, crawler):
"""从 settings 获取配置"""
return cls(
user_agent=crawler.settings.get('USER_AGENT')
)
def process_request(self, request, spider):
"""处理请求(发送前)"""
request.headers['User-Agent'] = self.user_agent
return None # 返回 None 继续处理其他中间件
def process_response(self, request, response, spider):
"""处理响应(返回后)"""
# 可以修改响应
if response.status == 404:
self.logger.info(f'404: {request.url}')
return response # 返回 Response 或 Request
def process_exception(self, request, exception, spider):
"""处理异常"""
self.logger.error(f'Exception: {exception}')
# 可以返回新的 Request 继续爬取
return None # 或返回 Request
class ProxyMiddleware:
"""代理中间件"""
def __init__(self, proxy_url):
self.proxy_url = proxy_url
@classmethod
def from_crawler(cls, crawler):
return cls(
proxy_url=crawler.settings.get('PROXY_URL')
)
def process_request(self, request, spider):
"""为请求设置代理"""
if self.proxy_url:
request.meta['proxy'] = self.proxy_url
class RetryMiddleware:
"""重试中间件"""
def __init__(self, retry_times):
self.retry_times = retry_times
@classmethod
def from_crawler(cls, crawler):
return cls(
retry_times=crawler.settings.get('RETRY_TIMES', 2)
)
def process_response(self, request, response, spider):
"""检查响应,决定是否重试"""
if response.status in [500, 502, 503, 504, 408, 429]:
# 检查重试次数
retry_times = request.meta.get('retry_times', 0)
if retry_times < self.retry_times:
self.logger.info(f'Retrying {request.url}')
retry_req = request.copy()
retry_req.meta['retry_times'] = retry_times + 1
return retry_req # 返回 Request 进行重试
return response
启用中间件:
# settings.py
DOWNLOADER_MIDDLEWARES = {
# 数字越小优先级越高
'myproject.middlewares.UserAgentMiddleware': 400,
'myproject.middlewares.ProxyMiddleware': 410,
'myproject.middlewares.RetryMiddleware': 500,
# 禁用默认中间件
'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
}
Spider Middleware
Spider 中间件处理 Spider 的输入输出:
# middlewares.py
class SpiderDepthMiddleware:
"""限制爬取深度"""
def __init__(self, max_depth):
self.max_depth = max_depth
@classmethod
def from_crawler(cls, crawler):
return cls(
max_depth=crawler.settings.get('MAX_DEPTH', 2)
)
def process_start_requests(self, start_requests, spider):
"""处理初始请求"""
for request in start_requests:
request.meta['depth'] = 0
yield request
def process_spider_input(self, response, spider):
"""处理输入到 Spider 的响应"""
depth = response.meta.get('depth', 0)
if depth > self.max_depth:
self.logger.info(f'Max depth {self.max_depth} reached')
# 可以抛出异常停止处理
# raise scrapy.exceptions.CloseSpider('max_depth')
return None
def process_spider_output(self, response, result, spider):
"""处理 Spider 的输出(Item 或 Request)"""
for item in result:
if isinstance(item, scrapy.Request):
# 增加深度
depth = response.meta.get('depth', 0)
item.meta['depth'] = depth + 1
yield item
def process_spider_exception(self, response, exception, spider):
"""处理 Spider 抛出的异常"""
self.logger.error(f'Spider exception: {exception}')
return []
Pipelines
Item Pipeline
Item Pipeline 用于处理 Item:
# pipelines.py
class DataCleaningPipeline:
"""数据清洗管道"""
def process_item(self, item, spider):
"""处理每个 Item"""
# 去除空白
for field, value in item.items():
if isinstance(value, str):
item[field] = value.strip()
# 处理空值
if not item.get('name'):
raise DropItem(f'Missing name in {item}')
return item
class ValidationPipeline:
"""数据验证管道"""
def process_item(self, item, spider):
"""验证数据"""
# 检查必需字段
required_fields = ['name', 'price', 'url']
for field in required_fields:
if not item.get(field):
raise DropItem(f'Missing {field} in {item}')
# 验证价格格式
price = item.get('price')
if price:
try:
item['price'] = float(price.replace('$', ''))
except ValueError:
raise DropItem(f'Invalid price: {price}')
return item
class DuplicatePipeline:
"""去重管道"""
def __init__(self):
self.seen = set()
def process_item(self, item, spider):
"""检查重复"""
# 使用唯一标识去重
identifier = (item.get('url'), item.get('sku'))
if identifier in self.seen:
raise DropItem(f'Duplicate item: {identifier}')
self.seen.add(identifier)
return item
def close_spider(self, spider):
"""爬虫关闭时调用"""
spider.logger.info(f'Total unique items: {len(self.seen)}')
数据清洗
import re
from itemadapter import ItemAdapter
class DataCleaningPipeline:
"""数据清洗"""
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 清洗文本字段
text_fields = ['name', 'description', 'author']
for field in text_fields:
if adapter.get(field):
# 去除多余空白
text = ' '.join(adapter[field].split())
adapter[field] = text
# 清洗数字字段
if adapter.get('price'):
# 提取数字
price_text = adapter['price']
numbers = re.findall(r'\d+\.?\d*', price_text)
if numbers:
adapter['price'] = float(numbers[0])
# 清洗日期
if adapter.get('date'):
date = adapter['date']
adapter['date'] = date.replace('Published:', '').strip()
return item
数据验证
from scrapy.exceptions import DropItem
class ValidationPipeline:
"""数据验证"""
def process_item(self, item, spider):
"""验证 Item 数据"""
# 必填字段检查
if not item.get('title'):
raise DropItem('Missing title')
# 长度检查
title = item['title']
if len(title) < 5 or len(title) > 200:
raise DropItem(f'Invalid title length: {len(title)}')
# 格式检查
if item.get('email'):
email = item['email']
if '@' not in email:
raise DropItem(f'Invalid email: {email}')
# 范围检查
if item.get('price'):
price = float(item['price'])
if price <= 0 or price > 1000000:
raise DropItem(f'Invalid price: {price}')
return item
class TypeConversionPipeline:
"""类型转换"""
def process_item(self, item, spider):
"""转换数据类型"""
# 字符串转数字
numeric_fields = ['price', 'quantity', 'rating']
for field in numeric_fields:
if item.get(field):
try:
item[field] = float(item[field])
except ValueError:
item[field] = None
# 字符串转列表
if item.get('tags'):
if isinstance(item['tags'], str):
item['tags'] = [tag.strip() for tag in item['tags'].split(',')]
# 字符串转日期
if item.get('publish_date'):
from datetime import datetime
item['publish_date'] = datetime.strptime(
item['publish_date'], '%Y-%m-%d'
)
return item
数据存储
import json
import csv
import sqlite3
import pymongo
from itemadapter import ItemAdapter
class JsonWriterPipeline:
"""保存到 JSON 文件"""
def __init__(self, file_path):
self.file_path = file_path
self.file = None
@classmethod
def from_crawler(cls, crawler):
return cls(
file_path=crawler.settings.get('JSON_FILE', 'output.json')
)
def open_spider(self, spider):
"""爬虫开始时打开文件"""
self.file = open(self.file_path, 'w', encoding='utf-8')
self.file.write('[\n')
def close_spider(self, spider):
"""爬虫结束时关闭文件"""
self.file.write('\n]')
self.file.close()
def process_item(self, item, spider):
"""写入 Item"""
adapter = ItemAdapter(item)
line = json.dumps(adapter.asdict(), ensure_ascii=False)
self.file.write(line + ',\n')
return item
class CsvPipeline:
"""保存到 CSV 文件"""
def __init__(self, file_path):
self.file_path = file_path
self.file = None
self.writer = None
@classmethod
def from_crawler(cls, crawler):
return cls(file_path=crawler.settings.get('CSV_FILE', 'output.csv'))
def open_spider(self, spider):
"""打开文件并写入表头"""
self.file = open(self.file_path, 'w', newline='', encoding='utf-8')
self.writer = csv.writer(self.file)
def close_spider(self, spider):
"""关闭文件"""
self.file.close()
def process_item(self, item, spider):
"""写入行"""
if self.writer:
# 首次写入表头
if len(self.file.tell() == 0 if hasattr(self.file, 'tell') else False):
self.writer.writerow(item.keys())
self.writer.writerow(item.values())
return item
class MySQLPipeline:
"""保存到 MySQL"""
def __init__(self, mysql_config):
self.mysql_config = mysql_config
@classmethod
def from_crawler(cls, crawler):
return cls(
mysql_config={
'host': crawler.settings.get('MYSQL_HOST', 'localhost'),
'user': crawler.settings.get('MYSQL_USER', 'root'),
'password': crawler.settings.get('MYSQL_PASSWORD', ''),
'database': crawler.settings.get('MYSQL_DATABASE', 'scrapy'),
}
)
def open_spider(self, spider):
"""连接数据库"""
import pymysql
self.connection = pymysql.connect(**self.mysql_config)
self.cursor = self.connection.cursor()
def close_spider(self, spider):
"""关闭连接"""
self.connection.close()
def process_item(self, item, spider):
"""插入数据"""
sql = """
INSERT INTO products (name, price, url)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE price=VALUES(price)
"""
self.cursor.execute(sql, (
item.get('name'),
item.get('price'),
item.get('url'),
))
self.connection.commit()
return item
class MongoDBPipeline:
"""保存到 MongoDB"""
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI', 'mongodb://localhost:27017'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'scrapy'),
)
def open_spider(self, spider):
"""连接 MongoDB"""
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
"""关闭连接"""
self.client.close()
def process_item(self, item, spider):
"""插入文档"""
collection_name = item.__class__.__name__
self.db[collection_name].insert_one(ItemAdapter(item).asdict())
return item
启用 Pipeline:
# settings.py
ITEM_PIPELINES = {
'myproject.pipelines.DataCleaningPipeline': 100,
'myproject.pipelines.ValidationPipeline': 200,
'myproject.pipelines.DuplicatePipeline': 300,
'myproject.pipelines.JsonWriterPipeline': 800,
'myproject.pipelines.MySQLPipeline': 900,
}
数据存储
JSON
# 方式 1:命令行输出
scrapy crawl myspider -o output.json
# 方式 2:指定编码
scrapy crawl myspider -o output.json -s FEED_EXPORT_ENCODING=utf-8
# 方式 3:JSON Lines(每行一个 JSON 对象)
scrapy crawl myspider -o output.jsonl
CSV
# 基本用法
scrapy crawl myspider -o output.csv
# 指定分隔符
scrapy crawl myspider -o output.csv -s FEED_EXPORT_DELIMITER=;
# 指定字段
scrapy crawl myspider -o output.csv -s FEED_EXPORT_FIELDS=name,price,url
XML
# 输出 XML
scrapy crawl myspider -o output.xml
# 指定根元素名称
scrapy crawl myspider -o output.xml -s FEED_EXPORT_INDENT=2
MySQL
# pipelines.py
import pymysql
class MySQLPipeline:
def __init__(self, mysql_config):
self.mysql_config = mysql_config
@classmethod
def from_crawler(cls, crawler):
return cls(
mysql_config={
'host': crawler.settings.get('MYSQL_HOST', 'localhost'),
'port': int(crawler.settings.get('MYSQL_PORT', 3306)),
'user': crawler.settings.get('MYSQL_USER', 'root'),
'password': crawler.settings.get('MYSQL_PASSWORD', ''),
'database': crawler.settings.get('MYSQL_DATABASE', 'scrapy'),
'charset': 'utf8mb4',
}
)
def open_spider(self, spider):
"""建立数据库连接"""
self.connection = pymysql.connect(**self.mysql_config)
self.cursor = self.connection.cursor()
# 创建表(如果不存在)
self.create_table()
def create_table(self):
"""创建数据表"""
sql = """
CREATE TABLE IF NOT EXISTS products (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
price DECIMAL(10, 2),
description TEXT,
url VARCHAR(500),
sku VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY unique_url (url)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
self.cursor.execute(sql)
def close_spider(self, spider):
"""关闭连接"""
self.connection.close()
def process_item(self, item, spider):
"""插入或更新数据"""
sql = """
INSERT INTO products (name, price, description, url, sku)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
name=VALUES(name),
price=VALUES(price),
description=VALUES(description),
updated_at=CURRENT_TIMESTAMP
"""
try:
self.cursor.execute(sql, (
item.get('name'),
item.get('price'),
item.get('description'),
item.get('url'),
item.get('sku'),
))
self.connection.commit()
except Exception as e:
spider.logger.error(f'MySQL error: {e}')
self.connection.rollback()
return item
MongoDB
# pipelines.py
import pymongo
from itemadapter import ItemAdapter
class MongoDBPipeline:
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI', 'mongodb://localhost:27017'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'scrapy'),
)
def open_spider(self, spider):
"""连接 MongoDB"""
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
"""关闭连接"""
self.client.close()
def process_item(self, item, spider):
"""插入或更新文档"""
collection_name = item.__class__.__name__.replace('Item', '').lower()
collection = self.db[collection_name]
# 使用 URL 作为唯一标识
url = item.get('url')
if url:
collection.update_one(
{'url': url},
{'$set': ItemAdapter(item).asdict()},
upsert=True
)
else:
collection.insert_one(ItemAdapter(item).asdict())
return item
请求设置
Headers
# settings.py
# 默认请求头
DEFAULT_REQUEST_HEADERS = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
}
# 自定义 User-Agent
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
在 Spider 中设置:
def start_requests(self):
"""自定义请求头"""
headers = {
'User-Agent': 'Custom User-Agent',
'Referer': 'https://example.com',
'Accept': 'application/json',
}
for url in self.start_urls:
yield scrapy.Request(url, headers=headers, callback=self.parse)
Cookies
# 方式 1:在 settings 中设置
COOKIES_ENABLED = True # 启用 Cookie
COOKIES_DEBUG = True # 调试 Cookie
# 方式 2:在 Request 中设置
def start_requests(self):
yield scrapy.Request(
url='http://example.com',
cookies={'session_id': 'abc123', 'token': 'xyz'},
callback=self.parse
)
# 方式 3:从响应中获取 Cookie 并使用
def parse(self, response):
# 获取 Cookie
cookies = response.headers.get('Set-Cookie')
# 在后续请求中使用
yield scrapy.Request(
url='http://example.com/protected',
cookies=response.request.cookies, # 使用当前 Cookie
callback=self.parse_protected
)
# 方式 4:使用 CookieJar
def parse(self, response):
# 保存所有 Cookie
yield scrapy.Request(
url='http://example.com/page2',
meta={'cookiejar': response.meta.get('cookiejar')},
callback=self.parse_page2
)
代理
# settings.py
# 代理设置
PROXY_LIST = [
'http://proxy1.com:8080',
'http://proxy2.com:8080',
'http://proxy3.com:8080',
]
# 禁用 HTTP/1.1(某些代理需要)
DOWNLOAD_TIMEOUT = 30
代理中间件:
# middlewares.py
import random
class ProxyMiddleware:
"""代理中间件"""
def __init__(self, proxy_list):
self.proxy_list = proxy_list
@classmethod
def from_crawler(cls, crawler):
return cls(
proxy_list=crawler.settings.get('PROXY_LIST', [])
)
def process_request(self, request, spider):
"""为请求设置代理"""
if self.proxy_list:
proxy = random.choice(self.proxy_list)
request.meta['proxy'] = proxy
spider.logger.info(f'Using proxy: {proxy}')
User-Agent
# 方式 1:固定 User-Agent
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
# 方式 2:User-Agent 池
USER_AGENT_LIST = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)',
'Mozilla/5.0 (X11; Linux x86_64)',
]
# 中间件中随机选择
class RandomUserAgentMiddleware:
def __init__(self, user_agents):
self.user_agents = user_agents
@classmethod
def from_crawler(cls, crawler):
return cls(
user_agents=crawler.settings.get('USER_AGENT_LIST', [])
)
def process_request(self, request, spider):
"""设置随机 User-Agent"""
if self.user_agents:
request.headers['User-Agent'] = random.choice(self.user_agents)
下载延迟
# settings.py
# 下载延迟(秒)
DOWNLOAD_DELAY = 2 # 每个请求延迟 2 秒
# 随机延迟
RANDOMIZE_DOWNLOAD_DELAY = True # 在 DOWNLOAD_DELAY 基础上随机 0.5-1.5 倍
# 并发请求数
CONCURRENT_REQUESTS = 16 # 全局并发
CONCURRENT_REQUESTS_PER_DOMAIN = 8 # 每个域名并发
CONCURRENT_REQUESTS_PER_IP = 8 # 每个 IP 并发
# 下载超时
DOWNLOAD_TIMEOUT = 180 # 默认 180 秒
CrawlSpider
Rule
CrawlSpider 通过规则自动爬取网站:
from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor
class MyCrawlSpider(CrawlSpider):
"""自动爬取 Spider"""
name = 'mycrawler'
allowed_domains = ['example.com']
start_urls = ['http://example.com/']
# 定义爬取规则
rules = (
# Rule 1:提取产品页链接,用 parse_item 处理
Rule(
LinkExtractor(allow=r'/product/\d+'), # 允许的 URL 模式
callback='parse_item', # 回调函数
follow=False, # 是否继续跟踪
),
# Rule 2:提取分类页链接,继续跟踪
Rule(
LinkExtractor(allow=r'/category/'), # 允许的 URL
deny=r'/admin/', # 排除的 URL
follow=True, # 继续跟踪
),
# Rule 3:提取分页链接
Rule(
LinkExtractor(
allow=r'/page/\d+', # 匹配分页
restrict_xpaths=['//div[@class="pagination"]'], # 限定区域
),
follow=True,
),
)
def parse_item(self, response):
"""解析商品页"""
yield {
'name': response.css('h1::text').get(),
'price': response.css('.price::text').get(),
'url': response.url,
}
LinkExtractor
LinkExtractor 用于从页面提取链接:
from scrapy.linkextractors import LinkExtractor
# 基本用法
le = LinkExtractor()
# 提取所有链接
links = le.extract_links(response)
for link in links:
print(link.url) # 链接 URL
print(link.text) # 链接文本
# 常用参数
le = LinkExtractor(
allow=r'/product/', # 允许的 URL 模式(正则)
deny=r'/admin/', # 排除的 URL 模式
allow_domains=('example.com',), # 允许的域名
deny_domains=('spam.com',), # 排除的域名
restrict_xpaths=('//div[@class="content"]',), # 限定 XPath 区域
restrict_css=('.content',), # 限定 CSS 区域
tags=('a', 'area'), # 考虑的标签
attrs=('href',), # 考虑的属性
canonicalize=True, # 规范化 URL
unique=True, # 去重
process_value=None, # 处理值的函数
)
# 在 CrawlSpider 中使用
rules = (
Rule(
LinkExtractor(
allow=r'/articles/',
restrict_xpaths=['//div[@class="article-list"]'],
),
callback='parse_article',
follow=False,
),
)
分布式爬虫
scrapy-redis
安装:
pip install scrapy-redis
配置:
# settings.py
# 使用 scrapy-redis 调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# 去重(使用 Redis)
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# Redis 配置
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
# 不清除 Redis 队列(支持暂停/恢复)
SCHEDULER_PERSIST = True
# 优先级队列
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
# 序列化
SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat"
Spider:
from scrapy_redis.spiders import RedisSpider
class MyRedisSpider(RedisSpider):
"""Redis Spider"""
name = 'myredis'
redis_key = 'mycrawler:start_urls' # Redis 列表键名
# 可选:允许的域名
allowed_domains = ['example.com']
def parse(self, response):
"""解析响应"""
yield {
'url': response.url,
'title': response.css('title::text').get(),
}
# 继续添加 URL 到队列
for link in response.css('a::attr(href)').getall():
yield scrapy.Request(response.urljoin(link), callback=self.parse)
运行:
# 方式 1:运行爬虫(等待从 Redis 获取 URL)
scrapy crawl myredis
# 方式 2:向 Redis 添加起始 URL
redis-cli LPUSH mycrawler:start_urls http://example.com
# 方式 3:使用 Python 添加 URL
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
r.lpush('mycrawler:start_urls', 'http://example.com/page1')
r.lpush('mycrawler:start_urls', 'http://example.com/page2')
Redis队列
# 手动管理 Redis 队列
import redis
import json
class RedisQueue:
"""Redis 队列管理"""
def __init__(self, host='localhost', port=6379, db=0):
self.redis = redis.StrictRedis(host=host, port=port, db=db)
self.queue_key = 'scrapy:queue'
def push(self, url, meta=None):
"""添加 URL 到队列"""
data = {'url': url, 'meta': meta or {}}
self.redis.rpush(self.queue_key, json.dumps(data))
def pop(self):
"""从队列弹出 URL"""
data = self.redis.lpop(self.queue_key)
if data:
return json.loads(data)
return None
def size(self):
"""队列大小"""
return self.redis.llen(self.queue_key)
# 使用示例
queue = RedisQueue()
queue.push('http://example.com/page1', meta={'priority': 1})
queue.push('http://example.com/page2', meta={'priority': 2})
print(f'Queue size: {queue.size()}')
反爬策略
User-Agent池
# middlewares.py
import random
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
]
class RandomUserAgentMiddleware:
"""随机 User-Agent"""
def process_request(self, request, spider):
request.headers['User-Agent'] = random.choice(USER_AGENTS)
代理池
# middlewares.py
class ProxyPoolMiddleware:
"""代理池中间件"""
def __init__(self, proxy_list):
self.proxy_list = proxy_list
self.current_proxy = None
self.failed_count = 0
@classmethod
def from_crawler(cls, crawler):
return cls(
proxy_list=crawler.settings.get('PROXY_LIST', [])
)
def process_request(self, request, spider):
"""设置代理"""
if self.proxy_list:
self.current_proxy = random.choice(self.proxy_list)
request.meta['proxy'] = self.current_proxy
spider.logger.info(f'Using proxy: {self.current_proxy}')
def process_response(self, request, response, spider):
"""检查响应"""
if response.status != 200:
self.failed_count += 1
if self.failed_count > 3:
# 更换代理
spider.logger.info(f'Proxy failed: {self.current_proxy}')
self.failed_count = 0
else:
self.failed_count = 0
return response
从 API 获取代理:
import requests
class DynamicProxyMiddleware:
"""动态代理中间件"""
def __init__(self, proxy_api):
self.proxy_api = proxy_api
self.proxies = []
self.fetch_proxies()
def fetch_proxies(self):
"""从 API 获取代理"""
try:
response = requests.get(self.proxy_api)
self.proxies = response.json().get('proxies', [])
except Exception as e:
self.logger.error(f'Failed to fetch proxies: {e}')
def process_request(self, request, spider):
"""使用动态代理"""
if self.proxies:
proxy = random.choice(self.proxies)
request.meta['proxy'] = f"http://{proxy['ip']}:{proxy['port']}"
IP限制
# settings.py
# 并发控制
CONCURRENT_REQUESTS_PER_DOMAIN = 2 # 每个域名的并发数
CONCURRENT_REQUESTS_PER_IP = 2 # 每个 IP 的并发数
DOWNLOAD_DELAY = 3 # 下载延迟
# 自动限速
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 2
AUTOTHROTTLE_MAX_DELAY = 10
AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
AUTOTHROTTLE_DEBUG = False
限速中间件:
class DomainThrottleMiddleware:
"""域名限速中间件"""
def __init__(self):
self.domain_requests = {}
self.last_request_time = {}
def process_request(self, request, spider):
"""限制每个域名的请求频率"""
from urllib.parse import urlparse
domain = urlparse(request.url).netloc
# 检查是否需要延迟
if domain in self.last_request_time:
elapsed = time.time() - self.last_request_time[domain]
if elapsed < 2: # 至少间隔 2 秒
time.sleep(2 - elapsed)
self.last_request_time[domain] = time.time()
验证码
# 方式 1:手动处理(暂停爬虫)
class CaptchaMiddleware:
"""验证码中间件"""
def process_response(self, request, response, spider):
"""检测验证码"""
if 'captcha' in response.text.lower():
spider.logger.warning(f'Captcha detected: {request.url}')
# 保存页面供人工查看
with open('captcha.html', 'w') as f:
f.write(response.text)
# 暂停爬虫,等待人工处理
raise scrapy.exceptions.CloseSpider('captcha_found')
return response
# 方式 2:使用第三方服务
import requests
class CaptchaSolver:
"""验证码解决器(示例)"""
def solve_captcha(self, image_url):
"""调用验证码识别 API"""
# 使用 2captcha、Anti-Captcha 等服务
api_url = 'http://captcha-service.com/solve'
response = requests.post(api_url, json={'image': image_url})
return response.json().get('solution')
日志与调试
# Spider 中使用日志
import scrapy
class MySpider(scrapy.Spider):
name = 'myspider'
def parse(self, response):
# 使用 logger
self.logger.info(f'Processing: {response.url}')
self.logger.debug(f'Response status: {response.status}')
self.logger.warning('Possible issue detected')
self.logger.error('Error occurred')
# 使用 print(调试用)
print(f'Debug: {response.url}')
# Scrapy shell 测试
from scrapy.shell import inspect_response
inspect_response(response, self)
配置:
# settings.py
# 日志级别
LOG_LEVEL = 'INFO' # DEBUG, INFO, WARNING, ERROR, CRITICAL
# 日志文件
LOG_FILE = 'scrapy.log'
# 日志格式
LOG_FORMAT = '%(asctime)s [%(name)s] %(levelname)s: %(message)s'
LOG_DATEFORMAT = '%Y-%m-%d %H:%M:%S'
# 统计信息
STATS_CLASS = 'scrapy.statscollectors.StatsCollector'
部署
Scrapyd
安装和运行:
# 安装
pip install scrapyd
# 启动服务
scrapyd
# 访问
# http://localhost:6800
部署项目:
# 安装 scrapyd-client
pip install scrapyd-client
# 部署
scrapyd-deploy
# 部署到指定目标
scrapyd-deploy target -p projectname
API 使用:
# 启动爬虫
curl http://localhost:6800/schedule.json \
-d project=myproject \
-d spider=myspider
# 取消爬虫
curl http://localhost:6800/cancel.json \
-d project=myproject \
-d job=jobid
# 查看状态
curl http://localhost:6800/list_jobs.json?project=myproject
# 查看日志
curl http://localhost:6800/logs/myproject/myspider/jobid.log
Scrapy Cloud
# 安装
pip install shub
# 登录
shub login
# 部署
shub deploy
# 部署到特定项目
shub deploy 12345
# 查看日志
shub logs 12345
# 运行爬虫
shub schedule 12345 myspider
最佳实践
1. 遵守 robots.txt
# settings.py
ROBOTSTXT_OBEY = True
2. 设置合理的延迟
# settings.py
DOWNLOAD_DELAY = 2
AUTOTHROTTLE_ENABLED = True
3. 使用 Item Pipeline
# 数据处理放在 Pipeline,而不是 Spider
# Pipeline 负责:验证、清洗、存储
4. 错误处理
def parse(self, response):
try:
data = response.css('.data::text').get()
yield {'data': data}
except Exception as e:
self.logger.error(f'Error parsing {response.url}: {e}')
5. 限流和重试
# settings.py
RETRY_ENABLED = True
RETRY_TIMES = 2
RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 429]
6. 数据去重
# 使用 URL 去重
DUPEFILTER_CLASS = 'scrapy.dupefilters.RFPDupeFilter'
# 自定义去重
class CustomDupeFilter:
def __init__(self):
self.seen = set()
def request_seen(self, request):
fp = request.url + request.meta.get('data', '')
if fp in self.seen:
return True
self.seen.add(fp)
return False
7. 分模块开发
project/
├── spiders/
│ ├── products.py # 产品爬虫
│ ├── articles.py # 文章爬虫
│ └── __init__.py
├── items.py # Item 定义
├── pipelines.py # 数据处理
├── middlewares.py # 中间件
└── utils.py # 工具函数
8. 监控和日志
# 记录统计信息
def closed(self, reason):
self.logger.info(f'Items scraped: {len(self.items)}')
self.logger.info(f'Total requests: {self.crawler.stats.get_value(\'downloader/request_count\')}')
法律与道德
法律合规
- 遵守 robots.txt:尊重网站爬虫协议
- 版权法:注意抓取内容的使用
- 计算机欺诈和滥用法:未经授权访问可能违法
- 服务条款:遵守网站使用条款
道德准则
- 识别自己:使用合适的 User-Agent
- 合理频率:避免对服务器造成压力
- 尊重隐私:不抓取个人敏感信息
- 用途正当:合法使用爬取数据
最佳实践
# 1. 设置标识
USER_AGENT = 'MyBot (+http://mysite.com/bot)'
# 2. 遵守规则
ROBOTSTXT_OBEY = True
# 3. 控制速率
DOWNLOAD_DELAY = 2
CONCURRENT_REQUESTS = 8
# 4. 尊重隐私
# 不抓取个人信息、密码、私人通信
# 5. 联系网站
# 大规模爬取前,联系网站所有者
通过 Scrapy,你可以构建强大、高效、可扩展的爬虫系统,但务必遵守法律和道德准则。