文章总结: 本文针对AI渗透测试中的灾难性遗忘、命令行沉迷、逻辑断层等核心痛点,提出了多层防遗忘架构、P-E-R协作架构、命令防沉迷机制及Meta-Tooling层等系统性解决方案。通过持久化上下文、智能压缩、分层归因、相似度检测等技术手段,构建了从规划到反思的完整闭环,有效解决了AIAgent在长周期渗透任务中的稳定性和效率问题。 综合评分: 88 文章分类: 渗透测试,AI安全,实战经验,安全建设,安全工具
告别“自嗨”与烧钱:如何为“AI渗透测试”构建可靠的实战闭环?
一寸灰 一寸灰
锦岳智慧
2026年3月4日 18:38 北京
开源的AI渗透项目,基本都有一些问题。
1.灾难性遗忘:在长时间的渗透中,随着对话轮次的增加,AI会忘记了自己要干嘛、自己做出来的成果、目标的特征、甚至是失败的路径。然后他就会捏造成果或者是重复失败的渗透路径。
2.命令行沉迷:这个是最有意思的,我们调用nmap去扫描整个网段,扫描会需要很长时间,工具不返回结果,AI就会一直等着,然后超过了一定时间之后,AI会认为工具执行失败,又进行尝试,然后又失败,又尝试,陷入一个死循环,曾经因为这个烧了很多很多的token。
3.逻辑断层:AI调用MCP工具时需要传参,有时候传参报错,他就会直接停止任务,然后说XXX工具报错,但是按照已经规定的参数进行传参,工具是可以正常调用的。也就是说,他缺少自我纠错的能力。
4.token消耗巨大:很多工具都将工具执行的过程注入到上下文中,使得上下文资源和token资源消耗很快。曾经有一个靶场测了几百万token的伟大创举。
5.检索效率瓶颈:知识库和渗透测试经验搜索杂,静态知识库和渗透测试经验重复太多,搜一个会出来很多,很浪费token,也影响效率。
下面是我们针对性的一些解决方案。
一、防止灾难性遗忘(Catastrophic Forgetting Prevention)
1.1
问题定义
在长时间渗透测试任务中,LLM 的上下文窗口有限,随着对话轮次增加,早期的关键信息(发现的漏洞、失败的策略、目标特征等)会因上下文截断而丢失,导致 Agent 重复尝试已失败的方法或遗忘已发现的攻击面。
1.2
多层防遗忘架构
项目通过 4 层机制 协同工作来对抗灾难性遗忘:
1.2.1持久化上下文对象(Persistent Context Objects)
定义了两个关键的持久化上下文对象:
classPlannerContext(BaseModel):"""规划器上下文对象,保存历史规划信息"""planning_history: List[Dict[str, Any]] = [] # 所有规划尝试的完整记录rejected_strategies: Dict[str, str] = {} # 被拒绝的策略及原因long_term_objectives: List[str] = [] # 长期战略目标latest_reflection_report: Optional[Dict[str, Any]] # 最新反思报告previous_planning_session: Optional[Dict[str, Any]] # 上一次规划会话
classReflectorContext(BaseModel):"""反思器上下文对象,保存历史反思信息"""reflection_log: List[Dict[str, Any]] = [] # 反思日志validated_patterns: List[Dict[str, Any]] = [] # 已验证的攻击模式persistent_insights: List[Dict[str, Any]] = [] # 持久化洞察
这些对象独立于 LLM 对话历史存在,即使对话被压缩,这些结构化数据仍然保留。
关键代码:update_planner_context() 方法持续追加规划历史、被拒策略和反思报告。被拒策略使用 Dict[str, str] 存储,key 为策略名,value 为拒绝原因,确保 Planner 不会重复提出已被否决的方案。
1.2.2:智能上下文压缩(LLM-based Context Compression)
实现了三策略触发的压缩机制:
classContextCompressor:"""基于三种策略进行智能压缩:1. 消息数量阈值(默认 50 条)2. 执行轮次阈值(每 10 轮)3. 估算 token 超限(默认 100000)"""
触发判断:
def should_compress(self, messages, execution_count) -> tuple[bool, str]:# 策略 1: 消息数量 > 50iflen(messages) > self.message_threshold:return True, f"消息数量过多"# 策略 2: 每 10 轮定期压缩if execution_count > 0 and execution_count % self.compress_interval == 0:return True, f"定期压缩"# 策略 3: 估算 token > 100000estimated_tokens = total_chars // 4if estimated_tokens > self.token_threshold:return True, f"估算 token 超限"
压缩策略:
压缩后的消息列表 = 系统提示词(不变) + LLM 生成的历史摘要 + 最近 10 条消息(不变)
核心是 压缩提示词,要求 LLM 在摘要中保留:
- 所有重要的决策和行动
- 关键的工具调用和结果
- 失败原因和教训
- 时间顺序
这确保了即使历史消息被压缩,关键的决策轨迹和失败教训不会丢失。
1.2.3:双后端知识库(Long-term Memory)
知识存储采用 OpenViking + ChromaDB 双后端架构(详见第五章),按数据性质自动路由:
- OpenViking(静态知识):攻击载荷库(PayloadsAllTheThings, 136 文件)和漏洞挖掘方法论(HowToHunt, 88 文件),支持 L0/L1/L2 渐进加载,避免一次性加载长文档浪费上下文窗口
- ChromaDB(动态经验):Agent 执行渗透测试时积累的 STE 经验(experience)、工具使用记录、任务记忆(task_memory)
知识库在 MCP Server 启动时自动预加载,包括 ChromaDB 数据加载和 OpenViking 后端初始化,确保 Agent 在任何新任务开始时都能检索到历史积累的知识。
1.2.4:STE 经验提取(Strategy-Tactics-Example)
reflect_global() 方法在任务结束时执行全局反思,提取可复用的 STE 经验:
STE 结构:├── 战略原则(Strategic Principle):最高层次的攻击原则├── 战术手册(Tactical Manual):有序的、抽象的步骤列表└── 适用场景(Applicable Scenarios):标签列表,定义复用场景
这些 STE 经验存储到ReflectorContext.persistent_insights 中,为后续任务提供可复用的攻击模式。
1.3
信息流转闭环
Executor 执行 → 结果存入 message_history↓Reflector 反思 → validated_patterns / persistent_insights 存入 ReflectorContext↓compress_if_needed() → 历史消息压缩为摘要,但 PlannerContext/ReflectorContext 不受影响↓Planner 动态规划 → 从 get_planner_summary() 获取完整历史↓rejected_strategies 防止重复 → 新规划避免已失败方法
二、逻辑断层解决方案(Logic Gap Resolution)
2.1
问题定义
逻辑断层指 Agent 在渗透测试过程中出现的推理链条断裂:子任务失败后不知道为什么失败、反思结论无法传导到下一次规划、或者 Executor 盲目重试而不调整策略。
2.2
P-E-R 协作架构
实现了 Planner-Executor-Reflector (P-E-R) 三角协作架构来消除逻辑断层:
┌─────────┐│ Planner │ ← rejected_strategies, latest_reflection_report└────┬────┘│ initial_plan / dynamic_plan▼┌──────────────────────┐│ Execute-Reflect │ ← _execute_reflect_loop (max 100 iterations)│ Loop ││ ││ Executor ──→ Reflector│ ↑ │ ││ └───────────┘ ││ (VETO / intelligence)│└──────────────────────┘
2.3
分层失败归因
(L0-L5 Failure Attribution)
这是消除逻辑断层的核心机制。定义了严格的 6 层递进归因体系:
printf("helloL0 (Observation) → 工具原始输出(最基础层)L1 (Tool Failure) → 工具执行失败(超时、命令不存在、权限不足)L2 (Prerequisite) → 前提条件失败(会话过期、认证失败、依赖缺失)L3 (Environment) → 环境阻断(WAF拦截、防火墙、速率限制)L4 (Hypothesis) → 假设被证伪(需 3+ 次失败才能归因到此层)L5 (Strategy) → 战略缺陷(需 3+ 个 L4 失败形成模式) world!");
严格的递进原则:
def attribute(self, ...):# 必须逐层排查,低层未排除不能归因到高层l1_result = self._check_l1_tool_failure(...) # 先排查工具问题if l1_result: return l1_result
l2_result = self._check_l2_prerequisite_failure(...) # 再排查前提条件if l2_result: return l2_result
l3_result = self._check_l3_environment(...) # 再排查环境因素if l3_result: return l3_result
# 只有排除 L0-L3 后,才能归因到 L4if hypothesis and evidence:l4_result = self._check_l4_hypothesis(...)# L4 需要 failed_attempts >= 3 才触发
# 只有多个 L4 失败形成模式(l4_failure_count >= 3),才归因到 L5if strategy:l5_result = self._check_l5_strategy(...)
这防止了最常见的逻辑断层:Agent 错误地将工具级失败归因为策略失败,从而过早放弃正确的攻击路径。
2.4
Reflector 的 VETO 权力
# Reflector 返回的 rejected_staged_nodes 列表rejected_nodes = reflection_result.get("rejected_staged_nodes", [])if rejected_nodes:self.veto_count += 1# 直接移除被拒绝的节点tasks = [t for t in tasks if t.get("id") not in rejected_nodes]
Reflector 拥有否决 Executor 提交的因果图节点的权力。这防止了 Executor 在执行中产生的错误推理被无条件接受,形成逻辑链条上的”坏节点”。
2.5
动态规划闭环
实现了反思驱动的动态规划:
# 基于 Reflector 的情报摘要触发动态规划if reflection_result.get("intelligence_summary"):dynamic_plan_result = await self.planner.dynamic_plan(goal=goal,graph_summary=graph_summary,intelligence_summary=reflection_result.get("intelligence_summary"),failure_patterns_summary=reflection_result.get("attribution_result"),failed_nodes=[t for t in tasks if t.get("status") == "failed"])
动态规划的提示词明确要求:
- 失败驱动:优先处理失败或阻塞的任务
- 诊断优先:为失败任务设计诊断或替代方案
- 避免重复:不要重复已失败的方法
同时,Planner 通过
get_planner_summary() 获取包含 rejected_strategies 的完整历史,确保新规划不会重蹈覆辙。
2.6
逻辑断层消除的完整链条
Executor 执行失败↓FailureAttributor 进行 L0-L5 递进归因 → 精确定位失败根因↓Reflector 审核执行结果:├── 验证/否决 Executor 的发现(VETO 权力)├── 生成 intelligence_summary(攻击情报)└── 生成 attribution_result(归因结果)↓intelligence_summary 触发 Planner 动态规划├── 输入:失败归因 + 失败节点 + 情报摘要├── 参考:rejected_strategies(避免重复)└── 输出:诊断任务或替代攻击路径↓新任务注入 execute_reflect_loop → 继续执行
三、命令执行防沉迷机制(Anti-Addiction / Loop Prevention)
3.1
问题定义
LLM Agent 在渗透测试中容易陷入以下模式:
- 无限循环:反复执行相似命令(如同一 payload 的微小变体)
- 过度枚举:对 IDOR 等场景进行逐个 ID 遍历
- 超时阻塞:长时间运行的命令阻塞整个流程
- 瞬时故障重试:网络抖动导致的无意义重试
3.2
三大防护机制
机制一:SequenceMatcher 相似度检测
核心参数:
classAntiAddiction:def __init__(self,similarity_threshold: float = 0.85, # 相似度阈值max_similar_count: int = 18, # 最大连续相似次数command_timeout: int = 300, # 命令超时 (秒)max_retries: int = 3 # 最大重试次数):
检测流程:
def _is_similar_to_last_command(self, current_cmd: str) -> bool:# 1. 命令太短 (< 10 字符) 不检测iflen(current_cmd) < self._min_cmd_length:return False# 2. 用 SequenceMatcher 计算与上一条命令的相似度similarity = SequenceMatcher(None, current_cmd, last_cmd).ratio()# 3. 相似度 >= 0.85 判定为相似命令return similarity >= self.similarity_threshold
滑动窗口历史:
class CommandHistory(BaseModel):commands: List[str] = []max_length: int = 18 # 18 条命令的滑动窗口
def add(self, command: str) -> None:self.commands.append(command)if len(self.commands) > self.max_length:self.commands.pop(0) # FIFO 弹出最早的命令
触发警告:当连续相似命令达到 18 次时,返回一个结构化的反思提示:
⚠️ 检测到可能陷入循环(已连续执行 18 次相似命令)
请先思考以下问题来重新制定计划:1. 我的核心假设是什么?2. 过去 18 次的尝试,是否证明了这个假设是错误的?3. 除了当前的方法,还有哪些其他的可能性?4. 是否有更高效的方式(如批量处理、自动化脚本)?
关键设计:警告触发后,计数器重置为 0,允许 Agent 在反思后继续执行。执行不相似的命令也会重置计数器,即只检测连续相似。
机制二:指数退避重试(Exponential Backoff with Jitter)
使用 tenacity 库创建重试装饰器:
def create_retry_decorator(self, func_name: str = "") -> Callable:returnretry(retry=retry_if_exception_type(Exception),wait=wait_exponential_jitter(initial=0.3, # 首次重试等待 0.3 秒max=10, # 最大等待 10 秒jitter=0.5 # ±0.5 秒随机抖动),stop=stop_after_attempt(self.max_retries), # 最多重试 3 次reraise=True,)
等待序列大约为:0.3s → 0.6s → 1.2s(加上 ±0.5s 随机抖动),最多 3 次。
可重试错误判断:
@staticmethoddef is_retryable_error(exception) -> bool:# ConnectionError → 可重试# TimeoutError → 可重试# HTTP 408, 404, 429, 500-526 → 可重试
机制三:异步超时控制
async def execute_with_timeout(self, coro, timeout=None, func_name=""):timeout = timeout or self.command_timeout # 默认 300 秒return await asyncio.wait_for(coro, timeout=timeout)
任何工具调用超过 300 秒将被强制终止,抛出 TimeoutError。
3.3
在 Executor 中的集成
Executor 初始化时创建 AntiAddiction 实例。
在每次 shell 命令执行前进行防沉迷检查:
if tool_name == "execute_shell_command":command = arguments.get("command", "")warning = self.anti_addiction.check_and_record(command)if warning:logger.warning(f"检测到循环模式: {warning[:100]}...")
# 所有工具调用都包装重试装饰器retry_decorator = self.anti_addiction.create_retry_decorator(func_name=tool_name)
@retry_decoratorasync def _execute():# 实际执行逻辑
3.4
防护层次总结
| | | | | | — | — | — | — | | 层级 | 机制 | 触发条件 | 行为 | | L1 | 相似度检测 | 连续 18 次相似度 >= 0.85 | 返回反思提示,重置计数器 | | L2 | 指数退避重试 | Exception 异常 | 0.3s→0.6s→1.2s+jitter, 最多 3 次 | | L3 | 超时控制 | 单命令超过 300s | 强制终止,抛出 TimeoutError | | L4 | 循环迭代上限 | execute_reflect_loop > 100 | 标记任务为 FAILED |
四、Meta-Tooling 层实现
4.1
设计理念
Meta-Tooling 层的核心原则是:Agent 不直接执行任何操作,所有工具在隔离的 Meta-Tooling 层中执行,只将结果返回给 Agent。这实现了 Agent 推理与工具执行的解耦。
4.2
架构概览
┌─────────────────────────────────────────────────────┐│ LLM Agent ││ (Planner / Executor / Reflector) │└──────────────────────┬──────────────────────────────┘│ MCP Protocol(JSON-RPC)▼┌─────────────────────────────────────────────────────┐│ MCP Server(mcp_server.py) ││ ┌────────────────────────────────┐ ││ │ Tool Router / Dispatcher │ ││ └────────────┬───────────────────┘ ││ │ ││ ┌──────────┬───────┼───────┬──────────┐ ││ ▼ ▼ ▼ ▼ ▼ ││ ┌──────┐ ┌───────┐ ┌──────┐ ┌─────┐ ┌──────┐ ││ │Python│ │Browser│ │Termi-│ │Proxy│ │Recon │ ││ │Execu-│ │Automa-│ │nal │ │ │ │Tools │ ││ │tor │ │tion │ │ │ │ │ │ │ ││ └──────┘ └───────┘ └──────┘ └─────┘ └──────┘ │└─────────────────────────────────────────────────────┘
4.3
MCP Server 作为中间层
定义了 AIPentestMCPServer 类:
classAIPentestMCPServer:def __init__(self):self.server = Server("ai-pentest-server")self.python_executor: PythonExecutor | None = Noneself.browser: BrowserAutomation | None = Noneself.terminal: Terminal | None = Noneself.proxy: Proxy | None = Noneself.knowledge_base: KnowledgeBase | None = Noneself.notes_storage: NoteStorage | None = Noneself.recon_workflow: ReconWorkflow | None = None
MCP Server 持有所有工具实例,Agent 通过 MCP 协议(JSON-RPC over stdio)发送工具调用请求,Server 路由到对应的工具实例执行,只返回文本结果。
4.4
五大 Meta-Tooling 组件
4.4.1 PythonExecutor(代码执行沙箱)
`self.python_executor = PythonExecutor(path="scripts")`
在 Executor 中的调用:
if tool_name == "execute_python_code":@retry_decoratorasync def _execute():outputs = self.python_executor.execute_code(session_name=arguments.get("session_name", "default"),code=arguments.get("code", ""),timeout=arguments.get("timeout", 120))return"\n\n".join(f"Output: {output.get('text', '')}"for output in outputs)
Agent 发送 Python 代码字符串 → PythonExecutor 在隔离环境中执行 → 只返回输出文本。
4.4.2 BrowserAutomation(浏览器自动化)
支持双模式运行:
- CDP 模式(无代理):连接到已有的 Chrome 实例
- Playwright 模式(有代理):启动独立 Chromium 并配置 HTTP 代理
Agent 通过 browser_navigate、
browser_execute_js、
browser_screenshot 等工具与浏览器交互,只收到页面内容/截图等结果。
4.4.3 Terminal(终端会话管理)
shell 命令执行:
elif tool_name == "execute_shell_command":@retry_decoratorasync def _execute():session_id = self.terminal.new_session() # 创建新终端会话self.terminal.send_keys(session_id=session_id, # 发送命令keys=arguments.get("command", ""), enter=True)await asyncio.sleep(2) # 等待执行output = self.terminal.get_output(session_id) # 获取输出return output
Terminal 封装了 tmux 会话管理,Agent 不直接接触 shell,只通过 send_keys → get_output 的模式交互。
4.4.4 Proxy(HTTP 流量代理)
与 Caido 代理集成
self.proxy = Proxy(url=config.proxy_caido_url,token=config.proxy_caido_token) if config.proxy_caido_token else None
Agent 可以通过 proxy_list_traffic 工具查看经过代理的 HTTP 流量,用于分析请求/响应。
4.4.5 Recon Tools(侦察工具集)
导出的侦察工具:
- CyberspaceMapper # 空间测绘(FOFA/Quake)- FingerprintScanner # 指纹识别- DirectoryScanner # 目录扫描- JSAnalyzer # JS 分析- AuthBypassFuzzer # 鉴权绕过 Fuzz- ReconWorkflow # 侦察工作流编排
这些工具都在 MCP Server 层实例化和执行,Agent 只收到结构化的扫描结果。
4.5
工具执行的隔离保障
Executor 中的工具调用流程:
Agent LLM 输出 tool_call↓_execute_tool_call() 路由↓┌─ 防沉迷检查 (anti_addiction.check_and_record)├─ 重试装饰器包装 (create_retry_decorator)├─ Meta-Tool 执行 (PythonExecutor/Terminal/Browser/Proxy)└─ 返回 ToolExecutionResult(tool_name, success, output, error, execution_time)↓结果存入 context_manager.add_message(role="tool", content=result.output)↓失败归因 _attribute_failure() → L0-L5 分析↓漏洞自动检测 _check_and_save_vulnerability() → 保存笔记
关键点:
- Agent 永远不直接执行命令,只通过 Meta-Tooling 层间接操作
- 每次工具调用都经过防沉迷检查和重试包装
- 工具输出经过 ToolExecutionResult 标准化后才返回
- 失败会自动进行 L0-L5 归因
- 漏洞相关输出自动保存到笔记系统
4.6
Meta-Tooling 层的安全边界
Agent 层(纯推理)↑ 只接收文本结果│├── 不直接访问文件系统├── 不直接执行 shell 命令├── 不直接操作浏览器 DOM└── 不直接发送网络请求│↓ 只发送工具调用请求Meta-Tooling 层(隔离执行)├── PythonExecutor:沙箱化代码执行├── Terminal:tmux 会话隔离├── Browser:CDP/Playwright 封装├── Proxy:HTTP 流量只读访问└── Recon:封装的扫描工具
五、双后端知识架构(OpenViking + ChromaDB)
5.1
问题定义
项目知识库存在两类性质截然不同的数据:
-
静态知识:攻击载荷
(PayloadsAllTheThings)和漏洞挖掘方法论(HowToHunt),更新频率低、单条内容长(完整 Markdown 文档)
-
动态经验:Agent 执行渗透测试时积累的 STE 经验、工具使用记录、任务记忆,持续增长
此前两者共存于 ChromaDB 平坦向量空间,带来两个问题:
- 检索噪声:搜索「SQL 注入」时,静态 payload 文档和动态 experience 条目混排,Agent 需要大量 token 才能筛选出目标类型
- 无法渐进加载:ChromaDB 只支持「全文命中→返回全文」,对于长文档(如 SQL_Injection/README.md 有数千行)会浪费 LLM 上下文窗口
5.2
解决方案:OpenViking 层级知识管理
引入 OpenViking 作为静态知识专用后端,利用其L0/L1/L2 渐进加载 和AGFS 层级目录结构替代 ChromaDB 的平坦搜索。
knowledge_search(query, category?)│▼┌─────────────────┐│ 路由判断层 ││ (mcp_server.py) │└────────┬────────┘┌──────────────────┼──────────────────┐▼ ▼ ▼category ∈ {payloads, category 为其他 category 为空howtohunt} (experience 等) (无指定)│ │ │▼ ▼ ▼┌──────────────┐ ┌──────────────┐ 两者合并搜索│ OpenViking │ │ ChromaDB │ 按 similarity│ (降级ChromaDB)│ │ │ 降序取 top N└──────────────┘ └──────────────┘
L0/L1/L2 渐进加载
OpenViking 的核心优势在于三级内容粒度:
| | | | | | — | — | — | — | | 层级 | 内容 | Token 量 | 用途 | | L0 | 自动生成的摘要(abstract) | ~100 tokens | knowledge_search 结果列表中的预览 | | L1 | 结构化概览(overview) | ~2k tokens | 中等粒度浏览,判断是否需要全文 | | L2 | 完整原始文档 | 原文长度 | knowledge_get_detail 返回完整内容 |
这使得 Agent 可以先通过 L0 摘要快速筛选,再对感兴趣的条目调用 knowledge_get_detail 获取 L2 全文,避免一次性加载大量无关内容。
5.3
路由策略实现
路由表
| | | | | — | — | — | | category | 后端 | 说明 | | payloads / howtohunt | OpenViking(不可用时降级 ChromaDB) | 静态攻击载荷和方法论 | | experience / task_memory / general | ChromaDB | 动态数据 | | 不填 | 两者合并,按 similarity 排序取 top N | 跨后端搜索 |
路由核心代码
实现了三分支路由:
async def _knowledge_search(self, arguments):viking = self._ensure_viking_backend()
if category and category in VIKING_CATEGORIES:# 静态知识类别 → 优先 Viking,Viking 不可用时降级 ChromaDBif viking:all_results = viking.search(query, n_results=limit, category=category)else:all_results = self.knowledge_base.search(query=query, category=category, ...)
elif category:# 非静态知识类别 → 仅搜 ChromaDBall_results = self.knowledge_base.search(query=query, category=category, ...)
else:# 无 category → 搜两者合并,按分数排序chroma_results = self.knowledge_base.search(query=query, ...)all_results = list(chroma_results)if viking:viking_results = viking.search(query, n_results=limit)all_results.extend(viking_results)all_results.sort(key=lambda e: e.metadata.get("similarity", 0), reverse=True)all_results = all_results[:limit]
详情路由
按 ID 前缀路由:
async def _knowledge_get_detail(self, arguments):entry_id = arguments["id"]if entry_id.startswith("viking://"):# viking:// URI → OpenViking L2 全文result = viking.get_detail(entry_id)else:# ChromaDB UUID → 原有逻辑entry = self.knowledge_base.get_entry(entry_id)
5.4
OpenViking 客户端封装
ai_pentest/knowledge/viking.py
封装了 OpenViking SDK,提供两个核心类:
VikingKnowledgeResult(兼容对象)
@dataclassclassVikingKnowledgeResult:"""兼容 KnowledgeEntry 的轻量结果对象"""id: str = ""# viking:// URI(作为路由信号)title: str = ""content: str = "" # L0 abstract(搜索时)或 L2 完整内容(详情时)category: str = "general"type: str = "general"severity: str = "info"metadata: dict = field(default_factory=dict) # {"similarity": score, "source": "openviking"}
id使用viking://前缀,
使得 _knowledge_get_detail 可以通过前缀判断路由到 OpenViking 而非 ChromaDB。metadata[“source”] = “openviking” 用于搜索结果中标记 [Viking] 来源标签。
VikingKnowledgeBackend(客户端)
classVikingKnowledgeBackend:def initialize(self) -> bool:# 确保配置文件路径可被 root 用户找到(MCP HTTP Server 以 root 运行)config_file = os.environ.get("OPENVIKING_CONFIG_FILE")ifnot config_file:default_conf = Path.home() / ".openviking" / "ov.conf"if default_conf.exists():os.environ["OPENVIKING_CONFIG_FILE"] = str(default_conf)
self._client = ov.OpenViking(path=self._data_path)self._client.initialize()
def search(self, query, n_results=5, category=None) -> list[VikingKnowledgeResult]:# category → target_uri 映射: "payloads" → "viking://resources/payloads/"target_uri = _CATEGORY_URI_MAP.get(category, "viking://resources/")find_result = self._client.find(query=query, target_uri=target_uri, limit=n_results)# 返回包含 L0 abstract 的结果列表
def get_detail(self, uri) -> VikingKnowledgeResult:# 调用 client.read(uri) 获取 L2 完整内容content = self._client.read(uri)
def get_overview(self, uri) -> str:# 调用 client.overview(uri) 获取 L1 概览(~2k tokens)return self._client.overview(uri)
5.5
Embedding 配置
OpenViking 使用豆包(Doubao)多模态向量模型生成语义向量:
~/.openviking/ov.conf 核心配置:
{"embedding": {"dense": {"provider": "volcengine","model": "ep-20260303110733-zxxfw","dimension": 2048,"input": "multimodal"}},"storage": {"workspace": "/home/kali/Desktop/ai/data/openviking","agfs": {"backend": "local"},"vectordb": {"backend": "local"}},"auto_generate_l0": true,"auto_generate_l1": true}
-
provider: volcengine — 使用火山引擎API
(/api/v3/embeddings/multimodal)
-
dimension: 2048 维多模态向量
-
auto_generate_l0/l1: 资源入库时自动生成 L0 摘要和 L1 概览
5.6
降级策略
系统设计了四级降级保障,确保 OpenViking 不可用时不影响核心功能:
| | | | | — | — | — | | 场景 | 行为 | 影响 | | OPENVIKING_ENABLED=false(默认) | 不导入 openviking,路由全部走 ChromaDB | 零影响 | | openviking 包未安装 | initialize() 捕获 ImportError,返回 False | 降级到 ChromaDB | | Viking 搜索异常 | 捕获异常返回空列表,ChromaDB 结果不受影响 | 部分结果缺失 | | knowledge_get_detail(viking://…) 但 Viking 不可用 | 返回明确错误信息 | 用户可从 ChromaDB 获取替代内容 |
initialize() 方法:
def initialize(self) -> bool:try:import openviking as ov # 包不存在 → ImportError → 返回 Falseexcept ImportError:return False
try:self._client = ov.OpenViking(path=self._data_path)self._client.initialize() # 配置错误/端口冲突 → Exception → 返回 Falseself._available = Truereturn Trueexcept Exception as e:self._available = Falsereturn False
5.7
MCP Server 集成方式
stdio 模式(单会话)
AIPentestMCPServer.__init__ 接受可选的 shared_viking_backend 参数。_initialize_knowledge_base() 末尾调用 _ensure_viking_backend() 懒初始化。
HTTP 模式(多会话)
在 main() 中预加载共享 Viking 后端,注入到所有会话的 AIPentestMCPServer 实例中,避免每个会话重复初始化 AGFS 服务(AGFS 监听端口 1833,只能启动一个实例)。
#### stdio 模式(单会话)
`AIPentestMCPServer.__init__` 接受可选的 `shared_viking_backend` 参数。`_initialize_knowledge_base()` 末尾调用 `_ensure_viking_backend()` 懒初始化。
#### HTTP 模式(多会话)
在 `main()` 中预加载共享 Viking 后端,注入到所有会话的 `AIPentestMCPServer` 实例中,避免每个会话重复初始化 AGFS 服务(AGFS 监听端口 1833,只能启动一个实例)。
5.8
数据迁移
迁移通过两个脚本完成:
迁移脚本
tools_scripts/migrate_to_openviking.py
将data/knowledge/payloads/(136 个 Markdown 文件)
和 data/knowledge/howtohunt/(88 个 Markdown 文件)
导入 OpenViking:
client = ov.OpenViking(path=data_path)client.add_resource(path=payloads_dir, target="viking://resources/payloads/",reason="渗透测试攻击载荷(PayloadsAllTheThings)")client.add_resource(path=howtohunt_dir, target="viking://resources/howtohunt/",reason="漏洞挖掘方法论(HowToHunt)")client.wait_processed() # 等待 L0/L1 自动生成
清理脚本
tools_scripts/cleanup_chroma_static.py
从 ChromaDB 删除已迁移到 OpenViking 的 447 条静态知识(271 payloads + 176 howtohunt),使 ChromaDB 只保留动态数据(experience:1294 + general:15 + task_memory:6)。
5.9
知识库分工
┌────────────────────────────────────────────────────────────────────┐│ 知识检索层(MCP Server) ││ knowledge_search / knowledge_get_detail — 按 category 路由 │└───────────────────────────┬────────────────────────────────────────┘│┌───────────────┴───────────────┐▼ ▼┌─────────────────────────┐ ┌─────────────────────────────────────┐│ OpenViking │ │ ChromaDB ││ (静态知识 · 224 文件) │ │ (动态数据 · 1315 条) ││ │ │ ││ payloads/ (136) │ │ experience (1294) ││ - SQL_Injection │ │ - STE 攻击经验 ││ - XSS │ │ - 工具使用记录 ││ - SSRF │ │ ││ - File Upload │ │ general (15) ││ - ... │ │ - 通用知识 ││ │ │ ││ howtohunt/ (88) │ │ task_memory (6) ││ - IDOR │ │ - 任务记忆 ││ - Authentication │ │ ││ - CORS │ │ ── 动态增长 ── ││ - ... │ │ _record_experience() ││ │ │ _try_merge_experience() ││ ── L0/L1/L2 渐进加载 ── │ │ ││ ── 2048 维向量检索 ── │ │ ── 平坦向量检索 ── ││ ── AGFS 层级目录 ── │ │ │└─────────────────────────┘ └─────────────────────────────────────┘
本文内容均来自【守望者实验室】团队成员【一寸灰】原创作品
《AI渗透MCP工具开发思路分享》已取得小伙伴授权转载。
免责声明:
本文所载程序、技术方法仅面向合法合规的安全研究与教学场景,旨在提升网络安全防护能力,具有明确的技术研究属性。
任何单位或个人未经授权,将本文内容用于攻击、破坏等非法用途的,由此引发的全部法律责任、民事赔偿及连带责任,均由行为人独立承担,本站不承担任何连带责任。
本站内容均为技术交流与知识分享目的发布,若存在版权侵权或其他异议,请通过邮件联系处理,具体联系方式可点击页面上方的联系我。
本文转载自:锦岳智慧 一寸灰 一寸灰《告别“自嗨”与烧钱:如何为“AI渗透测试”构建可靠的实战闭环?》
版权声明
本站仅做备份收录,仅供研究与教学参考之用。
读者将信息用于其他用途的,全部法律及连带责任由读者自行承担,本站不承担任何责任。








评论