长时运行 Agent 的有效控制框架

长时运行 Agent 的有效控制框架

来源: Anthropic Engineering Blog
作者: Anthropic Engineering Team
发布日期: 2025 年 11 月 21 日
类型: 技术架构
阅读时间: 约 13 分钟

概述

本文探讨了长时运行 AI Agent 的有效控制框架设计。随着 AI Agent 在生产环境中的广泛应用,如何管理和控制长时间运行的 Agent 成为关键挑战。我们分享了状态管理、错误恢复、任务调度、资源监控等方面的实践经验和架构设计。


长时运行 Agent 的挑战

什么是长时运行 Agent

定义:执行时间超过数分钟至数小时的 AI Agent 任务

典型场景

  • 大型代码库重构(1-4 小时)
  • 全面数据分析报告(30 分钟 -2 小时)
  • 复杂研究任务(2-8 小时)
  • 自动化工作流(持续运行)

主要挑战

挑战 描述 影响
状态管理 保持长时间的状态一致性 状态丢失导致任务失败
错误恢复 从中间状态恢复执行 需要重新开始,浪费资源
资源管理 长时间占用计算资源 资源耗尽风险
进度追踪 监控长任务进度 用户无法了解进展
超时处理 处理执行超时 任务可能被中断

控制框架设计

整体架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
┌─────────────────────────────────────────────────────────────┐
│ Agent Controller │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 状态管理器 │ │ 任务调度器 │ │ 错误处理器 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 资源监控器 │ │ 进度追踪器 │ │ 检查点管理 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ Agent Worker │
│ - 执行具体任务 │
│ - 报告状态和进度 │
│ - 处理检查点保存 │
└─────────────────────────────────────────────────────────────┘

状态管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class AgentStateManager:
"""Agent 状态管理器"""

def __init__(self, storage_backend):
self.storage = storage_backend
self.state_cache = {}

async def save_state(self, agent_id: str, state: AgentState):
"""保存 Agent 状态"""
state_data = {
'agent_id': agent_id,
'current_task': state.current_task,
'completed_tasks': state.completed_tasks,
'context': state.context,
'metadata': {
'timestamp': datetime.utcnow().isoformat(),
'version': state.version
}
}
await self.storage.set(f"agent:{agent_id}:state", state_data)

async def load_state(self, agent_id: str) -> AgentState:
"""加载 Agent 状态"""
state_data = await self.storage.get(f"agent:{agent_id}:state")
if state_data is None:
raise StateNotFoundError(f"Agent {agent_id} state not found")

return AgentState(
current_task=state_data['current_task'],
completed_tasks=state_data['completed_tasks'],
context=state_data['context']
)

async def delete_state(self, agent_id: str):
"""清理 Agent 状态"""
await self.storage.delete(f"agent:{agent_id}:state")

检查点机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class CheckpointManager:
"""检查点管理器"""

def __init__(self, storage_backend, checkpoint_interval=300):
self.storage = storage_backend
self.checkpoint_interval = checkpoint_interval # 秒
self.last_checkpoint = {}

async def save_checkpoint(self, agent_id: str, execution_state: dict):
"""保存检查点"""
checkpoint = {
'agent_id': agent_id,
'timestamp': datetime.utcnow().isoformat(),
'execution_state': execution_state,
'sequence_number': self._get_next_sequence(agent_id)
}

# 保存到持久化存储
await self.storage.set(
f"agent:{agent_id}:checkpoint:{checkpoint['sequence_number']}",
checkpoint
)

# 保留最近 N 个检查点
await self._prune_old_checkpoints(agent_id, keep=5)

self.last_checkpoint[agent_id] = time.time()

async def restore_from_checkpoint(self, agent_id: str) -> dict:
"""从检查点恢复"""
# 获取最新的检查点
checkpoint_keys = await self.storage.keys(f"agent:{agent_id}:checkpoint:*")
if not checkpoint_keys:
raise CheckpointNotFoundError(f"No checkpoints found for {agent_id}")

latest_key = max(checkpoint_keys)
checkpoint = await self.storage.get(latest_key)

return checkpoint['execution_state']

async def _prune_old_checkpoints(self, agent_id: str, keep: int):
"""删除旧检查点,只保留最近的 keep 个"""
checkpoint_keys = sorted(await self.storage.keys(
f"agent:{agent_id}:checkpoint:*"
))

# 删除多余的检查点
for key in checkpoint_keys[:-keep]:
await self.storage.delete(key)

错误恢复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class ErrorRecoveryStrategy:
"""错误恢复策略"""

def __init__(self, max_retries=3, backoff_factor=2):
self.max_retries = max_retries
self.backoff_factor = backoff_factor

async def execute_with_recovery(self, task: Task, context: ExecutionContext):
"""带恢复机制的执行"""
retries = 0
last_error = None

while retries <= self.max_retries:
try:
return await task.execute(context)

except TransientError as e:
# 临时错误,可以重试
last_error = e
wait_time = self.backoff_factor ** retries
logger.warning(f"临时错误,{wait_time}秒后重试:{e}")
await asyncio.sleep(wait_time)
retries += 1

except CheckpointError as e:
# 检查点错误,尝试从检查点恢复
logger.error(f"检查点错误:{e}")
context = await self._restore_from_last_checkpoint(context.agent_id)
retries += 1

except FatalError as e:
# 致命错误,无法恢复
logger.error(f"致命错误,无法恢复:{e}")
raise

# 所有重试失败
raise MaxRetriesExceededError(
f"任务失败,已重试{self.max_retries}次",
last_error
)

任务调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class TaskScheduler:
"""任务调度器"""

def __init__(self):
self.task_queues = defaultdict(asyncio.Queue)
self.running_tasks = {}
self.max_concurrent = 10

async def schedule(self, task: Task, priority: int = 0):
"""调度任务"""
# 根据优先级放入队列
await self.task_queues[priority].put(task)

async def run_scheduler(self):
"""运行调度器"""
semaphore = asyncio.Semaphore(self.max_concurrent)

while True:
# 检查所有优先级队列
for priority in sorted(self.task_queues.keys()):
queue = self.task_queues[priority]

if queue.empty():
continue

async with semaphore:
task = await queue.get()
asyncio.create_task(self._execute_task(task))

await asyncio.sleep(0.1)

async def _execute_task(self, task: Task):
"""执行任务"""
self.running_tasks[task.id] = task

try:
await task.run()
finally:
del self.running_tasks[task.id]

资源监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class ResourceMonitor:
"""资源监控器"""

def __init__(self, thresholds: ResourceThresholds):
self.thresholds = thresholds
self.metrics = {}

async def check_resources(self, agent_id: str) -> ResourceStatus:
"""检查资源使用情况"""
cpu_percent = await self._get_cpu_usage()
memory_mb = await self._get_memory_usage()
disk_mb = await self._get_disk_usage()

status = ResourceStatus(
cpu_percent=cpu_percent,
memory_mb=memory_mb,
disk_mb=disk_mb,
is_healthy=True,
warnings=[]
)

# 检查阈值
if cpu_percent > self.thresholds.cpu_warning:
status.warnings.append(f"CPU 使用率高:{cpu_percent}%")
status.is_healthy = False

if memory_mb > self.thresholds.memory_warning:
status.warnings.append(f"内存使用率高:{memory_mb}MB")
status.is_healthy = False

self.metrics[agent_id] = status
return status

async def enforce_limits(self, agent_id: str):
"""执行资源限制"""
status = await self.check_resources(agent_id)

if not status.is_healthy:
logger.warning(f"Agent {agent_id} 资源使用异常:{status.warnings}")

# 可以采取限流措施
if status.cpu_percent > self.thresholds.cpu_critical:
await self._throttle_cpu(agent_id)

进度追踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class ProgressTracker:
"""进度追踪器"""

def __init__(self):
self.progress_state = {}

async def update_progress(
self,
agent_id: str,
current_step: int,
total_steps: int,
description: str = ""
):
"""更新进度"""
progress = ProgressState(
current_step=current_step,
total_steps=total_steps,
percent_complete=current_step / total_steps * 100,
description=description,
updated_at=datetime.utcnow()
)

self.progress_state[agent_id] = progress

# 通知订阅者
await self._notify_subscribers(agent_id, progress)

async def get_progress(self, agent_id: str) -> ProgressState:
"""获取进度"""
if agent_id not in self.progress_state:
raise ProgressNotFoundError(f"Agent {agent_id} progress not found")

return self.progress_state[agent_id]

async def _notify_subscribers(self, agent_id: str, progress: ProgressState):
"""通知订阅者进度更新"""
# 可以通过 WebSocket、SSE 等方式推送给客户端
pass

实际应用

场景 1:大型代码库重构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class CodeRefactorAgent:
"""代码重构 Agent"""

def __init__(self, controller: AgentController):
self.controller = controller
self.checkpoint_manager = CheckpointManager()

async def refactor_codebase(self, repo_path: str, changes: list):
"""重构代码库"""
agent_id = f"refactor-{uuid.uuid4()}"

# 初始化状态
await self.controller.state_manager.save_state(
agent_id,
AgentState(current_task="initializing", completed_tasks=[])
)

for i, change in enumerate(changes):
# 保存检查点
await self.checkpoint_manager.save_checkpoint(agent_id, {
'completed_changes': i,
'total_changes': len(changes),
'current_file': change.file
})

# 执行变更
try:
await self._apply_change(repo_path, change)
except Exception as e:
# 从检查点恢复
state = await self.checkpoint_manager.restore_from_checkpoint(agent_id)
await self._retry_change(change, state)

# 更新进度
await self.controller.progress_tracker.update_progress(
agent_id, i + 1, len(changes), f"应用变更:{change.file}"
)

场景 2:数据分析报告

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class AnalysisAgent:
"""数据分析 Agent"""

async def generate_report(self, data_sources: list) -> Report:
"""生成分析报告"""
agent_id = f"analysis-{uuid.uuid4()}"

# 阶段 1: 数据收集
await self._collect_data(data_sources)
await self._save_checkpoint('data_collected')

# 阶段 2: 数据清洗
await self._clean_data()
await self._save_checkpoint('data_cleaned')

# 阶段 3: 数据分析
await self._analyze_data()
await self._save_checkpoint('analysis_complete')

# 阶段 4: 报告生成
report = await self._generate_report()
await self._save_checkpoint('report_generated')

return report

关键要点总结

  1. 状态管理:持久化存储 Agent 状态,支持恢复
  2. 检查点机制:定期保存执行状态,支持从失败中恢复
  3. 错误恢复:分层错误处理,临时错误重试,致命错误报告
  4. 任务调度:优先级队列,并发控制
  5. 资源监控:实时监控,阈值告警,自动限流

个人评价

长时运行 Agent 的控制框架是生产级 AI 系统的关键:

优点

  1. 可靠性:支持从失败中恢复
  2. 可观测性:实时追踪进度和状态
  3. 资源效率:合理分配和监控资源

总体评价

这是构建生产级 AI Agent 系统的必备能力。通过良好的控制框架,可以显著提高长时运行任务的可靠性和可管理性。


本文内容翻译自 Anthropic Engineering Blog 官方博客。

© 2026 Generative AI Discovery All Rights Reserved.
Theme by hiero