流式输出(Streaming)¶
Agent.stream() 返回一条 Flux<Event> 响应式事件流,让调用方可以实时看到每一步推理、工具结果和最终回复,而不必等 call() 返回整体 Msg。
基础用法¶
// ReActAgent / HarnessAgent 均可使用
Flux<Event> events = agent.stream(
List.of(Msg.builder().role(MsgRole.USER).textContent("分析这段日志").build()),
StreamOptions.defaults());
// 订阅并打印每个事件
events.subscribe(event ->
System.out.printf("[%s|last=%s] %s%n",
event.getType(), event.isLast(),
event.getMessage().getTextContent()));
// 阻塞收集(测试 / 批处理场景)
List<Event> all = events.collectList().block();
带 RuntimeContext(harness 场景):
RuntimeContext ctx = RuntimeContext.builder()
.sessionId("my-session").userId("alice").build();
Flux<Event> events = agent.stream(msgs, StreamOptions.defaults(), ctx);
StreamOptions.defaults() 开启全部事件类型、增量模式。
事件类型(EventType)¶
类型 |
触发时机 |
典型内容 |
|---|---|---|
|
每个推理轮次(可含多个 chunk) |
模型文本 / 思考链 / tool_use 调用 |
|
每次工具执行完毕后 |
|
|
RAG / 记忆检索注入后 |
注入模型的上下文文本 |
|
达到 |
迭代摘要文本 |
|
最终回复就绪 |
与 |
|
占位符,代表全部(不含 |
— |
只订阅指定类型¶
StreamOptions options = StreamOptions.builder()
.eventTypes(EventType.REASONING, EventType.TOOL_RESULT)
.build();
区分 chunk 与最终结果¶
同一条 REASONING 消息会先推若干增量 chunk,最后推带完整文本的最终事件。
用 Event.isLast() 区分:
events.filter(e -> e.getType() == EventType.REASONING)
.subscribe(e -> {
if (e.isLast()) {
System.out.println("推理完成: " + e.getMessage().getTextContent());
} else {
System.out.print(e.getMessage().getTextContent()); // 实时打印 delta
}
});
增量模式 vs. 全量模式¶
|
行为 |
|---|---|
|
每个 chunk 只含新增文本(delta),消费方自行拼接 |
|
每个 chunk 携带截至当前的全量文本,可直接渲染 |
StreamOptions options = StreamOptions.builder()
.incremental(false) // UI 直接渲染场景推荐
.build();
推理 / 摘要内容过滤¶
某些模型在 REASONING 事件中会同时包含思考过程 chunk 和最终推理结论,可分别控制:
StreamOptions options = StreamOptions.builder()
.eventTypes(EventType.REASONING)
.includeReasoningChunk(false) // 不要中间 delta,只要最终结论
.includeReasoningResult(true)
.build();
摘要类似:
StreamOptions options = StreamOptions.builder()
.eventTypes(EventType.SUMMARY)
.includeSummaryChunk(false)
.includeSummaryResult(true)
.build();
错误处理¶
stream() 遵循标准 Reactor 语义,使用 onErrorResume 捕获:
events.onErrorResume(e -> {
log.error("流出错", e);
return Flux.empty();
}).subscribe(...);
工具执行失败时,框架将异常转为 TOOL_RESULT 错误文本,不会向流发送 onError,父流不会被提前终止。
与 Spring WebFlux / SSE 集成¶
Flux<Event> 可直接桥接 Server-Sent Events:
@GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chat(@RequestParam String message) {
return agent.stream(
List.of(Msg.builder().role(MsgRole.USER).textContent(message).build()),
StreamOptions.defaults())
.map(event -> ServerSentEvent.<String>builder()
.event(event.getType().name().toLowerCase())
.data(event.getMessage().getTextContent())
.build());
}
如果使用 HarnessAgent 并需要向前端传递子 agent 来源信息,可将 event.getSource() 一并序列化(详见 Harness 子 Agent 流式)。
StreamOptions 完整参数¶
StreamOptions options = StreamOptions.builder()
// 订阅的事件类型(默认 ALL,不含 AGENT_RESULT)
.eventTypes(EventType.REASONING, EventType.TOOL_RESULT, EventType.AGENT_RESULT)
// 增量模式(默认 true)
.incremental(true)
// 推理内容过滤(默认均 true)
.includeReasoningChunk(true)
.includeReasoningResult(true)
// 摘要内容过滤(默认均 true)
.includeSummaryChunk(true)
.includeSummaryResult(true)
.build();
相关文档¶
Hook — 在推理 / 工具调用各阶段插入自定义逻辑
模型(Model) — 底层流式模型配置
Harness 子 Agent 流式 — HarnessAgent
agent_spawn/agent_send触发的子 agent 事件转发与EventSource字段