MsgHub(消息中心)¶
MsgHub 是 AgentScope 中用于多智能体对话的消息广播中心。它管理一组智能体之间的消息分发,无需手动编写消息传递代码。
概述¶
在构建多智能体应用时,您经常需要让智能体相互通信。如果不使用 MsgHub,您需要手动在智能体之间传递消息:
// 不使用 MsgHub(冗长且容易出错)
Msg aliceReply = alice.call().block();
bob.observe(aliceReply).block();
charlie.observe(aliceReply).block();
Msg bobReply = bob.call().block();
alice.observe(bobReply).block();
charlie.observe(bobReply).block();
使用 MsgHub,这变得简单多了:
// 使用 MsgHub(简洁且自动化)
try (MsgHub hub = MsgHub.builder()
.participants(alice, bob, charlie)
.build()) {
hub.enter().block();
alice.call().block(); // Bob 和 Charlie 自动收到
bob.call().block(); // Alice 和 Charlie 自动收到
}
核心特性¶
自动广播:任何参与者的消息都会自动广播给所有其他参与者
动态参与者:可以在对话过程中添加或移除智能体
公告消息:进入 Hub 时发送初始消息
手动广播:需要时可以手动广播消息
生命周期管理:使用 try-with-resources 自动清理
基本用法¶
创建和使用 MsgHub¶
import io.agentscope.core.ReActAgent;
import io.agentscope.core.formatter.dashscope.DashScopeMultiAgentFormatter;
import io.agentscope.core.memory.InMemoryMemory;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.MsgRole;
import io.agentscope.core.message.TextBlock;
import io.agentscope.core.model.DashScopeChatModel;
import io.agentscope.core.pipeline.MsgHub;
// 创建模型,使用 MultiAgentFormatter
DashScopeChatModel model = DashScopeChatModel.builder()
.apiKey(System.getenv("DASHSCOPE_API_KEY"))
.modelName("qwen-plus")
.formatter(new DashScopeMultiAgentFormatter())
.build();
// 创建智能体
ReActAgent alice = ReActAgent.builder()
.name("Alice")
.sysPrompt("你是 Alice,一位友好的老师。回答请简洁。")
.model(model)
.memory(new InMemoryMemory())
.build();
ReActAgent bob = ReActAgent.builder()
.name("Bob")
.sysPrompt("你是 Bob,一位好奇的学生。回答请简洁。")
.model(model)
.memory(new InMemoryMemory())
.build();
ReActAgent charlie = ReActAgent.builder()
.name("Charlie")
.sysPrompt("你是 Charlie,一位深思熟虑的观察者。回答请简洁。")
.model(model)
.memory(new InMemoryMemory())
.build();
// 创建公告消息
Msg announcement = Msg.builder()
.name("system")
.role(MsgRole.SYSTEM)
.content(TextBlock.builder()
.text("欢迎来到讨论!请简短地介绍一下自己。")
.build())
.build();
// 使用 try-with-resources 管理 MsgHub
try (MsgHub hub = MsgHub.builder()
.name("Introduction")
.participants(alice, bob, charlie)
.announcement(announcement)
.enableAutoBroadcast(true) // 默认为 true
.build()) {
// 进入 Hub(向所有参与者广播公告)
hub.enter().block();
// 每个智能体自我介绍
// 他们的回复会自动广播给其他人
Msg aliceReply = alice.call().block();
System.out.println("Alice: " + aliceReply.getTextContent());
Msg bobReply = bob.call().block();
System.out.println("Bob: " + bobReply.getTextContent());
Msg charlieReply = charlie.call().block();
System.out.println("Charlie: " + charlieReply.getTextContent());
}
// Hub 自动关闭,订阅者被清理
重要提示:使用 MsgHub 时,请使用
DashScopeMultiAgentFormatter(或其他提供商的对应格式化器)而不是标准格式化器。这种格式化器可以正确处理来自不同名称的多个智能体的消息。
生命周期方法¶
MsgHub 遵循 enter/exit 生命周期:
MsgHub hub = MsgHub.builder()
.participants(alice, bob)
.build();
// Enter:设置订阅并广播公告
hub.enter().block();
// ... 对话进行 ...
// Exit:清理订阅
hub.exit().block();
使用 try-with-resources 时,close() 会被自动调用,它内部会调用 exit()。
动态参与者管理¶
您可以在对话过程中添加或移除参与者:
添加参与者¶
try (MsgHub hub = MsgHub.builder()
.participants(alice, bob)
.build()) {
hub.enter().block();
// Alice 和 Bob 对话
alice.call().block();
bob.call().block();
// 对话中途添加 Charlie
hub.add(charlie).block();
// 现在 Charlie 也会收到消息
alice.call().block(); // Charlie 会收到
charlie.call().block(); // Alice 和 Bob 会收到
}
移除参与者¶
try (MsgHub hub = MsgHub.builder()
.participants(alice, bob, charlie)
.build()) {
hub.enter().block();
// 三人对话
alice.call().block();
bob.call().block();
charlie.call().block();
// 将 Bob 从对话中移除
hub.delete(bob).block();
// Bob 不会收到这些消息
alice.call().block();
charlie.call().block();
}
注意:新添加的参与者不会收到之前的消息。他们只会收到加入后的消息。
手动广播¶
您可以禁用自动广播并手动控制消息分发:
禁用自动广播¶
try (MsgHub hub = MsgHub.builder()
.participants(alice, bob, charlie)
.enableAutoBroadcast(false) // 禁用自动广播
.build()) {
hub.enter().block();
// Alice 说话,但其他人不会自动收到
Msg aliceReply = alice.call().block();
// 手动广播给所有参与者
hub.broadcast(aliceReply).block();
// 现在 Bob 和 Charlie 已经收到 Alice 的消息
bob.call().block();
}
切换自动广播¶
您可以在对话过程中切换自动广播模式:
try (MsgHub hub = MsgHub.builder()
.participants(alice, bob)
.enableAutoBroadcast(true)
.build()) {
hub.enter().block();
// 自动广播开启
alice.call().block(); // Bob 自动收到
// 关闭自动广播
hub.setAutoBroadcast(false);
Msg bobReply = bob.call().block();
// Alice 不会自动收到 Bob 的回复
// 手动广播特定消息
hub.broadcast(bobReply).block();
// 重新开启自动广播
hub.setAutoBroadcast(true);
}
广播多条消息¶
List<Msg> messages = List.of(msg1, msg2, msg3);
hub.broadcast(messages).block();
响应式编程风格¶
MsgHub 支持使用 Project Reactor 进行完全响应式编程:
MsgHub hub = MsgHub.builder()
.participants(alice, bob, charlie)
.announcement(announcement)
.build();
// 完全响应式链
hub.enter()
.then(alice.call())
.doOnSuccess(msg -> System.out.println("Alice: " + msg.getTextContent()))
.then(bob.call())
.doOnSuccess(msg -> System.out.println("Bob: " + msg.getTextContent()))
.then(charlie.call())
.doOnSuccess(msg -> System.out.println("Charlie: " + msg.getTextContent()))
.then(hub.exit())
.block(); // 只在最后阻塞一次
API 参考¶
Builder 方法¶
方法 |
描述 |
默认值 |
|---|---|---|
|
设置 Hub 名称 |
UUID |
|
设置参与者(必需) |
- |
|
从列表设置参与者 |
- |
|
设置公告消息 |
无 |
|
从列表设置公告 |
无 |
|
启用/禁用自动广播 |
|
实例方法¶
方法 |
返回类型 |
描述 |
|---|---|---|
|
|
进入 Hub 上下文,设置订阅 |
|
|
退出 Hub 上下文,清理订阅 |
|
|
AutoCloseable 实现 |
|
|
添加新参与者 |
|
|
从列表添加参与者 |
|
|
移除参与者 |
|
|
从列表移除参与者 |
|
|
广播单条消息 |
|
|
广播多条消息 |
|
|
切换自动广播 |
|
|
获取 Hub 名称 |
|
|
获取当前参与者 |
|
|
检查自动广播状态 |