Streaming¶
Agent.stream() returns a reactive Flux<Event> that lets callers observe every reasoning step, tool-call result, and final reply in real time, rather than waiting for call() to return a single Msg.
Basic Usage¶
// Works with both ReActAgent and HarnessAgent
Flux<Event> events = agent.stream(
List.of(Msg.builder().role(MsgRole.USER).textContent("Analyze this log").build()),
StreamOptions.defaults());
// Subscribe and print each event
events.subscribe(event ->
System.out.printf("[%s|last=%s] %s%n",
event.getType(), event.isLast(),
event.getMessage().getTextContent()));
// Blocking collect (for testing / batch pipelines)
List<Event> all = events.collectList().block();
With RuntimeContext (HarnessAgent):
RuntimeContext ctx = RuntimeContext.builder()
.sessionId("my-session").userId("alice").build();
Flux<Event> events = agent.stream(msgs, StreamOptions.defaults(), ctx);
StreamOptions.defaults() enables all event types in incremental mode.
Event Types (EventType)¶
Type |
When |
Typical content |
|---|---|---|
|
Each reasoning turn (may have multiple chunks) |
Model text / thinking chain / tool_use calls |
|
After each tool execution |
|
|
After RAG / memory retrieval |
Context text injected into the model |
|
When |
Iteration summary text |
|
Final reply ready |
Same as |
|
Placeholder for all the above (except |
— |
Subscribe to specific types only¶
StreamOptions options = StreamOptions.builder()
.eventTypes(EventType.REASONING, EventType.TOOL_RESULT)
.build();
Distinguish chunks from final results¶
For the same REASONING message the model first streams several incremental chunks,
then emits a final event with the complete text. Use Event.isLast():
events.filter(e -> e.getType() == EventType.REASONING)
.subscribe(e -> {
if (e.isLast()) {
System.out.println("Reasoning done: " + e.getMessage().getTextContent());
} else {
System.out.print(e.getMessage().getTextContent()); // live delta
}
});
Incremental vs. Full-text Mode¶
|
Behaviour |
|---|---|
|
Each chunk contains only the new text delta; caller concatenates. |
|
Each chunk carries the full accumulated text up to that point — convenient for direct rendering. |
StreamOptions options = StreamOptions.builder()
.incremental(false) // recommended for UI rendering
.build();
Reasoning / Summary Content Filtering¶
Some models emit both intermediate delta chunks and a final consolidated result
inside the same REASONING event group. Control them independently:
StreamOptions options = StreamOptions.builder()
.eventTypes(EventType.REASONING)
.includeReasoningChunk(false) // skip intermediate deltas
.includeReasoningResult(true) // keep only the final reasoning result
.build();
Similarly for summaries:
StreamOptions options = StreamOptions.builder()
.eventTypes(EventType.SUMMARY)
.includeSummaryChunk(false)
.includeSummaryResult(true)
.build();
Error Handling¶
stream() follows standard Reactor error semantics; use onErrorResume to handle:
events.onErrorResume(e -> {
log.error("Stream error", e);
return Flux.empty();
}).subscribe(...);
When a tool execution fails, the framework converts the exception to a TOOL_RESULT error string. The onError signal is not propagated to the parent stream, so the overall stream is not terminated prematurely.
Integration with Spring WebFlux / SSE¶
Flux<Event> bridges directly into Server-Sent Events:
@GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chat(@RequestParam String message) {
return agent.stream(
List.of(Msg.builder().role(MsgRole.USER).textContent(message).build()),
StreamOptions.defaults())
.map(event -> ServerSentEvent.<String>builder()
.event(event.getType().name().toLowerCase())
.data(event.getMessage().getTextContent())
.build());
}
When using HarnessAgent and you need to forward child-agent source metadata to the frontend,
serialize event.getSource() as well — see Harness Subagent Streaming.
StreamOptions Reference¶
StreamOptions options = StreamOptions.builder()
// Event types to receive (default: ALL, excluding AGENT_RESULT)
.eventTypes(EventType.REASONING, EventType.TOOL_RESULT, EventType.AGENT_RESULT)
// Incremental mode: true = push deltas (default), false = push full accumulated text
.incremental(true)
// Reasoning content filters (both default true)
.includeReasoningChunk(true)
.includeReasoningResult(true)
// Summary content filters (both default true)
.includeSummaryChunk(true)
.includeSummaryResult(true)
.build();