文章总结: 文章介绍了如何使用多智能体技术构建敏感信息泄露检测系统,通过角色分工与协同决策,解决了传统工具误报率高和缺乏语义理解的问题。系统采用分层检测架构,包含初始筛选、基础检测、高级检测和指挥官智能体,能够准确识别API密钥、数据库密码等敏感信息,准确率比传统工具提升40%以上。文章提供了完整的Python代码实现,包括共享内存池、各智能体模块和系统入口,使安全研究者能够快速上手部署。 综合评分: 91 文章分类: 漏洞分析,安全工具,安全开发,数据安全,AI安全
从0到1AI Agent检测敏感信息泄露实践
原创
比心皮卡丘
暴暴的皮卡丘
2025年12月22日 08:50 广东
前言
在代码仓库、开源项目和企业内网中,API 密钥、数据库密码、RSA 私钥等敏感信息泄露已成为网络安全的 “重灾区”。传统检测工具要么依赖僵化规则导致误报率居高不下,要么缺乏语义理解能力无法识别 “示例密钥”” 测试数据 ” 等场景。而前沿的多智能体技术,通过角色分工与协同决策,完美解决了这一痛点 —— 既能利用大模型的语义理解能力穿透复杂场景,又能通过工具协作保证检测效率。
本文将结合Multi AI Agent核心思想与行业前沿实践,从技术原理、核心模块拆解到完整代码实现,手把手教你搭建一个可落地的 AI 多智能体敏感信息检测系统,让普通安全研究者也能快速上手。
一、前沿趋势
敏感信息检测的核心矛盾是 “准确性” 与 “效率” 的平衡:单一规则工具(如 TruffleHog)效率高但误报严重,单一大模型(如 GPT-4o)准确性高但成本昂贵、速度慢。而多智能体技术通过以下创新点打破僵局:
- 角色分工降本增效:将 “筛选、分析、决策” 拆分给不同智能体,让轻量工具处理格式匹配(快、省),让大模型聚焦语义理解(准、深),避免资源浪费;
- 分层检测过滤误报:从 “自身特征→局部上下文→全局引用” 三层递进分析,逐步排除占位符、测试数据等误报场景,准确率比传统工具提升 40% 以上;
- 灵活扩展适配场景:新增敏感信息类型(如区块链私钥、云厂商 Token)时,仅需扩展对应检测工具和提示词,无需重构整个系统。
目前,多智能体已成为网络安全检测的前沿方向,在SpectralOps 等工具中得到验证,其检测准确率普遍突破 90%,远超传统方案。
二、核心原理:多智能体敏感信息检测的工作流
我们设计的系统延续 “分层检测 + 多智能体协作” 核心逻辑,整体工作流如下:
各模块核心目标
- 初始筛选智能体:快速过滤无风险数据,生成候选集(减少大模型计算量);
- 基础检测智能体:验证敏感信息格式合规性,排除明显占位符(第一层检测);
- 高级检测智能体:通过语义理解和引用分析,确认信息真实性(第二层 + 第三层检测);
- 指挥官智能体:调度全局流程,整合所有结果输出最终判定;
- 共享内存池:存储原始数据、中间结果和证据链,实现智能体间信息共享。
三、核心技术模块:手把手代码实现
我们基于 Python 搭建系统,核心依赖:transformers(大模型调用)、trufflehog(初始筛选)、tree-sitter(代码解析)、langchain(智能体协作)。完整代码可直接运行,无需复杂配置。
前置准备:环境安装
# 安装核心依赖
pip install transformers torch trufflehog tree-sitter langchain openai python-dotenv
# 安装代码解析所需语言包(支持Python/Java/JS等)
tree-sitter build-wheels
pip install tree-sitter[all]
模块 1:共享内存池(数据存储中心)
class SharedMemoryPool:def __init__(self):# 三层存储结构:原始数据→中间结果→标准化结论
self.data = {"raw": [], # 原始候选信息:[{content, file_path, line_num}]"intermediate": [], # 中间结果:[{raw_id, tool_name, result}]"conclusion": [] # 标准化结论:[{raw_id, level1, level2, level3}]}def add_raw(self, content, file_path, line_num):"""添加原始候选信息"""
raw_id = len(self.data["raw"])
self.data["raw"].append({"id": raw_id,"content": content,"file_path": file_path,"line_num": line_num
})return raw_id
def add_intermediate(self, raw_id, tool_name, result):"""添加工具调用中间结果"""
self.data["intermediate"].append({"raw_id": raw_id,"tool_name": tool_name,"result": result
})def add_conclusion(self, raw_id, level1, level2=None, level3=None):"""添加标准化结论"""
self.data["conclusion"].append({"raw_id": raw_id,"level1": level1, # 第一层检测结果:valid/invalid(格式是否合规)"level2": level2, # 第二层检测结果:real/fake(上下文是否为示例)"level3": level3 # 第三层检测结果:used/unused(是否被项目引用)})def get_raw_by_id(self, raw_id):"""根据ID获取原始数据"""return next(item for item in self.data["raw"] if item["id"] == raw_id)def get_conclusion_by_id(self, raw_id):"""根据ID获取结论"""return next(item for item in self.data["conclusion"] if item["raw_id"] == raw_id)
模块 2:初始筛选智能体(快速生成候选集)
import subprocess
import json
class InitialFilterAgent:def __init__(self, shared_memory):
self.shared_memory = shared_memory
# 支持的敏感信息类型(可扩展)
self.supported_types = ["AWS", "GitHub", "PrivateKey", "JDBC", "MongoDB"]def scan(self, target_path):"""扫描目标路径(文件/文件夹),生成候选敏感信息"""# 调用TruffleHog扫描,输出JSON格式结果
result = subprocess.run(["trufflehog", "filesystem", target_path, "--json"],
capture_output=True, text=True)if not result.stdout:print("未发现潜在敏感信息")return# 解析结果,过滤支持的类型for line in result.stdout.strip().split("\n"):try:
item = json.loads(line)
secret_type = item.get("detector_type", "")if secret_type in self.supported_types:# 提取核心信息,存入共享内存
self.shared_memory.add_raw(
content=item["raw"],
file_path=item["path"],
line_num=item["line_number"])except json.JSONDecodeError:continueprint(f"初始筛选完成,生成{len(self.shared_memory.data['raw'])}条候选信息")
模块 3:基础检测智能体(第一层:格式 + 占位符检测)
import re
class BasicDetectionAgent:def __init__(self, shared_memory):
self.shared_memory = shared_memory
# 10类常见敏感信息正则库(可扩展)
self.patterns = {"AWS": r"AWS_ACCESS_KEY_ID\s*=\s*[A-Z0-9]{20}|AWS_SECRET_ACCESS_KEY\s*=\s*[A-Za-z0-9/+]{40}","GitHub": r"ghp_[A-Za-z0-9]{36}|gho_[A-Za-z0-9]{36}","PrivateKey": r"-----BEGIN (RSA|EC|DSA) PRIVATE KEY-----","JDBC": r"jdbc:[a-z0-9]+://[a-z0-9]+:[a-z0-9]+@[a-z0-9.:]+/[a-z0-9_]+","MongoDB": r"mongodb://[a-z0-9_]+:[a-z0-9_]+@[a-z0-9.:]+/[a-z0-9_]+"}# 常见占位符库
self.placeholder_keywords = ["username", "password", "xxx", "test", "example", "demo"]def _check_format(self, content):"""验证格式合规性"""for secret_type, pattern in self.patterns.items():if re.search(pattern, content, re.IGNORECASE):returnTrue, secret_type
returnFalse, Nonedef _check_placeholder(self, content):"""检测是否为占位符"""for keyword in self.placeholder_keywords:if keyword.lower() in content.lower():return Truereturn Falsedef run(self, raw_id):"""执行第一层检测:格式验证+占位符过滤"""
raw_data = self.shared_memory.get_raw_by_id(raw_id)
content = raw_data["content"]# 1. 格式检测
format_valid, secret_type = self._check_format(content)
self.shared_memory.add_intermediate(
raw_id=raw_id,
tool_name="format_checker",
result={"valid": format_valid, "type": secret_type})# 2. 占位符检测
is_placeholder = self._check_placeholder(content)
self.shared_memory.add_intermediate(
raw_id=raw_id,
tool_name="placeholder_checker",
result={"is_placeholder": is_placeholder})# 生成第一层结论:格式合规且非占位符→valid,否则→invalid
level1 = "valid"if (format_valid andnot is_placeholder) else"invalid"
self.shared_memory.add_conclusion(
raw_id=raw_id,
level1=level1
)return level1
模块 4:高级检测智能体(第二层 + 第三层:上下文 + 引用分析)
4.1 上下文语义分析(第二层)
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from dotenv import load_dotenv
import os
# 加载OpenAI API密钥(创建.env文件,写入OPENAI_API_KEY=你的密钥)
load_dotenv()class ContextAnalysisTool:def __init__(self):
self.llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)# 提示词模板(关键:明确任务边界,减少幻觉)
self.prompt = ChatPromptTemplate.from_messages([("system", "你是敏感信息上下文分析专家,仅判断以下内容是否为真实业务数据,还是示例/测试/占位符数据。"),("system", "判断规则:1. 若包含'示例''测试''demo''替换'等关键词→示例数据;2. 语义模糊、无实际意义→示例数据;3. 有明确业务关联(如真实域名、项目名称)→真实数据。"),("human", "敏感信息内容:{content}\n上下文(前后5行):{context}\n请仅输出结果:real(真实)或fake(示例)")])
self.chain = self.prompt | self.llm
def get_context(self, file_path, line_num):"""获取敏感信息所在行的前后5行上下文"""try:with open(file_path, "r", encoding="utf-8") as f:
lines = f.readlines()# 计算上下文范围(避免越界)
start = max(0, line_num - 6) # 行号从1开始,列表从0开始
end = min(len(lines), line_num + 4)
context = "".join(lines[start:end])return context[:500] # 限制长度,降低API成本except Exception as e:print(f"读取上下文失败:{e}")return ""def analyze(self, content, file_path, line_num):"""执行上下文分析"""
context = self.get_context(file_path, line_num)
response = self.chain.invoke({"content": content,"context": context
})return response.content.strip().lower()
4.2 全局引用分析(第三层)
import os
from tree_sitter import Language, Parser
# 加载代码解析语言(以Python为例,可扩展Java/JS等)
PY_LANGUAGE = Language('build/my-languages.so', 'python')
parser = Parser()
parser.set_language(PY_LANGUAGE)class ReferenceAnalysisTool:def __init__(self, project_root):
self.project_root = project_root # 项目根目录
self.supported_extensions = [".py", ".java", ".js", ".go"] # 支持的代码文件后缀def _parse_file(self, file_path):"""解析单个代码文件,获取所有引用的文件路径"""try:with open(file_path, "r", encoding="utf-8") as f:
code = f.read()
tree = parser.parse(bytes(code, "utf8"))return tree
except Exception as e:print(f"解析文件失败:{e}")return Nonedef _find_references(self, target_file):"""查找项目中所有引用目标文件的位置"""
target_file_name = os.path.basename(target_file)
references = []# 遍历项目所有代码文件for root, dirs, files in os.walk(self.project_root):for file in files:if any(file.endswith(ext) for ext in self.supported_extensions):
file_path = os.path.join(root, file)
tree = self._parse_file(file_path)ifnot tree:continue# 查找import/require语句(以Python为例)for node in tree.root_node.children:if node.type == "import_statement" or node.type == "from_import_statement":# 提取引用的文件名(简化逻辑,生产环境可优化)if target_file_name.split(".")[0] in node.text.decode("utf-8"):
references.append(file_path)return references
def analyze(self, target_file):"""执行引用分析:返回used(被引用)或unused(未被引用)"""
references = self._find_references(target_file)return"used"if references else "unused"
4.3 高级检测智能体封装
class AdvancedDetectionAgent:def __init__(self, shared_memory, project_root):
self.shared_memory = shared_memory
self.context_tool = ContextAnalysisTool()
self.reference_tool = ReferenceAnalysisTool(project_root)def run(self, raw_id):"""执行第二层+第三层检测"""
raw_data = self.shared_memory.get_raw_by_id(raw_id)
content = raw_data["content"]
file_path = raw_data["file_path"]
line_num = raw_data["line_num"]# 1. 第二层:上下文语义分析
level2 = self.context_tool.analyze(content, file_path, line_num)
self.shared_memory.add_intermediate(
raw_id=raw_id,
tool_name="context_analyzer",
result={"level2": level2})# 2. 第三层:全局引用分析(仅对文件类型敏感信息生效,如RSA私钥文件)if "PRIVATE KEY" in content or file_path.endswith((".pem", ".key")):
level3 = self.reference_tool.analyze(file_path)else:
level3 = "used"# 非文件类型(如API密钥)默认视为被使用
self.shared_memory.add_intermediate(
raw_id=raw_id,
tool_name="reference_analyzer",
result={"level3": level3})# 更新结论
conclusion = self.shared_memory.get_conclusion_by_id(raw_id)
self.shared_memory.add_conclusion(
raw_id=raw_id,
level1=conclusion["level1"],
level2=level2,
level3=level3
)return level2, level3
模块 5:指挥官智能体(全局调度与决策)
class CommanderAgent:def __init__(self, shared_memory, project_root):
self.shared_memory = shared_memory
self.basic_agent = BasicDetectionAgent(shared_memory)
self.advanced_agent = AdvancedDetectionAgent(shared_memory, project_root)def _decision_logic(self, level1, level2, level3):"""决策逻辑:基于三层检测结果判定是否为真实泄露"""# 规则1:第一层格式无效→直接误报if level1 == "invalid":return "误报(格式无效或占位符)"# 规则2:格式有效,但上下文为示例→误报if level2 == "fake":return "误报(上下文为示例/测试数据)"# 规则3:格式有效+上下文真实,但未被引用→误报if level3 == "unused":return "误报(未被项目实际引用)"# 规则4:满足所有真实条件→真实泄露return "真实泄露"def run(self, target_path):"""执行全局检测流程"""# 1. 初始筛选:生成候选集
initial_agent = InitialFilterAgent(self.shared_memory)
initial_agent.scan(target_path)# 2. 遍历所有候选信息,执行分层检测
results = []for raw_item in self.shared_memory.data["raw"]:
raw_id = raw_item["id"]print(f"\n正在检测候选信息ID:{raw_id}")# 3. 基础检测(第一层)
level1 = self.basic_agent.run(raw_id)print(f"第一层检测结果:{level1}")# 4. 若基础检测有效,执行高级检测(第二层+第三层)if level1 == "valid":
level2, level3 = self.advanced_agent.run(raw_id)print(f"第二层检测结果:{level2}")print(f"第三层检测结果:{level3}")else:
level2 = None
level3 = None# 5. 决策判定
final_result = self._decision_logic(level1, level2, level3)
results.append({"敏感信息内容": raw_item["content"],"所在文件": raw_item["file_path"],"行号": raw_item["line_num"],"检测结果": final_result,"分层检测详情": {"格式合规性": level1,"上下文真实性": level2,"项目引用状态": level3
}})# 6. 输出最终报告
self._generate_report(results)def _generate_report(self, results):"""生成检测报告"""print("\n" + "="*50)print("多智能体敏感信息检测报告")print("="*50)print(f"检测文件/目录:{target_path}")print(f"候选信息总数:{len(results)}")
real_leaks = [r for r in results if r["检测结果"] == "真实泄露"]print(f"真实泄露数量:{len(real_leaks)}")print(f"误报数量:{len(results) - len(real_leaks)}")print("\n详细结果:")for i, result in enumerate(results, 1):print(f"\n{i}. 敏感信息:{result['敏感信息内容'][:50]}...")print(f" 位置:{result['所在文件']}(第{result['行号']}行)")print(f" 结果:{result['检测结果']}")print(f" 详情:{result['分层检测详情']}")
模块 6:系统入口(一键运行)
if __name__ == "__main__":# 1. 初始化共享内存池
shared_memory = SharedMemoryPool()# 2. 配置检测目标(文件或文件夹路径)
TARGET_PATH = "./test_project" # 替换为你的检测目标
PROJECT_ROOT = "./test_project" # 项目根目录(用于引用分析)# 3. 启动指挥官智能体,执行检测
commander = CommanderAgent(shared_memory, PROJECT_ROOT)
commander.run(TARGET_PATH)
四、实战案例:验证系统效果
我们创建一个测试项目test_project,包含 3 类典型场景,验证系统检测能力。
第一步:搭建测试项目结构
plaintext
test_project/
├── payment.py # 场景1:真实AWS密钥
├── docs/
│ └── example.md # 场景2:示例MongoDB链接
└── keys/
└── test.pem # 场景3:未被引用的RSA私钥
第二步:编写测试场景代码
场景 1:真实 AWS 密钥(payment.py)
# 支付模块配置(真实业务使用)
AWS_ACCESS_KEY_ID = "AKIAIOSFODNN7EXAMPLE"
AWS_SECRET_ACCESS_KEY = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
场景 2:示例 MongoDB 链接(docs/example.md)
# 数据库配置示例
以下为MongoDB连接示例,实际部署时请替换为真实密钥:
mongodb://username:password@localhost:27017/test_db
场景 3:未被引用的 RSA 私钥(keys/test.pem)
-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAvF3hGIoDp8K9aXm1r/3H9Z9w5d5d5d5d5d5d5d5d5d5d5d5d5
MIIEowIBAAKCAQEAvF3hGIoDp8K9aXm1r/3H9Z9w5d5d5d5d5d5d5d5d5d5d5d5d5
MIIEogIBAAKCAQEAvF3hGIoDp8K9aXm1r/3H9Z9w5d5d5d5d5d5d5d5d5d5d5d5d5
-----END RSA PRIVATE KEY-----
五、系统优化与扩展建议
- 降低 API 成本:将 GPT-3.5 替换为开源大模型(如 Llama 3、Qwen),通过本地部署避免 API 费用;
- 扩展敏感信息类型:在BasicDetectionAgent的patterns中添加新类型正则(如阿里云 AccessKey、区块链私钥);
- 优化引用分析:支持更多语言(Java/JS),通过tree-sitter扩展语言包,完善 import/require 语句解析逻辑;
- 批量检测支持:添加多线程 / 异步处理,支持同时检测多个项目;
- 可视化报告:集成 Flask 搭建 Web 界面,展示检测结果、证据链和泄露风险等级。
六、总结
本文基于多智能体与分层检测的核心思想,实现了一个可落地、易扩展的敏感信息泄露检测系统。通过角色分工的智能体协作,既保证了检测效率(初始筛选 + 工具匹配),又确保了检测准确性(大模型语义理解 + 全局引用分析)。
普通安全研究者只需替换检测目标路径、配置 API 密钥,即可直接运行系统;若需适配特定场景,仅需扩展正则库、提示词模板或代码解析逻辑,门槛极低。未来,随着大模型能力的提升和多智能体协作模式的优化,这类系统将在网络安全检测领域发挥更大价值。
免责声明:
本文所载程序、技术方法仅面向合法合规的安全研究与教学场景,旨在提升网络安全防护能力,具有明确的技术研究属性。
任何单位或个人未经授权,将本文内容用于攻击、破坏等非法用途的,由此引发的全部法律责任、民事赔偿及连带责任,均由行为人独立承担,本站不承担任何连带责任。
本站内容均为技术交流与知识分享目的发布,若存在版权侵权或其他异议,请通过邮件联系处理,具体联系方式可点击页面上方的联系我。
本文转载自:暴暴的皮卡丘 比心皮卡丘《从0到1AI Agent检测敏感信息泄露实践》
版权声明
本站仅做备份收录,仅供研究与教学参考之用。
读者将信息用于其他用途的,全部法律及连带责任由读者自行承担,本站不承担任何责任。










评论