第14章SQLite+Agent持久化存储

admin 2026-06-23 06:23:01 网络安全文章 来源:ZONE.CI 全球网 0 阅读模式

文章总结: 本文介绍在Agent开发中使用SQLite进行持久化存储的方案,强调SQLite在零配置、嵌入式、便携性方面的优势,并对比其与PostgreSQL的适用场景。详细设计了包含sessions、messages、tasks等5张表的核心数据库架构,提供完整的Python代码实现示例,重点说明表关系设计、JSON字段存储及WAL模式优化等关键技术要点。 综合评分: 85 文章分类: 安全开发,解决方案,技术标准,安全工具,其他


cover_image

第14章 SQLite + Agent 持久化存储

原创

网络安全民工 网络安全民工

网络安全民工

2026年6月19日 18:27 天津

在小说阅读器读本章

去阅读

本章完全独立运行(仅依赖 Python 标准库 sqlite3)

运行:python chapter_14_sqlite/14_sqlite_agent_storage.py

14.1 为什么是 SQLite?

❌ 常见误区:「Agent 项目 = 必须用 PostgreSQL / MongoDB」

✅ 真相:SQLite 是 Agent 开发中最被低估的数据库。

SQLite 在 Agent 场景中的优势:

零配置:不需要安装数据库服务,文件即数据库

嵌入式:数据库和应用在同一个进程(低延迟)

便携性:一个 .db 文件可以备份、迁移、版本控制

WAL 模式:支持并发读 + 单写(Agent 场景足够)

全文搜索(FTS5):内置全文索引(对话搜索)

JSON 支持:可以存储半结构化数据

什么时候用 SQLite?什么时候升级到 PostgreSQL?

SQLite 适用:

✓ 单机部署的 Agent 服务

✓ 原型开发 / MVP 阶段

✓ 个人 Agent 助手

✓ 中小型团队内部工具

✓ 对话历史 < 10M 条

PostgreSQL 适用:

→ 多副本高可用需求

→ 写入 QPS > 1000

→ 需要行级权限控制

→ 需要地理分布部署

14.2 Agent 数据库 schema 设计

核心表设计(5 张表):

📊 架构示意

&nbsp; ┌────────────────────────────────────────────────────┐&nbsp; │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;SQLite 数据库 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;│&nbsp; │ &nbsp;┌─────────────┐ &nbsp;┌──────────────────┐ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;│ &nbsp; sessions &nbsp; │ &nbsp;│ &nbsp; &nbsp;messages &nbsp; &nbsp; &nbsp; │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;│ &nbsp;───────── &nbsp;│ &nbsp;│ &nbsp; ──────────── &nbsp; │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;│ &nbsp;id&nbsp;(PK) &nbsp; &nbsp;│ &nbsp;│ &nbsp;&nbsp;id&nbsp;(PK) &nbsp; &nbsp; &nbsp; &nbsp; │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;│ &nbsp;user_id &nbsp; &nbsp;│──│ &nbsp; session_id (FK) │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;│ &nbsp;title &nbsp; &nbsp; &nbsp;│ &nbsp;│ &nbsp; role &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;│ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;│ &nbsp;status &nbsp; &nbsp; │ &nbsp;│ &nbsp; content &nbsp; &nbsp; &nbsp; &nbsp; │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;│ &nbsp;created_at │ &nbsp;│ &nbsp; tool_calls_json │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;│ &nbsp;updated_at │ &nbsp;│ &nbsp; token_count &nbsp; &nbsp; │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;└─────────────┘ &nbsp;│ &nbsp; created_at &nbsp; &nbsp; &nbsp;│ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;└──────────────────┘ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;│&nbsp; │ &nbsp;┌─────────────┐ &nbsp;┌──────────────────┐ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;│ &nbsp; &nbsp;tasks &nbsp; &nbsp; │ &nbsp;│ &nbsp; &nbsp;tool_logs &nbsp; &nbsp; &nbsp;│ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;│ &nbsp;───────── &nbsp;│ &nbsp;│ &nbsp; ──────────── &nbsp; │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;│ &nbsp;id&nbsp;(PK) &nbsp; &nbsp;│ &nbsp;│ &nbsp;&nbsp;id&nbsp;(PK) &nbsp; &nbsp; &nbsp; &nbsp; │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;│ &nbsp;session_id │ &nbsp;│ &nbsp; session_id &nbsp; &nbsp; &nbsp;│ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;│ &nbsp;type&nbsp; &nbsp; &nbsp; &nbsp;│ &nbsp;│ &nbsp; tool_name &nbsp; &nbsp; &nbsp; │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;│ &nbsp;payload &nbsp; &nbsp;│ &nbsp;│ &nbsp; input_json &nbsp; &nbsp; &nbsp;│ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;│ &nbsp;status &nbsp; &nbsp; │ &nbsp;│ &nbsp; output_json &nbsp; &nbsp; │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;│ &nbsp;result &nbsp; &nbsp; │ &nbsp;│ &nbsp; elapsed_ms &nbsp; &nbsp; &nbsp;│ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;│ &nbsp;created_at │ &nbsp;│ &nbsp; success &nbsp; &nbsp; &nbsp; &nbsp; │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp;└─────────────┘ &nbsp;│ &nbsp; created_at &nbsp; &nbsp; &nbsp;│ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;└──────────────────┘ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; │&nbsp; │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;│&nbsp; │ &nbsp;┌─────────────┐ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;│&nbsp; │ &nbsp;│ &nbsp; &nbsp;users&nbsp; &nbsp; &nbsp;│ &nbsp;← 扩展用(配额/权限/偏好) &nbsp; &nbsp; &nbsp; &nbsp;│&nbsp; │ &nbsp;│ &nbsp;───────── &nbsp;│ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;│&nbsp; │ &nbsp;│ &nbsp;id&nbsp;(PK) &nbsp; &nbsp;│ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;│&nbsp; │ &nbsp;│ &nbsp;name &nbsp; &nbsp; &nbsp; │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;│&nbsp; │ &nbsp;│ &nbsp;quota_total│ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;│&nbsp; │ &nbsp;│ &nbsp;quota_used │ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;│&nbsp; │ &nbsp;└─────────────┘ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;│&nbsp; └────────────────────────────────────────────────────┘

设计要点(面试重点!):

sessions 和 messages 是 1:N 关系

tool_calls 存储在 messages 表的 JSON 字段中(不用单独的表)

→ 减少 JOIN,提高读取性能

tool_logs 是独立的审计表(记录每次工具调用的详细信息)

tasks 表支持异步任务(暂停/恢复/取消)

📝 对应的代码实现

get_dbinit_databaseDB_PATH

import&nbsp;osimport&nbsp;jsonimport&nbsp;timeimport&nbsp;hashlibimport&nbsp;sqlite3from&nbsp;datetime&nbsp;import&nbsp;datetimefrom&nbsp;contextlib&nbsp;import&nbsp;contextmanagerfrom&nbsp;typing&nbsp;import&nbsp;OptionalDB_PATH = os.path.join(os.path.dirname(__file__),&nbsp;"agent_store.db")class="d">@contextmanagerdef&nbsp;get_db():&nbsp; &nbsp;&nbsp;"""获取数据库连接(上下文管理器,自动提交/回滚)。"""&nbsp; &nbsp; conn = sqlite3.connect(DB_PATH)&nbsp; &nbsp; conn.row_factory = sqlite3.Row &nbsp;# 结果以字典形式返回&nbsp; &nbsp; conn.execute("PRAGMA journal_mode=WAL") &nbsp;# 启用 WAL 模式&nbsp; &nbsp; conn.execute("PRAGMA foreign_keys=ON") &nbsp;&nbsp;# 启用外键约束&nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;yield&nbsp;conn&nbsp; &nbsp; &nbsp; &nbsp; conn.commit()&nbsp; &nbsp;&nbsp;except&nbsp;Exception:&nbsp; &nbsp; &nbsp; &nbsp; conn.rollback()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;raise&nbsp; &nbsp;&nbsp;finally:&nbsp; &nbsp; &nbsp; &nbsp; conn.close()def&nbsp;init_database():&nbsp; &nbsp;&nbsp;"""初始化数据库 —— 创建所有表。&nbsp; &nbsp; 这是 Agent 持久化的第一步。&nbsp; &nbsp; 每次启动时调用,用 IF NOT EXISTS 保证幂等。&nbsp; &nbsp; """&nbsp; &nbsp;&nbsp;with&nbsp;get_db()&nbsp;as&nbsp;conn:&nbsp; &nbsp; &nbsp; &nbsp; conn.executescript("""&nbsp; &nbsp; &nbsp; &nbsp; -- ===== 用户表 =====&nbsp; &nbsp; &nbsp; &nbsp; CREATE TABLE IF NOT EXISTS users (&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; id TEXT PRIMARY KEY,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name TEXT NOT NULL,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; email TEXT,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; api_key_hash TEXT,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; quota_total INTEGER DEFAULT 100000,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; quota_used INTEGER DEFAULT 0,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; preferences_json TEXT DEFAULT '{}',&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; created_at TEXT NOT NULL DEFAULT (datetime('now')),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; updated_at TEXT NOT NULL DEFAULT (datetime('now'))&nbsp; &nbsp; &nbsp; &nbsp; );&nbsp; &nbsp; &nbsp; &nbsp; -- ===== 会话表 =====&nbsp; &nbsp; &nbsp; &nbsp; CREATE TABLE IF NOT EXISTS sessions (&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; id TEXT PRIMARY KEY,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; user_id TEXT NOT NULL,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; title TEXT DEFAULT '新对话',&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; status TEXT DEFAULT 'active'&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; CHECK(status IN ('active','paused','completed','cancelled')),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; model TEXT DEFAULT 'gpt-4o-mini',&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; total_tokens INTEGER DEFAULT 0,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; total_cost REAL DEFAULT 0.0,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; message_count INTEGER DEFAULT 0,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; metadata_json TEXT DEFAULT '{}',&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; created_at TEXT NOT NULL DEFAULT (datetime('now')),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; updated_at TEXT NOT NULL DEFAULT (datetime('now')),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; FOREIGN KEY (user_id) REFERENCES users(id)&nbsp; &nbsp; &nbsp; &nbsp; );&nbsp; &nbsp; &nbsp; &nbsp; -- ===== 消息表(核心!)=====&nbsp; &nbsp; &nbsp; &nbsp; CREATE TABLE IF NOT EXISTS messages (&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; id INTEGER PRIMARY KEY AUTOINCREMENT,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; session_id TEXT NOT NULL,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; role TEXT NOT NULL CHECK(role IN ('system','user','assistant','tool')),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; content TEXT NOT NULL DEFAULT '',&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tool_calls_json TEXT,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tool_call_id TEXT,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; token_count INTEGER DEFAULT 0,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; created_at TEXT NOT NULL DEFAULT (datetime('now')),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; FOREIGN KEY (session_id) REFERENCES sessions(id)&nbsp; &nbsp; &nbsp; &nbsp; );&nbsp; &nbsp; &nbsp; &nbsp; -- ===== 任务表(异步任务管理)=====&nbsp; &nbsp; &nbsp; &nbsp; CREATE TABLE IF NOT EXISTS tasks (&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; id TEXT PRIMARY KEY,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; session_id TEXT NOT NULL,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; type TEXT NOT NULL,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; payload_json TEXT NOT NULL DEFAULT '{}',&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; status TEXT DEFAULT 'pending'&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; CHECK(status IN ('pending','running','paused','completed','failed','cancelled')),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; result_json TEXT,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; priority INTEGER DEFAULT 0,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; max_retries INTEGER DEFAULT 3,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; retry_count INTEGER DEFAULT 0,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; created_at TEXT NOT NULL DEFAULT (datetime('now')),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; started_at TEXT,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; completed_at TEXT,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; FOREIGN KEY (session_id) REFERENCES sessions(id)&nbsp; &nbsp; &nbsp; &nbsp; );&nbsp; &nbsp; &nbsp; &nbsp; -- ===== 工具调用日志表(审计 + 分析)=====&nbsp; &nbsp; &nbsp; &nbsp; CREATE TABLE IF NOT EXISTS tool_logs (&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; id INTEGER PRIMARY KEY AUTOINCREMENT,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; session_id TEXT NOT NULL,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; message_id INTEGER,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tool_name TEXT NOT NULL,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; input_json TEXT NOT NULL,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; output_json TEXT,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; elapsed_ms REAL,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; success INTEGER DEFAULT 1,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; error_message TEXT,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; created_at TEXT NOT NULL DEFAULT (datetime('now')),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; FOREIGN KEY (session_id) REFERENCES sessions(id),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; FOREIGN KEY (message_id) REFERENCES messages(id)&nbsp; &nbsp; &nbsp; &nbsp; );&nbsp; &nbsp; &nbsp; &nbsp; -- ===== 索引(查询性能关键!)=====&nbsp; &nbsp; &nbsp; &nbsp; CREATE INDEX IF NOT EXISTS idx_sessions_user&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ON sessions(user_id);&nbsp; &nbsp; &nbsp; &nbsp; CREATE INDEX IF NOT EXISTS idx_sessions_updated&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ON sessions(updated_at DESC);&nbsp; &nbsp; &nbsp; &nbsp; CREATE INDEX IF NOT EXISTS idx_messages_session&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ON messages(session_id, created_at);&nbsp; &nbsp; &nbsp; &nbsp; CREATE INDEX IF NOT EXISTS idx_messages_role&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ON messages(session_id, role);&nbsp; &nbsp; &nbsp; &nbsp; CREATE INDEX IF NOT EXISTS idx_tasks_status&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ON tasks(status, priority DESC);&nbsp; &nbsp; &nbsp; &nbsp; CREATE INDEX IF NOT EXISTS idx_tool_logs_session&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ON tool_logs(session_id, created_at);&nbsp; &nbsp; &nbsp; &nbsp; CREATE INDEX IF NOT EXISTS idx_tool_logs_name&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ON tool_logs(tool_name, created_at);&nbsp; &nbsp; &nbsp; &nbsp; """)&nbsp; &nbsp;&nbsp;print(" &nbsp;✅ 数据库已初始化(WAL模式 + 5张表 + 6个索引)")

14.3 Agent 存储层 API

这里封装一个 SQLiteAgentStore 类,提供 5 张表的 CRUD 操作。

设计思路:每个 Agent 交互环节(会话、任务、消息、工具调用、用户)都有独立表。

使用 WAL 模式保证高并发下的读写不阻塞,所有写操作都有审计时间戳。

📝 对应的代码实现

create_userget_usercheck_quotaconsume_quotacreate_sessionget_sessionlist_user_sessionspause_sessionresume_sessionsave_messageget_conversation_historylog_tool_callcreate_taskupdate_task_statusget_usage_statsAgentStorage

import&nbsp;osimport&nbsp;jsonimport&nbsp;timeimport&nbsp;hashlibimport&nbsp;sqlite3from&nbsp;datetime&nbsp;import&nbsp;datetimefrom&nbsp;contextlib&nbsp;import&nbsp;contextmanagerfrom&nbsp;typing&nbsp;import&nbsp;OptionalDB_PATH = os.path.join(os.path.dirname(__file__),&nbsp;"agent_store.db")class&nbsp;AgentStorage:&nbsp; &nbsp;&nbsp;"""Agent 持久化存储 —— 封装所有数据库操作。&nbsp; &nbsp; 设计原则:&nbsp; &nbsp; &nbsp; 1. 每个方法完成一个业务操作&nbsp; &nbsp; &nbsp; 2. 内部处理事务(外部无需关心)&nbsp; &nbsp; &nbsp; 3. 返回 Python 原生类型(dict/list)&nbsp; &nbsp; """&nbsp; &nbsp;&nbsp;# ==================== 用户管理 ====================&nbsp; &nbsp;&nbsp;class="d">@staticmethod&nbsp; &nbsp;&nbsp;def&nbsp;create_user(name:&nbsp;str, email:&nbsp;str&nbsp;=&nbsp;"",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; quota_total:&nbsp;int&nbsp;=&nbsp;100000) ->&nbsp;dict:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""创建用户。&nbsp; &nbsp; &nbsp; &nbsp; Args:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; name: 用户名。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; email: 邮箱(可选)。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; quota_total: Token 配额上限。&nbsp; &nbsp; &nbsp; &nbsp; Returns:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 创建的用户信息字典。&nbsp; &nbsp; &nbsp; &nbsp; """&nbsp; &nbsp; &nbsp; &nbsp; user_id = hashlib.md5(f"{name}-{time.time()}".encode()).hexdigest()[:16]&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;get_db()&nbsp;as&nbsp;conn:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; conn.execute(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"INSERT INTO users (id, name, email, quota_total) VALUES (?,?,?,?)",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (user_id, name, email, quota_total),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;{"id": user_id,&nbsp;"name": name,&nbsp;"quota_total": quota_total}&nbsp; &nbsp;&nbsp;class="d">@staticmethod&nbsp; &nbsp;&nbsp;def&nbsp;get_user(user_id:&nbsp;str) ->&nbsp;Optional[dict]:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""获取用户信息。"""&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;get_db()&nbsp;as&nbsp;conn:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; row = conn.execute(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"SELECT * FROM users WHERE id=?", (user_id,)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ).fetchone()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;dict(row)&nbsp;if&nbsp;row&nbsp;else&nbsp;None&nbsp; &nbsp;&nbsp;class="d">@staticmethod&nbsp; &nbsp;&nbsp;def&nbsp;check_quota(user_id:&nbsp;str, required_tokens:&nbsp;int&nbsp;=&nbsp;0) ->&nbsp;bool:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""检查用户 Token 配额是否充足。&nbsp; &nbsp; &nbsp; &nbsp; Args:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; user_id: 用户 ID。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; required_tokens: 本次需要的 Token 数。&nbsp; &nbsp; &nbsp; &nbsp; Returns:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 是否有足够配额。&nbsp; &nbsp; &nbsp; &nbsp; """&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;get_db()&nbsp;as&nbsp;conn:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; row = conn.execute(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"SELECT quota_total - quota_used AS remaining FROM users WHERE id=?",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (user_id,),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ).fetchone()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;row&nbsp;is&nbsp;None:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;False&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;row["remaining"] >= required_tokens&nbsp; &nbsp;&nbsp;class="d">@staticmethod&nbsp; &nbsp;&nbsp;def&nbsp;consume_quota(user_id:&nbsp;str, tokens:&nbsp;int):&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""消耗用户配额。"""&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;get_db()&nbsp;as&nbsp;conn:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; conn.execute(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"UPDATE users SET quota_used=quota_used+?, updated_at=datetime('now') WHERE id=?",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (tokens, user_id),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp;&nbsp;# ==================== 会话管理 ====================&nbsp; &nbsp;&nbsp;class="d">@staticmethod&nbsp; &nbsp;&nbsp;def&nbsp;create_session(user_id:&nbsp;str, title:&nbsp;str&nbsp;=&nbsp;"新对话",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;model:&nbsp;str&nbsp;=&nbsp;"gpt-4o-mini") ->&nbsp;dict:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""创建新会话。&nbsp; &nbsp; &nbsp; &nbsp; Args:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; user_id: 用户 ID。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; title: 会话标题。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; model: 使用的模型。&nbsp; &nbsp; &nbsp; &nbsp; Returns:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 创建的会话信息。&nbsp; &nbsp; &nbsp; &nbsp; """&nbsp; &nbsp; &nbsp; &nbsp; session_id = hashlib.md5(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;f"{user_id}-{time.time()}-{os.urandom(4).hex()}".encode()&nbsp; &nbsp; &nbsp; &nbsp; ).hexdigest()[:16]&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;get_db()&nbsp;as&nbsp;conn:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; conn.execute(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""INSERT INTO sessions (id, user_id, title, model)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;VALUES (?,?,?,?)""",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (session_id, user_id, title, model),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;{"id": session_id,&nbsp;"user_id": user_id,&nbsp;"title": title,&nbsp;"model": model}&nbsp; &nbsp;&nbsp;class="d">@staticmethod&nbsp; &nbsp;&nbsp;def&nbsp;get_session(session_id:&nbsp;str) ->&nbsp;Optional[dict]:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""获取会话详情。"""&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;get_db()&nbsp;as&nbsp;conn:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; row = conn.execute(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"SELECT * FROM sessions WHERE id=?", (session_id,)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ).fetchone()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;dict(row)&nbsp;if&nbsp;row&nbsp;else&nbsp;None&nbsp; &nbsp;&nbsp;class="d">@staticmethod&nbsp; &nbsp;&nbsp;def&nbsp;list_user_sessions(user_id:&nbsp;str, limit:&nbsp;int&nbsp;=&nbsp;20) ->&nbsp;list[dict]:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""列出用户的所有会话(按更新时间倒序)。&nbsp; &nbsp; &nbsp; &nbsp; Args:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; user_id: 用户 ID。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; limit: 返回数量上限。&nbsp; &nbsp; &nbsp; &nbsp; Returns:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 会话列表。&nbsp; &nbsp; &nbsp; &nbsp; """&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;get_db()&nbsp;as&nbsp;conn:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; rows = conn.execute(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""SELECT id, title, status, model, message_count,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; total_tokens, updated_at&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;FROM sessions WHERE user_id=?&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;ORDER BY updated_at DESC LIMIT ?""",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (user_id, limit),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ).fetchall()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;[dict(r)&nbsp;for&nbsp;r&nbsp;in&nbsp;rows]&nbsp; &nbsp;&nbsp;class="d">@staticmethod&nbsp; &nbsp;&nbsp;def&nbsp;pause_session(session_id:&nbsp;str):&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""暂停会话(暂停 Agent 执行)。"""&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;get_db()&nbsp;as&nbsp;conn:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; conn.execute(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"UPDATE sessions SET status='paused', updated_at=datetime('now') WHERE id=?",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (session_id,),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp;&nbsp;class="d">@staticmethod&nbsp; &nbsp;&nbsp;def&nbsp;resume_session(session_id:&nbsp;str):&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""恢复会话。"""&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;get_db()&nbsp;as&nbsp;conn:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; conn.execute(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"UPDATE sessions SET status='active', updated_at=datetime('now') WHERE id=?",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (session_id,),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp;&nbsp;# ==================== 消息管理(核心!)====================&nbsp; &nbsp;&nbsp;class="d">@staticmethod&nbsp; &nbsp;&nbsp;def&nbsp;save_message(session_id:&nbsp;str, role:&nbsp;str, content:&nbsp;str,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;tool_calls:&nbsp;Optional[list] =&nbsp;None,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;tool_call_id:&nbsp;Optional[str] =&nbsp;None,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;token_count:&nbsp;int&nbsp;=&nbsp;0):&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""保存一条消息到对话历史。&nbsp; &nbsp; &nbsp; &nbsp; 这是 Agent 存储的核心操作,每次 LLM 交互都需要调用。&nbsp; &nbsp; &nbsp; &nbsp; Args:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; session_id: 会话 ID。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; role: 消息角色(user/assistant/system/tool)。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; content: 消息内容。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tool_calls: 工具调用信息(仅 assistant 消息有)。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tool_call_id: 工具调用 ID(仅 tool 消息有)。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; token_count: Token 估算值。&nbsp; &nbsp; &nbsp; &nbsp; """&nbsp; &nbsp; &nbsp; &nbsp; tool_calls_json = json.dumps(tool_calls, ensure_ascii=False)&nbsp;if&nbsp;tool_calls&nbsp;else&nbsp;None&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;get_db()&nbsp;as&nbsp;conn:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; conn.execute(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""INSERT INTO messages (session_id, role, content,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;tool_calls_json, tool_call_id, token_count)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;VALUES (?,?,?,?,?,?)""",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (session_id, role, content, tool_calls_json,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;tool_call_id, token_count),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 同步更新会话统计&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; conn.execute(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""UPDATE sessions SET&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;message_count = message_count + 1,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;total_tokens = total_tokens + ?,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;updated_at = datetime('now')&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;WHERE id=?""",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (token_count, session_id),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp;&nbsp;class="d">@staticmethod&nbsp; &nbsp;&nbsp;def&nbsp;get_conversation_history(session_id:&nbsp;str,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; max_messages:&nbsp;int&nbsp;=&nbsp;50) ->&nbsp;list[dict]:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""获取会话的最近 N 条消息(用于构建 LLM 上下文)。&nbsp; &nbsp; &nbsp; &nbsp; 这是 Agent 记忆系统的核心查询。&nbsp; &nbsp; &nbsp; &nbsp; Args:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; session_id: 会话 ID。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; max_messages: 最大返回条数。&nbsp; &nbsp; &nbsp; &nbsp; Returns:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 消息列表(按时间正序)。&nbsp; &nbsp; &nbsp; &nbsp; """&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;get_db()&nbsp;as&nbsp;conn:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; rows = conn.execute(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""SELECT * FROM (&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;SELECT role, content, tool_calls_json, tool_call_id,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; token_count, created_at&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;FROM messages WHERE session_id=?&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;ORDER BY created_at DESC LIMIT ?&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;) ORDER BY created_at ASC""",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (session_id, max_messages),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ).fetchall()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;[dict(r)&nbsp;for&nbsp;r&nbsp;in&nbsp;rows]&nbsp; &nbsp;&nbsp;# ==================== 工具调用日志 ====================&nbsp; &nbsp;&nbsp;class="d">@staticmethod&nbsp; &nbsp;&nbsp;def&nbsp;log_tool_call(session_id:&nbsp;str, tool_name:&nbsp;str,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; input_data:&nbsp;dict, output_data:&nbsp;dict,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; elapsed_ms:&nbsp;float, success:&nbsp;bool&nbsp;=&nbsp;True,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; error_message:&nbsp;str&nbsp;=&nbsp;None,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; message_id:&nbsp;int&nbsp;=&nbsp;None):&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""记录一次工具调用。&nbsp; &nbsp; &nbsp; &nbsp; 这是 Agent 审计和优化的基础数据。&nbsp; &nbsp; &nbsp; &nbsp; Args:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; session_id: 会话 ID。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tool_name: 工具名称。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; input_data: 工具输入参数。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; output_data: 工具输出结果。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; elapsed_ms: 执行耗时(毫秒)。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; success: 是否成功。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; error_message: 错误信息。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; message_id: 关联的消息 ID。&nbsp; &nbsp; &nbsp; &nbsp; """&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;get_db()&nbsp;as&nbsp;conn:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; conn.execute(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""INSERT INTO tool_logs (session_id, message_id, tool_name,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;input_json, output_json, elapsed_ms, success, error_message)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;VALUES (?,?,?,?,?,?,?,?)""",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (session_id, message_id, tool_name,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;json.dumps(input_data, ensure_ascii=False),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;json.dumps(output_data, ensure_ascii=False),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;elapsed_ms,&nbsp;int(success), error_message),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp;&nbsp;# ==================== 任务管理 ====================&nbsp; &nbsp;&nbsp;class="d">@staticmethod&nbsp; &nbsp;&nbsp;def&nbsp;create_task(session_id:&nbsp;str, task_type:&nbsp;str,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; payload:&nbsp;dict, priority:&nbsp;int&nbsp;=&nbsp;0) ->&nbsp;dict:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""创建一个异步任务。&nbsp; &nbsp; &nbsp; &nbsp; Args:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; session_id: 关联的会话。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; task_type: 任务类型。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; payload: 任务参数。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; priority: 优先级(越大越优先)。&nbsp; &nbsp; &nbsp; &nbsp; Returns:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 创建的任务信息。&nbsp; &nbsp; &nbsp; &nbsp; """&nbsp; &nbsp; &nbsp; &nbsp; task_id = hashlib.md5(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;f"{session_id}-{task_type}-{time.time()}".encode()&nbsp; &nbsp; &nbsp; &nbsp; ).hexdigest()[:16]&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;get_db()&nbsp;as&nbsp;conn:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; conn.execute(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""INSERT INTO tasks (id, session_id, type, payload_json, priority)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;VALUES (?,?,?,?,?)""",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (task_id, session_id, task_type,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;json.dumps(payload, ensure_ascii=False), priority),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;{"id": task_id,&nbsp;"type": task_type,&nbsp;"status":&nbsp;"pending"}&nbsp; &nbsp;&nbsp;class="d">@staticmethod&nbsp; &nbsp;&nbsp;def&nbsp;update_task_status(task_id:&nbsp;str, status:&nbsp;str,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;result:&nbsp;Optional[dict] =&nbsp;None):&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""更新任务状态。&nbsp; &nbsp; &nbsp; &nbsp; Args:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; task_id: 任务 ID。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; status: 新状态。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; result: 任务结果(完成时)。&nbsp; &nbsp; &nbsp; &nbsp; """&nbsp; &nbsp; &nbsp; &nbsp; updates = ["status=?"]&nbsp; &nbsp; &nbsp; &nbsp; params = [status]&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;status ==&nbsp;"running":&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; updates.append("started_at=datetime('now')")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;elif&nbsp;status&nbsp;in&nbsp;("completed",&nbsp;"failed",&nbsp;"cancelled"):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; updates.append("completed_at=datetime('now')")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;result&nbsp;is&nbsp;not&nbsp;None:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; updates.append("result_json=?")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; params.append(json.dumps(result, ensure_ascii=False))&nbsp; &nbsp; &nbsp; &nbsp; params.append(task_id)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;get_db()&nbsp;as&nbsp;conn:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; conn.execute(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;f"UPDATE tasks SET&nbsp;{', '.join(updates)}&nbsp;WHERE id=?",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; params,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp;&nbsp;# ==================== 分析查询 ====================&nbsp; &nbsp;&nbsp;class="d">@staticmethod&nbsp; &nbsp;&nbsp;def&nbsp;get_usage_stats(user_id:&nbsp;str, days:&nbsp;int&nbsp;=&nbsp;7) ->&nbsp;dict:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""获取用户的用量统计。&nbsp; &nbsp; &nbsp; &nbsp; Args:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; user_id: 用户 ID。&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; days: 统计天数。&nbsp; &nbsp; &nbsp; &nbsp; Returns:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 包含各项统计的字典。&nbsp; &nbsp; &nbsp; &nbsp; """&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;get_db()&nbsp;as&nbsp;conn:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 总览&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; total = conn.execute(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""SELECT COUNT(*) as session_count,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; SUM(total_tokens) as total_tokens,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; SUM(message_count) as total_messages&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;FROM sessions&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;WHERE user_id=? AND updated_at > datetime('now', ?)""",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (user_id,&nbsp;f"-{days}&nbsp;days"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ).fetchone()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 工具调用统计&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tools = conn.execute(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""SELECT tool_name, COUNT(*) as call_count,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; AVG(elapsed_ms) as avg_latency,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; SUM(CASE WHEN success=0 THEN 1 ELSE 0 END) as failures&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;FROM tool_logs&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;WHERE session_id IN (&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;SELECT id FROM sessions WHERE user_id=?&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;) AND created_at > datetime('now', ?)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;GROUP BY tool_name ORDER BY call_count DESC""",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; (user_id,&nbsp;f"-{days}&nbsp;days"),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ).fetchall()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"period_days": days,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"session_count": total["session_count"],&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"total_tokens": total["total_tokens"]&nbsp;or&nbsp;0,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"total_messages": total["total_messages"]&nbsp;or&nbsp;0,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"tool_usage": [dict(r)&nbsp;for&nbsp;r&nbsp;in&nbsp;tools],&nbsp; &nbsp; &nbsp; &nbsp; }

14.4 关键设计决策详解(面试重点!)

决策 1: 为什么 tool_calls 存在 messages 表里而不单独建表?

方案 A(单独建表):messages 表 + tool_calls 表 → JOIN 查询

方案 B(JSON 字段):messages 表的 tool_calls_json 字段

选择方案 B 的理由:

✓ Agent 读取对话历史时,要一次性加载所有信息(含 tool_calls)

✓ 单表读取比 JOIN 快得多

✓ Agent 不会按 tool_name 筛选历史消息(没有这种查询需求)

✓ 但 tool_logs 表单独存在(用于审计和分析查询)

决策 2: WAL 模式为什么重要?

WAL = Write-Ahead Logging(预写式日志)

默认模式(DELETE):

写入时锁定整个数据库 → 读写互斥

AgentA 在写入消息 → AgentB 无法读取历史

WAL 模式:

写入操作记录到 WAL 文件 → 不阻塞读取

支持无限并发读 + 1 个写

AgentA 写消息 + AgentB 读历史 = 同时进行 ✓

代价:

WAL 文件会增长,需要定期 checkpoint(SQLite 自动处理)

决策 3: 为什么用 TEXT 存时间而不是 TIMESTAMP?

SQLite 没有 DATE/TIME 类型,TEXT 的 ISO8601 格式:

✓ 人类可读(便于调试)

✓ 排序正确(符合 ISO8601 字典序)

✓ 跨语言一致

14.5 SQLite 在 Agent 场景中的高级用法

FTS5 全文搜索 —— 搜索对话历史

CREATE&nbsp;VIRTUAL&nbsp;TABLE&nbsp;messages_fts&nbsp;USING&nbsp;fts5(content,&nbsp; content='messages', &nbsp; &nbsp; &nbsp; &nbsp;-- 外部内容表&nbsp; content_rowid='id'&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;-- 外部表的行ID);-- 搜索包含 "天气" 的消息SELECT&nbsp;*&nbsp;FROM&nbsp;messages_fts&nbsp;WHERE&nbsp;content&nbsp;MATCH&nbsp;'天气';

JSON 函数 —— 查询半结构化数据

— 查询偏好中 theme 为 dark 的用户

SELECT * FROM users

WHERE json_extract(preferences_json, ‘$.theme’) = ‘dark’;

— 查询 tool_calls 中包含 search 工具的消息

SELECT * FROM messages

WHERE tool_calls_json LIKE ‘%”name”:”search”%’;

增量备份

— VACUUM INTO 备份到新文件

conn.execute(“VACUUM INTO ‘agent_store_backup.db'”)

14.5.1 SQLite 连接池与并发 —— 生产中的真实问题

▍ 连接池不是 PostgreSQL 才需要的

SQLite 的 WAL 模式支持「无限读 + 1 写」,但前提是正确管理连接。

很多人直接用 sqlite3.connect() 每次都新建连接,这在低并发下

没问题,但在 FastAPI + 异步 + Agent 场景下会导致:

频繁打开/关闭文件 → 延迟抖动(P99 可达 100ms+)

连接数无法控制 → 超出文件描述符限制

最佳实践:使用连接池

WAL 模式下可以安全地复用连接

写操作需要排队(WAL 只允许 1 个写事务并发)

读操作可以无限并发

推荐工具:

Python: sqlite3 + WAL + BusyHandler(设置 timeout)

进阶: aiosqlite(异步 SQLite)

不建议: SQLAlchemy + SQLite(ORM 的 JOIN 难以控制)

▍ BusyHandler —— SQLite 的隐藏「并发锁」

WAL 模式下,写事务被另一个写事务阻塞时会立即返回 SQLITE_BUSY。

默认行为是崩溃/抛异常。这会导致高并发写场景下大量失败。

修复:

conn.execute(“PRAGMA busy_timeout = 5000”)

等 5 秒

设置后,SQLite 会内部重试,而不是一冲突就报错。

这个设置是 SQLite 生产化的「第一行配置」。

▍ 什么时候真的需要迁移到 PostgreSQL?

面试官问:「SQLite 扛不住了你怎么办?」不要说「直接换 PostgreSQL」,

要先说 SQLite 的极限在哪:

SQLite 真正扛不住的信号:

✗ 写操作 QPS > 200(WAL 的单写瓶颈)

✗ 数据库文件 > 10GB(VACUUM 耗时 > 1 分钟)

✗ 需要多副本高可用(SQLite 不支持主从复制)

✗ 需要行级权限控制(SQLite 的权限是文件级别的)

如果没有遇到以上 4 条中的任何一条 → SQLite 完全够用。

遇到任意一条 → 考虑 PostgreSQL / Turso(分布式 SQLite)

迁移策略:

不是「重写所有代码」,而是「抽象一个 Storage 接口,

SQLite 实现 → PostgreSQL 实现互换」。Ch14 的 APIStore 类

就是这个思想的第一步。

14.6 本章总结

核心要点回顾:

SQLite 是 Agent 开发中最实用的数据库

零配置、嵌入式、便携

WAL 模式支持高并发读

FTS5 + JSON 支持高级查询

Schema 设计核心

sessions: 会话管理

messages: 对话历史(含 JSON 格式的 tool_calls)

tasks: 异步任务(暂停/恢复/取消)

tool_logs: 工具调用审计

关键决策(面试重点!)

tool_calls 用 JSON 字段 vs 单独建表(选择 JSON 减少 JOIN)

WAL 模式保证读写不互斥

TEXT 存时间(人类可读 + 排序正确)

Agent 存储的核心查询

get_conversation_history: 构建 LLM 上下文

log_tool_call: 审计和分析

get_usage_stats: 用户用量统计

面试速记:

“Agent 的数据库怎么设计?”

→ SQLite + WAL 模式(开发/小规模)

→ 5 张核心表:users/sessions/messages/tasks/tool_logs

→ tool_calls 用 JSON 字段减少 JOIN

→ FTS5 做对话搜索

📝 对应的代码实现

demo_full_storage_workflow

import&nbsp;osimport&nbsp;jsonimport&nbsp;timeimport&nbsp;hashlibimport&nbsp;sqlite3from&nbsp;datetime&nbsp;import&nbsp;datetimefrom&nbsp;contextlib&nbsp;import&nbsp;contextmanagerfrom&nbsp;typing&nbsp;import&nbsp;OptionalDB_PATH = os.path.join(os.path.dirname(__file__),&nbsp;"agent_store.db")def&nbsp;demo_full_storage_workflow():&nbsp; &nbsp;&nbsp;"""演示完整的 Agent 存储工作流。"""&nbsp; &nbsp;&nbsp;print("="&nbsp;*&nbsp;60)&nbsp; &nbsp;&nbsp;print(" &nbsp;Agent 持久化存储完整演示")&nbsp; &nbsp;&nbsp;print("="&nbsp;*&nbsp;60)&nbsp; &nbsp; init_database()&nbsp; &nbsp; storage = AgentStorage()&nbsp; &nbsp;&nbsp;# 1. 创建用户&nbsp; &nbsp;&nbsp;print("\n &nbsp;👤 创建用户")&nbsp; &nbsp; user = storage.create_user("Alice",&nbsp;"[email protected]")&nbsp; &nbsp;&nbsp;print(f" &nbsp; &nbsp;用户:&nbsp;{user['name']}&nbsp;({user['id'][:8]}...)")&nbsp; &nbsp;&nbsp;# 2. 创建会话&nbsp; &nbsp;&nbsp;print("\n &nbsp;💬 创建会话")&nbsp; &nbsp; session = storage.create_session(user["id"],&nbsp;"学习 AI Agent")&nbsp; &nbsp;&nbsp;print(f" &nbsp; &nbsp;会话:&nbsp;{session['title']}&nbsp;({session['id'][:8]}...)")&nbsp; &nbsp;&nbsp;# 3. 模拟对话(保存消息)&nbsp; &nbsp;&nbsp;print("\n &nbsp;📝 模拟对话")&nbsp; &nbsp; messages = [&nbsp; &nbsp; &nbsp; &nbsp; ("user",&nbsp;"什么是 AI Agent?",&nbsp;0,&nbsp;50),&nbsp; &nbsp; &nbsp; &nbsp; ("assistant",&nbsp;"AI Agent 是一种能自主感知、决策、执行的智能系统...",&nbsp;None,&nbsp;120),&nbsp; &nbsp; &nbsp; &nbsp; ("user",&nbsp;"它由哪些组件组成?",&nbsp;0,&nbsp;40),&nbsp; &nbsp; &nbsp; &nbsp; ("assistant",&nbsp;"主要由 LLM、规划器、记忆系统和工具调用四部分组成。",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[{"name":&nbsp;"search",&nbsp;"arguments": {"query":&nbsp;"Agent components"}}],&nbsp;80),&nbsp; &nbsp; &nbsp; &nbsp; ("tool",&nbsp;"搜索结果:LLM/规划器/记忆/工具是Agent的核心组件",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;None,&nbsp;30),&nbsp; &nbsp; ]&nbsp; &nbsp;&nbsp;for&nbsp;role, content, tool_calls, tokens&nbsp;in&nbsp;messages:&nbsp; &nbsp; &nbsp; &nbsp; tc_id =&nbsp;f"call_{hashlib.md5(content.encode()).hexdigest()[:8]}"&nbsp;if&nbsp;role ==&nbsp;"tool"&nbsp;else&nbsp;None&nbsp; &nbsp; &nbsp; &nbsp; storage.save_message(session["id"], role, content, tool_calls, tc_id, tokens)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;print(f" &nbsp; &nbsp;[{role:>9s}]&nbsp;{content[:50]}... ({tokens}t)")&nbsp; &nbsp;&nbsp;# 4. 记录工具调用日志&nbsp; &nbsp;&nbsp;print("\n &nbsp;🔧 记录工具调用")&nbsp; &nbsp; storage.log_tool_call(&nbsp; &nbsp; &nbsp; &nbsp; session_id=session["id"],&nbsp; &nbsp; &nbsp; &nbsp; tool_name="search",&nbsp; &nbsp; &nbsp; &nbsp; input_data={"query":&nbsp;"Agent components"},&nbsp; &nbsp; &nbsp; &nbsp; output_data={"results": ["LLM",&nbsp;"规划器",&nbsp;"记忆",&nbsp;"工具"]},&nbsp; &nbsp; &nbsp; &nbsp; elapsed_ms=350.5,&nbsp; &nbsp; &nbsp; &nbsp; success=True,&nbsp; &nbsp; )&nbsp; &nbsp;&nbsp;print(" &nbsp; &nbsp;search → 成功 (350ms)")&nbsp; &nbsp;&nbsp;# 5. 读取对话历史(Agent 记忆系统用)&nbsp; &nbsp;&nbsp;print("\n &nbsp;📖 读取对话历史(构建 LLM 上下文)")&nbsp; &nbsp; history = storage.get_conversation_history(session["id"], max_messages=10)&nbsp; &nbsp;&nbsp;print(f" &nbsp; &nbsp;共&nbsp;{len(history)}&nbsp;条消息")&nbsp; &nbsp;&nbsp;for&nbsp;msg&nbsp;in&nbsp;history:&nbsp; &nbsp; &nbsp; &nbsp; tc_info =&nbsp;""&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;msg.get("tool_calls_json"):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tc = json.loads(msg["tool_calls_json"])&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tc_info =&nbsp;f" [tool_call:&nbsp;{tc[0]['name']}]"&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;print(f" &nbsp; &nbsp;[{msg['role']:>9s}]&nbsp;{msg['content'][:60]}...{tc_info}")&nbsp; &nbsp;&nbsp;# 6. 会话管理&nbsp; &nbsp;&nbsp;print("\n &nbsp;⏸️ 暂停会话")&nbsp; &nbsp; storage.pause_session(session["id"])&nbsp; &nbsp; s = storage.get_session(session["id"])&nbsp; &nbsp;&nbsp;print(f" &nbsp; &nbsp;状态:&nbsp;{s['status']}")&nbsp; &nbsp;&nbsp;print(" &nbsp;▶️ 恢复会话")&nbsp; &nbsp; storage.resume_session(session["id"])&nbsp; &nbsp; s = storage.get_session(session["id"])&nbsp; &nbsp;&nbsp;print(f" &nbsp; &nbsp;状态:&nbsp;{s['status']}")&nbsp; &nbsp;&nbsp;# 7. 任务管理&nbsp; &nbsp;&nbsp;print("\n &nbsp;📋 创建异步任务")&nbsp; &nbsp; task = storage.create_task(&nbsp; &nbsp; &nbsp; &nbsp; session["id"],&nbsp;"summarize",&nbsp; &nbsp; &nbsp; &nbsp; {"max_length":&nbsp;200}, priority=1,&nbsp; &nbsp; )&nbsp; &nbsp;&nbsp;print(f" &nbsp; &nbsp;任务:&nbsp;{task['id'][:8]}... (状态:&nbsp;{task['status']})")&nbsp; &nbsp; storage.update_task_status(task["id"],&nbsp;"running")&nbsp; &nbsp; storage.update_task_status(&nbsp; &nbsp; &nbsp; &nbsp; task["id"],&nbsp;"completed",&nbsp; &nbsp; &nbsp; &nbsp; {"summary":&nbsp;"本次对话讨论了 AI Agent 的基本概念和组成..."},&nbsp; &nbsp; )&nbsp; &nbsp;&nbsp;print(f" &nbsp; &nbsp;任务完成: 状态 → completed")&nbsp; &nbsp;&nbsp;# 8. 查询用户列表&nbsp; &nbsp;&nbsp;print("\n &nbsp;📊 用户会话列表")&nbsp; &nbsp; sessions = storage.list_user_sessions(user["id"])&nbsp; &nbsp;&nbsp;for&nbsp;s&nbsp;in&nbsp;sessions:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;print(f" &nbsp; &nbsp;📁&nbsp;{s['title']}&nbsp;| 消息:&nbsp;{s['message_count']}&nbsp;| Tokens:&nbsp;{s['total_tokens']}")&nbsp; &nbsp;&nbsp;# 9. 用量统计&nbsp; &nbsp;&nbsp;print("\n &nbsp;📈 7天用量统计")&nbsp; &nbsp; stats = storage.get_usage_stats(user["id"])&nbsp; &nbsp;&nbsp;print(f" &nbsp; &nbsp;会话数:&nbsp;{stats['session_count']}")&nbsp; &nbsp;&nbsp;print(f" &nbsp; &nbsp;总Tokens:&nbsp;{stats['total_tokens']}")&nbsp; &nbsp;&nbsp;print(f" &nbsp; &nbsp;总消息数:&nbsp;{stats['total_messages']}")&nbsp; &nbsp;&nbsp;print(f" &nbsp; &nbsp;工具使用:")&nbsp; &nbsp;&nbsp;for&nbsp;tool&nbsp;in&nbsp;stats["tool_usage"]:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;print(f" &nbsp; &nbsp; &nbsp;{tool['tool_name']}:&nbsp;{tool['call_count']}次, "&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;f"平均{tool['avg_latency']:.0f}ms, 失败{tool['failures']}次")if&nbsp;__name__ ==&nbsp;"__main__":&nbsp; &nbsp;&nbsp;print("╔══════════════════════════════════════════════════════╗")&nbsp; &nbsp;&nbsp;print("║ &nbsp;第14章:SQLite + Agent 持久化存储 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;║")&nbsp; &nbsp;&nbsp;print("║ &nbsp;Schema设计 · WAL模式 · 会话管理 · 任务状态 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ║")&nbsp; &nbsp;&nbsp;print("╚══════════════════════════════════════════════════════╝")&nbsp; &nbsp;&nbsp;# 清理旧数据库&nbsp; &nbsp;&nbsp;if&nbsp;os.path.exists(DB_PATH):&nbsp; &nbsp; &nbsp; &nbsp; os.remove(DB_PATH)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;print(" &nbsp;🧹 已清理旧数据库")&nbsp; &nbsp; demo_full_storage_workflow()&nbsp; &nbsp;&nbsp;# 展示数据库文件位置&nbsp; &nbsp;&nbsp;print(f"\n &nbsp;💾 数据库文件:&nbsp;{DB_PATH}")&nbsp; &nbsp;&nbsp;print(f" &nbsp;📏 文件大小:&nbsp;{os.path.getsize(DB_PATH):,}&nbsp;bytes")&nbsp; &nbsp;&nbsp;print("\n✅ 第14章完成!")

免责声明:

本文所载程序、技术方法仅面向合法合规的安全研究与教学场景,旨在提升网络安全防护能力,具有明确的技术研究属性。

任何单位或个人未经授权,将本文内容用于攻击、破坏等非法用途的,由此引发的全部法律责任、民事赔偿及连带责任,均由行为人独立承担,本站不承担任何连带责任。

本站内容均为技术交流与知识分享目的发布,若存在版权侵权或其他异议,请通过邮件联系处理,具体联系方式可点击页面上方的联系我

本文转载自:网络安全民工 网络安全民工 网络安全民工《第14章 SQLite + Agent 持久化存储》

评论:0   参与:  0