Agentic AI Design Patterns (Part 2): Orchestration and Routing
Agentic AI Design Patterns (Part 2): Orchestration and Routing
While Part 1 covered foundational patterns (ReAct, Reflection, Tool Use), this installment explores advanced orchestration patterns for managing complex workflows.
1. Planning Pattern: Task Decomposition and Execution
Pattern Overview
The Planning pattern is an Orchestrator-Workers architecture that decomposes complex tasks into executable subtasks and executes each task sequentially or in parallel.
Architecture
User Request
↓
Planner Agent (Orchestrator)
↓
[Task 1] [Task 2] [Task 3] ← Subtask decomposition
↓ ↓ ↓
Worker Worker Worker ← Specialized agents
↓ ↓ ↓
Results Aggregation
↓
Final Output
Mechanism
class PlanningAgent:
def execute(self, user_request):
# 1. Planning Phase
plan = self.create_plan(user_request)
# plan = [
# {"task": "Data collection", "agent": "researcher"},
# {"task": "Data analysis", "agent": "analyst"},
# {"task": "Report writing", "agent": "writer"}
# ]
# 2. Execution Phase
results = []
for step in plan:
worker = self.get_worker(step['agent'])
result = worker.execute(step['task'])
results.append(result)
# 3. Re-planning (optional)
if self.needs_replanning(result):
plan = self.update_plan(plan, result)
# 4. Synthesis Phase
return self.synthesize(results)
Use Cases
When to Use:
- When multi-step workflows are clear
- When each step can be executed independently
- When tracking task progress is important
Real-World Examples:
1. Software Development Automation
plan = [
{"task": "Requirements analysis", "agent": "analyst", "output": "spec.md"},
{"task": "Architecture design", "agent": "architect", "input": "spec.md"},
{"task": "Code generation", "agent": "coder", "parallel": True},
{"task": "Test writing", "agent": "tester", "parallel": True},
{"task": "Documentation", "agent": "documenter"}
]
2. Research Report Generation
plan = [
{"task": "Literature search", "agent": "searcher"},
{"task": "Data extraction", "agent": "extractor"},
{"task": "Statistical analysis", "agent": "analyst"},
{"task": "Report writing", "agent": "writer"},
{"task": "Quality review", "agent": "reviewer"}
]
AWS Implementation
Planning Pattern using AWS Step Functions:
import boto3
import json
stepfunctions = boto3.client('stepfunctions')
# State Machine Definition
state_machine = {
"Comment": "Planning Pattern Implementation",
"StartAt": "CreatePlan",
"States": {
"CreatePlan": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:planner",
"Next": "ExecuteSteps"
},
"ExecuteSteps": {
"Type": "Map",
"ItemsPath": "$.plan",
"Iterator": {
"StartAt": "ExecuteStep",
"States": {
"ExecuteStep": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:worker",
"End": True
}
}
},
"Next": "SynthesizeResults"
},
"SynthesizeResults": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:synthesizer",
"End": True
}
}
}
Planner Lambda Function:
def lambda_handler(event, context):
user_request = event['request']
# Plan generation using Bedrock
bedrock = boto3.client('bedrock-runtime')
prompt = f"""
Decompose the following request into executable steps:
{user_request}
Output each step in the following format:
- task: Task description
- agent: Responsible agent
- dependencies: Prerequisites (if any)
"""
response = bedrock.invoke_model(
modelId='anthropic.claude-3-sonnet-20240229-v1:0',
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 2000
})
)
plan = parse_plan(response)
return {"plan": plan}
Dynamic Re-planning
def execute_with_replanning(plan):
for step in plan:
result = execute_step(step)
# Evaluate execution result
if result.status == "failed":
# Generate alternative plan on failure
alternative_plan = create_alternative_plan(step, result.error)
plan = insert_alternative(plan, alternative_plan)
elif result.status == "partial":
# Insert additional steps on partial success
additional_steps = create_additional_steps(step, result)
plan = insert_steps(plan, additional_steps)
return plan
Trade-offs
Advantages:
- Clarity: Each step is explicitly defined
- Traceability: Easy to monitor progress
- Parallelization: Independent tasks can be executed concurrently
- Reusability: Plan templates can be reused
Disadvantages:
- Overhead: Additional time required for plan generation
- Complexity: Dependency management needed
- Limited flexibility: Relies on predefined structure
2. Routing Pattern: Intelligent Task Distribution
Pattern Overview
The Routing pattern analyzes incoming requests and routes them to the most suitable specialized agent. This is used when multiple specialized agents are more efficient than a single general-purpose agent.
Routing Strategies
1. Intent-based Routing
class IntentRouter:
def __init__(self):
self.agents = {
"technical_support": TechnicalSupportAgent(),
"billing": BillingAgent(),
"sales": SalesAgent(),
"general": GeneralAgent()
}
def route(self, user_message):
# Intent classification using LLM
intent = self.classify_intent(user_message)
agent = self.agents.get(intent, self.agents["general"])
return agent.handle(user_message)
def classify_intent(self, message):
prompt = f"""
Classify the intent of the following message:
{message}
Possible intents: technical_support, billing, sales, general
"""
return llm.classify(prompt)
2. Capability-based Routing
class CapabilityRouter:
def __init__(self):
self.agent_registry = {
"data_analysis": {
"agent": DataAnalystAgent(),
"capabilities": ["statistics", "visualization", "sql"],
"cost": 0.05,
"latency": 2.0
},
"code_generation": {
"agent": CoderAgent(),
"capabilities": ["python", "javascript", "testing"],
"cost": 0.03,
"latency": 1.5
}
}
def route(self, task):
required_capabilities = self.extract_capabilities(task)
# Select optimal agent considering capabilities, cost, and latency
best_agent = self.select_best_agent(
required_capabilities,
optimize_for="latency" # or "cost"
)
return best_agent.execute(task)
3. Hierarchical Routing
class HierarchicalRouter:
def route(self, request):
# Level 1: Domain classification
domain = self.classify_domain(request) # "engineering", "business", etc.
# Level 2: Subcategory classification
category = self.classify_category(request, domain)
# Level 3: Specialist selection
agent = self.get_specialist(domain, category)
return agent.handle(request)
Use Cases
When to Use:
- When handling various types of requests
- When specialized agents exist for each domain
- When cost and performance optimization is important
Real-World Examples:
Customer Support System
router = CustomerSupportRouter()
# Technical inquiry
router.route("I can't log in")
# → TechnicalSupportAgent
# Billing inquiry
router.route("I want a refund")
# → BillingAgent
# Complex inquiry
router.route("I can't access the service after payment")
# → [BillingAgent, TechnicalSupportAgent] (sequential or parallel)
AWS Implementation
Event-driven routing using Amazon EventBridge:
import boto3
import json
eventbridge = boto3.client('events')
def route_request(request):
# Request classification using LLM
classification = classify_request(request)
# Publish event to EventBridge
eventbridge.put_events(
Entries=[
{
'Source': 'agentic.router',
'DetailType': classification['intent'],
'Detail': json.dumps({
'request': request,
'priority': classification['priority'],
'capabilities': classification['required_capabilities']
}),
'EventBusName': 'agentic-ai-bus'
}
]
)
EventBridge Rule Definition:
{
"Rules": [
{
"Name": "route-to-technical-support",
"EventPattern": {
"source": ["agentic.router"],
"detail-type": ["technical_support"]
},
"Targets": [
{
"Arn": "arn:aws:lambda:region:account:function:technical-support-agent",
"Id": "1"
}
]
},
{
"Name": "route-to-billing",
"EventPattern": {
"source": ["agentic.router"],
"detail-type": ["billing"]
},
"Targets": [
{
"Arn": "arn:aws:lambda:region:account:function:billing-agent",
"Id": "1"
}
]
}
]
}
Handoff Pattern
Task transfer between agents:
class HandoffRouter:
def execute(self, request):
current_agent = self.initial_agent
context = {"request": request, "history": []}
while not context.get("completed"):
# Execute current agent
result = current_agent.process(context)
context["history"].append(result)
# Check if handoff is needed
if result.needs_handoff:
next_agent = self.select_next_agent(result.handoff_reason)
context["handoff_reason"] = result.handoff_reason
current_agent = next_agent
else:
context["completed"] = True
return context["history"]
Trade-offs
Advantages:
- Specialization: Each agent optimized for specific domain
- Scalability: Easy to add new agents
- Cost efficiency: Can select appropriate model for task
Disadvantages:
- Routing overhead: Additional classification step
- Context loss: Information may be lost during agent transitions
- Complexity: Need to manage multiple agents
3. Human-in-the-Loop Pattern: Human Review Integration
Pattern Overview
The Human-in-the-Loop (HITL) pattern has agents pause execution at critical decision points to receive human review or approval.
Mechanism
class HITLAgent:
def execute(self, task):
# 1. Initial analysis
analysis = self.analyze(task)
# 2. Risk assessment
risk_level = self.assess_risk(analysis)
# 3. Determine if human intervention is needed
if risk_level > self.threshold:
# Request human review
approval = self.request_human_review(analysis)
if not approval.approved:
return self.handle_rejection(approval.feedback)
# 4. Execute
return self.execute_action(analysis)
Determining Intervention Points
1. Risk-based Intervention
def should_request_review(action, context):
risk_factors = {
"financial_impact": action.cost > 10000,
"data_sensitivity": action.accesses_pii,
"irreversibility": not action.can_rollback,
"confidence": action.confidence < 0.8
}
return any(risk_factors.values())
2. Policy-based Intervention
policies = {
"financial": {
"threshold": 5000,
"requires": ["manager_approval"]
},
"data_access": {
"pii": True,
"requires": ["security_review", "legal_review"]
}
}
Use Cases
When to Use:
- For tasks with high risk or cost
- When regulatory compliance is required (GDPR, financial regulations)
- When agent reliability is insufficient
- When human judgment is needed for final decisions
Real-World Examples:
1. Financial Transaction Approval
class FinancialAgent:
def process_transaction(self, transaction):
# Check if automatic processing is possible
if transaction.amount < 1000:
return self.auto_approve(transaction)
# Human review required
review_request = {
"transaction": transaction,
"risk_analysis": self.analyze_risk(transaction),
"recommendation": self.get_recommendation(transaction)
}
approval = self.request_approval(review_request)
if approval.approved:
return self.execute_transaction(transaction)
else:
return self.handle_rejection(approval.reason)
2. Content Publishing Approval
class ContentPublisher:
def publish(self, content):
# Automatic validation
validation = self.validate_content(content)
if validation.has_issues:
# Request human review
review = self.request_content_review(content, validation)
if review.requires_changes:
content = self.apply_changes(content, review.suggestions)
return self.publish_content(content)
AWS Implementation
Implementation using AWS Step Functions Task Token:
# Step Functions State Machine
{
"StartAt": "ProcessRequest",
"States": {
"ProcessRequest": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:processor",
"Next": "CheckRisk"
},
"CheckRisk": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.riskLevel",
"StringEquals": "HIGH",
"Next": "RequestHumanApproval"
}
],
"Default": "ExecuteAction"
},
"RequestHumanApproval": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
"Parameters": {
"FunctionName": "request-approval",
"Payload": {
"taskToken.$": "$$.Task.Token",
"request.$": "$"
}
},
"Next": "ExecuteAction"
},
"ExecuteAction": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:executor",
"End": True
}
}
}
Approval Request Lambda:
def lambda_handler(event, context):
task_token = event['taskToken']
request = event['request']
# Send approval request notification via SNS
sns = boto3.client('sns')
sns.publish(
TopicArn='arn:aws:sns:region:account:approval-requests',
Subject='Approval Required',
Message=json.dumps({
'taskToken': task_token,
'request': request,
'approvalUrl': f'https://approval.example.com/{task_token}'
})
)
# Approval/rejection handled through separate API
# POST /approve with taskToken
Approval Processing API:
def approve_request(task_token, approved, feedback):
stepfunctions = boto3.client('stepfunctions')
if approved:
stepfunctions.send_task_success(
taskToken=task_token,
output=json.dumps({'approved': True, 'feedback': feedback})
)
else:
stepfunctions.send_task_failure(
taskToken=task_token,
error='ApprovalDenied',
cause=feedback
)
User Experience Optimization
1. Asynchronous Approval
# User receives immediate response
response = {
"status": "pending_approval",
"request_id": "req-123",
"estimated_time": "2-4 hours",
"notification_channels": ["email", "slack"]
}
# Approval process proceeds in background
2. Providing Approval Context
approval_request = {
"action": "Deploy to Production",
"context": {
"changes": ["Updated API endpoint", "Added new feature"],
"impact": "Affects 10,000 users",
"rollback_plan": "Automated rollback available",
"test_results": "All tests passed"
},
"recommendation": {
"approve": True,
"confidence": 0.92,
"reasoning": "Low risk deployment with comprehensive tests"
}
}
Trade-offs
Advantages:
- Safety: Adds human judgment to critical decisions
- Compliance: Audit trail and approval records
- Reliability: Prevents agent errors
Disadvantages:
- Latency: Wait time for human response
- Scalability limitation: Human intervention becomes bottleneck
- Cost: Human resources required
Pattern Combination Strategies
Real production systems combine multiple patterns:
Example: Enterprise Document Processing System
class DocumentProcessingSystem:
def process(self, document):
# 1. Routing: Classify document type
doc_type = self.router.classify(document)
# 2. Planning: Generate processing plan
plan = self.planner.create_plan(doc_type, document)
# 3. Execution with HITL
results = []
for step in plan:
result = self.execute_step(step)
# Human review at critical steps
if step.requires_review:
approval = self.request_review(result)
if not approval.approved:
result = self.handle_feedback(result, approval)
results.append(result)
return self.synthesize(results)
Coming Up Next
Part 3 will cover Multi-Agent patterns:
- Multi-Agent Collaboration: Collaboration among multiple agents
- Workflow Orchestration: Complex workflow management
- Agent Communication: Communication mechanisms between agents
