文章总结: 文章讲解基于DAG的任务系统实现,将任务存为JSON文件并建立依赖图,支持并行执行和状态持久化,为多智能体协作提供协调框架。 综合评分: 88 文章分类: AI安全,实战经验,安全开发
Agent开发|从0实现Agent(四):构建基于DAG图的任务系统(复杂任务协同篇)
原创
Real返璞归真 Real返璞归真
Real返璞归真
2026年3月25日 20:26 山东
公众号
欢迎关注公众号【Real返璞归真】,我们将不定期分享CTF竞赛、二进制安全、JS/安卓逆向、AI安全等领域的前沿知识与技术内容。
前言
简介
❝
Agent = LLM + 工具 + 循环
模型就是智能体,我们的工作就是给它工具,然后让开。
前面三次实验,我们完成了基础Agent运行、工具调用、任务规划、SKill封装和上下文压缩。
本篇文章将通过1个实验,实现更完善的基于DAG图的Agent任务系统。
参考资料
learn-claude-code:https://github.com/shareAI-lab/learn-claude-code
❝
12 个递进式课程, 从简单循环到隔离化的自治执行。每个课程添加一个机制。每个机制有一句格言。
s01“One loop & Bash is all you need” — 一个工具 + 一个循环 = 一个智能体
s02“加一个工具, 只加一个 handler” — 循环不用动, 新工具注册进 dispatch map 就行
s03“没有计划的 agent 走哪算哪” — 先列步骤再动手, 完成率翻倍
s04“大任务拆小, 每个小任务干净的上下文” — 子智能体用独立 messages[], 不污染主对话
s05“用到什么知识, 临时加载什么知识” — 通过 tool_result 注入, 不塞 system prompt
s06“上下文总会满, 要有办法腾地方” — 三层压缩策略, 换来无限会话
s07“大目标要拆成小任务, 排好序, 记在磁盘上” — 文件持久化的任务图, 为多 agent 协作打基础
s08“慢操作丢后台, agent 继续想下一步” — 后台线程跑命令, 完成后注入通知
s09“任务太大一个人干不完, 要能分给队友” — 持久化队友 + 异步邮箱
s10“队友之间要有统一的沟通规矩” — 一个 request-response 模式驱动所有协商
s11“队友自己看看板, 有活就认领” — 不需要领导逐个分配, 自组织
s12“各干各的目录, 互不干扰” — 任务管目标, worktree 管目录, 按 ID 绑定
s07:任务系统 – Task Graph + Dependencies
❝
A file-based task graph with ordering, parallelism, and dependencies — the coordination backbone for multi-agent work
基于文件的任务图,具备排序、并行和依赖管理——多智能体工作的协调核心。
问题
s03 的 TodoManager 只是内存中的扁平清单: 没有顺序、没有依赖,状态只有是否完成。
真实目标是有结构的 — 任务 B 依赖任务 A,任务 C 和 D 可以并行,任务 E 要等 C 和 D 都完成。
没有显式的关系,智能体分不清什么能做、什么被卡住、什么能同时跑。而且清单只活在内存里,上下文压缩 (s06) 以后就没了。
解决方案
把扁平清单升级为持久化到磁盘的任务图。每个任务是一个 JSON 文件,有状态、前置依赖 (blockedBy) 和后置依赖 (blocks)。
任务图随时回答三个问题:
- 什么可以做?状态为 pending 且 blockedBy 为空的任务。
- 什么被卡住?等待前置任务完成的任务。
- 什么做完了?状态为 completed 的任务, 完成时自动解锁后续任务。
.tasks/
task_1.json {"id":1, "status":"completed"}
task_2.json {"id":2, "blockedBy":[1], "status":"pending"}
task_3.json {"id":3, "blockedBy":[1], "status":"pending"}
task_4.json {"id":4, "blockedBy":[2,3], "status":"pending"}
任务图 (DAG):
+----------+
+--> | task 2 | --+
| | pending | |
+----------+ +----------+ +--> +----------+
| task 1 | | task 4 |
| completed| --> +----------+ +--> | blocked |
+----------+ | task 3 | --+ +----------+
| pending |
+----------+
顺序: task 1 必须先完成, 才能开始 2 和 3
并行: task 2 和 3 可以同时执行
依赖: task 4 要等 2 和 3 都完成
状态: pending -> in_progress -> completed
这个任务图是 s07 之后所有机制的协调骨架:后台执行 (s08)、多 agent 团队 (s09+)、worktree 隔离 (s12) 都读写这同一个结构。
工作原理
- 修改
TaskManager,每个任务一个 JSON 文件,定义CRUD + 依赖图:
class TaskManager:
def __init__(self, tasks_dir: Path):
self.dir = tasks_dir
self.dir.mkdir(exist_ok=True)
self._next_id = self._max_id() + 1
def create(self, subject, description=""):
task = {"id": self._next_id, "subject": subject,
"status": "pending", "blockedBy": [],
"blocks": [], "owner": ""}
self._save(task)
self._next_id += 1
return json.dumps(task, indent=2)
- 定义依赖解除功能,完成任务时,自动将其 ID 从其他任务的 blockedBy 中移除,解锁后续任务:
def _clear_dependency(self, completed_id):
for f in self.dir.glob("task_*.json"):
task = json.loads(f.read_text())
if completed_id in task.get("blockedBy", []):
task["blockedBy"].remove(completed_id)
self._save(task)
- 状态变更 + 依赖关联,使用update 处理状态转换和依赖边:
def update(self, task_id, status=None,
add_blocked_by=None, add_blocks=None):
task = self._load(task_id)
if status:
task["status"] = status
if status == "completed":
self._clear_dependency(task_id)
self._save(task)
- 将四个任务工具加入
dispatch map:
TOOL_HANDLERS = {
# ...base tools...
"task_create": lambda **kw: TASKS.create(kw["subject"]),
"task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status")),
"task_list": lambda **kw: TASKS.list_all(),
"task_get": lambda **kw: TASKS.get(kw["task_id"]),
}
从 s07 起, 任务图是多步工作的默认选择。s03 的 Todo 仍可用于单次会话内的快速清单。
完整代码
#!/usr/bin/env python3
"""
s07_task_system.py - Tasks
Tasks persist as JSON files in .tasks/ so they survive context compression.
Each task has a dependency graph (blockedBy/blocks).
.tasks/
task_1.json {"id":1, "subject":"...", "status":"completed", ...}
task_2.json {"id":2, "blockedBy":[1], "status":"pending", ...}
task_3.json {"id":3, "blockedBy":[2], "blocks":[], ...}
Dependency resolution:
+----------+ +----------+ +----------+
| task 1 | --> | task 2 | --> | task 3 |
| complete | | blocked | | blocked |
+----------+ +----------+ +----------+
|
+--- completing task 1 removes it from task 2's blockedBy
Key insight: "State that survives compression -- because it's outside the conversation."
"""
import json
import os
import subprocess
from pathlib import Path
from anthropic import Anthropic
from dotenv import load_dotenv
load_dotenv(override=True)
if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
WORKDIR = Path.cwd()
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
TASKS_DIR = WORKDIR / ".tasks"
SYSTEM = f"You are a coding agent at {WORKDIR}. Use task tools to plan and track work."
# -- TaskManager: CRUD with dependency graph, persisted as JSON files --
class TaskManager:
def __init__(self, tasks_dir: Path):
self.dir = tasks_dir
self.dir.mkdir(exist_ok=True)
self._next_id = self._max_id() + 1
def _max_id(self) -> int:
ids = [int(f.stem.split("_")[1]) for f in self.dir.glob("task_*.json")]
return max(ids) if ids else 0
def _load(self, task_id: int) -> dict:
path = self.dir / f"task_{task_id}.json"
if not path.exists():
raise ValueError(f"Task {task_id} not found")
return json.loads(path.read_text())
def _save(self, task: dict):
path = self.dir / f"task_{task['id']}.json"
path.write_text(json.dumps(task, indent=2))
def create(self, subject: str, description: str = "") -> str:
task = {
"id": self._next_id,
"subject": subject,
"description": description,
"status": "pending",
"blockedBy": [],
"blocks": [],
"owner": "",
}
self._save(task)
self._next_id += 1
return json.dumps(task, indent=2)
def get(self, task_id: int) -> str:
return json.dumps(self._load(task_id), indent=2)
def update(
self,
task_id: int,
status: str = None,
add_blocked_by: list = None,
add_blocks: list = None,
) -> str:
task = self._load(task_id)
if status:
if status not in ("pending", "in_progress", "completed"):
raise ValueError(f"Invalid status: {status}")
task["status"] = status
if status == "completed":
self._clear_dependency(task_id)
if add_blocked_by:
task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by))
if add_blocks:
task["blocks"] = list(set(task["blocks"] + add_blocks))
for blocked_id in add_blocks:
try:
blocked = self._load(blocked_id)
if task_id not in blocked["blockedBy"]:
blocked["blockedBy"].append(task_id)
self._save(blocked)
except ValueError:
pass
self._save(task)
return json.dumps(task, indent=2)
def _clear_dependency(self, completed_id: int):
"""Remove completed_id from all other tasks' blockedBy lists."""
for f in self.dir.glob("task_*.json"):
task = json.loads(f.read_text())
if completed_id in task.get("blockedBy", []):
task["blockedBy"].remove(completed_id)
self._save(task)
def list_all(self) -> str:
tasks = []
for f in sorted(self.dir.glob("task_*.json")):
tasks.append(json.loads(f.read_text()))
if not tasks:
return "No tasks."
lines = []
for t in tasks:
marker = {
"pending": "[ ]",
"in_progress": "[>]",
"completed": "[x]",
}.get(t["status"], "[?]")
blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else ""
lines.append(f"{marker} #{t['id']}: {t['subject']}{blocked}")
return "\n".join(lines)
TASKS = TaskManager(TASKS_DIR)
# -- Base tool implementations --
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
def run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(
command,
shell=True,
cwd=WORKDIR,
capture_output=True,
text=True,
timeout=120,
)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"
def run_read(path: str, limit: int = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"
def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes"
except Exception as e:
return f"Error: {e}"
def run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = safe_path(path)
c = fp.read_text()
if old_text not in c:
return f"Error: Text not found in {path}"
fp.write_text(c.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"
TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")),
"task_update": lambda **kw: TASKS.update(
kw["task_id"],
kw.get("status"),
kw.get("addBlockedBy"),
kw.get("addBlocks"),
),
"task_list": lambda **kw: TASKS.list_all(),
"task_get": lambda **kw: TASKS.get(kw["task_id"]),
}
TOOLS = [
{
"name": "bash",
"description": "Run a shell command.",
"input_schema": {
"type": "object",
"properties": {"command": {"type": "string"}},
"required": ["command"],
},
},
{
"name": "read_file",
"description": "Read file contents.",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"limit": {"type": "integer"},
},
"required": ["path"],
},
},
{
"name": "write_file",
"description": "Write content to file.",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"},
},
"required": ["path", "content"],
},
},
{
"name": "edit_file",
"description": "Replace exact text in file.",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"old_text": {"type": "string"},
"new_text": {"type": "string"},
},
"required": ["path", "old_text", "new_text"],
},
},
{
"name": "task_create",
"description": "Create a new task.",
"input_schema": {
"type": "object",
"properties": {
"subject": {"type": "string"},
"description": {"type": "string"},
},
"required": ["subject"],
},
},
{
"name": "task_update",
"description": "Update a task's status or dependencies.",
"input_schema": {
"type": "object",
"properties": {
"task_id": {"type": "integer"},
"status": {
"type": "string",
"enum": ["pending", "in_progress", "completed"],
},
"addBlockedBy": {
"type": "array",
"items": {"type": "integer"},
},
"addBlocks": {
"type": "array",
"items": {"type": "integer"},
},
},
"required": ["task_id"],
},
},
{
"name": "task_list",
"description": "List all tasks with status summary.",
"input_schema": {"type": "object", "properties": {}},
},
{
"name": "task_get",
"description": "Get full details of a task by ID.",
"input_schema": {
"type": "object",
"properties": {"task_id": {"type": "integer"}},
"required": ["task_id"],
},
},
]
def agent_loop(messages: list):
while True:
response = client.messages.create(
model=MODEL,
system=SYSTEM,
messages=messages,
tools=TOOLS,
max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
try:
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
except Exception as e:
output = f"Error: {e}"
print(f"> {block.name}: {str(output)[:200]}")
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output),
})
messages.append({"role": "user", "content": results})
if __name__ == "__main__":
history = []
while True:
try:
query = input("\033[36ms07 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_loop(history)
response_content = history[-1]["content"]
if isinstance(response_content, list):
for block in response_content:
if hasattr(block, "text"):
print(block.text)
print()
效果演示
cd learn-claude-code
python3 agents/s07_task_system.py
尝试提示词:
Create 3 tasks: "Setup project", "Write code", "Write tests". Make them depend on each other in order.
List all tasks and show the dependency graph
Complete task 1 and then list tasks to see task 2 unblocked
Create a task board for refactoring: parse -> transform -> emit -> test, where transform and emit can run in parallel after parse
深入探索
❝
Q:Task为主线,Todo是否还有应用场景?
A:TaskManager 延续了 Todo 的模型,并在本课程 s07 之后成为默认主线。两者都管理带状态的任务项,但 TaskManager 增加了文件持久化(崩溃后可恢复)、依赖追踪(blocks/blockedBy)、owner 字段与多进程协作能力。Todo 仍适合短、线性、一次性的轻量跟踪。
总结
本节通过引入基于DAG图的任务系统,可以处理更复杂的任务关系(依赖、并行等)。
并且,使用CRUD磁盘存储,有效避免了因上下文压缩导致Todo任务清单被清理的问题。
免责声明:
本文所载程序、技术方法仅面向合法合规的安全研究与教学场景,旨在提升网络安全防护能力,具有明确的技术研究属性。
任何单位或个人未经授权,将本文内容用于攻击、破坏等非法用途的,由此引发的全部法律责任、民事赔偿及连带责任,均由行为人独立承担,本站不承担任何连带责任。
本站内容均为技术交流与知识分享目的发布,若存在版权侵权或其他异议,请通过邮件联系处理,具体联系方式可点击页面上方的联系我。
本文转载自:Real返璞归真 Real返璞归真 Real返璞归真《Agent开发|从0实现Agent(四):构建基于DAG图的任务系统(复杂任务协同篇)》
版权声明
本站仅做备份收录,仅供研究与教学参考之用。
读者将信息用于其他用途的,全部法律及连带责任由读者自行承担,本站不承担任何责任。










评论