Your First Multi-Agent System¶
Build a system where a coordinator agent finds and calls a worker agent through the Semantic Layer.
What you’ll build¶
Two agents:
Coordinator — receives user requests, finds the right agent, delegates work
Researcher — answers research questions when called
Step 1: Create the project¶
abi-core create project multi-demo --with-semantic-layer
cd multi-demo
Step 2: Add the agents¶
abi-core add agent coordinator \
--description "Receives requests and delegates to specialized agents" \
--with-web-interface
# Tasks: coordinate_request
abi-core add agent researcher \
--description "Researches topics and provides detailed answers"
# Tasks: research_topic
Step 3: Write the Researcher¶
Edit agents/researcher/steps.py:
from app import agent
from config import config
from abi_core.agent.llm_provider import invoke
@agent.step(name="research")
async def research(topic):
"""Research a topic in depth."""
prompt = f"""You are a research specialist. Provide a detailed,
well-structured answer about: {topic}
Include key facts, context, and implications. 2-3 paragraphs max."""
result = await invoke(config.LLM_CONFIG, prompt)
return {"answer": result}
Edit agents/researcher/tasks.py:
import json
from app import agent
from abi_core.agent.agent_response import AgentResponse
@agent.task(name="research_topic", task_id="task-research")
async def research_topic(query):
data = json.loads(query) if isinstance(query, str) else query
topic = data.get("text", data.get("topic", ""))
yield AgentResponse.status("Researching...")
result = await agent.execute_step("research", topic=topic)
yield AgentResponse.result(result)
Step 4: Write the Coordinator¶
Edit agents/coordinator/steps.py:
import json
from app import agent
from config import config
from abi_core.common.semantic_tools import tool_find_agent
from abi_core.common.abi_a2a import agent_connection
@agent.step(name="find_worker")
async def find_worker(task_description):
"""Find an agent that can handle this task."""
agent_card = await tool_find_agent.ainvoke(task_description)
return {"worker": agent_card}
@agent.step(name="delegate")
async def delegate(worker, message, source_card):
"""Send work to another agent via A2A."""
payload = {
"message": {
"messageId": "delegate-001",
"role": "user",
"parts": [{"text": json.dumps({"text": message})}],
}
}
response_text = ""
async for chunk in agent_connection(source_card, worker, payload):
if hasattr(chunk, 'root') and hasattr(chunk.root, 'result'):
result = chunk.root.result
if hasattr(result, 'status') and hasattr(result.status, 'message'):
for part in result.status.message.parts:
if hasattr(part, 'text'):
response_text = part.text
return {"response": response_text}
Edit agents/coordinator/tasks.py:
import json
from app import agent
from abi_core.agent.agent_response import AgentResponse
from config import AGENT_CARD
@agent.task(name="coordinate_request", task_id="task-coordinate")
async def coordinate_request(query):
data = json.loads(query) if isinstance(query, str) else query
text = data.get("text", "")
# Find a worker
yield AgentResponse.status("🔍 Finding the right agent...")
discovery = await agent.execute_step("find_worker", task_description=text)
worker = discovery["worker"]
if not worker:
yield AgentResponse.error("No agent found for this task.")
return
yield AgentResponse.status(f"📡 Delegating to {worker.name}...")
# Call the worker via A2A
result = await agent.execute_step(
"delegate", worker=worker, message=text, source_card=AGENT_CARD
)
yield AgentResponse.result({
"delegated_to": worker.name,
"response": result["response"],
})
@agent.task(name="route_to_task", task_id="task-router")
async def route_to_task(query):
async for response in agent.execute_task("coordinate_request", query=query):
yield response
Step 5: Run it¶
docker compose up ollama -d
docker exec multi-demo-ollama ollama pull qwen2.5:3b
docker exec multi-demo-ollama ollama pull nomic-embed-text:v1.5
docker compose up --build -d
Step 6: Test it¶
curl -X POST http://localhost:8002/stream \
-H "Content-Type: application/json" \
-d '{"query": "Research the impact of quantum computing on cryptography"}'
You’ll see:
event: status → "🔍 Finding the right agent..."
event: status → "📡 Delegating to researcher..."
event: result → {"delegated_to": "researcher", "response": "Quantum computing poses..."}
What happened¶
User sent a request to the coordinator’s web interface
Coordinator searched the Semantic Layer for “research” capability
Found the researcher agent’s card (matched by description + skills)
Called the researcher via A2A protocol (HMAC validated)
Researcher executed its
research_topictask, called the LLMResponse streamed back through A2A to the coordinator
Coordinator forwarded the result to the user via SSE