网络编程
网络编程是 Python 的重要应用领域,它涵盖了从简单的 HTTP 请求到底层 Socket 通信的各种技术。本章将介绍 HTTP 编程、Socket 编程、WebSocket、API 开发和网络爬虫等内容。
HTTP 编程
requests 库
安装和基础使用
# 安装 requests 库
# pip install requests
import requests
# 发送简单的 GET 请求
response = requests.get('https://httpbin.org/get')
# 查看响应状态码
print(f"状态码: {response.status_code}") # 200
# 查看响应内容
print(f"响应内容: {response.text}")
# 查看响应头
print(f"响应头: {response.headers}")
# 查看 JSON 数据
print(f"JSON数据: {response.json()}")
# 响应对象的主要属性和方法
print(f"URL: {response.url}") # 请求的 URL
print(f"编码: {response.encoding}") # 响应的编码
print(f"Cookies: {response.cookies}") # 响应的 cookies
print(f"耗时: {response.elapsed.total_seconds()}秒") # 请求耗时
GET 请求
import requests
# 基本 GET 请求
response = requests.get('https://httpbin.org/get')
print(response.json())
# 带参数的 GET 请求
params = {
'name': 'Alice',
'age': 25,
'city': 'Beijing'
}
response = requests.get('https://httpbin.org/get', params=params)
print(f"请求URL: {response.url}")
# 实际请求: https://httpbin.org/get?name=Alice&age=25&city=Beijing
# 设置请求头
headers = {
'User-Agent': 'My App/1.0',
'Accept': 'application/json',
'Accept-Language': 'zh-CN'
}
response = requests.get(
'https://httpbin.org/get',
headers=headers
)
print(f"自定义请求头: {response.json()['headers']}")
# 设置超时
try:
# 连接超时5秒,读取超时10秒
response = requests.get(
'https://httpbin.org/get',
timeout=(5, 10)
)
print("请求成功")
except requests.exceptions.Timeout:
print("请求超时")
# 设置超时(统一超时时间)
response = requests.get('https://httpbin.org/get', timeout=5)
# 下载文件
response = requests.get('https://httpbin.org/image/png')
with open('image.png', 'wb') as f:
f.write(response.content)
# 流式下载大文件
response = requests.get(
'https://httpbin.org/stream/20',
stream=True
)
with open('stream.txt', 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
POST 请求
import requests
import json
# 发送表单数据
data = {
'username': 'alice',
'password': '123456'
}
response = requests.post(
'https://httpbin.org/post',
data=data
)
print(f"表单提交: {response.json()['form']}")
# 发送 JSON 数据
json_data = {
'name': 'Bob',
'age': 30,
'hobbies': ['reading', 'swimming']
}
response = requests.post(
'https://httpbin.org/post',
json=json_data # 自动设置 Content-Type: application/json
)
print(f"JSON提交: {response.json()['json']}")
# 手动发送 JSON
response = requests.post(
'https://httpbin.org/post',
data=json.dumps(json_data),
headers={'Content-Type': 'application/json'}
)
# 上传文件
files = {
'file': open('test.txt', 'rb')
}
response = requests.post(
'https://httpbin.org/post',
files=files
)
# 上传文件并指定文件名
files = {
'file': ('custom_name.txt', open('test.txt', 'rb'))
}
response = requests.post(
'https://httpbin.org/post',
files=files
)
# 上传多个文件
files = [
('files', open('file1.txt', 'rb')),
('files', open('file2.txt', 'rb'))
]
response = requests.post(
'https://httpbin.org/post',
files=files
)
请求头和响应头
import requests
# 设置请求头
headers = {
'User-Agent': 'Mozilla/5.0',
'Accept': 'text/html,application/xhtml+xml',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Referer': 'https://www.google.com'
}
response = requests.get(
'https://httpbin.org/headers',
headers=headers
)
# 查看响应头
print("所有响应头:")
for key, value in response.headers.items():
print(f" {key}: {value}")
# 访问特定响应头
print(f"\nContent-Type: {response.headers['Content-Type']}")
print(f"Content-Length: {response.headers.get('Content-Length')}")
# 不区分大小写
print(f"content-type: {response.headers['content-type']}")
# 查看服务器返回的特定头
response = requests.get('https://www.example.com')
print(f"\nServer: {response.headers.get('Server')}")
print(f"Date: {response.headers.get('Date')}")
# 常用请求头说明
"""
User-Agent: 浏览器标识
Accept: 接受的内容类型
Accept-Language: 接受的语言
Accept-Encoding: 接受的编码方式
Authorization: 认证信息
Cookie: 客户端存储的 cookie
Referer: 请求来源页
Content-Type: 请求体的内容类型
Content-Length: 请求体的长度
"""
Cookie 处理
import requests
# 获取响应中的 cookies
response = requests.get('https://httpbin.org/cookies/set/name/value')
print(f"响应Cookies: {response.cookies}")
# 访问特定的 cookie
print(f"name cookie: {response.cookies.get('name')}")
# 发送请求时携带 cookies
cookies = {
'name': 'Alice',
'age': '25'
}
response = requests.get(
'https://httpbin.org/cookies',
cookies=cookies
)
print(f"发送的Cookies: {response.json()['cookies']}")
# 使用 CookieJar 对象管理 cookies
jar = requests.cookies.RequestsCookieJar()
jar.set('name', 'Bob', domain='httpbin.org')
jar.set('session', 'abc123', domain='httpbin.org')
response = requests.get(
'https://httpbin.org/cookies',
cookies=jar
)
# 从响应中提取 cookies 并保存到 CookieJar
jar = requests.cookies.RequestsCookieJar()
response = requests.get('https://www.example.com')
jar.update(response.cookies)
# 保存 cookies 到文件
import pickle
with open('cookies.pkl', 'wb') as f:
pickle.dump(jar, f)
# 从文件加载 cookies
with open('cookies.pkl', 'rb') as f:
jar = pickle.load(f)
# 使用加载的 cookies
response = requests.get(
'https://httpbin.org/cookies',
cookies=jar
)
Session 管理
import requests
# 使用 Session 对象自动管理 cookies
session = requests.Session()
# 第一次请求,设置 cookie
session.get('https://httpbin.org/cookies/set/name/Alice')
# 后续请求会自动携带之前的 cookies
response = session.get('https://httpbin.org/cookies')
print(f"Session Cookies: {response.json()['cookies']}")
# Session 会保持连接,提高性能
for i in range(5):
response = session.get('https://httpbin.org/get')
print(f"请求 {i+1}: 状态码 {response.status_code}")
# 设置 Session 的默认参数
session.headers.update({
'User-Agent': 'MySession/1.0'
})
session.verify = False # 忽略 SSL 证书验证
session.timeout = 10 # 默认超时时间
# 使用 Session 的代理设置
session.proxies = {
'http': 'http://proxy.example.com:8080',
'https': 'https://proxy.example.com:8080'
}
# 清空 Session cookies
session.cookies.clear()
# 关闭 Session
session.close()
# 使用 with 语句自动关闭
with requests.Session() as session:
session.get('https://httpbin.org/cookies/set/name/Bob')
response = session.get('https://httpbin.org/cookies')
print(f"Cookies: {response.json()['cookies']}")
高级功能
异常处理
import requests
from requests.exceptions import (
RequestException,
Timeout,
ConnectionError,
HTTPError,
TooManyRedirects
)
# 完整的异常处理示例
def safe_request(url, **kwargs):
"""安全的请求方法"""
try:
response = requests.get(url, **kwargs)
# 检查 HTTP 错误状态码
response.raise_for_status()
return response
except Timeout:
print(f"请求超时: {url}")
except ConnectionError:
print(f"连接错误: {url}")
except HTTPError as e:
print(f"HTTP错误: {e.response.status_code}")
except TooManyRedirects:
print(f"重定向次数过多: {url}")
except RequestException as e:
print(f"请求异常: {e}")
return None
# 使用
response = safe_request(
'https://httpbin.org/get',
timeout=5
)
if response:
print(response.json())
# 重试机制
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=3, # 总共重试3次
backoff_factor=1, # 重试间隔递增因子
status_forcelist=[429, 500, 502, 503, 504], # 这些状态码时重试
method_whitelist=["HEAD", "GET", "OPTIONS"] # 只对这些方法重试
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
# 使用带重试的 session
response = session.get('https://httpbin.org/get')
代理设置
import requests
# 设置代理
proxies = {
'http': 'http://proxy.example.com:8080',
'https': 'https://proxy.example.com:8080'
}
response = requests.get(
'https://httpbin.org/get',
proxies=proxies
)
# 使用需要认证的代理
proxies = {
'http': 'http://user:pass@proxy.example.com:8080',
'https': 'https://user:pass@proxy.example.com:8080'
}
# 使用 SOCKS 代理
# 需要安装: pip install requests[socks]
proxies = {
'http': 'socks5://proxy.example.com:1080',
'https': 'socks5://proxy.example.com:1080'
}
# 禁用代理
response = requests.get(
'https://httpbin.org/get',
proxies={'http': None, 'https': None}
)
# 从环境变量读取代理
# export HTTP_PROXY=http://proxy.example.com:8080
# export HTTPS_PROXY=https://proxy.example.com:8080
response = requests.get('https://httpbin.org/get')
SSL 证书验证
import requests
import urllib3
# 禁用 SSL 警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 忽略 SSL 证书验证(不推荐)
response = requests.get(
'https://example.com',
verify=False
)
# 指定 CA 证书
response = requests.get(
'https://example.com',
verify='/path/to/cacert.pem'
)
# 使用证书和私钥(客户端认证)
response = requests.get(
'https://example.com',
cert=('/path/to/client.cert', '/path/to/client.key')
)
# Session 级别的 SSL 设置
session = requests.Session()
session.verify = False # 全局禁用验证
session.cert = '/path/to/client.cert' # 全局证书
Socket 编程
Socket 基础
Socket 概念
"""
Socket (套接字) 是网络编程的基础
Socket 类型:
- AF_INET: IPv4 地址族
- AF_INET6: IPv6 地址族
- AF_UNIX: 本地进程间通信
Socket 类型:
- SOCK_STREAM: TCP (面向连接,可靠)
- SOCK_DGRAM: UDP (无连接,不可靠)
Socket 工作流程:
1. 创建 socket
2. 绑定地址和端口 (bind)
3. 监听连接 (listen) - 服务器端
4. 接受连接 (accept) - 服务器端
5. 发送/接收数据 (send/recv)
6. 关闭 socket (close)
"""
import socket
# 创建 socket
# socket.AF_INET: IPv4
# socket.SOCK_STREAM: TCP
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print(f"Socket家族: {sock.family}")
print(f"Socket类型: {sock.type}")
print(f"Socket协议: {sock.proto}")
# 关闭 socket
sock.close()
TCP 客户端
import socket
# 创建 TCP socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 连接服务器
server_host = 'localhost'
server_port = 8888
try:
client_socket.connect((server_host, server_port))
print(f"已连接到服务器 {server_host}:{server_port}")
# 发送数据
message = "Hello, Server!"
client_socket.send(message.encode('utf-8'))
print(f"已发送: {message}")
# 接收数据
data = client_socket.recv(1024)
print(f"收到回复: {data.decode('utf-8')}")
except ConnectionRefusedError:
print("连接被拒绝,请确认服务器是否运行")
except Exception as e:
print(f"发生错误: {e}")
finally:
# 关闭 socket
client_socket.close()
print("连接已关闭")
# 使用 with 语句自动关闭
def tcp_client(host, port, message):
"""TCP 客户端"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((host, port))
sock.send(message.encode('utf-8'))
response = sock.recv(1024)
return response.decode('utf-8')
# 使用
# response = tcp_client('localhost', 8888, 'Hello!')
TCP 服务器
import socket
import threading
def handle_client(client_socket, client_address):
"""处理客户端连接"""
try:
print(f"新连接来自: {client_address}")
# 接收数据
data = client_socket.recv(1024)
if data:
message = data.decode('utf-8')
print(f"收到消息: {message}")
# 发送回复
response = f"Echo: {message}"
client_socket.send(response.encode('utf-8'))
except Exception as e:
print(f"处理客户端时出错: {e}")
finally:
# 关闭连接
client_socket.close()
print(f"连接关闭: {client_address}")
def tcp_server(host='0.0.0.0', port=8888):
"""TCP 服务器"""
# 创建 socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置地址复用
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
# 绑定地址和端口
server_socket.bind((host, port))
print(f"服务器启动在 {host}:{port}")
# 监听连接
server_socket.listen(5)
print("等待客户端连接...")
while True:
# 接受新连接
client_socket, client_address = server_socket.accept()
# 为每个客户端创建新线程
client_thread = threading.Thread(
target=handle_client,
args=(client_socket, client_address)
)
client_thread.start()
except KeyboardInterrupt:
print("\n服务器停止")
except Exception as e:
print(f"服务器错误: {e}")
finally:
server_socket.close()
# 启动服务器
# tcp_server()
UDP Socket
UDP 客户端
import socket
def udp_client(host, port, message):
"""UDP 客户端"""
# 创建 UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# 发送数据 (无需连接)
sock.sendto(message.encode('utf-8'), (host, port))
print(f"已发送到 {host}:{port}: {message}")
# 接收回复
data, addr = sock.recvfrom(1024)
print(f"收到来自 {addr} 的回复: {data.decode('utf-8')}")
except Exception as e:
print(f"发生错误: {e}")
finally:
sock.close()
# 使用
# udp_client('localhost', 9999, 'Hello UDP Server!')
UDP 服务器
import socket
def udp_server(host='0.0.0.0', port=9999):
"""UDP 服务器"""
# 创建 UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# 绑定地址和端口
sock.bind((host, port))
print(f"UDP 服务器启动在 {host}:{port}")
while True:
# 接收数据
data, addr = sock.recvfrom(1024)
message = data.decode('utf-8')
print(f"收到来自 {addr} 的消息: {message}")
# 发送回复
response = f"Echo: {message}"
sock.sendto(response.encode('utf-8'), addr)
print(f"已回复 {addr}")
except KeyboardInterrupt:
print("\n服务器停止")
except Exception as e:
print(f"服务器错误: {e}")
finally:
sock.close()
# 启动服务器
# udp_server()
UDP vs TCP
"""
TCP (Transmission Control Protocol):
- 面向连接的协议
- 可靠传输 (保证数据顺序和完整性)
- 有流量控制和拥塞控制
- 适合传输重要数据
- 速度相对较慢
UDP (User Datagram Protocol):
- 无连接协议
- 不可靠传输 (可能丢包、乱序)
- 没有流量控制和拥塞控制
- 适合实时应用 (视频、音频、游戏)
- 速度快
选择建议:
- 需要可靠性: 使用 TCP
- 需要速度、容忍丢包: 使用 UDP
- 文件传输: TCP
- 视频直播: UDP
- 在线游戏: UDP
"""
Socket 高级
非阻塞 Socket
import socket
import select
def non_blocking_client(host, port):
"""非阻塞客户端"""
# 创建 socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置为非阻塞模式
sock.setblocking(False)
try:
# 连接服务器 (会抛出 BlockingIOError)
try:
sock.connect((host, port))
except BlockingIOError:
pass
# 等待连接完成
ready = select.select([], [sock], [], 5)[0]
if sock in ready:
print("连接成功")
# 发送数据
message = "Hello!"
sock.send(message.encode('utf-8'))
# 接收数据
ready = select.select([sock], [], [], 5)[0]
if sock in ready:
data = sock.recv(1024)
print(f"收到: {data.decode('utf-8')}")
except Exception as e:
print(f"错误: {e}")
finally:
sock.close()
# 使用
# non_blocking_client('localhost', 8888)
Socket 超时设置
import socket
def timeout_client(host, port, timeout=5):
"""带超时的客户端"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置超时
sock.settimeout(timeout)
try:
sock.connect((host, port))
print("连接成功")
# 发送数据
sock.send(b"Hello!")
# 接收数据 (会应用超时)
data = sock.recv(1024)
print(f"收到: {data.decode('utf-8')}")
except socket.timeout:
print("操作超时")
except Exception as e:
print(f"错误: {e}")
finally:
sock.close()
多路复用
import socket
import select
def multiplexing_server(host='0.0.0.0', port=8888):
"""使用 select 的多路复用服务器"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((host, port))
server_socket.listen(5)
# 设置为非阻塞
server_socket.setblocking(False)
# 输入列表 (需要读取的 socket)
inputs = [server_socket]
print(f"服务器启动在 {host}:{port}")
try:
while inputs:
# 使用 select 监听多个 socket
readable, _, exceptional = select.select(inputs, [], inputs, 1)
for sock in readable:
if sock is server_socket:
# 新连接
client_socket, client_address = sock.accept()
client_socket.setblocking(False)
inputs.append(client_socket)
print(f"新连接: {client_address}")
else:
# 客户端数据
data = sock.recv(1024)
if data:
print(f"收到: {data.decode('utf-8')}")
sock.send(b"OK")
else:
# 连接关闭
inputs.remove(sock)
sock.close()
for sock in exceptional:
inputs.remove(sock)
sock.close()
except KeyboardInterrupt:
print("\n服务器停止")
finally:
server_socket.close()
# 启动服务器
# multiplexing_server()
WebSocket
WebSocket 基础
"""
WebSocket 是一种全双工通信协议
- 建立在 TCP 之上
- 允许服务器主动推送消息
- 保持长连接,减少通信开销
- 适合实时应用
使用场景:
- 实时聊天
- 在线游戏
- 股票行情
- 协作编辑
"""
# 安装 websockets 库
# pip install websockets
import asyncio
import websockets
async def websocket_client(uri):
"""WebSocket 客户端"""
async with websockets.connect(uri) as websocket:
# 发送消息
await websocket.send("Hello, WebSocket!")
# 接收消息
response = await websocket.recv()
print(f"收到: {response}")
# 持续接收消息
while True:
try:
message = await asyncio.wait_for(
websocket.recv(),
timeout=1.0
)
print(f"收到: {message}")
except asyncio.TimeoutError:
# 可以在这里发送心跳包
await websocket.ping()
# 运行客户端
# asyncio.run(websocket_client('ws://localhost:8765'))
WebSocket 服务器
import asyncio
import websockets
async def echo_server(websocket, path):
"""Echo 服务器"""
print(f"新客户端连接: {websocket.remote_address}")
try:
async for message in websocket:
print(f"收到: {message}")
# 发送回复
await websocket.send(f"Echo: {message}")
except websockets.exceptions.ConnectionClosed:
print("客户端断开连接")
async def websocket_server(host='localhost', port=8765):
"""WebSocket 服务器"""
async with websockets.serve(echo_server, host, port):
print(f"WebSocket 服务器启动在 {host}:{port}")
await asyncio.Future() # 永远运行
# 启动服务器
# asyncio.run(websocket_server())
广播消息
import asyncio
import websockets
# 连接池
connected_clients = set()
async def broadcast_server(websocket, path):
"""广播服务器"""
# 注册新客户端
connected_clients.add(websocket)
print(f"客户端连接: {websocket.remote_address}")
try:
async for message in websocket:
print(f"收到: {message}")
# 广播消息给所有客户端
for client in connected_clients:
if client != websocket:
await client.send(message)
except websockets.exceptions.ConnectionClosed:
print("客户端断开连接")
finally:
# 移除断开的客户端
connected_clients.remove(websocket)
async def broadcast_websocket_server(host='localhost', port=8765):
"""广播 WebSocket 服务器"""
async with websockets.serve(broadcast_server, host, port):
print(f"广播服务器启动在 {host}:{port}")
await asyncio.Future()
# 启动服务器
# asyncio.run(broadcast_websocket_server())
API 开发
Flask 基础
安装和快速开始
# 安装 Flask
# pip install flask
from flask import Flask, jsonify, request
# 创建应用
app = Flask(__name__)
# 路由和视图函数
@app.route('/')
def hello():
return 'Hello, World!'
@app.route('/hello/<name>')
def hello_name(name):
return f'Hello, {name}!'
# 返回 JSON
@app.route('/api/data')
def get_data():
data = {
'name': 'Alice',
'age': 25,
'city': 'Beijing'
}
return jsonify(data)
# 处理不同的 HTTP 方法
@app.route('/api/user', methods=['GET', 'POST'])
def user():
if request.method == 'GET':
return jsonify({'message': 'GET user'})
elif request.method == 'POST':
# 获取 JSON 数据
data = request.get_json()
return jsonify({'message': 'User created', 'data': data})
# 获取查询参数
@app.route('/api/search')
def search():
keyword = request.args.get('keyword', '')
page = request.args.get('page', 1, type=int)
return jsonify({
'keyword': keyword,
'page': page
})
# 运行应用
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
路由和请求处理
from flask import Flask, request, jsonify
app = Flask(__name__)
# 路由变量
@app.route('/user/<username>')
def show_user_profile(username):
return f'User: {username}'
@app.route('/post/<int:post_id>')
def show_post(post_id):
return f'Post: {post_id}'
@app.route('/path/<path:subpath>')
def show_subpath(subpath):
return f'Subpath: {subpath}'
# HTTP 方法
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
# 获取表单数据
username = request.form['username']
password = request.form['password']
# 获取 JSON 数据
# data = request.get_json()
return jsonify({'status': 'success'})
return '''
<form method="post">
<input type="text" name="username" placeholder="用户名">
<input type="password" name="password" placeholder="密码">
<button type="submit">登录</button>
</form>
'''
# 获取请求头
@app.route('/headers')
def headers():
user_agent = request.headers.get('User-Agent')
return f'Your browser: {user_agent}'
# 获取上传文件
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
if file:
filename = file.filename
file.save(f'/uploads/{filename}')
return jsonify({'message': f'File {filename} uploaded'})
# 响应处理
@app.route('/response')
def response():
# 返回 HTML
# return '<h1>Hello World</h1>'
# 返回 JSON
# return jsonify({'key': 'value'})
# 返回自定义状态码
return jsonify({'message': 'Not Found'}), 404
if __name__ == '__main__':
app.run(debug=True)
错误处理和中间件
from flask import Flask, jsonify, request
app = Flask(__name__)
# 错误处理
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Not found'}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': 'Internal server error'}), 500
# 自定义错误
class InvalidUsage(Exception):
status_code = 400
def __init__(self, message, status_code=None, payload=None):
Exception.__init__(self)
self.message = message
if status_code is not None:
self.status_code = status_code
self.payload = payload
def to_dict(self):
rv = dict(self.payload or ())
rv['message'] = self.message
return rv
@app.errorhandler(InvalidUsage)
def handle_invalid_usage(error):
response = jsonify(error.to_dict())
response.status_code = error.status_code
return response
# 使用自定义错误
@app.route('/api/<int:value>')
def api_endpoint(value):
if value < 0:
raise InvalidUsage('Value must be positive', status_code=400)
return jsonify({'value': value})
# 请求前钩子
@app.before_request
def before_request():
"""每个请求前执行"""
print(f'Before request: {request.method} {request.path}')
# 可以在这里做认证
# token = request.headers.get('Authorization')
# if not token:
# return jsonify({'error': 'Unauthorized'}), 401
# 请求后钩子
@app.after_request
def after_request(response):
"""每个请求后执行"""
print(f'After request: {response.status_code}')
# 添加响应头
response.headers['X-Custom-Header'] = 'Flask'
return response
# 拆卸请求
@app.teardown_request
def teardown_request(exception):
"""请求结束后执行"""
print(f'Teardown request: {exception}')
if __name__ == '__main__':
app.run(debug=True)
FastAPI 基础
安装和快速开始
# 安装 FastAPI
# pip install fastapi uvicorn
from fastapi import FastAPI
from fastapi.responses import JSONResponse
# 创建应用
app = FastAPI()
# 基本路由
@app.get('/')
def read_root():
return {'Hello': 'World'}
# 路径参数
@app.get('/items/{item_id}')
def read_item(item_id: int):
return {'item_id': item_id}
# 带类型的路径参数
@app.get('/users/{user_id}')
def read_user(user_id: int, q: str = None):
return {'user_id': user_id, 'q': q}
# 查询参数
@app.get('/items/')
def read_items(skip: int = 0, limit: int = 10):
return {'skip': skip, 'limit': limit}
# 多个路径和查询参数
@app.get('/users/{user_id}/items/{item_id}')
def read_user_item(
user_id: int,
item_id: str,
q: str = None,
short: bool = False
):
item = {'item_id': item_id, 'owner_id': user_id}
if q:
item.update({'q': q})
if not short:
item.update(
{'description': 'This is an amazing item'}
)
return item
# POST 请求
from pydantic import BaseModel
class Item(BaseModel):
name: str
price: float
is_offer: bool = None
@app.post('/items/')
def create_item(item: Item):
return {
'item_name': item.name,
'item_price': item.price
}
# 运行服务器
# uvicorn main:app --reload
请求体和验证
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
app = FastAPI()
class Item(BaseModel):
name: str
description: str = None
price: float = Field(..., gt=0, description="价格必须大于0")
tax: float = None
tags: list = []
class User(BaseModel):
username: str
full_name: str = None
# 请求体 + 路径参数 + 查询参数
@app.put('/items/{item_id}')
def update_item(
item_id: int,
item: Item,
user: User,
q: str = None
):
results = {'item_id': item_id, 'item': item, 'user': user}
if q:
results.update({'q': q})
return results
# 请求体验证
@app.post('/items/')
async def create_item(item: Item):
# Pydantic 会自动验证
if item.price < 0:
raise HTTPException(
status_code=400,
detail="价格必须为正数"
)
return {
'name': item.name,
'price': item.price,
'description': item.description
}
# 多个请求体
@app.post('/items/multiple/')
async def create_multiple_items(
item: Item,
user: User,
importance: int = Body(..., gt=0, le=10)
):
results = {
'item': item,
'user': user,
'importance': importance
}
return results
# 嵌入请求体
class ItemIn(BaseModel):
name: str
description: str
price: float
class ItemOut(BaseModel):
name: str
price: float
tax: float = 10.5
@app.post('/items/embed/', response_model=ItemOut)
async def create_item_embed(item: ItemIn):
# 计算税费
item_dict = item.dict()
if 'tax' not in item_dict:
item_dict.update({'tax': item.price * 0.1})
return item_dict
异常处理
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
app = FastAPI()
# 抛出 HTTP 异常
@app.get('/items/{item_id}')
def read_item(item_id: int):
if item_id == 0:
raise HTTPException(
status_code=404,
detail='Item not found',
headers={'X-Error': 'There goes my error'}
)
return {'item_id': item_id}
# 自定义异常处理器
class UnicornException(Exception):
def __init__(self, name: str):
self.name = name
@app.exception_handler(UnicornException)
async def unicorn_exception_handler(request: Request, exc: UnicornException):
return JSONResponse(
status_code=418,
content={'message': f'Oops! {exc.name} did something.'}
)
# 使用自定义异常
@app.get('/unicorns/{name}')
def read_unicorn(name: str):
if name == 'yolo':
raise UnicornException(name=name)
return {'unicorn_name': name}
# 全局异常处理器
@app.exception_handler(ValueError)
async def value_error_handler(request: Request, exc: ValueError):
return JSONResponse(
status_code=400,
content={'error': str(exc)}
)
# 覆盖默认异常处理器
@app.exception_handler(HTTPException)
async def custom_http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={
'custom_error': True,
'detail': exc.detail
}
)
网络爬虫
requests + BeautifulSoup
基础爬虫
# 安装依赖
# pip install requests beautifulsoup4 lxml
import requests
from bs4 import BeautifulSoup
# 获取网页内容
url = 'https://example.com'
response = requests.get(url)
response.encoding = 'utf-8' # 设置编码
# 解析 HTML
soup = BeautifulSoup(response.text, 'lxml')
# 获取标题
title = soup.title.string
print(f"网页标题: {title}")
# 查找元素
# 通过 ID 查找
element = soup.find(id='content')
# 通过类名查找
elements = soup.find_all(class_='article')
# 通过标签查找
links = soup.find_all('a')
for link in links:
href = link.get('href')
text = link.string
print(f"链接: {href} - {text}")
# CSS 选择器
elements = soup.select('div.container > ul li')
for element in elements:
print(element.text)
# 提取文本和属性
for link in soup.select('a'):
href = link.get('href')
text = link.get_text(strip=True)
print(f"{text}: {href}")
实战示例
import requests
from bs4 import BeautifulSoup
import time
def scrape_news():
"""爬取新闻标题"""
url = 'https://news.example.com'
# 设置请求头模拟浏览器
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, 'lxml')
# 查找新闻列表
news_items = soup.select('.news-item')
for item in news_items:
title = item.select_one('.title').get_text(strip=True)
link = item.select_one('a')['href']
summary = item.select_one('.summary').get_text(strip=True)
print(f"标题: {title}")
print(f"链接: {link}")
print(f"摘要: {summary}")
print("-" * 50)
# 使用 Session 保持连接
def scrape_with_session(urls):
"""使用 Session 爬取多个页面"""
session = requests.Session()
session.headers.update({
'User-Agent': 'My Bot/1.0'
})
for url in urls:
try:
response = session.get(url, timeout=10)
soup = BeautifulSoup(response.text, 'lxml')
# 处理数据
title = soup.title.string
print(f"页面标题: {title}")
# 礼貌延迟
time.sleep(1)
except Exception as e:
print(f"爬取 {url} 失败: {e}")
session.close()
# 处理分页
def scrape_pages(base_url, pages):
"""爬取多个分页"""
for page in range(1, pages + 1):
url = f"{base_url}?page={page}"
print(f"爬取第 {page} 页: {url}")
response = requests.get(url)
soup = BeautifulSoup(response.text, 'lxml')
# 提取当前页数据
items = soup.select('.item')
for item in items:
title = item.get_text(strip=True)
print(f" - {title}")
time.sleep(1)
# 处理表单和登录
def login_and_scrape():
"""登录后爬取数据"""
session = requests.Session()
# 获取登录页面
login_page = session.get('https://example.com/login')
soup = BeautifulSoup(login_page.text, 'lxml')
# 提取 CSRF token
csrf_token = soup.find('input', {'name': 'csrf_token'})['value']
# 提交登录表单
login_data = {
'username': 'your_username',
'password': 'your_password',
'csrf_token': csrf_token
}
session.post('https://example.com/login', data=login_data)
# 爬取需要登录的页面
response = session.get('https://example.com/protected')
print(response.text)
Scrapy 框架
Scrapy 基础
# 安装 Scrapy
# pip install scrapy
# 创建项目
# scrapy startproject myproject
# 定义 Item
import scrapy
class ArticleItem(scrapy.Item):
"""文章 Item"""
title = scrapy.Field()
link = scrapy.Field()
summary = scrapy.Field()
# 编写 Spider
class ArticleSpider(scrapy.Spider):
"""文章爬虫"""
name = 'articles'
allowed_domains = ['example.com']
start_urls = [
'https://example.com/articles'
]
def parse(self, response):
"""解析响应"""
# 提取文章列表
for article in response.css('.article'):
item = ArticleItem()
item['title'] = article.css('.title::text').get()
item['link'] = article.css('a::attr(href)').get()
item['summary'] = article.css('.summary::text').get()
yield item
# 提取下一页链接
next_page = response.css('a.next::attr(href)').get()
if next_page is not None:
yield response.follow(next_page, callback=self.parse)
# 运行爬虫
# scrapy crawl articles
Scrapy 高级
import scrapy
from scrapy.http import Request
from scrapy.exceptions import CloseSpider
class AdvancedSpider(scrapy.Spider):
"""高级爬虫示例"""
name = 'advanced'
def __init__(self, max_pages=5, *args, **kwargs):
super(AdvancedSpider, self).__init__(*args, **kwargs)
self.max_pages = int(max_pages)
self.page_count = 0
def start_requests(self):
"""自定义起始请求"""
urls = [
'https://example.com/page1',
'https://example.com/page2'
]
for url in urls:
yield Request(url, callback=self.parse, meta={'proxy': None})
def parse(self, response):
"""解析响应"""
self.page_count += 1
if self.page_count > self.max_pages:
raise CloseSpider('Reached maximum page limit')
# 提取数据
for item in response.css('.item'):
yield {
'title': item.css('.title::text').get(),
'url': item.css('a::attr(href)').get(),
}
# 提取链接并跟踪
for href in response.css('a::attr(href)'):
url = response.urljoin(href)
yield Request(url, callback=self.parse_detail)
def parse_detail(self, response):
"""解析详情页"""
yield {
'detail_url': response.url,
'content': response.css('.content::text').getall()
}
# 中间件
class ProxyMiddleware:
"""代理中间件"""
def __init__(self, proxy_url):
self.proxy_url = proxy_url
@classmethod
def from_crawler(cls, crawler):
return cls(
proxy_url=crawler.settings.get('PROXY_URL')
)
def process_request(self, request, spider):
"""处理请求"""
request.meta['proxy'] = self.proxy_url
def process_exception(self, request, exception, spider):
"""处理异常"""
if 'proxy' in request.meta:
proxy = request.meta['proxy']
spider.logger.error(f'Proxy error: {proxy}')
# Pipelines
class DataPipeline:
"""数据处理管道"""
def __init__(self):
self.items = []
def process_item(self, item, spider):
"""处理每个 item"""
# 数据清洗
item['title'] = item['title'].strip()
# 数据验证
if not item['title']:
raise DropItem(f'Missing title in {item}')
# 存储数据
self.items.append(item)
return item
def close_spider(self, spider):
"""爬虫关闭时保存数据"""
import json
with open('output.json', 'w') as f:
json.dump(self.items, f)
# settings.py 配置
"""
BOT_NAME = 'myproject'
SPIDER_MODULES = ['myproject.spiders']
NEWSPIDER_MODULE = 'myproject.spiders'
# 遵守 robots.txt
ROBOTSTXT_OBEY = True
# 下载延迟
DOWNLOAD_DELAY = 1
# 并发请求数
CONCURRENT_REQUESTS = 16
# User-Agent
USER_AGENT = 'My Bot (+http://www.yourdomain.com)'
# 启用中间件
DOWNLOADER_MIDDLEWARES = {
'myproject.middlewares.ProxyMiddleware': 400,
}
# 启用 Pipeline
ITEM_PIPELINES = {
'myproject.pipelines.DataPipeline': 300,
}
"""
小结
本章节介绍了 Python 的网络编程:
HTTP 编程
- requests 库: 安装、基础使用、GET/POST 请求
- 请求头和响应头: 设置请求头、读取响应头
- Cookie 处理: Cookie 的读取、设置和管理
- Session 管理: 使用 Session 保持连接、自动管理 cookies
- 高级功能: 异常处理、代理设置、SSL 证书验证
Socket 编程
- Socket 基础: Socket 概念、TCP/UDP 区别
- TCP Socket: TCP 客户端和服务器实现
- UDP Socket: UDP 客户端和服务器实现
- Socket 高级: 非阻塞 Socket、超时设置、多路复用
WebSocket
- WebSocket 基础: WebSocket 概念和使用场景
- WebSocket 客户端: 使用 websockets 库连接服务器
- WebSocket 服务器: 实现 Echo 服务器和广播服务器
API 开发
- Flask 基础: 路由、请求处理、返回 JSON
- Flask 高级: 错误处理、中间件、请求钩子
- FastAPI 基础: 路径参数、查询参数、请求体验证
- FastAPI 高级: 异常处理、自定义异常处理器
网络爬虫
- requests + BeautifulSoup: 基础爬虫、实战示例
- Scrapy 框架: Spider 编写、Item 定义、中间件、Pipeline
掌握 Python 网络编程可以帮助你构建各种网络应用,从简单的 HTTP 请求到复杂的分布式系统。合理选择合适的网络编程技术和框架,可以大大提高开发效率和程序性能。