RocketMQ¶
agentscope-extensions-rocketmq 提供基于 Apache RocketMQ 的 A2A 传输层:调用方把 A2A 请求发到 RocketMQ topic,Agent 服务端从 topic 消费、处理后再通过 MQ 反向回应(含流式)。
何时使用¶
Agent 服务部署在内网,不希望直接暴露 HTTP。
调用流量需要削峰填谷、回放、审计。
多个 Agent 服务希望以”消费组”形式做负载均衡。
添加依赖¶
<dependency>
<groupId>io.agentscope</groupId>
<artifactId>agentscope-extensions-rocketmq</artifactId>
<version>${agentscope.version}</version>
</dependency>
服务端:把 Agent 暴露到 RocketMQ¶
import io.agentscope.core.a2a.server.AgentScopeA2aServer;
import io.agentscope.extensions.rocketmq.a2a.config.RocketMQA2aConfig;
import io.agentscope.extensions.rocketmq.a2a.wrapper.RocketMQTransportWrapperBuilder;
RocketMQA2aConfig config = new RocketMQA2aConfig();
config.setRocketMQEndpoint("localhost:8081");
config.setRocketMQNamespace("agentscope");
config.setBizTopic("a2a-request");
config.setBizConsumerGroup("a2a-server-cg");
config.setReplyTopic("a2a-reply");
config.setAccessKey("AK");
config.setSecretKey("SK");
// 1) 用 RocketMQ 包一层 A2A Transport
TransportWrapper rocketMqTransport = RocketMQTransportWrapperBuilder.create(config)
.agentBuilder(ReActAgent.builder().name("backend-agent").model(model))
.build();
// 2) 注册到 AgentScopeA2aServer
AgentScopeA2aServer server = AgentScopeA2aServer.builder()
.agentBuilder(ReActAgent.builder().name("backend-agent").model(model))
.transportWrapper("ROCKETMQ", rocketMqTransport)
.build();
启动后,业务 topic(bizTopic)会订阅 A2A JSON-RPC 请求;处理结果通过 replyTopic 反向投递。
客户端:通过 RocketMQ 调用远端 Agent¶
调用方使用 RocketMQ 客户端发送 RocketMQRequest:
// 调用方业务代码
Producer producer = ...; // RocketMQ Producer
producer.send(buildMessageForRequest(jsonRpcRequest));
// 在 replyTopic 上消费 RocketMQResponse
推荐通过
agentscope-spring-boot-starter-a2a-server+agentscope-extensions-rocketmq一并使用:starter 会把上面的 transport 自动注册到 server。
关键配置项¶
字段 |
说明 |
|---|---|
|
RocketMQ 接入地址 |
|
命名空间(多租户隔离) |
|
业务请求 topic |
|
服务端消费组 |
|
反向回应 topic |
|
鉴权凭证 |
流式响应¶
服务端返回流式时,每个增量都会被打包成 RocketMQResponse 投递到 replyTopic,调用方按 requestId 聚合即可得到 Flux<JSONRPCResponse>。