Agent开发|从0实现Agent(四):构建基于DAG图的任务系统(复杂任务协同篇)

admin 2026-03-30 00:26:19 网络安全文章 来源:ZONE.CI 全球网 0 阅读模式

文章总结: 文章讲解基于DAG的任务系统实现,将任务存为JSON文件并建立依赖图,支持并行执行和状态持久化,为多智能体协作提供协调框架。 综合评分: 88 文章分类: AI安全,实战经验,安全开发


cover_image

Agent开发|从0实现Agent(四):构建基于DAG图的任务系统(复杂任务协同篇)

原创

Real返璞归真 Real返璞归真

Real返璞归真

2026年3月25日 20:26 山东

公众号

欢迎关注公众号【Real返璞归真】,我们将不定期分享CTF竞赛、二进制安全、JS/安卓逆向、AI安全等领域的前沿知识与技术内容。

前言

简介

Agent = LLM + 工具 + 循环

模型就是智能体,我们的工作就是给它工具,然后让开。

前面三次实验,我们完成了基础Agent运行工具调用任务规划SKill封装上下文压缩

本篇文章将通过1个实验,实现更完善的基于DAG图的Agent任务系统。

参考资料

learn-claude-code:https://github.com/shareAI-lab/learn-claude-code

12 个递进式课程, 从简单循环到隔离化的自治执行。每个课程添加一个机制。每个机制有一句格言。

s01“One loop & Bash is all you need” — 一个工具 + 一个循环 = 一个智能体

s02“加一个工具, 只加一个 handler” — 循环不用动, 新工具注册进 dispatch map 就行

s03“没有计划的 agent 走哪算哪” — 先列步骤再动手, 完成率翻倍

s04“大任务拆小, 每个小任务干净的上下文” — 子智能体用独立 messages[], 不污染主对话

s05“用到什么知识, 临时加载什么知识” — 通过 tool_result 注入, 不塞 system prompt

s06“上下文总会满, 要有办法腾地方” — 三层压缩策略, 换来无限会话

s07“大目标要拆成小任务, 排好序, 记在磁盘上” — 文件持久化的任务图, 为多 agent 协作打基础

s08“慢操作丢后台, agent 继续想下一步” — 后台线程跑命令, 完成后注入通知

s09“任务太大一个人干不完, 要能分给队友” — 持久化队友 + 异步邮箱

s10“队友之间要有统一的沟通规矩” — 一个 request-response 模式驱动所有协商

s11“队友自己看看板, 有活就认领” — 不需要领导逐个分配, 自组织

s12“各干各的目录, 互不干扰” — 任务管目标, worktree 管目录, 按 ID 绑定

s07:任务系统 – Task Graph + Dependencies

A file-based task graph with ordering, parallelism, and dependencies — the coordination backbone for multi-agent work

基于文件的任务图,具备排序、并行和依赖管理——多智能体工作的协调核心。

问题

s03 的 TodoManager 只是内存中的扁平清单: 没有顺序、没有依赖,状态只有是否完成。

真实目标是有结构的 — 任务 B 依赖任务 A,任务 C 和 D 可以并行,任务 E 要等 C 和 D 都完成。

没有显式的关系,智能体分不清什么能做、什么被卡住、什么能同时跑。而且清单只活在内存里,上下文压缩 (s06) 以后就没了。

解决方案

把扁平清单升级为持久化到磁盘的任务图。每个任务是一个 JSON 文件,有状态、前置依赖 (blockedBy) 和后置依赖 (blocks)。

任务图随时回答三个问题:

  • 什么可以做?状态为 pending 且 blockedBy 为空的任务。
  • 什么被卡住?等待前置任务完成的任务。
  • 什么做完了?状态为 completed 的任务, 完成时自动解锁后续任务。
.tasks/
  task_1.json  {"id":1, "status":"completed"}
  task_2.json  {"id":2, "blockedBy":[1], "status":"pending"}
  task_3.json  {"id":3, "blockedBy":[1], "status":"pending"}
  task_4.json  {"id":4, "blockedBy":[2,3], "status":"pending"}

任务图 (DAG):
                 +----------+
            +--> | task 2   | --+
            |    | pending  |   |
+----------+     +----------+    +--> +----------+
| task 1   |                          | task 4   |
| completed| --> +----------+    +--> | blocked  |
+----------+     | task 3   | --+     +----------+
                 | pending  |
                 +----------+

顺序:   task 1 必须先完成, 才能开始 2 和 3
并行:   task 2 和 3 可以同时执行
依赖:   task 4 要等 2 和 3 都完成
状态:   pending -> in_progress -> completed

这个任务图是 s07 之后所有机制的协调骨架:后台执行 (s08)、多 agent 团队 (s09+)、worktree 隔离 (s12) 都读写这同一个结构。

工作原理

  1. 修改TaskManager,每个任务一个 JSON 文件,定义CRUD + 依赖图:
   class TaskManager:
       def __init__(self, tasks_dir: Path):
           self.dir = tasks_dir
           self.dir.mkdir(exist_ok=True)
           self._next_id = self._max_id() + 1

       def create(self, subject, description=""):
           task = {"id": self._next_id, "subject": subject,
                   "status": "pending", "blockedBy": [],
                   "blocks": [], "owner": ""}
           self._save(task)
           self._next_id += 1
           return json.dumps(task, indent=2)
  1. 定义依赖解除功能,完成任务时,自动将其 ID 从其他任务的 blockedBy 中移除,解锁后续任务:
   def _clear_dependency(self, completed_id):
       for f in self.dir.glob("task_*.json"):
           task = json.loads(f.read_text())
           if completed_id in task.get("blockedBy", []):
               task["blockedBy"].remove(completed_id)
               self._save(task)
  1. 状态变更 + 依赖关联,使用update 处理状态转换和依赖边:
   def update(self, task_id, status=None,
              add_blocked_by=None, add_blocks=None):
       task = self._load(task_id)
       if status:
           task["status"] = status
           if status == "completed":
               self._clear_dependency(task_id)
       self._save(task)
  1. 将四个任务工具加入dispatch map
   TOOL_HANDLERS = {
       # ...base tools...
       "task_create": lambda **kw: TASKS.create(kw["subject"]),
       "task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status")),
       "task_list":   lambda **kw: TASKS.list_all(),
       "task_get":    lambda **kw: TASKS.get(kw["task_id"]),
   }

从 s07 起, 任务图是多步工作的默认选择。s03 的 Todo 仍可用于单次会话内的快速清单。

完整代码

#!/usr/bin/env python3
"""
s07_task_system.py - Tasks

Tasks persist as JSON files in .tasks/ so they survive context compression.

Each task has a dependency graph (blockedBy/blocks).

    .tasks/
      task_1.json  {"id":1, "subject":"...", "status":"completed", ...}
      task_2.json  {"id":2, "blockedBy":[1], "status":"pending", ...}
      task_3.json  {"id":3, "blockedBy":[2], "blocks":[], ...}

    Dependency resolution:

    +----------+     +----------+     +----------+
    | task 1   | --> | task 2   | --> | task 3   |
    | complete |     | blocked  |     | blocked  |
    +----------+     +----------+     +----------+

         |
         +--- completing task 1 removes it from task 2's blockedBy

Key insight: "State that survives compression -- because it's outside the conversation."
"""

import json
import os
import subprocess
from pathlib import Path
from anthropic import Anthropic
from dotenv import load_dotenv

load_dotenv(override=True)

if os.getenv("ANTHROPIC_BASE_URL"):
    os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)

WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
TASKS_DIR = WORKDIR / ".tasks"

SYSTEM = f"You are a coding agent at {WORKDIR}. Use task tools to plan and track work."

# -- TaskManager: CRUD with dependency graph, persisted as JSON files --

class TaskManager:

    def __init__(self, tasks_dir: Path):
        self.dir = tasks_dir
        self.dir.mkdir(exist_ok=True)
        self._next_id = self._max_id() + 1

    def _max_id(self) -> int:
        ids = [int(f.stem.split("_")[1]) for f in self.dir.glob("task_*.json")]
        return max(ids) if ids else 0

    def _load(self, task_id: int) -> dict:
        path = self.dir / f"task_{task_id}.json"
        if not path.exists():
            raise ValueError(f"Task {task_id} not found")
        return json.loads(path.read_text())

    def _save(self, task: dict):
        path = self.dir / f"task_{task['id']}.json"
        path.write_text(json.dumps(task, indent=2))

    def create(self, subject: str, description: str = "") -> str:
        task = {
            "id": self._next_id,
            "subject": subject,
            "description": description,
            "status": "pending",
            "blockedBy": [],
            "blocks": [],
            "owner": "",
        }
        self._save(task)
        self._next_id += 1
        return json.dumps(task, indent=2)

    def get(self, task_id: int) -> str:
        return json.dumps(self._load(task_id), indent=2)

    def update(
        self,
        task_id: int,
        status: str = None,
        add_blocked_by: list = None,
        add_blocks: list = None,
    ) -> str:

        task = self._load(task_id)

        if status:
            if status not in ("pending", "in_progress", "completed"):
                raise ValueError(f"Invalid status: {status}")

            task["status"] = status

            if status == "completed":
                self._clear_dependency(task_id)

        if add_blocked_by:
            task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by))

        if add_blocks:
            task["blocks"] = list(set(task["blocks"] + add_blocks))

            for blocked_id in add_blocks:
                try:
                    blocked = self._load(blocked_id)
                    if task_id not in blocked["blockedBy"]:
                        blocked["blockedBy"].append(task_id)
                        self._save(blocked)
                except ValueError:
                    pass

        self._save(task)
        return json.dumps(task, indent=2)

    def _clear_dependency(self, completed_id: int):
        """Remove completed_id from all other tasks' blockedBy lists."""
        for f in self.dir.glob("task_*.json"):
            task = json.loads(f.read_text())
            if completed_id in task.get("blockedBy", []):
                task["blockedBy"].remove(completed_id)
                self._save(task)

    def list_all(self) -> str:
        tasks = []
        for f in sorted(self.dir.glob("task_*.json")):
            tasks.append(json.loads(f.read_text()))

        if not tasks:
            return "No tasks."

        lines = []
        for t in tasks:
            marker = {
                "pending": "[ ]",
                "in_progress": "[>]",
                "completed": "[x]",
            }.get(t["status"], "[?]")

            blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else ""

            lines.append(f"{marker} #{t['id']}: {t['subject']}{blocked}")

        return "\n".join(lines)

TASKS = TaskManager(TASKS_DIR)

# -- Base tool implementations --

def safe_path(p: str) -> Path:
    path = (WORKDIR / p).resolve()
    if not path.is_relative_to(WORKDIR):
        raise ValueError(f"Path escapes workspace: {p}")
    return path

def run_bash(command: str) -> str:
    dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]

    if any(d in command for d in dangerous):
        return "Error: Dangerous command blocked"

    try:
        r = subprocess.run(
            command,
            shell=True,
            cwd=WORKDIR,
            capture_output=True,
            text=True,
            timeout=120,
        )

        out = (r.stdout + r.stderr).strip()
        return out[:50000] if out else "(no output)"

    except subprocess.TimeoutExpired:
        return "Error: Timeout (120s)"

def run_read(path: str, limit: int = None) -> str:
    try:
        lines = safe_path(path).read_text().splitlines()

&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;limit&nbsp;and&nbsp;limit < len(lines):
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; lines = lines[:limit] + [f"... ({len(lines) - limit}&nbsp;more)"]

&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;"\n".join(lines)[:50000]

&nbsp; &nbsp;&nbsp;except&nbsp;Exception&nbsp;as&nbsp;e:
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;f"Error:&nbsp;{e}"

def&nbsp;run_write(path: str, content: str)&nbsp;-> str:
&nbsp; &nbsp;&nbsp;try:
&nbsp; &nbsp; &nbsp; &nbsp; fp = safe_path(path)
&nbsp; &nbsp; &nbsp; &nbsp; fp.parent.mkdir(parents=True, exist_ok=True)
&nbsp; &nbsp; &nbsp; &nbsp; fp.write_text(content)
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;f"Wrote&nbsp;{len(content)}&nbsp;bytes"

&nbsp; &nbsp;&nbsp;except&nbsp;Exception&nbsp;as&nbsp;e:
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;f"Error:&nbsp;{e}"

def&nbsp;run_edit(path: str, old_text: str, new_text: str)&nbsp;-> str:
&nbsp; &nbsp;&nbsp;try:
&nbsp; &nbsp; &nbsp; &nbsp; fp = safe_path(path)
&nbsp; &nbsp; &nbsp; &nbsp; c = fp.read_text()

&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;old_text&nbsp;not&nbsp;in&nbsp;c:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;f"Error: Text not found in&nbsp;{path}"

&nbsp; &nbsp; &nbsp; &nbsp; fp.write_text(c.replace(old_text, new_text,&nbsp;1))
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;f"Edited&nbsp;{path}"

&nbsp; &nbsp;&nbsp;except&nbsp;Exception&nbsp;as&nbsp;e:
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return&nbsp;f"Error:&nbsp;{e}"

TOOL_HANDLERS = {
&nbsp; &nbsp;&nbsp;"bash":&nbsp;lambda&nbsp;**kw: run_bash(kw["command"]),
&nbsp; &nbsp;&nbsp;"read_file":&nbsp;lambda&nbsp;**kw: run_read(kw["path"], kw.get("limit")),
&nbsp; &nbsp;&nbsp;"write_file":&nbsp;lambda&nbsp;**kw: run_write(kw["path"], kw["content"]),
&nbsp; &nbsp;&nbsp;"edit_file":&nbsp;lambda&nbsp;**kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
&nbsp; &nbsp;&nbsp;"task_create":&nbsp;lambda&nbsp;**kw: TASKS.create(kw["subject"], kw.get("description",&nbsp;"")),
&nbsp; &nbsp;&nbsp;"task_update":&nbsp;lambda&nbsp;**kw: TASKS.update(
&nbsp; &nbsp; &nbsp; &nbsp; kw["task_id"],
&nbsp; &nbsp; &nbsp; &nbsp; kw.get("status"),
&nbsp; &nbsp; &nbsp; &nbsp; kw.get("addBlockedBy"),
&nbsp; &nbsp; &nbsp; &nbsp; kw.get("addBlocks"),
&nbsp; &nbsp; ),
&nbsp; &nbsp;&nbsp;"task_list":&nbsp;lambda&nbsp;**kw: TASKS.list_all(),
&nbsp; &nbsp;&nbsp;"task_get":&nbsp;lambda&nbsp;**kw: TASKS.get(kw["task_id"]),
}

TOOLS = [
&nbsp; &nbsp; {
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"name":&nbsp;"bash",
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"description":&nbsp;"Run a shell command.",
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"input_schema": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"type":&nbsp;"object",
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"properties": {"command": {"type":&nbsp;"string"}},
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"required": ["command"],
&nbsp; &nbsp; &nbsp; &nbsp; },
&nbsp; &nbsp; },
&nbsp; &nbsp; {
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"name":&nbsp;"read_file",
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"description":&nbsp;"Read file contents.",
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"input_schema": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"type":&nbsp;"object",
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"properties": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"path": {"type":&nbsp;"string"},
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"limit": {"type":&nbsp;"integer"},
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; },
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"required": ["path"],
&nbsp; &nbsp; &nbsp; &nbsp; },
&nbsp; &nbsp; },
&nbsp; &nbsp; {
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"name":&nbsp;"write_file",
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"description":&nbsp;"Write content to file.",
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"input_schema": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"type":&nbsp;"object",
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"properties": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"path": {"type":&nbsp;"string"},
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"content": {"type":&nbsp;"string"},
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; },
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"required": ["path",&nbsp;"content"],
&nbsp; &nbsp; &nbsp; &nbsp; },
&nbsp; &nbsp; },
&nbsp; &nbsp; {
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"name":&nbsp;"edit_file",
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"description":&nbsp;"Replace exact text in file.",
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"input_schema": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"type":&nbsp;"object",
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"properties": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"path": {"type":&nbsp;"string"},
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"old_text": {"type":&nbsp;"string"},
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"new_text": {"type":&nbsp;"string"},
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; },
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"required": ["path",&nbsp;"old_text",&nbsp;"new_text"],
&nbsp; &nbsp; &nbsp; &nbsp; },
&nbsp; &nbsp; },
&nbsp; &nbsp; {
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"name":&nbsp;"task_create",
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"description":&nbsp;"Create a new task.",
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"input_schema": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"type":&nbsp;"object",
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"properties": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"subject": {"type":&nbsp;"string"},
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"description": {"type":&nbsp;"string"},
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; },
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"required": ["subject"],
&nbsp; &nbsp; &nbsp; &nbsp; },
&nbsp; &nbsp; },
&nbsp; &nbsp; {
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"name":&nbsp;"task_update",
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"description":&nbsp;"Update a task's status or dependencies.",
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"input_schema": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"type":&nbsp;"object",
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"properties": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"task_id": {"type":&nbsp;"integer"},
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"status": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"type":&nbsp;"string",
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"enum": ["pending",&nbsp;"in_progress",&nbsp;"completed"],
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; },
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"addBlockedBy": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"type":&nbsp;"array",
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"items": {"type":&nbsp;"integer"},
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; },
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"addBlocks": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"type":&nbsp;"array",
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"items": {"type":&nbsp;"integer"},
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; },
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; },
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"required": ["task_id"],
&nbsp; &nbsp; &nbsp; &nbsp; },
&nbsp; &nbsp; },
&nbsp; &nbsp; {
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"name":&nbsp;"task_list",
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"description":&nbsp;"List all tasks with status summary.",
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"input_schema": {"type":&nbsp;"object",&nbsp;"properties": {}},
&nbsp; &nbsp; },
&nbsp; &nbsp; {
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"name":&nbsp;"task_get",
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"description":&nbsp;"Get full details of a task by ID.",
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"input_schema": {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"type":&nbsp;"object",
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"properties": {"task_id": {"type":&nbsp;"integer"}},
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"required": ["task_id"],
&nbsp; &nbsp; &nbsp; &nbsp; },
&nbsp; &nbsp; },
]

def&nbsp;agent_loop(messages: list):
&nbsp; &nbsp;&nbsp;while&nbsp;True:
&nbsp; &nbsp; &nbsp; &nbsp; response = client.messages.create(
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; model=MODEL,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; system=SYSTEM,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; messages=messages,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; tools=TOOLS,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; max_tokens=8000,
&nbsp; &nbsp; &nbsp; &nbsp; )

&nbsp; &nbsp; &nbsp; &nbsp; messages.append({"role":&nbsp;"assistant",&nbsp;"content": response.content})

&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;response.stop_reason !=&nbsp;"tool_use":
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;return

&nbsp; &nbsp; &nbsp; &nbsp; results = []

&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;block&nbsp;in&nbsp;response.content:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;block.type ==&nbsp;"tool_use":

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; handler = TOOL_HANDLERS.get(block.name)

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;try:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; output = handler(**block.input)&nbsp;if&nbsp;handler&nbsp;else&nbsp;f"Unknown tool:&nbsp;{block.name}"
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;except&nbsp;Exception&nbsp;as&nbsp;e:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; output =&nbsp;f"Error:&nbsp;{e}"

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print(f">&nbsp;{block.name}:&nbsp;{str(output)[:200]}")

&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; results.append({
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"type":&nbsp;"tool_result",
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"tool_use_id": block.id,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;"content": str(output),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; })

&nbsp; &nbsp; &nbsp; &nbsp; messages.append({"role":&nbsp;"user",&nbsp;"content": results})

if&nbsp;__name__ ==&nbsp;"__main__":
&nbsp; &nbsp; history = []
&nbsp; &nbsp;&nbsp;while&nbsp;True:
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;try:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; query = input("\033[36ms07 >> \033[0m")
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;except&nbsp;(EOFError, KeyboardInterrupt):
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;break
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;query.strip().lower()&nbsp;in&nbsp;("q",&nbsp;"exit",&nbsp;""):
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;break
&nbsp; &nbsp; &nbsp; &nbsp; history.append({"role":&nbsp;"user",&nbsp;"content": query})
&nbsp; &nbsp; &nbsp; &nbsp; agent_loop(history)
&nbsp; &nbsp; &nbsp; &nbsp; response_content = history[-1]["content"]
&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;isinstance(response_content, list):
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;for&nbsp;block&nbsp;in&nbsp;response_content:
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;if&nbsp;hasattr(block,&nbsp;"text"):
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print(block.text)
&nbsp; &nbsp; &nbsp; &nbsp; print()

效果演示

cd&nbsp;learn-claude-code
python3 agents/s07_task_system.py

尝试提示词:

Create 3 tasks:&nbsp;"Setup project",&nbsp;"Write code",&nbsp;"Write tests". Make them depend on each other&nbsp;in&nbsp;order.
List all tasks and show the dependency graph
Complete task 1 and&nbsp;then&nbsp;list tasks to see task 2 unblocked
Create a task board&nbsp;for&nbsp;refactoring: parse -> transform -> emit ->&nbsp;test,&nbsp;where&nbsp;transform and emit can run&nbsp;in&nbsp;parallel after parse

深入探索

Q:Task为主线,Todo是否还有应用场景?

A:TaskManager 延续了 Todo 的模型,并在本课程 s07 之后成为默认主线。两者都管理带状态的任务项,但 TaskManager 增加了文件持久化(崩溃后可恢复)、依赖追踪(blocks/blockedBy)、owner 字段与多进程协作能力。Todo 仍适合短、线性、一次性的轻量跟踪。

总结

本节通过引入基于DAG图的任务系统,可以处理更复杂的任务关系(依赖、并行等)。

并且,使用CRUD磁盘存储,有效避免了因上下文压缩导致Todo任务清单被清理的问题。


免责声明:

本文所载程序、技术方法仅面向合法合规的安全研究与教学场景,旨在提升网络安全防护能力,具有明确的技术研究属性。

任何单位或个人未经授权,将本文内容用于攻击、破坏等非法用途的,由此引发的全部法律责任、民事赔偿及连带责任,均由行为人独立承担,本站不承担任何连带责任。

本站内容均为技术交流与知识分享目的发布,若存在版权侵权或其他异议,请通过邮件联系处理,具体联系方式可点击页面上方的联系我

本文转载自:Real返璞归真 Real返璞归真 Real返璞归真《Agent开发|从0实现Agent(四):构建基于DAG图的任务系统(复杂任务协同篇)》

评论:0   参与:  0