S
STONI
AI
Agentic AI
AWS
Design Patterns
Planning
Routing
HITL
Orchestration
Step Functions
EventBridge

Agentic AI Design Patterns (Part 3): Multi-Agent Systems

Agentic AI Design Patterns (Part 3): Multi-Agent Systems

In the final part of this series, we'll explore Multi-Agent patterns where multiple agents collaborate to solve complex problems. This is essential for complex domains that are difficult to solve with a single agent.

Multi-Agent vs Single-Agent

When Should You Use Multi-Agent?

When Single-Agent is Appropriate:

  • Task belongs to a single domain
  • Sequential processing is sufficient
  • Context sharing is important

When Multi-Agent is Needed:

  • Multiple areas of expertise required
  • Performance improvement through parallel processing
  • Agents can operate independently
  • Scalability and modularity are important

1. Multi-Agent Collaboration Pattern

Pattern Overview

Multi-Agent Collaboration is a pattern where multiple autonomous agents with their own roles and expertise collaborate to solve complex tasks.

Workflow Agents vs Multi-Agent Collaboration

| Characteristic | Workflow Agents | Multi-Agent Collaboration | |------|----------------|---------------------------| | Control | Centralized coordinator | Distributed, role-based peers | | Interaction | One agent delegates and tracks | Multiple agents negotiate, share, adapt | | Design | Predefined task sequence | Emergent, flexible task distribution | | Coordination | Procedural orchestration | Collaborative or competitive interactions | | Use Cases | Enterprise process automation | Complex reasoning, exploration, emergent strategies |

Architecture

User Request
    ↓
Manager Agent (initialization)
    ↓
┌─────────┬─────────┬─────────┐
│ Planner │Research │ Executor│  ← Role assignment
│ Agent   │ Agent   │ Agent   │
└────┬────┴────┬────┴────┬────┘
     │         │         │
     └────→ Shared Memory ←────┘  ← Communication and collaboration
              ↓
         Supervisor Agent (optional)
              ↓
         Final Output

Working Mechanism

class MultiAgentSystem:
    def __init__(self):
        self.agents = {
            "planner": PlannerAgent(),
            "researcher": ResearchAgent(),
            "coder": CoderAgent(),
            "tester": TesterAgent(),
            "reviewer": ReviewerAgent()
        }
        self.shared_memory = SharedMemory()
        self.message_queue = MessageQueue()
    
    def execute(self, task):
        # 1. Initialize task
        self.shared_memory.set("task", task)
        
        # 2. Assign roles
        roles = self.assign_roles(task)
        
        # 3. Agent collaboration
        while not self.is_complete():
            for agent_name in roles:
                agent = self.agents[agent_name]
                
                # Read context from shared memory
                context = self.shared_memory.get_context()
                
                # Execute agent
                result = agent.process(context)
                
                # Store result in shared memory
                self.shared_memory.update(agent_name, result)
                
                # Send message to other agents
                if result.needs_collaboration:
                    self.message_queue.send(
                        from_agent=agent_name,
                        to_agents=result.target_agents,
                        message=result.message
                    )
        
        # 4. Synthesize results
        return self.synthesize_results()

Communication Mechanisms

1. Shared Memory

class SharedMemory:
    def __init__(self):
        self.data = {}
        self.lock = threading.Lock()
    
    def write(self, key, value, agent_id):
        with self.lock:
            self.data[key] = {
                "value": value,
                "author": agent_id,
                "timestamp": time.time()
            }
    
    def read(self, key):
        return self.data.get(key)
    
    def query(self, filter_fn):
        return [v for v in self.data.values() if filter_fn(v)]

2. Message Passing

class MessageQueue:
    def send(self, from_agent, to_agent, message):
        event = {
            "from": from_agent,
            "to": to_agent,
            "message": message,
            "timestamp": time.time()
        }
        self.queue.put(event)
    
    def receive(self, agent_id):
        messages = []
        while not self.queue.empty():
            event = self.queue.get()
            if event["to"] == agent_id:
                messages.append(event)
        return messages

3. Prompt Chaining

def prompt_chain_collaboration(task):
    # Agent 1: Create plan
    plan = planner_agent.create_plan(task)
    
    # Agent 2: Conduct research
    research = researcher_agent.research(plan)
    
    # Agent 3: Write code
    code = coder_agent.write_code(plan, research)
    
    # Agent 4: Test
    test_results = tester_agent.test(code)
    
    # Agent 5: Review
    review = reviewer_agent.review(code, test_results)
    
    return review

Use Cases

1. Software Development Team

class SoftwareDevelopmentTeam:
    def develop_feature(self, requirements):
        # Planner: Decompose tasks
        tasks = self.planner.decompose(requirements)
        
        # Architect: Design
        design = self.architect.design(tasks)
        
        # Coder: Implementation (parallel)
        code_results = []
        with ThreadPoolExecutor() as executor:
            futures = [
                executor.submit(self.coder.implement, task)
                for task in tasks
            ]
            code_results = [f.result() for f in futures]
        
        # Tester: Testing
        test_results = self.tester.test_all(code_results)
        
        # Reviewer: Code review
        review = self.reviewer.review(code_results, test_results)
        
        # Documenter: Documentation
        docs = self.documenter.document(code_results, design)
        
        return {
            "code": code_results,
            "tests": test_results,
            "review": review,
            "documentation": docs
        }

2. Research Team

class ResearchTeam:
    def conduct_research(self, topic):
        # Searcher: Literature search
        papers = self.searcher.find_papers(topic)
        
        # Summarizer: Summarize (parallel)
        summaries = self.parallel_summarize(papers)
        
        # Analyst: Analysis
        analysis = self.analyst.analyze(summaries)
        
        # Validator: Validation
        validation = self.validator.validate(analysis)
        
        # Writer: Report writing
        report = self.writer.write_report(analysis, validation)
        
        return report

3. Business Scenario Modeling

class BusinessAnalysisTeam:
    def analyze_scenario(self, scenario):
        # Each agent analyzes from different perspectives
        finance_view = self.finance_agent.analyze(scenario)
        policy_view = self.policy_agent.analyze(scenario)
        compliance_view = self.compliance_agent.analyze(scenario)
        
        # Agent discussion
        discussion = self.facilitate_discussion([
            finance_view,
            policy_view,
            compliance_view
        ])
        
        # Reach consensus
        consensus = self.reach_consensus(discussion)
        
        return consensus

AWS Implementation

Amazon Bedrock Multi-Agent Collaboration

import boto3

bedrock_agent = boto3.client('bedrock-agent')

# Create Supervisor Agent
supervisor = bedrock_agent.create_agent(
    agentName='supervisor-agent',
    foundationModel='anthropic.claude-3-sonnet-20240229-v1:0',
    instruction='''
    You are a supervisor coordinating multiple expert agents.
    Analyze tasks and delegate to appropriate agents.
    '''
)

# Create Collaborator Agents
researcher = bedrock_agent.create_agent(
    agentName='researcher-agent',
    foundationModel='anthropic.claude-3-sonnet-20240229-v1:0',
    instruction='You are an expert in information retrieval and research.'
)

coder = bedrock_agent.create_agent(
    agentName='coder-agent',
    foundationModel='anthropic.claude-3-sonnet-20240229-v1:0',
    instruction='You are an expert in code writing.'
)

# Configure Multi-Agent Collaboration
bedrock_agent.associate_agent_collaborator(
    agentId=supervisor['agentId'],
    agentVersion='DRAFT',
    collaboratorName='researcher',
    collaboratorId=researcher['agentId']
)

bedrock_agent.associate_agent_collaborator(
    agentId=supervisor['agentId'],
    agentVersion='DRAFT',
    collaboratorName='coder',
    collaboratorId=coder['agentId']
)

Inter-Agent Communication with Amazon SQS

import boto3
import json

sqs = boto3.client('sqs')

class AgentCommunication:
    def __init__(self, agent_id):
        self.agent_id = agent_id
        self.queue_url = f'https://sqs.region.amazonaws.com/account/agent-{agent_id}'
    
    def send_message(self, to_agent, message):
        sqs.send_message(
            QueueUrl=f'https://sqs.region.amazonaws.com/account/agent-{to_agent}',
            MessageBody=json.dumps({
                'from': self.agent_id,
                'message': message,
                'timestamp': time.time()
            })
        )
    
    def receive_messages(self):
        response = sqs.receive_message(
            QueueUrl=self.queue_url,
            MaxNumberOfMessages=10
        )
        return response.get('Messages', [])

Shared Memory with DynamoDB

import boto3

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('agent-shared-memory')

class SharedMemoryDynamoDB:
    def write(self, key, value, agent_id):
        table.put_item(
            Item={
                'key': key,
                'value': value,
                'agent_id': agent_id,
                'timestamp': int(time.time())
            }
        )
    
    def read(self, key):
        response = table.get_item(Key={'key': key})
        return response.get('Item')
    
    def query_by_agent(self, agent_id):
        response = table.query(
            IndexName='agent-index',
            KeyConditionExpression='agent_id = :agent_id',
            ExpressionAttributeValues={':agent_id': agent_id}
        )
        return response['Items']

Trade-offs

Advantages:

  • Specialization: Each agent optimized for specific domains
  • Parallelism: Concurrent execution of independent tasks
  • Scalability: Easy to add new agents
  • Resilience: One agent failure has minimal impact on entire system
  • Emergence: Unexpected solutions through agent interactions

Disadvantages:

  • Complexity: Managing and coordinating multiple agents
  • Communication overhead: Cost of inter-agent message passing
  • Consistency: Difficulty in managing shared state
  • Debugging: Hard to trace distributed system issues
  • Cost: Increased costs from multiple LLM calls

2. Workflow Orchestration Agents

Pattern Overview

Workflow Orchestration Agents coordinate and manage multi-step tasks and processes. Unlike Multi-Agent Collaboration, they use centralized coordination.

Architecture

User Input / System Event
    ↓
Workflow Orchestrator
    ↓
┌─────────────────────────┐
│ Context Retrieval       │
│ - Knowledge Base        │
│ - Agent Registry        │
└─────────────────────────┘
    ↓
LLM-based Agent Selection
    ↓
┌──────────┬──────────┬──────────┐
│ Worker 1 │ Worker 2 │ Worker 3 │
└──────────┴──────────┴──────────┘
    ↓
State Tracking & Results

Working Mechanism

class WorkflowOrchestrator:
    def __init__(self):
        self.worker_agents = {}
        self.state_store = StateStore()
        self.agent_registry = AgentRegistry()
    
    def orchestrate(self, task):
        # 1. Create workflow plan
        workflow = self.create_workflow(task)
        
        # 2. Initialize state
        execution_id = self.state_store.initialize(workflow)
        
        # 3. Execute step by step
        for step in workflow.steps:
            # Select appropriate worker agent
            worker = self.select_worker(step)
            
            # Prepare context
            context = self.state_store.get_context(execution_id)
            
            # Execute worker
            try:
                result = worker.execute(step, context)
                self.state_store.update(execution_id, step.id, result)
                
                # Retry logic
                if result.status == "failed" and step.retryable:
                    result = self.retry_with_backoff(worker, step, context)
                
            except Exception as e:
                self.handle_failure(execution_id, step, e)
        
        # 4. Return results
        return self.state_store.get_final_result(execution_id)
    
    def select_worker(self, step):
        # Dynamic worker selection using LLM
        candidates = self.agent_registry.find_capable_agents(step)
        
        selection_prompt = f"""
        Select the most suitable agent for the following task:
        Task: {step.description}
        
        Candidate agents:
        {json.dumps(candidates, indent=2)}
        
        Selection criteria: capability, cost, latency, success rate
        """
        
        selected = llm.select(selection_prompt)
        return self.worker_agents[selected]

Use Cases

1. Customer Service Routing

class CustomerServiceOrchestrator:
    def handle_request(self, customer_request):
        # 1. Classify request
        classification = self.classify_request(customer_request)
        
        # 2. Determine workflow
        if classification.type == "simple":
            # Handle with single agent
            return self.faq_agent.handle(customer_request)
        
        elif classification.type == "complex":
            # Multi-step workflow
            workflow = [
                ("gather_info", self.info_agent),
                ("analyze", self.analysis_agent),
                ("resolve", self.resolution_agent),
                ("follow_up", self.followup_agent)
            ]
            
            return self.execute_workflow(workflow, customer_request)
        
        elif classification.type == "escalation":
            # Escalate to human agent
            return self.escalate_to_human(customer_request)

2. Data Pipeline Orchestration

class DataPipelineOrchestrator:
    def process_data(self, data_source):
        workflow = [
            {
                "step": "extract",
                "agent": self.extractor,
                "retry": 3,
                "timeout": 300
            },
            {
                "step": "transform",
                "agent": self.transformer,
                "parallel": True,
                "chunks": 10
            },
            {
                "step": "validate",
                "agent": self.validator,
                "critical": True
            },
            {
                "step": "load",
                "agent": self.loader,
                "retry": 5
            }
        ]
        
        return self.execute_pipeline(workflow, data_source)

AWS Implementation

Orchestration with AWS Step Functions

import boto3
import json

stepfunctions = boto3.client('stepfunctions')

# State Machine definition
state_machine_definition = {
    "Comment": "Workflow Orchestration Pattern",
    "StartAt": "ClassifyRequest",
    "States": {
        "ClassifyRequest": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:classifier",
            "Next": "RouteToWorker"
        },
        "RouteToWorker": {
            "Type": "Choice",
            "Choices": [
                {
                    "Variable": "$.category",
                    "StringEquals": "technical",
                    "Next": "TechnicalWorker"
                },
                {
                    "Variable": "$.category",
                    "StringEquals": "billing",
                    "Next": "BillingWorker"
                }
            ],
            "Default": "GeneralWorker"
        },
        "TechnicalWorker": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:technical-worker",
            "Retry": [
                {
                    "ErrorEquals": ["States.TaskFailed"],
                    "IntervalSeconds": 2,
                    "MaxAttempts": 3,
                    "BackoffRate": 2.0
                }
            ],
            "Catch": [
                {
                    "ErrorEquals": ["States.ALL"],
                    "Next": "HandleFailure"
                }
            ],
            "Next": "AggregateResults"
        },
        "BillingWorker": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:billing-worker",
            "Next": "AggregateResults"
        },
        "GeneralWorker": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:general-worker",
            "Next": "AggregateResults"
        },
        "AggregateResults": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:aggregator",
            "End": True
        },
        "HandleFailure": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:failure-handler",
            "End": True
        }
    }
}

# Create State Machine
response = stepfunctions.create_state_machine(
    name='workflow-orchestrator',
    definition=json.dumps(state_machine_definition),
    roleArn='arn:aws:iam::account:role/step-functions-role'
)

# Execute
execution = stepfunctions.start_execution(
    stateMachineArn=response['stateMachineArn'],
    input=json.dumps({
        "request": "Customer request content",
        "customer_id": "12345"
    })
)

Trade-offs

Advantages:

  • Clear control flow
  • Easy state tracking and monitoring
  • Easy to implement error handling and retry logic
  • Predictable execution path

Disadvantages:

  • Potential centralized bottleneck
  • Limited flexibility (predefined workflows)
  • Difficult to manage complex workflows

Pattern Selection Guide

Multi-Agent Collaboration vs Workflow Orchestration

def select_pattern(task):
    if task.requires_creativity or task.is_open_ended:
        return "Multi-Agent Collaboration"
    
    if task.has_clear_steps and task.is_predictable:
        return "Workflow Orchestration"
    
    if task.needs_diverse_perspectives:
        return "Multi-Agent Collaboration"
    
    if task.requires_strict_control:
        return "Workflow Orchestration"
    
    # Default
    return "Workflow Orchestration"

Real-World Implementation Example

End-to-End Multi-Agent System

class EnterpriseMultiAgentSystem:
    def __init__(self):
        # Initialize agents
        self.agents = {
            "supervisor": SupervisorAgent(),
            "planner": PlannerAgent(),
            "researcher": ResearchAgent(),
            "analyst": AnalystAgent(),
            "coder": CoderAgent(),
            "tester": TesterAgent(),
            "reviewer": ReviewerAgent()
        }
        
        # Initialize infrastructure
        self.shared_memory = DynamoDBSharedMemory()
        self.message_queue = SQSMessageQueue()
        self.state_machine = StepFunctionsOrchestrator()
    
    def execute_complex_task(self, task):
        # 1. Supervisor analyzes task
        analysis = self.agents["supervisor"].analyze(task)
        
        # 2. Determine execution strategy
        if analysis.complexity == "high":
            # Multi-Agent Collaboration
            return self.collaborative_execution(task, analysis)
        else:
            # Workflow Orchestration
            return self.orchestrated_execution(task, analysis)
    
    def collaborative_execution(self, task, analysis):
        # Assign roles
        roles = analysis.required_roles
        
        # Initialize shared memory
        session_id = self.shared_memory.create_session(task)
        
        # Parallel agent execution
        with ThreadPoolExecutor() as executor:
            futures = []
            for role in roles:
                agent = self.agents[role]
                future = executor.submit(
                    self.run_agent_with_collaboration,
                    agent,
                    session_id
                )
                futures.append(future)
            
            results = [f.result() for f in futures]
        
        # Synthesize results
        return self.agents["supervisor"].synthesize(results)
    
    def orchestrated_execution(self, task, analysis):
        # Create Step Functions workflow
        workflow = self.create_workflow(analysis)
        
        # Execute
        execution_arn = self.state_machine.start_execution(workflow, task)
        
        # Wait for results
        return self.state_machine.wait_for_completion(execution_arn)

Monitoring and Observability

class AgentMonitoring:
    def __init__(self):
        self.cloudwatch = boto3.client('cloudwatch')
        self.xray = boto3.client('xray')
    
    def track_agent_execution(self, agent_id, execution_data):
        # Send metrics
        self.cloudwatch.put_metric_data(
            Namespace='MultiAgentSystem',
            MetricData=[
                {
                    'MetricName': 'AgentExecutionTime',
                    'Value': execution_data['duration'],
                    'Unit': 'Milliseconds',
                    'Dimensions': [
                        {'Name': 'AgentId', 'Value': agent_id},
                        {'Name': 'TaskType', 'Value': execution_data['task_type']}
                    ]
                },
                {
                    'MetricName': 'AgentTokenUsage',
                    'Value': execution_data['tokens'],
                    'Unit': 'Count',
                    'Dimensions': [
                        {'Name': 'AgentId', 'Value': agent_id}
                    ]
                }
            ]
        )
        
        # X-Ray tracing
        segment = self.xray.begin_segment(f'agent-{agent_id}')
        segment.put_annotation('task_type', execution_data['task_type'])
        segment.put_metadata('execution_details', execution_data)
        self.xray.end_segment()

Series Conclusion

Key patterns covered in this series:

Part 1: Foundational Patterns

  • ReAct: Combining reasoning and action
  • Reflection: Self-improvement mechanism
  • Tool Use: Integrating external capabilities

Part 2: Orchestration

  • Planning: Task decomposition and execution
  • Routing: Intelligent task distribution
  • Human-in-the-Loop: Integrating human review

Part 3: Multi-Agent

  • Multi-Agent Collaboration: Distributed collaboration
  • Workflow Orchestration: Centralized coordination

Pattern Combination Strategy

Real production systems combine multiple patterns:

# Example: Enterprise AI System
system = {
    "base": "ReAct",  # Base reasoning pattern
    "quality": "Reflection",  # Quality assurance
    "capabilities": "Tool Use",  # External integration
    "complexity": "Planning",  # Complex task decomposition
    "specialization": "Routing",  # Specialized agent utilization
    "safety": "Human-in-the-Loop",  # Critical decision review
    "scale": "Multi-Agent"  # Large-scale parallel processing
}

References

Clickable cat