S
STONI
AI
Agentic AI
AWS
Design Patterns
Planning
Routing
HITL
Orchestration
Step Functions
EventBridge

Agentic AI Design Patterns (Part 3): Multi-Agent Systems

Agentic AI Design Patterns (Part 3): Multi-Agent Systems

In the final part of this series, we'll explore Multi-Agent patterns where multiple agents collaborate to solve complex problems. This is essential for complex domains that are difficult to solve with a single agent.

Multi-Agent vs Single-Agent

When Should You Use Multi-Agent?

When Single-Agent is Appropriate:

  • Task belongs to a single domain
  • Sequential processing is sufficient
  • Context sharing is important

When Multi-Agent is Needed:

  • Multiple areas of expertise required
  • Performance improvement through parallel processing
  • Agents can operate independently
  • Scalability and modularity are important

1. Multi-Agent Collaboration Pattern

Pattern Overview

Multi-Agent Collaboration is a pattern where multiple autonomous agents with their own roles and expertise collaborate to solve complex tasks.

Workflow Agents vs Multi-Agent Collaboration

CharacteristicWorkflow AgentsMulti-Agent Collaboration
ControlCentralized coordinatorDistributed, role-based peers
InteractionOne agent delegates and tracksMultiple agents negotiate, share, adapt
DesignPredefined task sequenceEmergent, flexible task distribution
CoordinationProcedural orchestrationCollaborative or competitive interactions
Use CasesEnterprise process automationComplex reasoning, exploration, emergent strategies

Architecture

User Request
    ↓
Manager Agent (initialization)
    ↓
┌─────────┬─────────┬─────────┐
│ Planner │Research │ Executor│  ← Role assignment
│ Agent   │ Agent   │ Agent   │
└────┬────┴────┬────┴────┬────┘
     │         │         │
     └────→ Shared Memory ←────┘  ← Communication and collaboration
              ↓
         Supervisor Agent (optional)
              ↓
         Final Output

Working Mechanism

class MultiAgentSystem:
    def __init__(self):
        self.agents = {
            "planner": PlannerAgent(),
            "researcher": ResearchAgent(),
            "coder": CoderAgent(),
            "tester": TesterAgent(),
            "reviewer": ReviewerAgent()
        }
        self.shared_memory = SharedMemory()
        self.message_queue = MessageQueue()
    
    def execute(self, task):
        # 1. Initialize task
        self.shared_memory.set("task", task)
        
        # 2. Assign roles
        roles = self.assign_roles(task)
        
        # 3. Agent collaboration
        while not self.is_complete():
            for agent_name in roles:
                agent = self.agents[agent_name]
                
                # Read context from shared memory
                context = self.shared_memory.get_context()
                
                # Execute agent
                result = agent.process(context)
                
                # Store result in shared memory
                self.shared_memory.update(agent_name, result)
                
                # Send message to other agents
                if result.needs_collaboration:
                    self.message_queue.send(
                        from_agent=agent_name,
                        to_agents=result.target_agents,
                        message=result.message
                    )
        
        # 4. Synthesize results
        return self.synthesize_results()

Communication Mechanisms

1. Shared Memory

class SharedMemory:
    def __init__(self):
        self.data = {}
        self.lock = threading.Lock()
    
    def write(self, key, value, agent_id):
        with self.lock:
            self.data[key] = {
                "value": value,
                "author": agent_id,
                "timestamp": time.time()
            }
    
    def read(self, key):
        return self.data.get(key)
    
    def query(self, filter_fn):
        return [v for v in self.data.values() if filter_fn(v)]

2. Message Passing

class MessageQueue:
    def send(self, from_agent, to_agent, message):
        event = {
            "from": from_agent,
            "to": to_agent,
            "message": message,
            "timestamp": time.time()
        }
        self.queue.put(event)
    
    def receive(self, agent_id):
        messages = []
        while not self.queue.empty():
            event = self.queue.get()
            if event["to"] == agent_id:
                messages.append(event)
        return messages

3. Prompt Chaining

def prompt_chain_collaboration(task):
    # Agent 1: Create plan
    plan = planner_agent.create_plan(task)
    
    # Agent 2: Conduct research
    research = researcher_agent.research(plan)
    
    # Agent 3: Write code
    code = coder_agent.write_code(plan, research)
    
    # Agent 4: Test
    test_results = tester_agent.test(code)
    
    # Agent 5: Review
    review = reviewer_agent.review(code, test_results)
    
    return review

Use Cases

1. Software Development Team

class SoftwareDevelopmentTeam:
    def develop_feature(self, requirements):
        # Planner: Decompose tasks
        tasks = self.planner.decompose(requirements)
        
        # Architect: Design
        design = self.architect.design(tasks)
        
        # Coder: Implementation (parallel)
        code_results = []
        with ThreadPoolExecutor() as executor:
            futures = [
                executor.submit(self.coder.implement, task)
                for task in tasks
            ]
            code_results = [f.result() for f in futures]
        
        # Tester: Testing
        test_results = self.tester.test_all(code_results)
        
        # Reviewer: Code review
        review = self.reviewer.review(code_results, test_results)
        
        # Documenter: Documentation
        docs = self.documenter.document(code_results, design)
        
        return {
            "code": code_results,
            "tests": test_results,
            "review": review,
            "documentation": docs
        }

2. Research Team

class ResearchTeam:
    def conduct_research(self, topic):
        # Searcher: Literature search
        papers = self.searcher.find_papers(topic)
        
        # Summarizer: Summarize (parallel)
        summaries = self.parallel_summarize(papers)
        
        # Analyst: Analysis
        analysis = self.analyst.analyze(summaries)
        
        # Validator: Validation
        validation = self.validator.validate(analysis)
        
        # Writer: Report writing
        report = self.writer.write_report(analysis, validation)
        
        return report

3. Business Scenario Modeling

class BusinessAnalysisTeam:
    def analyze_scenario(self, scenario):
        # Each agent analyzes from different perspectives
        finance_view = self.finance_agent.analyze(scenario)
        policy_view = self.policy_agent.analyze(scenario)
        compliance_view = self.compliance_agent.analyze(scenario)
        
        # Agent discussion
        discussion = self.facilitate_discussion([
            finance_view,
            policy_view,
            compliance_view
        ])
        
        # Reach consensus
        consensus = self.reach_consensus(discussion)
        
        return consensus

AWS Implementation

Amazon Bedrock Multi-Agent Collaboration

import boto3

bedrock_agent = boto3.client('bedrock-agent')

# Create Supervisor Agent
supervisor = bedrock_agent.create_agent(
    agentName='supervisor-agent',
    foundationModel='anthropic.claude-3-sonnet-20240229-v1:0',
    instruction='''
    You are a supervisor coordinating multiple expert agents.
    Analyze tasks and delegate to appropriate agents.
    '''
)

# Create Collaborator Agents
researcher = bedrock_agent.create_agent(
    agentName='researcher-agent',
    foundationModel='anthropic.claude-3-sonnet-20240229-v1:0',
    instruction='You are an expert in information retrieval and research.'
)

coder = bedrock_agent.create_agent(
    agentName='coder-agent',
    foundationModel='anthropic.claude-3-sonnet-20240229-v1:0',
    instruction='You are an expert in code writing.'
)

# Configure Multi-Agent Collaboration
bedrock_agent.associate_agent_collaborator(
    agentId=supervisor['agentId'],
    agentVersion='DRAFT',
    collaboratorName='researcher',
    collaboratorId=researcher['agentId']
)

bedrock_agent.associate_agent_collaborator(
    agentId=supervisor['agentId'],
    agentVersion='DRAFT',
    collaboratorName='coder',
    collaboratorId=coder['agentId']
)

Inter-Agent Communication with Amazon SQS

import boto3
import json

sqs = boto3.client('sqs')

class AgentCommunication:
    def __init__(self, agent_id):
        self.agent_id = agent_id
        self.queue_url = f'https://sqs.region.amazonaws.com/account/agent-{agent_id}'
    
    def send_message(self, to_agent, message):
        sqs.send_message(
            QueueUrl=f'https://sqs.region.amazonaws.com/account/agent-{to_agent}',
            MessageBody=json.dumps({
                'from': self.agent_id,
                'message': message,
                'timestamp': time.time()
            })
        )
    
    def receive_messages(self):
        response = sqs.receive_message(
            QueueUrl=self.queue_url,
            MaxNumberOfMessages=10
        )
        return response.get('Messages', [])

Shared Memory with DynamoDB

import boto3

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('agent-shared-memory')

class SharedMemoryDynamoDB:
    def write(self, key, value, agent_id):
        table.put_item(
            Item={
                'key': key,
                'value': value,
                'agent_id': agent_id,
                'timestamp': int(time.time())
            }
        )
    
    def read(self, key):
        response = table.get_item(Key={'key': key})
        return response.get('Item')
    
    def query_by_agent(self, agent_id):
        response = table.query(
            IndexName='agent-index',
            KeyConditionExpression='agent_id = :agent_id',
            ExpressionAttributeValues={':agent_id': agent_id}
        )
        return response['Items']

Trade-offs

Advantages:

  • Specialization: Each agent optimized for specific domains
  • Parallelism: Concurrent execution of independent tasks
  • Scalability: Easy to add new agents
  • Resilience: One agent failure has minimal impact on entire system
  • Emergence: Unexpected solutions through agent interactions

Disadvantages:

  • Complexity: Managing and coordinating multiple agents
  • Communication overhead: Cost of inter-agent message passing
  • Consistency: Difficulty in managing shared state
  • Debugging: Hard to trace distributed system issues
  • Cost: Increased costs from multiple LLM calls

2. Workflow Orchestration Agents

Pattern Overview

Workflow Orchestration Agents coordinate and manage multi-step tasks and processes. Unlike Multi-Agent Collaboration, they use centralized coordination.

Architecture

User Input / System Event
    ↓
Workflow Orchestrator
    ↓
┌─────────────────────────┐
│ Context Retrieval       │
│ - Knowledge Base        │
│ - Agent Registry        │
└─────────────────────────┘
    ↓
LLM-based Agent Selection
    ↓
┌──────────┬──────────┬──────────┐
│ Worker 1 │ Worker 2 │ Worker 3 │
└──────────┴──────────┴──────────┘
    ↓
State Tracking & Results

Working Mechanism

class WorkflowOrchestrator:
    def __init__(self):
        self.worker_agents = {}
        self.state_store = StateStore()
        self.agent_registry = AgentRegistry()
    
    def orchestrate(self, task):
        # 1. Create workflow plan
        workflow = self.create_workflow(task)
        
        # 2. Initialize state
        execution_id = self.state_store.initialize(workflow)
        
        # 3. Execute step by step
        for step in workflow.steps:
            # Select appropriate worker agent
            worker = self.select_worker(step)
            
            # Prepare context
            context = self.state_store.get_context(execution_id)
            
            # Execute worker
            try:
                result = worker.execute(step, context)
                self.state_store.update(execution_id, step.id, result)
                
                # Retry logic
                if result.status == "failed" and step.retryable:
                    result = self.retry_with_backoff(worker, step, context)
                
            except Exception as e:
                self.handle_failure(execution_id, step, e)
        
        # 4. Return results
        return self.state_store.get_final_result(execution_id)
    
    def select_worker(self, step):
        # Dynamic worker selection using LLM
        candidates = self.agent_registry.find_capable_agents(step)
        
        selection_prompt = f"""
        Select the most suitable agent for the following task:
        Task: {step.description}
        
        Candidate agents:
        {json.dumps(candidates, indent=2)}
        
        Selection criteria: capability, cost, latency, success rate
        """
        
        selected = llm.select(selection_prompt)
        return self.worker_agents[selected]

Use Cases

1. Customer Service Routing

class CustomerServiceOrchestrator:
    def handle_request(self, customer_request):
        # 1. Classify request
        classification = self.classify_request(customer_request)
        
        # 2. Determine workflow
        if classification.type == "simple":
            # Handle with single agent
            return self.faq_agent.handle(customer_request)
        
        elif classification.type == "complex":
            # Multi-step workflow
            workflow = [
                ("gather_info", self.info_agent),
                ("analyze", self.analysis_agent),
                ("resolve", self.resolution_agent),
                ("follow_up", self.followup_agent)
            ]
            
            return self.execute_workflow(workflow, customer_request)
        
        elif classification.type == "escalation":
            # Escalate to human agent
            return self.escalate_to_human(customer_request)

2. Data Pipeline Orchestration

class DataPipelineOrchestrator:
    def process_data(self, data_source):
        workflow = [
            {
                "step": "extract",
                "agent": self.extractor,
                "retry": 3,
                "timeout": 300
            },
            {
                "step": "transform",
                "agent": self.transformer,
                "parallel": True,
                "chunks": 10
            },
            {
                "step": "validate",
                "agent": self.validator,
                "critical": True
            },
            {
                "step": "load",
                "agent": self.loader,
                "retry": 5
            }
        ]
        
        return self.execute_pipeline(workflow, data_source)

AWS Implementation

Orchestration with AWS Step Functions

import boto3
import json

stepfunctions = boto3.client('stepfunctions')

# State Machine definition
state_machine_definition = {
    "Comment": "Workflow Orchestration Pattern",
    "StartAt": "ClassifyRequest",
    "States": {
        "ClassifyRequest": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:classifier",
            "Next": "RouteToWorker"
        },
        "RouteToWorker": {
            "Type": "Choice",
            "Choices": [
                {
                    "Variable": "$.category",
                    "StringEquals": "technical",
                    "Next": "TechnicalWorker"
                },
                {
                    "Variable": "$.category",
                    "StringEquals": "billing",
                    "Next": "BillingWorker"
                }
            ],
            "Default": "GeneralWorker"
        },
        "TechnicalWorker": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:technical-worker",
            "Retry": [
                {
                    "ErrorEquals": ["States.TaskFailed"],
                    "IntervalSeconds": 2,
                    "MaxAttempts": 3,
                    "BackoffRate": 2.0
                }
            ],
            "Catch": [
                {
                    "ErrorEquals": ["States.ALL"],
                    "Next": "HandleFailure"
                }
            ],
            "Next": "AggregateResults"
        },
        "BillingWorker": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:billing-worker",
            "Next": "AggregateResults"
        },
        "GeneralWorker": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:general-worker",
            "Next": "AggregateResults"
        },
        "AggregateResults": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:aggregator",
            "End": True
        },
        "HandleFailure": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:failure-handler",
            "End": True
        }
    }
}

# Create State Machine
response = stepfunctions.create_state_machine(
    name='workflow-orchestrator',
    definition=json.dumps(state_machine_definition),
    roleArn='arn:aws:iam::account:role/step-functions-role'
)

# Execute
execution = stepfunctions.start_execution(
    stateMachineArn=response['stateMachineArn'],
    input=json.dumps({
        "request": "Customer request content",
        "customer_id": "12345"
    })
)

Trade-offs

Advantages:

  • Clear control flow
  • Easy state tracking and monitoring
  • Easy to implement error handling and retry logic
  • Predictable execution path

Disadvantages:

  • Potential centralized bottleneck
  • Limited flexibility (predefined workflows)
  • Difficult to manage complex workflows

Pattern Selection Guide

Multi-Agent Collaboration vs Workflow Orchestration

def select_pattern(task):
    if task.requires_creativity or task.is_open_ended:
        return "Multi-Agent Collaboration"
    
    if task.has_clear_steps and task.is_predictable:
        return "Workflow Orchestration"
    
    if task.needs_diverse_perspectives:
        return "Multi-Agent Collaboration"
    
    if task.requires_strict_control:
        return "Workflow Orchestration"
    
    # Default
    return "Workflow Orchestration"

Real-World Implementation Example

End-to-End Multi-Agent System

class EnterpriseMultiAgentSystem:
    def __init__(self):
        # Initialize agents
        self.agents = {
            "supervisor": SupervisorAgent(),
            "planner": PlannerAgent(),
            "researcher": ResearchAgent(),
            "analyst": AnalystAgent(),
            "coder": CoderAgent(),
            "tester": TesterAgent(),
            "reviewer": ReviewerAgent()
        }
        
        # Initialize infrastructure
        self.shared_memory = DynamoDBSharedMemory()
        self.message_queue = SQSMessageQueue()
        self.state_machine = StepFunctionsOrchestrator()
    
    def execute_complex_task(self, task):
        # 1. Supervisor analyzes task
        analysis = self.agents["supervisor"].analyze(task)
        
        # 2. Determine execution strategy
        if analysis.complexity == "high":
            # Multi-Agent Collaboration
            return self.collaborative_execution(task, analysis)
        else:
            # Workflow Orchestration
            return self.orchestrated_execution(task, analysis)
    
    def collaborative_execution(self, task, analysis):
        # Assign roles
        roles = analysis.required_roles
        
        # Initialize shared memory
        session_id = self.shared_memory.create_session(task)
        
        # Parallel agent execution
        with ThreadPoolExecutor() as executor:
            futures = []
            for role in roles:
                agent = self.agents[role]
                future = executor.submit(
                    self.run_agent_with_collaboration,
                    agent,
                    session_id
                )
                futures.append(future)
            
            results = [f.result() for f in futures]
        
        # Synthesize results
        return self.agents["supervisor"].synthesize(results)
    
    def orchestrated_execution(self, task, analysis):
        # Create Step Functions workflow
        workflow = self.create_workflow(analysis)
        
        # Execute
        execution_arn = self.state_machine.start_execution(workflow, task)
        
        # Wait for results
        return self.state_machine.wait_for_completion(execution_arn)

Monitoring and Observability

class AgentMonitoring:
    def __init__(self):
        self.cloudwatch = boto3.client('cloudwatch')
        self.xray = boto3.client('xray')
    
    def track_agent_execution(self, agent_id, execution_data):
        # Send metrics
        self.cloudwatch.put_metric_data(
            Namespace='MultiAgentSystem',
            MetricData=[
                {
                    'MetricName': 'AgentExecutionTime',
                    'Value': execution_data['duration'],
                    'Unit': 'Milliseconds',
                    'Dimensions': [
                        {'Name': 'AgentId', 'Value': agent_id},
                        {'Name': 'TaskType', 'Value': execution_data['task_type']}
                    ]
                },
                {
                    'MetricName': 'AgentTokenUsage',
                    'Value': execution_data['tokens'],
                    'Unit': 'Count',
                    'Dimensions': [
                        {'Name': 'AgentId', 'Value': agent_id}
                    ]
                }
            ]
        )
        
        # X-Ray tracing
        segment = self.xray.begin_segment(f'agent-{agent_id}')
        segment.put_annotation('task_type', execution_data['task_type'])
        segment.put_metadata('execution_details', execution_data)
        self.xray.end_segment()

Series Conclusion

Key patterns covered in this series:

Part 1: Foundational Patterns

  • ReAct: Combining reasoning and action
  • Reflection: Self-improvement mechanism
  • Tool Use: Integrating external capabilities

Part 2: Orchestration

  • Planning: Task decomposition and execution
  • Routing: Intelligent task distribution
  • Human-in-the-Loop: Integrating human review

Part 3: Multi-Agent

  • Multi-Agent Collaboration: Distributed collaboration
  • Workflow Orchestration: Centralized coordination

Pattern Combination Strategy

Real production systems combine multiple patterns:

# Example: Enterprise AI System
system = {
    "base": "ReAct",  # Base reasoning pattern
    "quality": "Reflection",  # Quality assurance
    "capabilities": "Tool Use",  # External integration
    "complexity": "Planning",  # Complex task decomposition
    "specialization": "Routing",  # Specialized agent utilization
    "safety": "Human-in-the-Loop",  # Critical decision review
    "scale": "Multi-Agent"  # Large-scale parallel processing
}

References

Clickable cat