S
STONI
AI
Agentic AI
AWS
Design Patterns
Planning
Routing
HITL
Orchestration
Step Functions
EventBridge

Agentic AI Design Patterns (Part 2): Orchestration and Routing

Agentic AI Design Patterns (Part 2): Orchestration and Routing

While Part 1 covered foundational patterns (ReAct, Reflection, Tool Use), this installment explores advanced orchestration patterns for managing complex workflows.

1. Planning Pattern: Task Decomposition and Execution

Pattern Overview

The Planning pattern is an Orchestrator-Workers architecture that decomposes complex tasks into executable subtasks and executes each task sequentially or in parallel.

Architecture

User Request
    ↓
Planner Agent (Orchestrator)
    ↓
[Task 1] [Task 2] [Task 3]  ← Subtask decomposition
    ↓       ↓       ↓
Worker   Worker  Worker      ← Specialized agents
    ↓       ↓       ↓
Results Aggregation
    ↓
Final Output

Mechanism

class PlanningAgent:
    def execute(self, user_request):
        # 1. Planning Phase
        plan = self.create_plan(user_request)
        # plan = [
        #     {"task": "Data collection", "agent": "researcher"},
        #     {"task": "Data analysis", "agent": "analyst"},
        #     {"task": "Report writing", "agent": "writer"}
        # ]
        
        # 2. Execution Phase
        results = []
        for step in plan:
            worker = self.get_worker(step['agent'])
            result = worker.execute(step['task'])
            results.append(result)
            
            # 3. Re-planning (optional)
            if self.needs_replanning(result):
                plan = self.update_plan(plan, result)
        
        # 4. Synthesis Phase
        return self.synthesize(results)

Use Cases

When to Use:

  • When multi-step workflows are clear
  • When each step can be executed independently
  • When tracking task progress is important

Real-World Examples:

1. Software Development Automation

plan = [
    {"task": "Requirements analysis", "agent": "analyst", "output": "spec.md"},
    {"task": "Architecture design", "agent": "architect", "input": "spec.md"},
    {"task": "Code generation", "agent": "coder", "parallel": True},
    {"task": "Test writing", "agent": "tester", "parallel": True},
    {"task": "Documentation", "agent": "documenter"}
]

2. Research Report Generation

plan = [
    {"task": "Literature search", "agent": "searcher"},
    {"task": "Data extraction", "agent": "extractor"},
    {"task": "Statistical analysis", "agent": "analyst"},
    {"task": "Report writing", "agent": "writer"},
    {"task": "Quality review", "agent": "reviewer"}
]

AWS Implementation

Planning Pattern using AWS Step Functions:

import boto3
import json

stepfunctions = boto3.client('stepfunctions')

# State Machine Definition
state_machine = {
    "Comment": "Planning Pattern Implementation",
    "StartAt": "CreatePlan",
    "States": {
        "CreatePlan": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:planner",
            "Next": "ExecuteSteps"
        },
        "ExecuteSteps": {
            "Type": "Map",
            "ItemsPath": "$.plan",
            "Iterator": {
                "StartAt": "ExecuteStep",
                "States": {
                    "ExecuteStep": {
                        "Type": "Task",
                        "Resource": "arn:aws:lambda:region:account:function:worker",
                        "End": True
                    }
                }
            },
            "Next": "SynthesizeResults"
        },
        "SynthesizeResults": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:synthesizer",
            "End": True
        }
    }
}

Planner Lambda Function:

def lambda_handler(event, context):
    user_request = event['request']
    
    # Plan generation using Bedrock
    bedrock = boto3.client('bedrock-runtime')
    
    prompt = f"""
    Decompose the following request into executable steps:
    {user_request}
    
    Output each step in the following format:
    - task: Task description
    - agent: Responsible agent
    - dependencies: Prerequisites (if any)
    """
    
    response = bedrock.invoke_model(
        modelId='anthropic.claude-3-sonnet-20240229-v1:0',
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "messages": [{"role": "user", "content": prompt}],
            "max_tokens": 2000
        })
    )
    
    plan = parse_plan(response)
    return {"plan": plan}

Dynamic Re-planning

def execute_with_replanning(plan):
    for step in plan:
        result = execute_step(step)
        
        # Evaluate execution result
        if result.status == "failed":
            # Generate alternative plan on failure
            alternative_plan = create_alternative_plan(step, result.error)
            plan = insert_alternative(plan, alternative_plan)
        
        elif result.status == "partial":
            # Insert additional steps on partial success
            additional_steps = create_additional_steps(step, result)
            plan = insert_steps(plan, additional_steps)
    
    return plan

Trade-offs

Advantages:

  • Clarity: Each step is explicitly defined
  • Traceability: Easy to monitor progress
  • Parallelization: Independent tasks can be executed concurrently
  • Reusability: Plan templates can be reused

Disadvantages:

  • Overhead: Additional time required for plan generation
  • Complexity: Dependency management needed
  • Limited flexibility: Relies on predefined structure

2. Routing Pattern: Intelligent Task Distribution

Pattern Overview

The Routing pattern analyzes incoming requests and routes them to the most suitable specialized agent. This is used when multiple specialized agents are more efficient than a single general-purpose agent.

Routing Strategies

1. Intent-based Routing

class IntentRouter:
    def __init__(self):
        self.agents = {
            "technical_support": TechnicalSupportAgent(),
            "billing": BillingAgent(),
            "sales": SalesAgent(),
            "general": GeneralAgent()
        }
    
    def route(self, user_message):
        # Intent classification using LLM
        intent = self.classify_intent(user_message)
        
        agent = self.agents.get(intent, self.agents["general"])
        return agent.handle(user_message)
    
    def classify_intent(self, message):
        prompt = f"""
        Classify the intent of the following message:
        {message}
        
        Possible intents: technical_support, billing, sales, general
        """
        return llm.classify(prompt)

2. Capability-based Routing

class CapabilityRouter:
    def __init__(self):
        self.agent_registry = {
            "data_analysis": {
                "agent": DataAnalystAgent(),
                "capabilities": ["statistics", "visualization", "sql"],
                "cost": 0.05,
                "latency": 2.0
            },
            "code_generation": {
                "agent": CoderAgent(),
                "capabilities": ["python", "javascript", "testing"],
                "cost": 0.03,
                "latency": 1.5
            }
        }
    
    def route(self, task):
        required_capabilities = self.extract_capabilities(task)
        
        # Select optimal agent considering capabilities, cost, and latency
        best_agent = self.select_best_agent(
            required_capabilities,
            optimize_for="latency"  # or "cost"
        )
        
        return best_agent.execute(task)

3. Hierarchical Routing

class HierarchicalRouter:
    def route(self, request):
        # Level 1: Domain classification
        domain = self.classify_domain(request)  # "engineering", "business", etc.
        
        # Level 2: Subcategory classification
        category = self.classify_category(request, domain)
        
        # Level 3: Specialist selection
        agent = self.get_specialist(domain, category)
        
        return agent.handle(request)

Use Cases

When to Use:

  • When handling various types of requests
  • When specialized agents exist for each domain
  • When cost and performance optimization is important

Real-World Examples:

Customer Support System

router = CustomerSupportRouter()

# Technical inquiry
router.route("I can't log in") 
# → TechnicalSupportAgent

# Billing inquiry
router.route("I want a refund")
# → BillingAgent

# Complex inquiry
router.route("I can't access the service after payment")
# → [BillingAgent, TechnicalSupportAgent] (sequential or parallel)

AWS Implementation

Event-driven routing using Amazon EventBridge:

import boto3
import json

eventbridge = boto3.client('events')

def route_request(request):
    # Request classification using LLM
    classification = classify_request(request)
    
    # Publish event to EventBridge
    eventbridge.put_events(
        Entries=[
            {
                'Source': 'agentic.router',
                'DetailType': classification['intent'],
                'Detail': json.dumps({
                    'request': request,
                    'priority': classification['priority'],
                    'capabilities': classification['required_capabilities']
                }),
                'EventBusName': 'agentic-ai-bus'
            }
        ]
    )

EventBridge Rule Definition:

{
  "Rules": [
    {
      "Name": "route-to-technical-support",
      "EventPattern": {
        "source": ["agentic.router"],
        "detail-type": ["technical_support"]
      },
      "Targets": [
        {
          "Arn": "arn:aws:lambda:region:account:function:technical-support-agent",
          "Id": "1"
        }
      ]
    },
    {
      "Name": "route-to-billing",
      "EventPattern": {
        "source": ["agentic.router"],
        "detail-type": ["billing"]
      },
      "Targets": [
        {
          "Arn": "arn:aws:lambda:region:account:function:billing-agent",
          "Id": "1"
        }
      ]
    }
  ]
}

Handoff Pattern

Task transfer between agents:

class HandoffRouter:
    def execute(self, request):
        current_agent = self.initial_agent
        context = {"request": request, "history": []}
        
        while not context.get("completed"):
            # Execute current agent
            result = current_agent.process(context)
            context["history"].append(result)
            
            # Check if handoff is needed
            if result.needs_handoff:
                next_agent = self.select_next_agent(result.handoff_reason)
                context["handoff_reason"] = result.handoff_reason
                current_agent = next_agent
            else:
                context["completed"] = True
        
        return context["history"]

Trade-offs

Advantages:

  • Specialization: Each agent optimized for specific domain
  • Scalability: Easy to add new agents
  • Cost efficiency: Can select appropriate model for task

Disadvantages:

  • Routing overhead: Additional classification step
  • Context loss: Information may be lost during agent transitions
  • Complexity: Need to manage multiple agents

3. Human-in-the-Loop Pattern: Human Review Integration

Pattern Overview

The Human-in-the-Loop (HITL) pattern has agents pause execution at critical decision points to receive human review or approval.

Mechanism

class HITLAgent:
    def execute(self, task):
        # 1. Initial analysis
        analysis = self.analyze(task)
        
        # 2. Risk assessment
        risk_level = self.assess_risk(analysis)
        
        # 3. Determine if human intervention is needed
        if risk_level > self.threshold:
            # Request human review
            approval = self.request_human_review(analysis)
            
            if not approval.approved:
                return self.handle_rejection(approval.feedback)
        
        # 4. Execute
        return self.execute_action(analysis)

Determining Intervention Points

1. Risk-based Intervention

def should_request_review(action, context):
    risk_factors = {
        "financial_impact": action.cost > 10000,
        "data_sensitivity": action.accesses_pii,
        "irreversibility": not action.can_rollback,
        "confidence": action.confidence < 0.8
    }
    
    return any(risk_factors.values())

2. Policy-based Intervention

policies = {
    "financial": {
        "threshold": 5000,
        "requires": ["manager_approval"]
    },
    "data_access": {
        "pii": True,
        "requires": ["security_review", "legal_review"]
    }
}

Use Cases

When to Use:

  • For tasks with high risk or cost
  • When regulatory compliance is required (GDPR, financial regulations)
  • When agent reliability is insufficient
  • When human judgment is needed for final decisions

Real-World Examples:

1. Financial Transaction Approval

class FinancialAgent:
    def process_transaction(self, transaction):
        # Check if automatic processing is possible
        if transaction.amount < 1000:
            return self.auto_approve(transaction)
        
        # Human review required
        review_request = {
            "transaction": transaction,
            "risk_analysis": self.analyze_risk(transaction),
            "recommendation": self.get_recommendation(transaction)
        }
        
        approval = self.request_approval(review_request)
        
        if approval.approved:
            return self.execute_transaction(transaction)
        else:
            return self.handle_rejection(approval.reason)

2. Content Publishing Approval

class ContentPublisher:
    def publish(self, content):
        # Automatic validation
        validation = self.validate_content(content)
        
        if validation.has_issues:
            # Request human review
            review = self.request_content_review(content, validation)
            
            if review.requires_changes:
                content = self.apply_changes(content, review.suggestions)
        
        return self.publish_content(content)

AWS Implementation

Implementation using AWS Step Functions Task Token:

# Step Functions State Machine
{
    "StartAt": "ProcessRequest",
    "States": {
        "ProcessRequest": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:processor",
            "Next": "CheckRisk"
        },
        "CheckRisk": {
            "Type": "Choice",
            "Choices": [
                {
                    "Variable": "$.riskLevel",
                    "StringEquals": "HIGH",
                    "Next": "RequestHumanApproval"
                }
            ],
            "Default": "ExecuteAction"
        },
        "RequestHumanApproval": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
            "Parameters": {
                "FunctionName": "request-approval",
                "Payload": {
                    "taskToken.$": "$$.Task.Token",
                    "request.$": "$"
                }
            },
            "Next": "ExecuteAction"
        },
        "ExecuteAction": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:executor",
            "End": True
        }
    }
}

Approval Request Lambda:

def lambda_handler(event, context):
    task_token = event['taskToken']
    request = event['request']
    
    # Send approval request notification via SNS
    sns = boto3.client('sns')
    sns.publish(
        TopicArn='arn:aws:sns:region:account:approval-requests',
        Subject='Approval Required',
        Message=json.dumps({
            'taskToken': task_token,
            'request': request,
            'approvalUrl': f'https://approval.example.com/{task_token}'
        })
    )
    
    # Approval/rejection handled through separate API
    # POST /approve with taskToken

Approval Processing API:

def approve_request(task_token, approved, feedback):
    stepfunctions = boto3.client('stepfunctions')
    
    if approved:
        stepfunctions.send_task_success(
            taskToken=task_token,
            output=json.dumps({'approved': True, 'feedback': feedback})
        )
    else:
        stepfunctions.send_task_failure(
            taskToken=task_token,
            error='ApprovalDenied',
            cause=feedback
        )

User Experience Optimization

1. Asynchronous Approval

# User receives immediate response
response = {
    "status": "pending_approval",
    "request_id": "req-123",
    "estimated_time": "2-4 hours",
    "notification_channels": ["email", "slack"]
}

# Approval process proceeds in background

2. Providing Approval Context

approval_request = {
    "action": "Deploy to Production",
    "context": {
        "changes": ["Updated API endpoint", "Added new feature"],
        "impact": "Affects 10,000 users",
        "rollback_plan": "Automated rollback available",
        "test_results": "All tests passed"
    },
    "recommendation": {
        "approve": True,
        "confidence": 0.92,
        "reasoning": "Low risk deployment with comprehensive tests"
    }
}

Trade-offs

Advantages:

  • Safety: Adds human judgment to critical decisions
  • Compliance: Audit trail and approval records
  • Reliability: Prevents agent errors

Disadvantages:

  • Latency: Wait time for human response
  • Scalability limitation: Human intervention becomes bottleneck
  • Cost: Human resources required

Pattern Combination Strategies

Real production systems combine multiple patterns:

Example: Enterprise Document Processing System

class DocumentProcessingSystem:
    def process(self, document):
        # 1. Routing: Classify document type
        doc_type = self.router.classify(document)
        
        # 2. Planning: Generate processing plan
        plan = self.planner.create_plan(doc_type, document)
        
        # 3. Execution with HITL
        results = []
        for step in plan:
            result = self.execute_step(step)
            
            # Human review at critical steps
            if step.requires_review:
                approval = self.request_review(result)
                if not approval.approved:
                    result = self.handle_feedback(result, approval)
            
            results.append(result)
        
        return self.synthesize(results)

Coming Up Next

Part 3 will cover Multi-Agent patterns:

  • Multi-Agent Collaboration: Collaboration among multiple agents
  • Workflow Orchestration: Complex workflow management
  • Agent Communication: Communication mechanisms between agents

References

Clickable cat