Agentic AI 디자인 패턴 (Part 3): Multi-Agent 시스템
Agentic AI 디자인 패턴 (Part 3): Multi-Agent 시스템
시리즈의 마지막 편에서는 여러 에이전트가 협력하여 복잡한 문제를 해결하는 Multi-Agent 패턴을 다룹니다. 이는 단일 에이전트로는 해결하기 어려운 복잡한 도메인에서 필수적입니다.
Multi-Agent vs Single-Agent
언제 Multi-Agent를 사용해야 하는가?
Single-Agent가 적합한 경우:
- 작업이 단일 도메인에 속함
- 순차적 처리로 충분
- 컨텍스트 공유가 중요
Multi-Agent가 필요한 경우:
- 여러 전문 영역이 필요
- 병렬 처리로 성능 향상 가능
- 각 에이전트가 독립적으로 작동 가능
- 확장성과 모듈성이 중요
1. Multi-Agent Collaboration Pattern
패턴 개요
Multi-Agent Collaboration은 여러 자율 에이전트가 각자의 역할과 전문성을 가지고 협력하여 복잡한 작업을 해결하는 패턴입니다.
Workflow Agents vs Multi-Agent Collaboration
| 특성 | Workflow Agents | Multi-Agent Collaboration | |------|----------------|---------------------------| | 제어 | 중앙 집중식 코디네이터 | 분산형, 역할 기반 피어 | | 상호작용 | 하나의 에이전트가 위임 및 추적 | 여러 에이전트가 협상, 공유, 적응 | | 설계 | 사전 정의된 작업 순서 | 창발적, 유연한 작업 분배 | | 조정 | 절차적 오케스트레이션 | 협력적 또는 경쟁적 상호작용 | | 사용 사례 | 엔터프라이즈 프로세스 자동화 | 복잡한 추론, 탐색, 창발적 전략 |
아키텍처
User Request
↓
Manager Agent (초기화)
↓
┌─────────┬─────────┬─────────┐
│ Planner │Research │ Executor│ ← 역할 할당
│ Agent │ Agent │ Agent │
└────┬────┴────┬────┴────┬────┘
│ │ │
└────→ Shared Memory ←────┘ ← 통신 및 협업
↓
Supervisor Agent (선택적)
↓
Final Output
작동 메커니즘
class MultiAgentSystem:
def __init__(self):
self.agents = {
"planner": PlannerAgent(),
"researcher": ResearchAgent(),
"coder": CoderAgent(),
"tester": TesterAgent(),
"reviewer": ReviewerAgent()
}
self.shared_memory = SharedMemory()
self.message_queue = MessageQueue()
def execute(self, task):
# 1. 작업 초기화
self.shared_memory.set("task", task)
# 2. 역할 할당
roles = self.assign_roles(task)
# 3. 에이전트 간 협업
while not self.is_complete():
for agent_name in roles:
agent = self.agents[agent_name]
# 공유 메모리에서 컨텍스트 읽기
context = self.shared_memory.get_context()
# 에이전트 실행
result = agent.process(context)
# 결과를 공유 메모리에 저장
self.shared_memory.update(agent_name, result)
# 다른 에이전트에게 메시지 전송
if result.needs_collaboration:
self.message_queue.send(
from_agent=agent_name,
to_agents=result.target_agents,
message=result.message
)
# 4. 결과 통합
return self.synthesize_results()
통신 메커니즘
1. Shared Memory (공유 메모리)
class SharedMemory:
def __init__(self):
self.data = {}
self.lock = threading.Lock()
def write(self, key, value, agent_id):
with self.lock:
self.data[key] = {
"value": value,
"author": agent_id,
"timestamp": time.time()
}
def read(self, key):
return self.data.get(key)
def query(self, filter_fn):
return [v for v in self.data.values() if filter_fn(v)]
2. Message Passing (메시지 전달)
class MessageQueue:
def send(self, from_agent, to_agent, message):
event = {
"from": from_agent,
"to": to_agent,
"message": message,
"timestamp": time.time()
}
self.queue.put(event)
def receive(self, agent_id):
messages = []
while not self.queue.empty():
event = self.queue.get()
if event["to"] == agent_id:
messages.append(event)
return messages
3. Prompt Chaining (프롬프트 체이닝)
def prompt_chain_collaboration(task):
# Agent 1: 계획 수립
plan = planner_agent.create_plan(task)
# Agent 2: 연구 수행
research = researcher_agent.research(plan)
# Agent 3: 코드 작성
code = coder_agent.write_code(plan, research)
# Agent 4: 테스트
test_results = tester_agent.test(code)
# Agent 5: 리뷰
review = reviewer_agent.review(code, test_results)
return review
적용 케이스
1. 소프트웨어 개발 팀
class SoftwareDevelopmentTeam:
def develop_feature(self, requirements):
# Planner: 작업 분해
tasks = self.planner.decompose(requirements)
# Architect: 설계
design = self.architect.design(tasks)
# Coder: 구현 (병렬)
code_results = []
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(self.coder.implement, task)
for task in tasks
]
code_results = [f.result() for f in futures]
# Tester: 테스트
test_results = self.tester.test_all(code_results)
# Reviewer: 코드 리뷰
review = self.reviewer.review(code_results, test_results)
# Documenter: 문서화
docs = self.documenter.document(code_results, design)
return {
"code": code_results,
"tests": test_results,
"review": review,
"documentation": docs
}
2. 연구 팀
class ResearchTeam:
def conduct_research(self, topic):
# Searcher: 문헌 검색
papers = self.searcher.find_papers(topic)
# Summarizer: 요약 (병렬)
summaries = self.parallel_summarize(papers)
# Analyst: 분석
analysis = self.analyst.analyze(summaries)
# Validator: 검증
validation = self.validator.validate(analysis)
# Writer: 보고서 작성
report = self.writer.write_report(analysis, validation)
return report
3. 비즈니스 시나리오 모델링
class BusinessAnalysisTeam:
def analyze_scenario(self, scenario):
# 각 에이전트가 다른 관점에서 분석
finance_view = self.finance_agent.analyze(scenario)
policy_view = self.policy_agent.analyze(scenario)
compliance_view = self.compliance_agent.analyze(scenario)
# 에이전트 간 토론
discussion = self.facilitate_discussion([
finance_view,
policy_view,
compliance_view
])
# 합의 도출
consensus = self.reach_consensus(discussion)
return consensus
AWS 구현
Amazon Bedrock Multi-Agent Collaboration
import boto3
bedrock_agent = boto3.client('bedrock-agent')
# Supervisor Agent 생성
supervisor = bedrock_agent.create_agent(
agentName='supervisor-agent',
foundationModel='anthropic.claude-3-sonnet-20240229-v1:0',
instruction='''
당신은 여러 전문 에이전트를 조정하는 슈퍼바이저입니다.
작업을 분석하고 적절한 에이전트에게 위임하세요.
'''
)
# Collaborator Agents 생성
researcher = bedrock_agent.create_agent(
agentName='researcher-agent',
foundationModel='anthropic.claude-3-sonnet-20240229-v1:0',
instruction='당신은 정보 검색 및 연구 전문가입니다.'
)
coder = bedrock_agent.create_agent(
agentName='coder-agent',
foundationModel='anthropic.claude-3-sonnet-20240229-v1:0',
instruction='당신은 코드 작성 전문가입니다.'
)
# Multi-Agent Collaboration 설정
bedrock_agent.associate_agent_collaborator(
agentId=supervisor['agentId'],
agentVersion='DRAFT',
collaboratorName='researcher',
collaboratorId=researcher['agentId']
)
bedrock_agent.associate_agent_collaborator(
agentId=supervisor['agentId'],
agentVersion='DRAFT',
collaboratorName='coder',
collaboratorId=coder['agentId']
)
Amazon SQS를 활용한 에이전트 간 통신
import boto3
import json
sqs = boto3.client('sqs')
class AgentCommunication:
def __init__(self, agent_id):
self.agent_id = agent_id
self.queue_url = f'https://sqs.region.amazonaws.com/account/agent-{agent_id}'
def send_message(self, to_agent, message):
sqs.send_message(
QueueUrl=f'https://sqs.region.amazonaws.com/account/agent-{to_agent}',
MessageBody=json.dumps({
'from': self.agent_id,
'message': message,
'timestamp': time.time()
})
)
def receive_messages(self):
response = sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=10
)
return response.get('Messages', [])
DynamoDB를 활용한 공유 메모리
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('agent-shared-memory')
class SharedMemoryDynamoDB:
def write(self, key, value, agent_id):
table.put_item(
Item={
'key': key,
'value': value,
'agent_id': agent_id,
'timestamp': int(time.time())
}
)
def read(self, key):
response = table.get_item(Key={'key': key})
return response.get('Item')
def query_by_agent(self, agent_id):
response = table.query(
IndexName='agent-index',
KeyConditionExpression='agent_id = :agent_id',
ExpressionAttributeValues={':agent_id': agent_id}
)
return response['Items']
트레이드오프
장점:
- 전문화: 각 에이전트가 특정 영역에 최적화
- 병렬성: 독립적인 작업 동시 실행
- 확장성: 새로운 에이전트 추가 용이
- 복원력: 한 에이전트 실패가 전체 시스템에 영향 최소화
- 창발성: 에이전트 간 상호작용으로 예상치 못한 해결책 도출
단점:
- 복잡성: 여러 에이전트 관리 및 조정
- 통신 오버헤드: 에이전트 간 메시지 전달 비용
- 일관성: 공유 상태 관리의 어려움
- 디버깅: 분산 시스템 문제 추적 어려움
- 비용: 여러 LLM 호출로 인한 비용 증가
2. Workflow Orchestration Agents
패턴 개요
Workflow Orchestration Agents는 다단계 작업과 프로세스를 조정하고 관리하는 에이전트입니다. Multi-Agent Collaboration과 달리 중앙 집중식 조정을 사용합니다.
아키텍처
User Input / System Event
↓
Workflow Orchestrator
↓
┌─────────────────────────┐
│ Context Retrieval │
│ - Knowledge Base │
│ - Agent Registry │
└─────────────────────────┘
↓
LLM-based Agent Selection
↓
┌──────────┬──────────┬──────────┐
│ Worker 1 │ Worker 2 │ Worker 3 │
└──────────┴──────────┴──────────┘
↓
State Tracking & Results
작동 메커니즘
class WorkflowOrchestrator:
def __init__(self):
self.worker_agents = {}
self.state_store = StateStore()
self.agent_registry = AgentRegistry()
def orchestrate(self, task):
# 1. 워크플로우 계획 생성
workflow = self.create_workflow(task)
# 2. 상태 초기화
execution_id = self.state_store.initialize(workflow)
# 3. 단계별 실행
for step in workflow.steps:
# 적절한 워커 에이전트 선택
worker = self.select_worker(step)
# 컨텍스트 준비
context = self.state_store.get_context(execution_id)
# 워커 실행
try:
result = worker.execute(step, context)
self.state_store.update(execution_id, step.id, result)
# 재시도 로직
if result.status == "failed" and step.retryable:
result = self.retry_with_backoff(worker, step, context)
except Exception as e:
self.handle_failure(execution_id, step, e)
# 4. 결과 반환
return self.state_store.get_final_result(execution_id)
def select_worker(self, step):
# LLM을 사용한 동적 워커 선택
candidates = self.agent_registry.find_capable_agents(step)
selection_prompt = f"""
다음 작업에 가장 적합한 에이전트를 선택하세요:
작업: {step.description}
후보 에이전트:
{json.dumps(candidates, indent=2)}
선택 기준: 능력, 비용, 레이턴시, 성공률
"""
selected = llm.select(selection_prompt)
return self.worker_agents[selected]
적용 케이스
1. 고객 서비스 라우팅
class CustomerServiceOrchestrator:
def handle_request(self, customer_request):
# 1. 요청 분류
classification = self.classify_request(customer_request)
# 2. 워크플로우 결정
if classification.type == "simple":
# 단일 에이전트로 처리
return self.faq_agent.handle(customer_request)
elif classification.type == "complex":
# 다단계 워크플로우
workflow = [
("gather_info", self.info_agent),
("analyze", self.analysis_agent),
("resolve", self.resolution_agent),
("follow_up", self.followup_agent)
]
return self.execute_workflow(workflow, customer_request)
elif classification.type == "escalation":
# 인간 에이전트로 에스컬레이션
return self.escalate_to_human(customer_request)
2. 데이터 파이프라인 오케스트레이션
class DataPipelineOrchestrator:
def process_data(self, data_source):
workflow = [
{
"step": "extract",
"agent": self.extractor,
"retry": 3,
"timeout": 300
},
{
"step": "transform",
"agent": self.transformer,
"parallel": True,
"chunks": 10
},
{
"step": "validate",
"agent": self.validator,
"critical": True
},
{
"step": "load",
"agent": self.loader,
"retry": 5
}
]
return self.execute_pipeline(workflow, data_source)
AWS 구현
AWS Step Functions를 활용한 오케스트레이션
import boto3
import json
stepfunctions = boto3.client('stepfunctions')
# State Machine 정의
state_machine_definition = {
"Comment": "Workflow Orchestration Pattern",
"StartAt": "ClassifyRequest",
"States": {
"ClassifyRequest": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:classifier",
"Next": "RouteToWorker"
},
"RouteToWorker": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.category",
"StringEquals": "technical",
"Next": "TechnicalWorker"
},
{
"Variable": "$.category",
"StringEquals": "billing",
"Next": "BillingWorker"
}
],
"Default": "GeneralWorker"
},
"TechnicalWorker": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:technical-worker",
"Retry": [
{
"ErrorEquals": ["States.TaskFailed"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "HandleFailure"
}
],
"Next": "AggregateResults"
},
"BillingWorker": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:billing-worker",
"Next": "AggregateResults"
},
"GeneralWorker": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:general-worker",
"Next": "AggregateResults"
},
"AggregateResults": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:aggregator",
"End": True
},
"HandleFailure": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:failure-handler",
"End": True
}
}
}
# State Machine 생성
response = stepfunctions.create_state_machine(
name='workflow-orchestrator',
definition=json.dumps(state_machine_definition),
roleArn='arn:aws:iam::account:role/step-functions-role'
)
# 실행
execution = stepfunctions.start_execution(
stateMachineArn=response['stateMachineArn'],
input=json.dumps({
"request": "고객 요청 내용",
"customer_id": "12345"
})
)
트레이드오프
장점:
- 명확한 제어 흐름
- 상태 추적 및 모니터링 용이
- 오류 처리 및 재시도 로직 구현 용이
- 예측 가능한 실행 경로
단점:
- 중앙 집중식 병목 가능성
- 유연성 제한 (사전 정의된 워크플로우)
- 복잡한 워크플로우 관리 어려움
패턴 선택 가이드
Multi-Agent Collaboration vs Workflow Orchestration
def select_pattern(task):
if task.requires_creativity or task.is_open_ended:
return "Multi-Agent Collaboration"
if task.has_clear_steps and task.is_predictable:
return "Workflow Orchestration"
if task.needs_diverse_perspectives:
return "Multi-Agent Collaboration"
if task.requires_strict_control:
return "Workflow Orchestration"
# 기본값
return "Workflow Orchestration"
실전 구현 예시
엔드-투-엔드 Multi-Agent 시스템
class EnterpriseMultiAgentSystem:
def __init__(self):
# 에이전트 초기화
self.agents = {
"supervisor": SupervisorAgent(),
"planner": PlannerAgent(),
"researcher": ResearchAgent(),
"analyst": AnalystAgent(),
"coder": CoderAgent(),
"tester": TesterAgent(),
"reviewer": ReviewerAgent()
}
# 인프라 초기화
self.shared_memory = DynamoDBSharedMemory()
self.message_queue = SQSMessageQueue()
self.state_machine = StepFunctionsOrchestrator()
def execute_complex_task(self, task):
# 1. Supervisor가 작업 분석
analysis = self.agents["supervisor"].analyze(task)
# 2. 실행 전략 결정
if analysis.complexity == "high":
# Multi-Agent Collaboration
return self.collaborative_execution(task, analysis)
else:
# Workflow Orchestration
return self.orchestrated_execution(task, analysis)
def collaborative_execution(self, task, analysis):
# 역할 할당
roles = analysis.required_roles
# 공유 메모리 초기화
session_id = self.shared_memory.create_session(task)
# 에이전트 병렬 실행
with ThreadPoolExecutor() as executor:
futures = []
for role in roles:
agent = self.agents[role]
future = executor.submit(
self.run_agent_with_collaboration,
agent,
session_id
)
futures.append(future)
results = [f.result() for f in futures]
# 결과 통합
return self.agents["supervisor"].synthesize(results)
def orchestrated_execution(self, task, analysis):
# Step Functions 워크플로우 생성
workflow = self.create_workflow(analysis)
# 실행
execution_arn = self.state_machine.start_execution(workflow, task)
# 결과 대기
return self.state_machine.wait_for_completion(execution_arn)
모니터링 및 관찰성
class AgentMonitoring:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
self.xray = boto3.client('xray')
def track_agent_execution(self, agent_id, execution_data):
# 메트릭 전송
self.cloudwatch.put_metric_data(
Namespace='MultiAgentSystem',
MetricData=[
{
'MetricName': 'AgentExecutionTime',
'Value': execution_data['duration'],
'Unit': 'Milliseconds',
'Dimensions': [
{'Name': 'AgentId', 'Value': agent_id},
{'Name': 'TaskType', 'Value': execution_data['task_type']}
]
},
{
'MetricName': 'AgentTokenUsage',
'Value': execution_data['tokens'],
'Unit': 'Count',
'Dimensions': [
{'Name': 'AgentId', 'Value': agent_id}
]
}
]
)
# X-Ray 트레이싱
segment = self.xray.begin_segment(f'agent-{agent_id}')
segment.put_annotation('task_type', execution_data['task_type'])
segment.put_metadata('execution_details', execution_data)
self.xray.end_segment()
시리즈 마무리
이 시리즈에서 다룬 주요 패턴들:
Part 1: 기초 패턴
- ReAct: 추론과 행동의 결합
- Reflection: 자기 개선 메커니즘
- Tool Use: 외부 능력 통합
Part 2: 오케스트레이션
- Planning: 작업 분해와 실행
- Routing: 지능형 작업 분배
- Human-in-the-Loop: 인간 검토 통합
Part 3: Multi-Agent
- Multi-Agent Collaboration: 분산 협업
- Workflow Orchestration: 중앙 집중식 조정
패턴 조합 전략
실제 프로덕션 시스템은 여러 패턴을 조합합니다:
# 예시: 엔터프라이즈 AI 시스템
system = {
"base": "ReAct", # 기본 추론 패턴
"quality": "Reflection", # 품질 보장
"capabilities": "Tool Use", # 외부 통합
"complexity": "Planning", # 복잡한 작업 분해
"specialization": "Routing", # 전문 에이전트 활용
"safety": "Human-in-the-Loop", # 중요 결정 검토
"scale": "Multi-Agent" # 대규모 병렬 처리
}
