数据库操作
数据库操作是 Python 应用开发中的重要组成部分。本章将介绍 SQLite、MySQL、Redis 和 MongoDB 的使用方法,以及数据库连接池等内容。
数据库基础
数据库类型
"""
常见数据库类型:
关系型数据库 (RDBMS):
- SQLite: 轻量级嵌入式数据库
- MySQL: 开源关系型数据库
- PostgreSQL: 功能强大的开源数据库
- Oracle: 商业数据库
非关系型数据库 (NoSQL):
- Redis: 键值对存储,内存数据库
- MongoDB: 文档型数据库
- Cassandra: 列族数据库
选择建议:
- 小型应用: SQLite
- Web 应用: MySQL/PostgreSQL
- 缓存: Redis
- 大数据: MongoDB/Cassandra
"""
数据库基本操作
"""
CRUD 操作:
- Create: 创建数据
- Read: 读取数据
- Update: 更新数据
- Delete: 删除数据
SQL 基础语句:
- SELECT: 查询数据
- INSERT: 插入数据
- UPDATE: 更新数据
- DELETE: 删除数据
- CREATE: 创建表
- DROP: 删除表
事务 (ACID):
- Atomicity: 原子性
- Consistency: 一致性
- Isolation: 隔离性
- Durability: 持久性
"""
SQLite
sqlite3 模块
连接数据库
import sqlite3
# 连接数据库(不存在会自动创建)
conn = sqlite3.connect('example.db')
# 创建游标
cursor = conn.cursor()
print(f"数据库连接成功: {conn}")
# 使用 with 语句自动关闭连接
with sqlite3.connect('example.db') as conn:
cursor = conn.cursor()
print("使用 with 语句自动管理连接")
# 内存数据库
conn_memory = sqlite3.connect(':memory:')
print("内存数据库创建成功")
# 设置隔离级别
conn.isolation_level = None # 自动提交模式
# 关闭连接
conn.close()
执行 SQL
import sqlite3
def create_table():
"""创建表"""
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# 创建用户表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
age INTEGER,
email TEXT UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
print("表创建成功")
# 创建表
create_table()
# 插入数据
def insert_data():
"""插入数据"""
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# 单条插入
cursor.execute(
"INSERT INTO users (name, age, email) VALUES (?, ?, ?)",
('Alice', 25, 'alice@example.com')
)
# 多条插入
users = [
('Bob', 30, 'bob@example.com'),
('Charlie', 35, 'charlie@example.com'),
('David', 28, 'david@example.com')
]
cursor.executemany(
"INSERT INTO users (name, age, email) VALUES (?, ?, ?)",
users
)
conn.commit()
print(f"插入了 {cursor.rowcount} 条数据")
conn.close()
# 执行插入
insert_data()
# 查询数据
def query_data():
"""查询数据"""
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# 查询所有数据
cursor.execute("SELECT * FROM users")
rows = cursor.fetchall()
print("所有用户:")
for row in rows:
print(f" ID: {row[0]}, 姓名: {row[1]}, 年龄: {row[2]}, 邮箱: {row[3]}")
# 查询特定数据
cursor.execute("SELECT * FROM users WHERE age > ?", (25,))
rows = cursor.fetchall()
print("\n年龄大于25的用户:")
for row in rows:
print(f" {row[1]}: {row[2]}岁")
# 使用 fetchone 获取单条数据
cursor.execute("SELECT * FROM users WHERE name = ?", ('Alice',))
row = cursor.fetchone()
if row:
print(f"\nAlice的信息: {row}")
conn.close()
# 执行查询
query_data()
# 更新数据
def update_data():
"""更新数据"""
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# 更新单个字段
cursor.execute(
"UPDATE users SET age = ? WHERE name = ?",
(26, 'Alice')
)
# 批量更新
cursor.execute(
"UPDATE users SET age = age + 1 WHERE age < 30"
)
conn.commit()
print(f"更新了 {cursor.rowcount} 条数据")
conn.close()
# 执行更新
update_data()
# 删除数据
def delete_data():
"""删除数据"""
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# 删除特定数据
cursor.execute("DELETE FROM users WHERE name = ?", ('David',))
conn.commit()
print(f"删除了 {cursor.rowcount} 条数据")
conn.close()
# 执行删除
delete_data()
事务处理
import sqlite3
def transaction_example():
"""事务示例"""
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
try:
# 开始事务
conn.execute("BEGIN TRANSACTION")
# 执行多个 SQL 操作
cursor.execute("INSERT INTO users (name, age, email) VALUES (?, ?, ?)",
('Eve', 27, 'eve@example.com'))
cursor.execute("UPDATE users SET age = age + 1 WHERE name = 'Eve'")
# 提交事务
conn.commit()
print("事务提交成功")
except Exception as e:
# 回滚事务
conn.rollback()
print(f"事务回滚: {e}")
finally:
conn.close()
# 上下文管理器方式
def transaction_with_context():
"""使用上下文管理器的事务"""
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
try:
# 执行多个操作
cursor.execute("INSERT INTO users (name, age, email) VALUES (?, ?, ?)",
('Frank', 32, 'frank@example.com'))
cursor.execute("UPDATE users SET age = age + 2")
# 提交
conn.commit()
print("操作成功")
except Exception as e:
conn.rollback()
print(f"操作失败: {e}")
finally:
conn.close()
# 执行事务示例
transaction_example()
高级操作
import sqlite3
def advanced_operations():
"""高级操作示例"""
conn = sqlite3.connect('example.db')
conn.row_factory = sqlite3.Row # 使用 Row 工厂
cursor = conn.cursor()
# 1. 使用 Row 对象访问字段
cursor.execute("SELECT * FROM users LIMIT 3")
for row in cursor.fetchall():
# 可以通过列名访问
print(f"{row['name']}: {row['age']}岁")
# 2. 分页查询
page = 1
page_size = 2
offset = (page - 1) * page_size
cursor.execute(
"SELECT * FROM users LIMIT ? OFFSET ?",
(page_size, offset)
)
print(f"\n第{page}页数据:")
for row in cursor.fetchall():
print(f" {row['name']}")
# 3. 排序
cursor.execute("SELECT * FROM users ORDER BY age DESC")
print("\n按年龄降序:")
for row in cursor.fetchall():
print(f" {row['name']}: {row['age']}")
# 4. 聚合函数
cursor.execute("SELECT COUNT(*), AVG(age), MAX(age), MIN(age) FROM users")
count, avg_age, max_age, min_age = cursor.fetchone()
print(f"\n统计信息:")
print(f" 总数: {count}")
print(f" 平均年龄: {avg_age:.2f}")
print(f" 最大年龄: {max_age}")
print(f" 最小年龄: {min_age}")
# 5. 分组查询
cursor.execute("SELECT age, COUNT(*) FROM users GROUP BY age")
print("\n年龄分组:")
for row in cursor.fetchall():
print(f" {row[0]}岁: {row[1]}人")
# 6. 连接查询
# 先创建订单表
cursor.execute('''
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
amount REAL,
FOREIGN KEY (user_id) REFERENCES users(id)
)
''')
# 插入订单数据
cursor.execute("INSERT INTO orders (user_id, amount) VALUES (?, ?)", (1, 100.0))
cursor.execute("INSERT INTO orders (user_id, amount) VALUES (?, ?)", (2, 200.0))
conn.commit()
# 连接查询
cursor.execute('''
SELECT users.name, orders.amount
FROM users
INNER JOIN orders ON users.id = orders.user_id
''')
print("\n用户订单:")
for row in cursor.fetchall():
print(f" {row[0]}: {row[1]}元")
conn.close()
# 执行高级操作
advanced_operations()
MySQL
PyMySQL
安装和连接
# 安装 PyMySQL
# pip install PyMySQL
import pymysql
# 连接 MySQL 数据库
connection = pymysql.connect(
host='localhost',
port=3306,
user='root',
password='password',
database='test_db',
charset='utf8mb4'
)
print(f"MySQL 连接成功: {connection}")
# 使用上下文管理器
with pymysql.connect(
host='localhost',
user='root',
password='password',
database='test_db'
) as conn:
print("使用 with 语句管理连接")
# 关闭连接
connection.close()
基本操作
import pymysql
def mysql_operations():
"""MySQL 基本操作"""
conn = pymysql.connect(
host='localhost',
user='root',
password='password',
database='test_db'
)
try:
with conn.cursor() as cursor:
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
price DECIMAL(10, 2) NOT NULL,
stock INT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
''')
# 插入数据
cursor.execute(
"INSERT INTO products (name, price, stock) VALUES (%s, %s, %s)",
('iPhone', 5999.00, 100)
)
# 批量插入
products = [
('iPad', 3299.00, 50),
('MacBook', 12999.00, 20),
('AirPods', 1299.00, 200)
]
cursor.executemany(
"INSERT INTO products (name, price, stock) VALUES (%s, %s, %s)",
products
)
# 提交事务
conn.commit()
print(f"插入了 {cursor.rowcount} 条数据")
with conn.cursor() as cursor:
# 查询数据
cursor.execute("SELECT * FROM products")
results = cursor.fetchall()
print("\n产品列表:")
for row in results:
print(f" ID: {row[0]}, 名称: {row[1]}, 价格: {row[2]}, 库存: {row[3]}")
# 使用 WHERE 条件
cursor.execute(
"SELECT * FROM products WHERE price > %s",
(1000,)
)
print("\n价格大于1000的产品:")
for row in cursor.fetchall():
print(f" {row[1]}: {row[2]}元")
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
# 使用字典游标(字段名访问)
cursor.execute("SELECT * FROM products LIMIT 2")
results = cursor.fetchall()
print("\n使用字典游标:")
for row in results:
print(f" {row['name']}: {row['price']}元")
except Exception as e:
# 回滚事务
conn.rollback()
print(f"操作失败: {e}")
finally:
conn.close()
# 执行操作
# mysql_operations()
ORM 操作
SQLAlchemy 基础
# 安装 SQLAlchemy
# pip install SQLAlchemy
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 创建引擎
engine = create_engine(
'sqlite:///example_orm.db', # SQLite
# 'mysql+pymysql://root:password@localhost/test_db', # MySQL
echo=True # 打印 SQL 语句
)
# 创建基类
Base = declarative_base()
# 定义模型
class User(Base):
"""用户模型"""
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50), nullable=False)
age = Column(Integer)
email = Column(String(100), unique=True)
def __repr__(self):
return f"<User(id={self.id}, name='{self.name}')>"
class Product(Base):
"""产品模型"""
__tablename__ = 'products'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
price = Column(Float, nullable=False)
stock = Column(Integer, default=0)
def __repr__(self):
return f"<Product(id={self.id}, name='{self.name}', price={self.price})>"
# 创建表
Base.metadata.create_all(engine)
# 创建 Session
Session = sessionmaker(bind=engine)
session = Session()
# 插入数据
user1 = User(name='Alice', age=25, email='alice@example.com')
user2 = User(name='Bob', age=30, email='bob@example.com')
session.add(user1)
session.add(user2)
session.commit()
# 批量插入
users = [
User(name=f'User{i}', age=20 + i, email=f'user{i}@example.com')
for i in range(3, 6)
]
session.add_all(users)
session.commit()
# 查询数据
print("\n查询所有用户:")
all_users = session.query(User).all()
for user in all_users:
print(f" {user.name}: {user.age}岁")
# 条件查询
print("\n年龄大于25的用户:")
users_over_25 = session.query(User).filter(User.age > 25).all()
for user in users_over_25:
print(f" {user.name}: {user.age}岁")
# 查询单个对象
alice = session.query(User).filter_by(name='Alice').first()
if alice:
print(f"\n找到用户: {alice.name}")
# 更新数据
alice.age = 26
session.commit()
# 删除数据
user_to_delete = session.query(User).filter_by(name='Bob').first()
if user_to_delete:
session.delete(user_to_delete)
session.commit()
# 统计查询
count = session.query(User).count()
print(f"\n用户总数: {count}")
avg_age = session.query(User.age).all()
print(f"平均年龄: {sum(avg_age) / len(avg_age):.2f}")
# 关闭 Session
session.close()
关系映射
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class Author(Base):
"""作者模型"""
__tablename__ = 'authors'
id = Column(Integer, primary_key=True)
name = Column(String(50), nullable=False)
# 关系: 一个作者有多本书
books = relationship('Book', back_populates='author')
def __repr__(self):
return f"<Author(id={self.id}, name='{self.name}')>"
class Book(Base):
"""书籍模型"""
__tablename__ = 'books'
id = Column(Integer, primary_key=True)
title = Column(String(100), nullable=False)
author_id = Column(Integer, ForeignKey('authors.id'))
# 关系: 一本书属于一个作者
author = relationship('Author', back_populates='books')
def __repr__(self):
return f"<Book(id={self.id}, title='{self.title}')>"
# 创建表
Base.metadata.create_all(engine)
# 使用关系
session = Session()
# 创建作者和书籍
author = Author(name='J.K. Rowling')
book1 = Book(title='Harry Potter 1', author=author)
book2 = Book(title='Harry Potter 2', author=author)
session.add(author)
session.commit()
# 查询关系
print("\n查询作者及其书籍:")
authors = session.query(Author).all()
for author in authors:
print(f"\n作者: {author.name}")
for book in author.books:
print(f" 书籍: {book.title}")
# 反向查询
print("\n查询书籍及其作者:")
books = session.query(Book).all()
for book in books:
print(f"{book.title} - {book.author.name}")
session.close()
Redis
redis-py
安装和连接
# 安装 redis-py
# pip install redis
import redis
# 连接 Redis
r = redis.Redis(
host='localhost',
port=6379,
db=0,
password=None, # 如果有密码
decode_responses=True # 自动解码为字符串
)
# 测试连接
try:
r.ping()
print("Redis 连接成功")
except redis.ConnectionError:
print("Redis 连接失败")
# 使用连接池
pool = redis.ConnectionPool(
host='localhost',
port=6379,
db=0,
decode_responses=True
)
r = redis.Redis(connection_pool=pool)
数据类型操作
String 操作
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 设置和获取
r.set('name', 'Alice')
value = r.get('name')
print(f"name: {value}")
# 设置过期时间
r.setex('session:alice', 3600, 'active') # 1小时后过期
r.set('temp', 'data', ex=60) # 60秒后过期
# 批量设置
r.mset('key1', 'value1', 'key2', 'value2')
values = r.mget('key1', 'key2')
print(f"批量获取: {values}")
# 计数器
r.set('counter', 0)
r.incr('counter') # 加1
r.incrby('counter', 5) # 加5
r.decr('counter') # 减1
counter = r.get('counter')
print(f"计数器: {counter}")
# 追加操作
r.append('name', ' Smith') # 追加
print(r.get('name')) # 'Alice Smith'
# 字符串长度
print(f"长度: {r.strlen('name')}")
# 删除键
r.delete('name', 'key1', 'key2')
Hash 操作
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 设置字段
r.hset('user:1', 'name', 'Alice')
r.hset('user:1', 'age', 25)
r.hset('user:1', 'email', 'alice@example.com')
# 批量设置
user_data = {
'name': 'Bob',
'age': 30,
'email': 'bob@example.com'
}
r.hset('user:2', mapping=user_data)
# 获取字段
name = r.hget('user:1', 'name')
print(f"name: {name}")
# 获取所有字段
user1 = r.hgetall('user:1')
print(f"user1 所有数据: {user1}")
# 获取所有字段名
keys = r.hkeys('user:1')
print(f"字段名: {keys}")
# 获取所有值
values = r.hvals('user:1')
print(f"字段值: {values}")
# 字段是否存在
print(r.hexists('user:1', 'name')) # True
print(r.hexists('user:1', 'address')) # False
# 字段数量
print(f"字段数: {r.hlen('user:1')}")
# 删除字段
r.hdel('user:1', 'email')
# 增加数值
r.hincrby('user:1', 'age', 1) # age + 1
# 删除整个 hash
r.delete('user:2')
List 操作
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 左推入和弹出
r.lpush('tasks', 'task1', 'task2', 'task3')
task = r.lpop('tasks') # 从左边弹出
print(f"执行任务: {task}")
# 右推入和弹出
r.rpush('tasks', 'task4')
task = r.rpop('tasks') # 从右边弹出
print(f"最后任务: {task}")
# 获取列表元素
tasks = r.lrange('tasks', 0, -1) # 获取所有
print(f"所有任务: {tasks}")
# 获取列表长度
length = r.llen('tasks')
print(f"任务数: {length}")
# 索引访问
first_task = r.lindex('tasks', 0) # 第一个
second_task = r.lindex('tasks', 1) # 第二个
print(f"前两个任务: {first_task}, {second_task}")
# 保留范围内的元素
r.ltrim('tasks', 0, 2) # 只保留前3个
# 阻塞操作
# 从右边推入,如果列表为空则等待
# task = r.brpop('tasks', timeout=5)
# 删除列表
r.delete('tasks')
Set 操作
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 添加成员
r.sadd('tags', 'python', 'java', 'golang', 'python')
# 获取所有成员
tags = r.smembers('tags')
print(f"标签: {tags}")
# 判断成员是否存在
print(r.sismember('tags', 'python')) # True
print(r.sismember('tags', 'ruby')) # False
# 获取成员数量
count = r.scard('tags')
print(f"标签数: {count}")
# 随机获取成员
tag = r.srandmember('tags')
print(f"随机标签: {tag}")
# 随机获取多个成员
random_tags = r.srandmember('tags', 2)
print(f"随机标签: {random_tags}")
# 移除成员
r.srem('tags', 'java')
# 集合运算
r.sadd('set1', 1, 2, 3, 4)
r.sadd('set2', 3, 4, 5, 6)
# 交集
intersection = r.sinter('set1', 'set2')
print(f"交集: {intersection}")
# 并集
union = r.sunion('set1', 'set2')
print(f"并集: {union}")
# 差集
difference = r.sdiff('set1', 'set2')
print(f"差集: {difference}")
# 删除集合
r.delete('tags', 'set1', 'set2')
Sorted Set 操作
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 添加成员
r.zadd('rankings', {'Alice': 100, 'Bob': 85, 'Charlie': 95})
# 获取成员分数
score = r.zscore('rankings', 'Alice')
print(f"Alice 的分数: {score}")
# 获取排名
rank = r.zrank('rankings', 'Alice') # 从0开始
print(f"Alice 的排名: {rank}")
# 获取排名(倒序)
reverse_rank = r.zrevrank('rankings', 'Alice')
print(f"Alice 的倒序排名: {reverse_rank}")
# 获取范围内的成员
members = r.zrange('rankings', 0, -1)
print(f"所有成员: {members}")
# 按分数排序
members_with_scores = r.zrange('rankings', 0, -1, withscores=True)
print(f"成员和分数: {members_with_scores}")
# 倒序获取
top_members = r.zrevrange('rankings', 0, 2, withscores=True)
print(f"前3名: {top_members}")
# 获取范围内的成员
members = r.zrangebyscore('rankings', 90, 100)
print(f"90-100分的成员: {members}")
# 增加分数
r.zincrby('rankings', 5, 'Alice') # Alice +5分
# 获取成员数量
count = r.zcard('rankings')
print(f"成员数: {count}")
# 删除成员
r.zrem('rankings', 'Charlie')
管道与事务
Pipeline
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 使用 Pipeline 批量执行命令
pipe = r.pipeline()
# 添加多个命令(不会立即执行)
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.set('key3', 'value3')
pipe.incr('counter')
# 批量执行
results = pipe.execute()
print(f"执行结果: {results}")
# 使用 transaction 确保原子性
pipe = r.pipeline(transaction=True)
pipe.multi()
pipe.set('balance', 100)
pipe.decrby('balance', 30)
pipe.execute()
balance = r.get('balance')
print(f"余额: {balance}")
事务
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 使用 MULTI/EXEC 执行事务
pipe = r.pipeline()
try:
# 开始事务
pipe.multi()
# 监视键
pipe.watch('account:alice', 'account:bob')
alice_balance = int(pipe.get('account:alice'))
bob_balance = int(pipe.get('account:bob'))
# 转账操作
pipe.set('account:alice', alice_balance - 100)
pipe.set('account:bob', bob_balance + 100)
# 执行事务
pipe.execute()
print("转账成功")
except redis.WatchError:
print("事务失败: 键已被修改")
pipe.reset()
# 初始化测试账户
r.set('account:alice', 1000)
r.set('account:bob', 500)
MongoDB
PyMongo
安装和连接
# 安装 PyMongo
# pip install pymongo
from pymongo import MongoClient
# 连接 MongoDB
client = MongoClient('localhost', 27017)
# 或使用连接字符串
# client = MongoClient('mongodb://localhost:27017/')
# 访问数据库
db = client['test_db']
# 访问集合
collection = db['users']
print(f"数据库列表: {client.list_database_names()}")
print(f"集合列表: {db.list_collection_names()}")
文档操作
from pymongo import MongoClient
from datetime import datetime
client = MongoClient('localhost', 27017)
db = client['test_db']
collection = db['users']
# 插入文档
def insert_documents():
"""插入文档"""
# 单个文档
user1 = {
'name': 'Alice',
'age': 25,
'email': 'alice@example.com',
'tags': ['developer', 'python'],
'created_at': datetime.now()
}
result = collection.insert_one(user1)
print(f"插入的文档ID: {result.inserted_id}")
# 批量插入
users = [
{
'name': 'Bob',
'age': 30,
'email': 'bob@example.com',
'tags': ['manager', 'java'],
'created_at': datetime.now()
},
{
'name': 'Charlie',
'age': 35,
'email': 'charlie@example.com',
'tags': ['designer', 'ui/ux'],
'created_at': datetime.now()
}
]
result = collection.insert_many(users)
print(f"插入了 {len(result.inserted_ids)} 条文档")
# 查询文档
def query_documents():
"""查询文档"""
# 查询所有文档
print("所有用户:")
for user in collection.find():
print(f" {user['name']}: {user.get('age', 'N/A')}岁")
# 条件查询
print("\n年龄大于25的用户:")
for user in collection.find({'age': {'$gt': 25}}):
print(f" {user['name']}: {user['age']}岁")
# 查询单个文档
alice = collection.find_one({'name': 'Alice'})
if alice:
print(f"\n找到用户: {alice['name']}")
# 字段过滤
print("\n只返回姓名和邮箱:")
for user in collection.find({}, {'name': 1, 'email': 1, '_id': 0}):
print(f" {user['name']}: {user['email']}")
# 排序和限制
print("\n年龄最大的3个用户:")
for user in collection.find().sort('age', -1).limit(3):
print(f" {user['name']}: {user['age']}岁")
# 更新文档
def update_documents():
"""更新文档"""
# 更新单个字段
collection.update_one(
{'name': 'Alice'},
{'$set': {'age': 26}}
)
# 更新多个文档
collection.update_many(
{'age': {'$lt': 30}},
{'$inc': {'age': 1}} # age + 1
)
# 数组操作
collection.update_one(
{'name': 'Alice'},
{'$push': {'tags': 'team lead'}} # 添加到数组
)
collection.update_one(
{'name': 'Bob'},
{'$pull': {'tags': 'java'}} # 从数组删除
)
print("文档更新成功")
# 删除文档
def delete_documents():
"""删除文档"""
# 删除单个文档
result = collection.delete_one({'name': 'Charlie'})
print(f"删除了 {result.deleted_count} 条文档")
# 删除多个文档
result = collection.delete_many({'age': {'$gt': 30}})
print(f"删除了 {result.deleted_count} 条文档")
# 聚合操作
def aggregation():
"""聚合操作"""
# 统计
pipeline = [
{'$group': {'_id': None, 'count': {'$sum': 1}}}
]
result = list(collection.aggregate(pipeline))
print(f"总用户数: {result[0]['count']}")
# 分组统计
pipeline = [
{'$group': {'_id': '$age', 'count': {'$sum': 1}}},
{'$sort': {'_id': 1}}
]
print("\n按年龄分组:")
for doc in collection.aggregate(pipeline):
print(f" {doc['_id']}岁: {doc['count']}人")
# 标签统计
pipeline = [
{'$unwind': '$tags'},
{'$group': {'_id': '$tags', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}}
]
print("\n标签统计:")
for doc in collection.aggregate(pipeline):
print(f" {doc['_id']}: {doc['count']}人")
# 创建索引
collection.create_index([('name', 1)])
collection.create_index([('age', -1)])
# 执行操作
# insert_documents()
# query_documents()
# update_documents()
# delete_documents()
# aggregation()
数据库连接池
SQLite 连接池
import sqlite3
import threading
from queue import Queue
class SQLitePool:
"""SQLite 连接池"""
def __init__(self, database, max_connections=5):
self.database = database
self.max_connections = max_connections
self.pool = Queue(max_connections)
self.local = threading.local()
# 初始化连接池
for _ in range(max_connections):
conn = sqlite3.connect(database)
self.pool.put(conn)
def get_connection(self):
"""获取连接"""
# 线程本地存储
if not hasattr(self.local, 'conn') or self.local.conn is None:
self.local.conn = self.pool.get()
return self.local.conn
def return_connection(self):
"""归还连接"""
if hasattr(self.local, 'conn') and self.local.conn is not None:
self.pool.put(self.local.conn)
self.local.conn = None
def close_all(self):
"""关闭所有连接"""
while not self.pool.empty():
conn = self.pool.get()
conn.close()
def __enter__(self):
"""上下文管理器入口"""
self.conn = self.get_connection()
return self.conn
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器出口"""
if exc_type is None:
self.return_connection()
else:
# 发生异常时不归还连接
pass
# 使用连接池
pool = SQLitePool('example.db')
def use_pool():
"""使用连接池"""
with pool as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
results = cursor.fetchall()
print(f"查询到 {len(results)} 条记录")
# 使用
# use_pool()
MySQL 连接池
from pymysql import connect
from dbutils.pooled_db import PooledDB
# 创建连接池
pool = PooledDB(
creator=pymysql, # 使用 PyMySQL
maxconnections=10, # 最大连接数
mincached=2, # 最小空闲连接数
maxcached=5, # 最大空闲连接数
maxshared=3, # 最大共享连接数
blocking=True, # 连接池满时是否阻塞等待
host='localhost',
port=3306,
user='root',
password='password',
database='test_db',
charset='utf8mb4'
)
def query_with_pool():
"""使用连接池查询"""
conn = pool.connection()
try:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM products")
results = cursor.fetchall()
for row in results:
print(f"产品: {row[1]}")
finally:
pool.close(conn) # 归还连接到池
# 使用
# query_with_pool()
SQLAlchemy 连接池
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 创建引擎(带连接池)
engine = create_engine(
'mysql+pymysql://root:password@localhost/test_db',
pool_size=10, # 连接池大小
max_overflow=20, # 超出 pool_size 后最多可以创建的连接数
pool_recycle=3600, # 连接回收时间(秒)
pool_pre_ping=True, # 从池获取连接时先测试连接
echo=False
)
# 创建 Session
Session = sessionmaker(bind=engine)
def use_session():
"""使用 Session"""
session = Session()
try:
# 执行数据库操作
users = session.query(User).limit(5).all()
for user in users:
print(f"用户: {user.name}")
session.commit()
except Exception as e:
session.rollback()
print(f"错误: {e}")
finally:
session.close()
# 使用
# use_session()
# 关闭连接池
engine.dispose()
小结
本章节介绍了 Python 的数据库操作:
数据库基础
- 数据库类型: 关系型数据库和非关系型数据库
- 基本操作: CRUD 操作、SQL 语句、事务概念
SQLite
- sqlite3 模块: 连接数据库、创建表
- 执行 SQL: INSERT、SELECT、UPDATE、DELETE 操作
- 事务处理: 事务提交、回滚、上下文管理器
- 高级操作: 分页、排序、聚合函数、连接查询
MySQL
- PyMySQL: 连接数据库、基本 CRUD 操作
- ORM 操作: SQLAlchemy 模型定义、关系映射
- 关系映射: 一对多、多对多关系
Redis
- redis-py: 连接 Redis、连接池
- 数据类型: String、Hash、List、Set、Sorted Set
- 管道与事务: Pipeline 批量执行、事务保证原子性
MongoDB
- PyMongo: 连接 MongoDB、访问数据库和集合
- 文档操作: 插入、查询、更新、删除文档
- 聚合操作: 统计、分组、聚合管道
数据库连接池
- SQLite 连接池: 自定义连接池实现
- MySQL 连接池: 使用 DBUtils
- SQLAlchemy 连接池: 配置连接池参数
掌握 Python 数据库操作可以让你轻松实现数据持久化,构建功能完整的应用程序。合理选择数据库类型和操作方式,可以大大提高应用性能和开发效率。