Architecture

System overview

User (curl / Postman / Open WebUI)
  │
  ▼
┌─────────────────────────────────────────────────────┐
│  Web Interface (FastAPI)                            │
│  SSE streaming, REST, webhooks                      │
└──────────────────────┬──────────────────────────────┘
                       │
                       ▼
┌─────────────────────────────────────────────────────┐
│  Orchestrator                                       │
│  DAG: classify → guardian_validate → gate_decision  │
│       → call_planner → build_workflow → execute     │
└──────────────────────┬──────────────────────────────┘
                       │ A2A
          ┌────────────┼────────────┐
          ▼            ▼            ▼
     ┌─────────┐ ┌─────────┐ ┌─────────┐
     │ Planner │ │ Builder │ │ Agents  │
     │         │ │         │ │ (yours) │
     └─────────┘ └─────────┘ └─────────┘
          │            │            │
          └────────────┼────────────┘
                       │ MCP
                       ▼
┌─────────────────────────────────────────────────────┐
│  Semantic Layer                                     │
│  Weaviate (vectors) + MCP Server (tools)            │
│  Agent cards, tool cards, document storage           │
└─────────────────────────────────────────────────────┘
                       │
                       ▼
┌─────────────────────────────────────────────────────┐
│  Security Layer                                     │
│  Guardian (gate) + OPA (policy engine)              │
│  HMAC auth, risk scoring, audit logs                │
└─────────────────────────────────────────────────────┘

Communication protocols

Protocol

Between

Purpose

A2A (JSON-RPC/HTTP)

Agent ↔ Agent

Task delegation, streaming responses

MCP

Agent → Semantic Layer

Tool calls, agent discovery

HTTP/SSE

User → Web Interface

Queries, streaming responses

REST

Any → Health endpoints

Health checks, metrics

Inside an agent

AbiCore (app runner)
  │
  ├── @agent.step → ToolExecutionGraph (DAG)
  ├── @agent.task → Programmatic orchestration
  ├── @agent.tool → LLM-invocable functions
  └── @agent.mcp_tool → Remote tools via MCP
  │
  ▼
AbiAgent (base class)
  ├── stream() → Main execution path
  ├── LLM (via create_llm + LLM_CONFIG)
  ├── Checkpointer (conversation memory)
  └── A2A Server (auto-started by agent_factory)

Execution paths in stream()

Path

Condition

Behavior

Path 0

Tasks registered (@agent.task)

Execute first task as entry point

Path A

tool_graph exists (steps/tools)

Execute DAG deterministically

Path B

No graph, no tasks

Use LangChain agent with LLM

Data flow for a request

1. HTTP POST /stream {"query": "..."}
2. web_interface._wrap_user_query() → {"route": "...", "text": "..."}
3. agent.stream(query=json_string)
4. _resolve_task(task_id) → finds the right task function
5. task function runs:
   - yield AgentResponse.status("...")  → SSE event to client
   - await agent.execute_step("name")   → runs step function
   - step calls invoke(LLM_CONFIG, prompt) → LLM response
   - yield AgentResponse.result({...})  → SSE event to client
6. SSE stream closes

Project structure

my-project/
├── agents/
│   ├── my-agent/
│   │   ├── app.py           ← AbiCore instance
│   │   ├── agent_*.py       ← AbiAgent subclass
│   │   ├── steps.py         ← @agent.step functions
│   │   ├── tasks.py         ← @agent.task functions
│   │   ├── tools.py         ← @agent.tool functions
│   │   ├── prompts.py       ← All prompts
│   │   ├── config/config.py ← LLM_CONFIG, ports, env vars
│   │   ├── web_interface.py ← FastAPI endpoints
│   │   ├── agent_cards/     ← Agent card JSON
│   │   ├── main.py          ← Entry point
│   │   └── Dockerfile
│   └── ...
├── services/
│   ├── semantic_layer/      ← Weaviate + MCP + embeddings
│   ├── guardian/            ← Security + OPA policies
│   └── web_api/             ← Optional frontend
├── compose.yaml
└── .abi/runtime.yaml