多智能体协作
约 1912 字大约 6 分钟
Agent虾学智能体入门
2026-03-08
多智能体协作(Multi-Agent)
本系列第八篇,深入讲解多智能体系统的设计与协作模式。
为什么需要多智能体?
单个智能体能力有限,而复杂任务往往需要多种专业能力。多智能体系统通过协作实现:
- 专业化分工 - 每个 Agent 专注自己擅长的领域
- 并行处理 - 多个 Agent 同时工作,提高效率
- 相互校验 - Agent 之间互相检查,减少错误
- 角色扮演 - 模拟真实团队协作
单 Agent vs 多 Agent
| 维度 | 单 Agent | 多 Agent |
|---|---|---|
| 专业性 | 通用 | 专业化 |
| 效率 | 串行 | 并行 |
| 可靠性 | 单点故障 | 冗余容错 |
| 复杂度 | 低 | 高 |
| 适用场景 | 简单任务 | 复杂项目 |
1. 基本协作模式
顺序协作(Sequential)
Agent 按顺序执行,前一个的输出是后一个的输入。
from typing import List, Callable
from dataclasses import dataclass
@dataclass
class Agent:
"""智能体定义"""
name: str
role: str
execute: Callable
class SequentialPipeline:
"""顺序协作管道"""
def __init__(self, agents: List[Agent]):
self.agents = agents
def run(self, input_text: str) -> str:
"""顺序执行"""
current_input = input_text
for agent in self.agents:
print(f"\n[Agent: {agent.name}] 执行中...")
current_input = agent.execute(current_input)
print(f"[Agent: {agent.name}] 输出: {current_input[:100]}...")
return current_input
# 使用示例
# researcher = Agent("研究员", "收集信息", lambda x: f"研究: {x}")
# writer = Agent("写手", "撰写文章", lambda x: f"文章: {x}")
# editor = Agent("编辑", "审核修改", lambda x: f"最终: {x}")
#
# pipeline = SequentialPipeline([researcher, writer, editor])
# result = pipeline.run("AI Agent 技术趋势")并行协作(Parallel)
多个 Agent 同时执行相同或不同任务。
import asyncio
from typing import List
class ParallelOrchestrator:
"""并行协作编排器"""
def __init__(self, agents: List[Agent]):
self.agents = agents
async def run(self, input_text: str) -> List[str]:
"""并行执行"""
tasks = [
self._run_agent(agent, input_text)
for agent in self.agents
]
results = await asyncio.gather(*tasks)
return list(results)
async def _run_agent(self, agent: Agent, input_text: str) -> str:
"""执行单个 Agent"""
print(f"[Agent: {agent.name}] 开始执行...")
result = agent.execute(input_text)
print(f"[Agent: {agent.name}] 完成")
return result
# 使用示例
# async def main():
# agents = [
# Agent("分析师A", "技术分析", lambda x: f"技术: {x}"),
# Agent("分析师B", "市场分析", lambda x: f"市场: {x}"),
# Agent("分析师C", "竞品分析", lambda x: f"竞品: {x}")
# ]
#
# orchestrator = ParallelOrchestrator(agents)
# results = await orchestrator.run("AI Agent 市场")
# print(results)
#
# asyncio.run(main())分层协作(Hierarchical)
管理者 Agent 协调多个工作者 Agent。
class HierarchicalSystem:
"""分层协作系统"""
def __init__(self, manager: Agent, workers: List[Agent]):
self.manager = manager
self.workers = workers
def run(self, task: str) -> str:
"""分层执行"""
# 1. 管理者分解任务
print(f"[管理者] 分析任务...")
subtasks = self._decompose_task(task)
# 2. 分配给工作者
results = []
for i, subtask in enumerate(subtasks):
worker = self.workers[i % len(self.workers)]
print(f"[管理者] 分配给 {worker.name}: {subtask}")
result = worker.execute(subtask)
results.append(result)
# 3. 管理者整合结果
print(f"[管理者] 整合结果...")
final_result = self._aggregate_results(results)
return final_result
def _decompose_task(self, task: str) -> List[str]:
"""分解任务"""
# 简化:按句号分割
# 实际应用中用 LLM 分解
return [f"子任务{i+1}: {task}" for i in range(3)]
def _aggregate_results(self, results: List[str]) -> str:
"""整合结果"""
return "\n".join([f"- {r}" for r in results])
# 使用示例
# manager = Agent("项目经理", "协调", lambda x: x)
# workers = [
# Agent("开发", "编码", lambda x: f"代码: {x}"),
# Agent("测试", "测试", lambda x: f"测试: {x}"),
# Agent("文档", "文档", lambda x: f"文档: {x}")
# ]
#
# system = HierarchicalSystem(manager, workers)
# result = system.run("开发一个新功能")2. 主流多智能体框架
CrewAI
from crewai import Agent, Task, Crew
# 定义 Agent
researcher = Agent(
role="研究员",
goal="收集和分析信息",
backstory="你是一位经验丰富的研究员",
allow_delegation=False
)
writer = Agent(
role="技术写作",
goal="撰写清晰的技术文章",
backstory="你是一位专业的技术作家",
allow_delegation=True
)
# 定义任务
research_task = Task(
description="研究 AI Agent 的最新进展",
agent=researcher
)
write_task = Task(
description="根据研究结果撰写博客文章",
agent=writer
)
# 组建团队
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task]
)
# 执行
# result = crew.kickoff()AutoGen
from autogen import AssistantAgent, UserProxyAgent
# 创建助手
assistant = AssistantAgent(
name="assistant",
llm_config={"model": "gpt-4"}
)
# 创建用户代理
user_proxy = UserProxyAgent(
name="user_proxy",
human_input_mode="NEVER",
code_execution_config={"work_dir": "coding"}
)
# 开始对话
# user_proxy.initiate_chat(
# assistant,
# message="帮我写一个 Python 爬虫"
# )LangGraph
from langgraph.graph import StateGraph, END
from typing import TypedDict
class State(TypedDict):
input: str
research: str
draft: str
review: str
final: str
def research_node(state: State) -> State:
"""研究节点"""
state["research"] = f"研究: {state['input']}"
return state
def draft_node(state: State) -> State:
"""起草节点"""
state["draft"] = f"草稿: {state['research']}"
return state
def review_node(state: State) -> State:
"""审核节点"""
state["review"] = f"审核: {state['draft']}"
return state
def finalize_node(state: State) -> State:
"""完成节点"""
state["final"] = f"最终: {state['review']}"
return state
# 构建工作流
workflow = StateGraph(State)
workflow.add_node("research", research_node)
workflow.add_node("draft", draft_node)
workflow.add_node("review", review_node)
workflow.add_node("finalize", finalize_node)
workflow.set_entry_point("research")
workflow.add_edge("research", "draft")
workflow.add_edge("draft", "review")
workflow.add_edge("review", "finalize")
workflow.add_edge("finalize", END)
# 编译并运行
# app = workflow.compile()
# result = app.invoke({"input": "AI Agent 教程"})3. 协作模式详解
辩论模式(Debate)
多个 Agent 从不同角度讨论问题。
class DebateSystem:
"""辩论系统"""
def __init__(self, agents: List[Agent], llm, rounds: int = 3):
self.agents = agents
self.llm = llm
self.rounds = rounds
def debate(self, topic: str) -> str:
"""进行辩论"""
history = []
for round_num in range(self.rounds):
print(f"\n=== 第 {round_num + 1} 轮 ===")
for agent in self.agents:
# 构建上下文
context = f"主题: {topic}\n历史: {history[-3:] if history else '无'}"
# 生成观点
opinion = agent.execute(context)
history.append(f"{agent.name}: {opinion}")
print(f"[{agent.name}] {opinion[:100]}...")
# 总结
return self._summarize(topic, history)
def _summarize(self, topic: str, history: List[str]) -> str:
"""总结辩论"""
return f"关于 '{topic}' 的辩论总结:\n" + "\n".join(history[-3:])
# 使用示例
# agents = [
# Agent("乐观派", "积极视角", lambda x: f"积极观点: {x}"),
# Agent("悲观派", "风险视角", lambda x: f"风险观点: {x}"),
# Agent("中立派", "客观视角", lambda x: f"客观观点: {x}")
# ]
#
# debate = DebateSystem(agents, llm, rounds=2)
# result = debate.debate("AI 会取代人类工作吗?")投票模式(Voting)
多个 Agent 投票决策。
from collections import Counter
class VotingSystem:
"""投票系统"""
def __init__(self, agents: List[Agent]):
self.agents = agents
def vote(self, question: str, options: List[str]) -> str:
"""投票决策"""
votes = []
for agent in self.agents:
# 每个 Agent 选择一个选项
choice = agent.execute(f"{question}\n选项: {options}")
votes.append(choice)
print(f"[{agent.name}] 投票: {choice}")
# 统计票数
vote_count = Counter(votes)
winner = vote_count.most_common(1)[0]
print(f"\n投票结果: {winner[0]} ({winner[1]} 票)")
return winner[0]
# 使用示例
# agents = [Agent(f"评委{i}", "评审", lambda x: "选项A" if i % 2 == 0 else "选项B") for i in range(5)]
# voting = VotingSystem(agents)
# result = voting.vote("最佳方案?", ["选项A", "选项B", "选项C"])教师学生模式(Teacher-Student)
一个 Agent 指导另一个学习。
class TeacherStudentSystem:
"""教师学生系统"""
def __init__(self, teacher: Agent, student: Agent, llm):
self.teacher = teacher
self.student = student
self.llm = llm
def teach(self, topic: str, iterations: int = 3) -> str:
"""教学过程"""
learned = []
for i in range(iterations):
print(f"\n=== 教学轮次 {i + 1} ===")
# 教师讲解
lesson = self.teacher.execute(f"讲解: {topic}")
print(f"[教师] {lesson[:100]}...")
# 学生尝试
attempt = self.student.execute(f"根据以下内容回答: {topic}\n{lesson}")
print(f"[学生] {attempt[:100]}...")
# 教师反馈
feedback = self.teacher.execute(f"评价学生回答: {attempt}")
print(f"[教师反馈] {feedback[:100]}...")
learned.append({
"lesson": lesson,
"attempt": attempt,
"feedback": feedback
})
return f"学习完成: {topic}"4. 通信机制
消息传递
from dataclasses import dataclass
from typing import Optional
@dataclass
class Message:
"""消息结构"""
sender: str
receiver: str
content: str
message_type: str # request, response, broadcast
metadata: dict = None
class MessageBus:
"""消息总线"""
def __init__(self):
self.queues = {} # agent_name -> message_queue
self.subscribers = {} # topic -> [agent_names]
def register(self, agent_name: str):
"""注册 Agent"""
self.queues[agent_name] = []
def send(self, message: Message):
"""发送消息"""
if message.receiver == "broadcast":
# 广播给所有
for queue in self.queues.values():
queue.append(message)
elif message.receiver in self.queues:
self.queues[message.receiver].append(message)
def receive(self, agent_name: str) -> Optional[Message]:
"""接收消息"""
if self.queues[agent_name]:
return self.queues[agent_name].pop(0)
return None
def subscribe(self, topic: str, agent_name: str):
"""订阅主题"""
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(agent_name)
# 使用示例
# bus = MessageBus()
# bus.register("agent1")
# bus.register("agent2")
#
# bus.send(Message("agent1", "agent2", "你好", "request"))
# msg = bus.receive("agent2")
# print(msg.content) # 输出: 你好小结
- 多智能体协作通过分工合作处理复杂任务
- 三种基本模式:顺序、并行、分层
- 主流框架:CrewAI、AutoGen、LangGraph
- 高级模式:辩论、投票、教师学生
- 通信机制是协作的基础
下一篇
评估与安全 - 学习智能体的评估方法和安全可控性