Multi-Agent Workflows¶
AgentInteractionFlow is the engine that runs plans across multiple agents.
What it does¶
Takes a plan (list of tasks with assigned agents) and runs them in the right order. Each task calls an agent and collects its response.
Creating a workflow¶
from abi_core.common.workflow import AgentInteractionFlow, InteractionFlowNode
from config import AGENT_CARD
# Create the flow
workflow = AgentInteractionFlow()
# Add a node (one task = one agent call)
node = InteractionFlowNode(
task="Analyze Q4 revenue data",
source_agent_card=AGENT_CARD,
target_agent_card=analyst_card, # AgentCard from discovery
node_key="analyze",
node_label="Revenue Analysis",
)
workflow.add_node(node)
workflow.set_source_card(AGENT_CARD)
# Execute and stream results
async for chunk in workflow.run_workflow():
print(chunk) # A2A response chunks from the target agent
Execution patterns¶
Sequential¶
Tasks run one after another. Each depends on the previous.
collect_data → analyze_data → write_report
The Planner expresses this with depends_on:
{
"tasks": [
{"task_id": "1", "description": "Collect data", "depends_on": []},
{"task_id": "2", "description": "Analyze", "depends_on": ["1"]},
{"task_id": "3", "description": "Report", "depends_on": ["2"]}
]
}
Parallel¶
Tasks with no dependencies between them run at the same time.
collect_sales ──┐
collect_costs ──┼→ analyze_all
collect_inventory─┘
{
"tasks": [
{"task_id": "1", "description": "Collect sales", "depends_on": []},
{"task_id": "2", "description": "Collect costs", "depends_on": []},
{"task_id": "3", "description": "Collect inventory", "depends_on": []},
{"task_id": "4", "description": "Analyze all", "depends_on": ["1", "2", "3"]}
]
}
Hybrid¶
Mix of sequential and parallel.
classify ──→ analyze ──┐
guardian ──────────────┼→ synthesize
│
search_context ────────┘
Workflow state¶
from abi_core.common.workflow import Status
workflow.state # Status.READY → RUNNING → COMPLETED
Each node also has its own state. The workflow tracks which nodes have completed and which are pending.
Heartbeat¶
Long-running workflows send heartbeat events to keep the SSE connection alive:
# In the Orchestrator's stream()
dag_result, heartbeats = await self._run_with_heartbeat(
dag_coro, context_id, task_id, "Processing..."
)
for hb in heartbeats:
yield hb # Keeps the client connection alive