Agentic AI Design Patterns (Part 3): Multi-Agent Systems
Agentic AI Design Patterns (Part 3): Multi-Agent Systems
In the final part of this series, we'll explore Multi-Agent patterns where multiple agents collaborate to solve complex problems. This is essential for complex domains that are difficult to solve with a single agent.
Multi-Agent vs Single-Agent
When Should You Use Multi-Agent?
When Single-Agent is Appropriate:
- Task belongs to a single domain
- Sequential processing is sufficient
- Context sharing is important
When Multi-Agent is Needed:
- Multiple areas of expertise required
- Performance improvement through parallel processing
- Agents can operate independently
- Scalability and modularity are important
1. Multi-Agent Collaboration Pattern
Pattern Overview
Multi-Agent Collaboration is a pattern where multiple autonomous agents with their own roles and expertise collaborate to solve complex tasks.
Workflow Agents vs Multi-Agent Collaboration
| Characteristic | Workflow Agents | Multi-Agent Collaboration | |------|----------------|---------------------------| | Control | Centralized coordinator | Distributed, role-based peers | | Interaction | One agent delegates and tracks | Multiple agents negotiate, share, adapt | | Design | Predefined task sequence | Emergent, flexible task distribution | | Coordination | Procedural orchestration | Collaborative or competitive interactions | | Use Cases | Enterprise process automation | Complex reasoning, exploration, emergent strategies |
Architecture
User Request
↓
Manager Agent (initialization)
↓
┌─────────┬─────────┬─────────┐
│ Planner │Research │ Executor│ ← Role assignment
│ Agent │ Agent │ Agent │
└────┬────┴────┬────┴────┬────┘
│ │ │
└────→ Shared Memory ←────┘ ← Communication and collaboration
↓
Supervisor Agent (optional)
↓
Final Output
Working Mechanism
class MultiAgentSystem:
def __init__(self):
self.agents = {
"planner": PlannerAgent(),
"researcher": ResearchAgent(),
"coder": CoderAgent(),
"tester": TesterAgent(),
"reviewer": ReviewerAgent()
}
self.shared_memory = SharedMemory()
self.message_queue = MessageQueue()
def execute(self, task):
# 1. Initialize task
self.shared_memory.set("task", task)
# 2. Assign roles
roles = self.assign_roles(task)
# 3. Agent collaboration
while not self.is_complete():
for agent_name in roles:
agent = self.agents[agent_name]
# Read context from shared memory
context = self.shared_memory.get_context()
# Execute agent
result = agent.process(context)
# Store result in shared memory
self.shared_memory.update(agent_name, result)
# Send message to other agents
if result.needs_collaboration:
self.message_queue.send(
from_agent=agent_name,
to_agents=result.target_agents,
message=result.message
)
# 4. Synthesize results
return self.synthesize_results()
Communication Mechanisms
1. Shared Memory
class SharedMemory:
def __init__(self):
self.data = {}
self.lock = threading.Lock()
def write(self, key, value, agent_id):
with self.lock:
self.data[key] = {
"value": value,
"author": agent_id,
"timestamp": time.time()
}
def read(self, key):
return self.data.get(key)
def query(self, filter_fn):
return [v for v in self.data.values() if filter_fn(v)]
2. Message Passing
class MessageQueue:
def send(self, from_agent, to_agent, message):
event = {
"from": from_agent,
"to": to_agent,
"message": message,
"timestamp": time.time()
}
self.queue.put(event)
def receive(self, agent_id):
messages = []
while not self.queue.empty():
event = self.queue.get()
if event["to"] == agent_id:
messages.append(event)
return messages
3. Prompt Chaining
def prompt_chain_collaboration(task):
# Agent 1: Create plan
plan = planner_agent.create_plan(task)
# Agent 2: Conduct research
research = researcher_agent.research(plan)
# Agent 3: Write code
code = coder_agent.write_code(plan, research)
# Agent 4: Test
test_results = tester_agent.test(code)
# Agent 5: Review
review = reviewer_agent.review(code, test_results)
return review
Use Cases
1. Software Development Team
class SoftwareDevelopmentTeam:
def develop_feature(self, requirements):
# Planner: Decompose tasks
tasks = self.planner.decompose(requirements)
# Architect: Design
design = self.architect.design(tasks)
# Coder: Implementation (parallel)
code_results = []
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(self.coder.implement, task)
for task in tasks
]
code_results = [f.result() for f in futures]
# Tester: Testing
test_results = self.tester.test_all(code_results)
# Reviewer: Code review
review = self.reviewer.review(code_results, test_results)
# Documenter: Documentation
docs = self.documenter.document(code_results, design)
return {
"code": code_results,
"tests": test_results,
"review": review,
"documentation": docs
}
2. Research Team
class ResearchTeam:
def conduct_research(self, topic):
# Searcher: Literature search
papers = self.searcher.find_papers(topic)
# Summarizer: Summarize (parallel)
summaries = self.parallel_summarize(papers)
# Analyst: Analysis
analysis = self.analyst.analyze(summaries)
# Validator: Validation
validation = self.validator.validate(analysis)
# Writer: Report writing
report = self.writer.write_report(analysis, validation)
return report
3. Business Scenario Modeling
class BusinessAnalysisTeam:
def analyze_scenario(self, scenario):
# Each agent analyzes from different perspectives
finance_view = self.finance_agent.analyze(scenario)
policy_view = self.policy_agent.analyze(scenario)
compliance_view = self.compliance_agent.analyze(scenario)
# Agent discussion
discussion = self.facilitate_discussion([
finance_view,
policy_view,
compliance_view
])
# Reach consensus
consensus = self.reach_consensus(discussion)
return consensus
AWS Implementation
Amazon Bedrock Multi-Agent Collaboration
import boto3
bedrock_agent = boto3.client('bedrock-agent')
# Create Supervisor Agent
supervisor = bedrock_agent.create_agent(
agentName='supervisor-agent',
foundationModel='anthropic.claude-3-sonnet-20240229-v1:0',
instruction='''
You are a supervisor coordinating multiple expert agents.
Analyze tasks and delegate to appropriate agents.
'''
)
# Create Collaborator Agents
researcher = bedrock_agent.create_agent(
agentName='researcher-agent',
foundationModel='anthropic.claude-3-sonnet-20240229-v1:0',
instruction='You are an expert in information retrieval and research.'
)
coder = bedrock_agent.create_agent(
agentName='coder-agent',
foundationModel='anthropic.claude-3-sonnet-20240229-v1:0',
instruction='You are an expert in code writing.'
)
# Configure Multi-Agent Collaboration
bedrock_agent.associate_agent_collaborator(
agentId=supervisor['agentId'],
agentVersion='DRAFT',
collaboratorName='researcher',
collaboratorId=researcher['agentId']
)
bedrock_agent.associate_agent_collaborator(
agentId=supervisor['agentId'],
agentVersion='DRAFT',
collaboratorName='coder',
collaboratorId=coder['agentId']
)
Inter-Agent Communication with Amazon SQS
import boto3
import json
sqs = boto3.client('sqs')
class AgentCommunication:
def __init__(self, agent_id):
self.agent_id = agent_id
self.queue_url = f'https://sqs.region.amazonaws.com/account/agent-{agent_id}'
def send_message(self, to_agent, message):
sqs.send_message(
QueueUrl=f'https://sqs.region.amazonaws.com/account/agent-{to_agent}',
MessageBody=json.dumps({
'from': self.agent_id,
'message': message,
'timestamp': time.time()
})
)
def receive_messages(self):
response = sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=10
)
return response.get('Messages', [])
Shared Memory with DynamoDB
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('agent-shared-memory')
class SharedMemoryDynamoDB:
def write(self, key, value, agent_id):
table.put_item(
Item={
'key': key,
'value': value,
'agent_id': agent_id,
'timestamp': int(time.time())
}
)
def read(self, key):
response = table.get_item(Key={'key': key})
return response.get('Item')
def query_by_agent(self, agent_id):
response = table.query(
IndexName='agent-index',
KeyConditionExpression='agent_id = :agent_id',
ExpressionAttributeValues={':agent_id': agent_id}
)
return response['Items']
Trade-offs
Advantages:
- Specialization: Each agent optimized for specific domains
- Parallelism: Concurrent execution of independent tasks
- Scalability: Easy to add new agents
- Resilience: One agent failure has minimal impact on entire system
- Emergence: Unexpected solutions through agent interactions
Disadvantages:
- Complexity: Managing and coordinating multiple agents
- Communication overhead: Cost of inter-agent message passing
- Consistency: Difficulty in managing shared state
- Debugging: Hard to trace distributed system issues
- Cost: Increased costs from multiple LLM calls
2. Workflow Orchestration Agents
Pattern Overview
Workflow Orchestration Agents coordinate and manage multi-step tasks and processes. Unlike Multi-Agent Collaboration, they use centralized coordination.
Architecture
User Input / System Event
↓
Workflow Orchestrator
↓
┌─────────────────────────┐
│ Context Retrieval │
│ - Knowledge Base │
│ - Agent Registry │
└─────────────────────────┘
↓
LLM-based Agent Selection
↓
┌──────────┬──────────┬──────────┐
│ Worker 1 │ Worker 2 │ Worker 3 │
└──────────┴──────────┴──────────┘
↓
State Tracking & Results
Working Mechanism
class WorkflowOrchestrator:
def __init__(self):
self.worker_agents = {}
self.state_store = StateStore()
self.agent_registry = AgentRegistry()
def orchestrate(self, task):
# 1. Create workflow plan
workflow = self.create_workflow(task)
# 2. Initialize state
execution_id = self.state_store.initialize(workflow)
# 3. Execute step by step
for step in workflow.steps:
# Select appropriate worker agent
worker = self.select_worker(step)
# Prepare context
context = self.state_store.get_context(execution_id)
# Execute worker
try:
result = worker.execute(step, context)
self.state_store.update(execution_id, step.id, result)
# Retry logic
if result.status == "failed" and step.retryable:
result = self.retry_with_backoff(worker, step, context)
except Exception as e:
self.handle_failure(execution_id, step, e)
# 4. Return results
return self.state_store.get_final_result(execution_id)
def select_worker(self, step):
# Dynamic worker selection using LLM
candidates = self.agent_registry.find_capable_agents(step)
selection_prompt = f"""
Select the most suitable agent for the following task:
Task: {step.description}
Candidate agents:
{json.dumps(candidates, indent=2)}
Selection criteria: capability, cost, latency, success rate
"""
selected = llm.select(selection_prompt)
return self.worker_agents[selected]
Use Cases
1. Customer Service Routing
class CustomerServiceOrchestrator:
def handle_request(self, customer_request):
# 1. Classify request
classification = self.classify_request(customer_request)
# 2. Determine workflow
if classification.type == "simple":
# Handle with single agent
return self.faq_agent.handle(customer_request)
elif classification.type == "complex":
# Multi-step workflow
workflow = [
("gather_info", self.info_agent),
("analyze", self.analysis_agent),
("resolve", self.resolution_agent),
("follow_up", self.followup_agent)
]
return self.execute_workflow(workflow, customer_request)
elif classification.type == "escalation":
# Escalate to human agent
return self.escalate_to_human(customer_request)
2. Data Pipeline Orchestration
class DataPipelineOrchestrator:
def process_data(self, data_source):
workflow = [
{
"step": "extract",
"agent": self.extractor,
"retry": 3,
"timeout": 300
},
{
"step": "transform",
"agent": self.transformer,
"parallel": True,
"chunks": 10
},
{
"step": "validate",
"agent": self.validator,
"critical": True
},
{
"step": "load",
"agent": self.loader,
"retry": 5
}
]
return self.execute_pipeline(workflow, data_source)
AWS Implementation
Orchestration with AWS Step Functions
import boto3
import json
stepfunctions = boto3.client('stepfunctions')
# State Machine definition
state_machine_definition = {
"Comment": "Workflow Orchestration Pattern",
"StartAt": "ClassifyRequest",
"States": {
"ClassifyRequest": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:classifier",
"Next": "RouteToWorker"
},
"RouteToWorker": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.category",
"StringEquals": "technical",
"Next": "TechnicalWorker"
},
{
"Variable": "$.category",
"StringEquals": "billing",
"Next": "BillingWorker"
}
],
"Default": "GeneralWorker"
},
"TechnicalWorker": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:technical-worker",
"Retry": [
{
"ErrorEquals": ["States.TaskFailed"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "HandleFailure"
}
],
"Next": "AggregateResults"
},
"BillingWorker": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:billing-worker",
"Next": "AggregateResults"
},
"GeneralWorker": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:general-worker",
"Next": "AggregateResults"
},
"AggregateResults": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:aggregator",
"End": True
},
"HandleFailure": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:failure-handler",
"End": True
}
}
}
# Create State Machine
response = stepfunctions.create_state_machine(
name='workflow-orchestrator',
definition=json.dumps(state_machine_definition),
roleArn='arn:aws:iam::account:role/step-functions-role'
)
# Execute
execution = stepfunctions.start_execution(
stateMachineArn=response['stateMachineArn'],
input=json.dumps({
"request": "Customer request content",
"customer_id": "12345"
})
)
Trade-offs
Advantages:
- Clear control flow
- Easy state tracking and monitoring
- Easy to implement error handling and retry logic
- Predictable execution path
Disadvantages:
- Potential centralized bottleneck
- Limited flexibility (predefined workflows)
- Difficult to manage complex workflows
Pattern Selection Guide
Multi-Agent Collaboration vs Workflow Orchestration
def select_pattern(task):
if task.requires_creativity or task.is_open_ended:
return "Multi-Agent Collaboration"
if task.has_clear_steps and task.is_predictable:
return "Workflow Orchestration"
if task.needs_diverse_perspectives:
return "Multi-Agent Collaboration"
if task.requires_strict_control:
return "Workflow Orchestration"
# Default
return "Workflow Orchestration"
Real-World Implementation Example
End-to-End Multi-Agent System
class EnterpriseMultiAgentSystem:
def __init__(self):
# Initialize agents
self.agents = {
"supervisor": SupervisorAgent(),
"planner": PlannerAgent(),
"researcher": ResearchAgent(),
"analyst": AnalystAgent(),
"coder": CoderAgent(),
"tester": TesterAgent(),
"reviewer": ReviewerAgent()
}
# Initialize infrastructure
self.shared_memory = DynamoDBSharedMemory()
self.message_queue = SQSMessageQueue()
self.state_machine = StepFunctionsOrchestrator()
def execute_complex_task(self, task):
# 1. Supervisor analyzes task
analysis = self.agents["supervisor"].analyze(task)
# 2. Determine execution strategy
if analysis.complexity == "high":
# Multi-Agent Collaboration
return self.collaborative_execution(task, analysis)
else:
# Workflow Orchestration
return self.orchestrated_execution(task, analysis)
def collaborative_execution(self, task, analysis):
# Assign roles
roles = analysis.required_roles
# Initialize shared memory
session_id = self.shared_memory.create_session(task)
# Parallel agent execution
with ThreadPoolExecutor() as executor:
futures = []
for role in roles:
agent = self.agents[role]
future = executor.submit(
self.run_agent_with_collaboration,
agent,
session_id
)
futures.append(future)
results = [f.result() for f in futures]
# Synthesize results
return self.agents["supervisor"].synthesize(results)
def orchestrated_execution(self, task, analysis):
# Create Step Functions workflow
workflow = self.create_workflow(analysis)
# Execute
execution_arn = self.state_machine.start_execution(workflow, task)
# Wait for results
return self.state_machine.wait_for_completion(execution_arn)
Monitoring and Observability
class AgentMonitoring:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
self.xray = boto3.client('xray')
def track_agent_execution(self, agent_id, execution_data):
# Send metrics
self.cloudwatch.put_metric_data(
Namespace='MultiAgentSystem',
MetricData=[
{
'MetricName': 'AgentExecutionTime',
'Value': execution_data['duration'],
'Unit': 'Milliseconds',
'Dimensions': [
{'Name': 'AgentId', 'Value': agent_id},
{'Name': 'TaskType', 'Value': execution_data['task_type']}
]
},
{
'MetricName': 'AgentTokenUsage',
'Value': execution_data['tokens'],
'Unit': 'Count',
'Dimensions': [
{'Name': 'AgentId', 'Value': agent_id}
]
}
]
)
# X-Ray tracing
segment = self.xray.begin_segment(f'agent-{agent_id}')
segment.put_annotation('task_type', execution_data['task_type'])
segment.put_metadata('execution_details', execution_data)
self.xray.end_segment()
Series Conclusion
Key patterns covered in this series:
Part 1: Foundational Patterns
- ReAct: Combining reasoning and action
- Reflection: Self-improvement mechanism
- Tool Use: Integrating external capabilities
Part 2: Orchestration
- Planning: Task decomposition and execution
- Routing: Intelligent task distribution
- Human-in-the-Loop: Integrating human review
Part 3: Multi-Agent
- Multi-Agent Collaboration: Distributed collaboration
- Workflow Orchestration: Centralized coordination
Pattern Combination Strategy
Real production systems combine multiple patterns:
# Example: Enterprise AI System
system = {
"base": "ReAct", # Base reasoning pattern
"quality": "Reflection", # Quality assurance
"capabilities": "Tool Use", # External integration
"complexity": "Planning", # Complex task decomposition
"specialization": "Routing", # Specialized agent utilization
"safety": "Human-in-the-Loop", # Critical decision review
"scale": "Multi-Agent" # Large-scale parallel processing
}
