Pipeline¶
The pipeline example uses Spring AI Alibaba flow agents (SequentialAgent, ParallelAgent, LoopAgent) with AgentScopeAgent sub-agents and AgentScope DashScopeChatModel (Model). Each pipeline is built from ReActAgent-based AgentScopeAgents and invoked via PipelineService.
Location: agentscope-examples/multiagent-patterns/pipeline/
Prerequisites¶
JDK 17+
Maven 3.6+
DashScope API key:
export AI_DASHSCOPE_API_KEY=your-keyor setspring.ai.dashscope.api-keyinapplication.yml
Model configuration¶
The example uses a single Model bean (DashScopeChatModel) shared by all pipeline sub-agents:
package com.alibaba.cloud.ai.examples.multiagents.pipeline;
import io.agentscope.core.model.DashScopeChatModel;
import io.agentscope.core.model.Model;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class PipelineModelConfig {
@Bean
public Model dashScopeChatModel() {
String key = System.getenv("AI_DASHSCOPE_API_KEY");
return DashScopeChatModel.builder()
.apiKey(key)
.modelName("qwen-plus")
.build();
}
}
1. SequentialAgent: natural language → SQL → score¶
Scenario: User describes a query in natural language. The pipeline (1) SQL Generator converts it to MySQL SQL, (2) SQL Rater scores how well the SQL matches user intent (0–1). Sub-agents run in sequence; each output feeds the next.
Example input: “List all orders from the last 30 days with total amount greater than 500.”
Implementation¶
package com.alibaba.cloud.ai.examples.multiagents.pipeline.sequential;
import com.alibaba.cloud.ai.agent.agentscope.AgentScopeAgent;
import com.alibaba.cloud.ai.graph.agent.flow.agent.SequentialAgent;
import io.agentscope.core.ReActAgent;
import io.agentscope.core.memory.InMemoryMemory;
import io.agentscope.core.model.Model;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@Configuration
public class SequentialPipelineConfig {
private static final String SQL_GENERATOR_PROMPT = """
You are a MySQL database expert. Given the user's natural language request, output the corresponding SQL statement.
Only output valid MySQL SQL. Do not include explanations.
""";
private static final String SQL_RATER_PROMPT = """
You are a SQL quality reviewer. Given the user's natural language request and the generated SQL,
output a single float score between 0 and 1. The score indicates how well the SQL matches the user intent.
Output ONLY the number, no other text. Example: 0.85
""";
@Bean("sequentialSqlAgent")
public SequentialAgent sequentialSqlAgent(Model dashScopeChatModel) {
ReActAgent.Builder sqlGenBuilder = ReActAgent.builder()
.name("sql_generator")
.model(dashScopeChatModel)
.description("Converts natural language to MySQL SQL")
.sysPrompt(SQL_GENERATOR_PROMPT)
.memory(new InMemoryMemory());
AgentScopeAgent sqlGenerateAgent = AgentScopeAgent.fromBuilder(sqlGenBuilder)
.name("sql_generator")
.description("Converts natural language to MySQL SQL")
.instruction("{input}")
.includeContents(false)
.outputKey("sql")
.build();
ReActAgent.Builder sqlRaterBuilder = ReActAgent.builder()
.name("sql_rater")
.model(dashScopeChatModel)
.description("Scores SQL against user intent")
.sysPrompt(SQL_RATER_PROMPT)
.memory(new InMemoryMemory());
AgentScopeAgent sqlRatingAgent = AgentScopeAgent.fromBuilder(sqlRaterBuilder)
.name("sql_rater")
.description("Scores SQL against user intent")
.instruction("Here's the generated SQL:\n {sql}.\n\n Here's the original user request:\n {input}.")
.includeContents(false)
.outputKey("score")
.build();
return SequentialAgent.builder()
.name("sequential_sql_agent")
.description("Natural language to SQL pipeline: generates SQL and scores its quality")
.subAgents(List.of(sqlGenerateAgent, sqlRatingAgent))
.build();
}
}
SQL Generator:
instruction("{input}"),outputKey("sql")— receives user input, writes generated SQL into state keysql.SQL Rater:
instruction("... {sql} ... {input} ..."),outputKey("score")— receives previoussqland originalinput, writes score into state keyscore.
2. ParallelAgent: multi-angle research¶
Scenario: User provides a topic; the pipeline researches it from three angles in parallel: technology, finance/business, and market/industry. Results are merged into a single report (research_report).
Example input: “Research the current state of large language models.” (Demo uses “AI agents in enterprise software”.)
Implementation¶
package com.alibaba.cloud.ai.examples.multiagents.pipeline.parallel;
import com.alibaba.cloud.ai.agent.agentscope.AgentScopeAgent;
import com.alibaba.cloud.ai.graph.agent.flow.agent.ParallelAgent;
import io.agentscope.core.ReActAgent;
import io.agentscope.core.memory.InMemoryMemory;
import io.agentscope.core.model.Model;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@Configuration
public class ParallelPipelineConfig {
private static final String TECH_RESEARCH_PROMPT = """
You are a technology analyst. Research the given topic from a technology perspective.
Provide a concise 2-3 paragraph analysis covering: key technologies, trends, and innovations.
Focus on technical aspects only.
""";
private static final String FINANCE_RESEARCH_PROMPT = """
You are a financial analyst. Research the given topic from a finance and business perspective.
Provide a concise 2-3 paragraph analysis covering: market size, investment trends, business models.
Focus on financial and business aspects only.
""";
private static final String MARKET_RESEARCH_PROMPT = """
You are a market analyst. Research the given topic from an industry and market perspective.
Provide a concise 2-3 paragraph analysis covering: competitive landscape, growth drivers, challenges.
Focus on market and industry aspects only.
""";
@Bean("parallelResearchAgent")
public ParallelAgent parallelResearchAgent(Model dashScopeChatModel) {
ReActAgent.Builder techBuilder = ReActAgent.builder()
.name("tech_researcher")
.model(dashScopeChatModel)
.description("Researches from technology perspective")
.sysPrompt(TECH_RESEARCH_PROMPT)
.memory(new InMemoryMemory());
AgentScopeAgent techResearcher = AgentScopeAgent.fromBuilder(techBuilder)
.name("tech_researcher")
.description("Researches from technology perspective")
.instruction("Research the following topic: {input}.")
.includeContents(false)
.outputKey("tech_analysis")
.build();
ReActAgent.Builder financeBuilder = ReActAgent.builder()
.name("finance_researcher")
.model(dashScopeChatModel)
.description("Researches from finance perspective")
.sysPrompt(FINANCE_RESEARCH_PROMPT)
.memory(new InMemoryMemory());
AgentScopeAgent financeResearcher = AgentScopeAgent.fromBuilder(financeBuilder)
.name("finance_researcher")
.description("Researches from finance perspective")
.instruction("Research the following topic: {input}.")
.includeContents(false)
.outputKey("finance_analysis")
.build();
ReActAgent.Builder marketBuilder = ReActAgent.builder()
.name("market_researcher")
.model(dashScopeChatModel)
.description("Researches from market perspective")
.sysPrompt(MARKET_RESEARCH_PROMPT)
.memory(new InMemoryMemory());
AgentScopeAgent marketResearcher = AgentScopeAgent.fromBuilder(marketBuilder)
.name("market_researcher")
.description("Researches from market perspective")
.instruction("Research the following topic: {input}.")
.outputKey("market_analysis")
.build();
return ParallelAgent.builder()
.name("parallel_research_agent")
.description("Multi-topic research: analyzes a topic from tech, finance, and market angles in parallel")
.subAgents(List.of(techResearcher, financeResearcher, marketResearcher))
.mergeStrategy(new ParallelAgent.DefaultMergeStrategy())
.mergeOutputKey("research_report")
.maxConcurrency(3)
.build();
}
}
Each sub-agent has
instruction("Research the following topic: {input}.")and its ownoutputKey(tech_analysis,finance_analysis,market_analysis).DefaultMergeStrategy merges sub-agent outputs into one; merged result is written to mergeOutputKey
research_report.
3. LoopAgent: SQL refinement until quality threshold¶
Scenario: Generate SQL from natural language and iteratively refine until the quality score exceeds 0.5. Each iteration runs an inner SequentialAgent: SQL Generator → SQL Rater. Loop continues until score > 0.5 or max iterations.
Example input: “Find customers who placed more than 3 orders in 2024.”
Implementation¶
LoopPipelineConfig builds the same SQL Generator and SQL Rater AgentScopeAgents as SequentialPipelineConfig (same prompts, instruction, outputKey). It then wraps them in a SequentialAgent and that in a LoopAgent with a condition-based loop strategy:
SequentialAgent sqlAgent = SequentialAgent.builder()
.name("sql_agent")
.description("Generates SQL and scores its quality")
.subAgents(List.of(sqlGenerateAgent, sqlRatingAgent))
.build();
return LoopAgent.builder()
.name("loop_sql_refinement_agent")
.description("Iteratively refines SQL until quality score exceeds " + QUALITY_THRESHOLD)
.subAgent(sqlAgent)
.loopStrategy(LoopMode.condition(messages -> {
if (messages == null || messages.isEmpty()) return false;
String text = messages.get(messages.size() - 1).getText();
if (text == null || text.isBlank()) return false;
try {
double score = Double.parseDouble(text.trim());
return score > QUALITY_THRESHOLD;
} catch (NumberFormatException e) {
return false;
}
}))
.build();
LoopAgent wraps a single SequentialAgent (
sql_agent) that runs SQL Generator then SQL Rater each iteration.loopStrategy:
LoopMode.condition(...)— receives the last turn’s messages, reads the last message (rater output), parses it as a number; returnstruewhen score > 0.5 to stop the loop.
Invoking pipelines: PipelineService¶
PipelineService is wired with the three agents and exposes runSequential, runParallel, and runLoop. Each method invokes the corresponding agent with a string input and returns a result record.
// Inject in your code
@Autowired
PipelineService pipelineService;
// Sequential: natural language → SQL → score
PipelineService.SequentialResult seq = pipelineService.runSequential(
"List all orders from the last 30 days with total amount greater than 500");
// seq.input(), seq.sql(), seq.score()
// Parallel: one topic → merged research report
PipelineService.ParallelResult par = pipelineService.runParallel(
"AI agents in enterprise software");
// par.input(), par.researchReport()
// Loop: SQL refinement until score > 0.5
PipelineService.LoopResult loop = pipelineService.runLoop(
"Find customers who placed more than 3 orders in 2024");
// loop.input(), loop.sql(), loop.score()
Result types:
Method |
Return type |
Keys used |
|---|---|---|
|
|
|
|
|
|
|
|
|
Service implementation (extract of how results are read from state):
public SequentialResult runSequential(String userInput) throws GraphRunnerException {
Optional<OverAllState> resultOpt = sequentialSqlAgent.invoke(userInput);
if (resultOpt.isEmpty()) {
return new SequentialResult(userInput, null, null);
}
OverAllState state = resultOpt.get();
String sql = extractText(state.value(SQL_KEY));
String score = extractText(state.value(SCORE_KEY));
return new SequentialResult(userInput, sql, score);
}
Optional demo runner¶
When pipeline.runner.enabled=true, PipelineCommandRunner runs a demo for each pipeline on startup with sample inputs and logs the results:
@Component
@ConditionalOnProperty(name = "pipeline.runner.enabled", havingValue = "true")
public class PipelineCommandRunner implements ApplicationRunner {
private final PipelineService pipelineService;
@Override
public void run(ApplicationArguments args) throws Exception {
runSequentialDemo(); // "List all orders from the last 30 days..."
runParallelDemo(); // "AI agents in enterprise software"
runLoopDemo(); // "Find customers who placed more than 3 orders in 2024"
}
// ...
}
Build and run¶
From the repo root:
./mvnw -pl agentscope-examples/multiagent-patterns/pipeline -am -B package -DskipTests
./mvnw -pl agentscope-examples/multiagent-patterns/pipeline spring-boot:run
With demo runner:
export pipeline.runner.enabled=true
./mvnw -pl agentscope-examples/multiagent-patterns/pipeline spring-boot:run
Or in application.yml: pipeline.runner.enabled: true.
Configuration¶
Property |
Default |
Description |
|---|---|---|
|
(env |
DashScope API key for the model |
|
|
If |
Project layout¶
agentscope-examples/multiagent-patterns/pipeline/
├── README.md
├── pom.xml
└── src/main/
├── java/.../pipeline/
│ ├── PipelineApplication.java
│ ├── PipelineModelConfig.java # Model (DashScopeChatModel) bean
│ ├── PipelineService.java # runSequential, runParallel, runLoop
│ ├── PipelineCommandRunner.java # optional demo (pipeline.runner.enabled)
│ ├── PipelineRunnerConfig.java # PipelineService bean
│ ├── sequential/
│ │ └── SequentialPipelineConfig.java
│ ├── parallel/
│ │ └── ParallelPipelineConfig.java
│ └── loop/
│ └── LoopPipelineConfig.java
└── resources/
└── application.yml