Multi-Agent AI Systems: Architecture Patterns for Building Reliable Agent Pipelines
in Ai / Architecture on Ai-agents, Llm, Multi-agent, Langgraph, Crewai, Orchestration, Tool-use, Rag, Architecture
Introduction
2025 was the year AI agents went from demos to production deployments. By 2026, multi-agent systems are running in production at companies of all sizes — automating code review, customer support, data analysis, and complex research pipelines. But building agent systems that are reliable at scale is a fundamentally different challenge than building a chatbot.
This post covers the architectural patterns that separate production-grade agent systems from fragile prototypes.
Photo by Steve Johnson on Unsplash
What Makes Multi-Agent Systems Hard
Single agents are hard enough. Multi-agent systems multiply the challenges:
- Non-determinism compounds. Each LLM call introduces variance. Chained agents multiply it.
- Failures propagate. One agent’s bad output becomes another’s bad input.
- Debugging is opaque. “Why did the system produce this output?” requires tracing through multiple model calls.
- Cost and latency multiply. A 5-agent pipeline with 3 tool calls each = 15+ LLM calls per user request.
- Alignment is hard to maintain. Agents operating autonomously can drift from user intent.
Understanding these challenges shapes every architectural decision.
Core Architecture Patterns
1. Orchestrator-Worker Pattern
The most common and reliable pattern. One orchestrator LLM plans and delegates; worker agents execute specific tasks.
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class AgentState(TypedDict):
task: str
plan: list[str]
results: Annotated[list[str], operator.add]
final_output: str
def orchestrator(state: AgentState) -> AgentState:
"""Breaks the task into subtasks"""
response = llm.invoke(f"""
Task: {state['task']}
Break this into 3-5 concrete subtasks that can be executed independently.
Return as a JSON list of task descriptions.
""")
plan = parse_json(response.content)
return {"plan": plan}
def research_worker(state: AgentState) -> AgentState:
"""Executes research subtasks"""
current_task = state['plan'][0] # Take next task
result = research_agent.run(current_task)
return {"results": [result]}
def synthesizer(state: AgentState) -> AgentState:
"""Combines all results into final output"""
response = llm.invoke(f"""
Original task: {state['task']}
Research results: {state['results']}
Synthesize these into a comprehensive, well-structured response.
""")
return {"final_output": response.content}
# Build the graph
graph = StateGraph(AgentState)
graph.add_node("orchestrator", orchestrator)
graph.add_node("worker", research_worker)
graph.add_node("synthesizer", synthesizer)
graph.set_entry_point("orchestrator")
graph.add_edge("orchestrator", "worker")
graph.add_conditional_edges("worker",
lambda state: "worker" if len(state['plan']) > 1 else "synthesizer"
)
graph.add_edge("synthesizer", END)
2. Reflection Pattern
An agent produces output; a critic agent evaluates and improves it. Dramatically improves output quality at the cost of 2x LLM calls.
class ReflectionSystem:
def __init__(self, generator_model: str, critic_model: str):
self.generator = LLM(model=generator_model)
self.critic = LLM(model=critic_model)
self.max_iterations = 3
def generate_with_reflection(self, task: str) -> str:
draft = self.generator.invoke(task)
for i in range(self.max_iterations):
critique = self.critic.invoke(f"""
Task: {task}
Draft output: {draft.content}
Evaluate this output. Identify:
1. Factual errors or unsupported claims
2. Missing important information
3. Logical inconsistencies
4. Tone or format issues
Rate severity: MINOR / MAJOR / CRITICAL
If all issues are MINOR or none, respond with: APPROVED
Otherwise, provide specific improvement instructions.
""")
if "APPROVED" in critique.content:
break
# Regenerate with critique as context
draft = self.generator.invoke(f"""
Task: {task}
Previous attempt: {draft.content}
Critic feedback: {critique.content}
Revise your response addressing the critic's feedback.
""")
return draft.content
3. Parallel Fan-Out Pattern
Execute multiple specialized agents simultaneously, then merge results. Reduces latency compared to sequential chains.
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def parallel_research(query: str) -> dict:
"""Fan out to multiple specialized agents simultaneously"""
async def run_agent(agent_fn, task):
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, agent_fn, task)
return result
# Run all agents in parallel
results = await asyncio.gather(
run_agent(web_research_agent, query),
run_agent(database_agent, query),
run_agent(document_agent, query),
run_agent(code_analysis_agent, query),
return_exceptions=True # Don't fail all if one fails
)
# Handle partial failures gracefully
successful_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.warning(f"Agent {i} failed: {result}")
else:
successful_results.append(result)
return merge_results(successful_results)
4. Hierarchical Multi-Agent Pattern
For complex long-running tasks: a top-level manager, mid-level team leads, and ground-level workers.
Manager Agent
├── Research Lead
│ ├── Web Search Agent
│ ├── Document Analysis Agent
│ └── Data Extraction Agent
├── Analysis Lead
│ ├── Statistics Agent
│ ├── Visualization Agent
│ └── Validation Agent
└── Writing Lead
├── Draft Agent
├── Edit Agent
└── Format Agent
Each level communicates via structured messages, not raw text, enabling type checking and validation at handoffs.
Tool Design: The Foundation of Agent Reliability
The quality of an agent system is heavily determined by its tools. Poorly designed tools are the #1 source of agent failures.
Principles of Good Agent Tools
from pydantic import BaseModel, Field
from typing import Literal
# ✅ Good tool: clear purpose, typed I/O, explicit error handling
class SearchWebInput(BaseModel):
query: str = Field(description="Search query. Be specific and include key terms.")
max_results: int = Field(default=5, ge=1, le=20, description="Number of results")
recency: Literal["any", "week", "month", "year"] = Field(
default="any",
description="Filter by recency. Use 'week' for current events."
)
def search_web(input: SearchWebInput) -> str:
"""
Search the web for information.
Returns: Formatted search results with titles, URLs, and snippets.
Best for: Current information, facts, documentation, news.
Not for: Private data, real-time prices, internal documents.
"""
try:
results = brave_search(input.query, input.max_results, input.recency)
return format_results(results)
except RateLimitError:
return "ERROR: Search rate limit exceeded. Wait 60 seconds before retrying."
except Exception as e:
return f"ERROR: Search failed: {str(e)}. Try rephrasing the query."
Key tool design rules:
- Include what it’s NOT for — helps the agent avoid misuse
- Return actionable error messages — agents can retry or pivot
- Return structured, consistent formats — agents parse results more reliably
- Be idempotent where possible — retries shouldn’t cause side effects
Memory Architecture
Agents need different types of memory for different purposes:
class AgentMemorySystem:
def __init__(self):
# Working memory: current conversation/task context
self.working_memory: list[Message] = []
self.max_working_memory = 20 # Keep recent context
# Episodic memory: past interactions and outcomes
self.episodic_store = VectorStore() # Semantic search
# Semantic memory: world knowledge and facts
self.knowledge_base = RAGSystem()
# Procedural memory: successful strategies and workflows
self.strategy_store = SQLiteDB("strategies.db")
def remember(self, interaction: Interaction, outcome: str):
"""Store an interaction for future recall"""
self.episodic_store.add({
"interaction": interaction.to_text(),
"outcome": outcome,
"timestamp": datetime.now().isoformat(),
"success": outcome != "failed"
})
def recall_relevant(self, current_task: str, n: int = 5) -> list[str]:
"""Find similar past interactions"""
similar = self.episodic_store.similarity_search(current_task, k=n)
return [item["interaction"] for item in similar if item["success"]]
def build_context(self, task: str) -> str:
"""Construct agent context from memory"""
recent = self.working_memory[-10:] # Last 10 messages
relevant_past = self.recall_relevant(task, n=3)
knowledge = self.knowledge_base.retrieve(task, k=3)
return f"""
Recent context: {format_messages(recent)}
Relevant past experience: {format_list(relevant_past)}
Relevant knowledge: {format_list(knowledge)}
"""
Failure Handling and Reliability
Production agent systems need multiple failure handling strategies:
1. Circuit Breaker for External Tools
from circuitbreaker import circuit
class AgentToolkit:
@circuit(failure_threshold=5, recovery_timeout=60)
def call_external_api(self, endpoint: str, data: dict) -> dict:
response = requests.post(endpoint, json=data, timeout=10)
response.raise_for_status()
return response.json()
def safe_tool_call(self, tool_fn, *args, **kwargs):
try:
return tool_fn(*args, **kwargs)
except CircuitBreakerOpen:
return {"error": "Tool temporarily unavailable. Proceed with available information."}
except TimeoutError:
return {"error": "Tool timed out. Try a simpler query or skip this step."}
2. Structured Output Validation
LLMs hallucinate structure. Always validate:
from pydantic import BaseModel, ValidationError
class ResearchPlan(BaseModel):
subtasks: list[str]
estimated_steps: int
requires_external_data: bool
def parse_agent_output(raw_output: str, schema: type[BaseModel]) -> BaseModel | None:
"""Parse and validate LLM output against a schema"""
try:
# Try direct JSON parse
data = json.loads(raw_output)
return schema(**data)
except (json.JSONDecodeError, ValidationError):
# Try to extract JSON from markdown
json_match = re.search(r'```json\n(.*?)\n```', raw_output, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group(1))
return schema(**data)
except (json.JSONDecodeError, ValidationError):
pass
# Validation failed — return None and log for debugging
logger.error(f"Failed to parse agent output: {raw_output[:500]}")
return None
3. Human-in-the-Loop for High-Stakes Actions
HIGH_STAKES_ACTIONS = {"delete_records", "send_email", "execute_payment", "modify_production"}
async def execute_agent_action(action: AgentAction) -> ActionResult:
if action.tool_name in HIGH_STAKES_ACTIONS:
# Require human approval
approval = await request_human_approval(
action=action,
timeout_seconds=300,
default_on_timeout="reject" # Fail safe
)
if not approval.approved:
return ActionResult(success=False, reason=f"Rejected by human: {approval.reason}")
return await execute_action(action)
Observability for Agent Systems
Standard APM is insufficient for agents. You need agent-specific observability:
from opentelemetry import trace
import json
tracer = trace.get_tracer("agent-system")
class ObservableAgent:
def run(self, task: str) -> str:
with tracer.start_as_current_span("agent.run") as span:
span.set_attribute("agent.task", task[:200])
span.set_attribute("agent.model", self.model)
llm_calls = 0
tool_calls = []
result = self._run_with_tracking(task, span, llm_calls, tool_calls)
span.set_attribute("agent.llm_calls", llm_calls)
span.set_attribute("agent.tool_calls", json.dumps(tool_calls))
span.set_attribute("agent.success", result.success)
return result
Track these metrics per agent system:
- LLM calls per task (efficiency)
- Tool call success rate (reliability)
- Task completion rate (effectiveness)
- P95 task latency (performance)
- Token cost per task (economics)
- Human intervention rate (autonomy health)
The LangGraph vs CrewAI vs Custom Decision
LangGraph (LangChain): Graph-based, explicit state management, best for complex conditional workflows. Steep learning curve but highly controllable.
CrewAI: Role-based agents, easier to get started, opinionated about how agents collaborate. Great for team-simulation patterns.
Custom: Maximum control, no framework overhead. Recommended for teams with specific requirements or when frameworks don’t fit the mental model.
In 2026, LangGraph has won the framework wars for complex systems, but CrewAI remains popular for simpler agent teams.
Conclusion
Multi-agent systems in production require engineering discipline that goes far beyond prompt engineering:
- Choose patterns based on failure modes, not capability demos
- Design tools first — they determine agent reliability more than model choice
- Build observability in from day one — you cannot debug what you cannot see
- Plan for failure — circuit breakers, validation, and human-in-the-loop aren’t optional at scale
- Start simple — reflection on a single agent often beats complex multi-agent pipelines
The gap between “impressive demo” and “production system” in AI agents is larger than in most software domains. Close it with engineering rigor.
Related Posts:
- LangGraph Deep Dive: Building Stateful Agent Workflows
- RAG Architecture Patterns: Beyond Naive Retrieval in 2026
이 글이 도움이 되셨다면 공감 및 광고 클릭을 부탁드립니다 :)
