S
STONI
AI
Agentic AI
AWS
Design Patterns
Planning
Routing
HITL
Orchestration
Step Functions
EventBridge

Agentic AI 디자인 패턴 (Part 3): Multi-Agent 시스템

Agentic AI 디자인 패턴 (Part 3): Multi-Agent 시스템

시리즈의 마지막 편에서는 여러 에이전트가 협력하여 복잡한 문제를 해결하는 Multi-Agent 패턴을 다룹니다. 이는 단일 에이전트로는 해결하기 어려운 복잡한 도메인에서 필수적입니다.

Multi-Agent vs Single-Agent

언제 Multi-Agent를 사용해야 하는가?

Single-Agent가 적합한 경우:

  • 작업이 단일 도메인에 속함
  • 순차적 처리로 충분
  • 컨텍스트 공유가 중요

Multi-Agent가 필요한 경우:

  • 여러 전문 영역이 필요
  • 병렬 처리로 성능 향상 가능
  • 각 에이전트가 독립적으로 작동 가능
  • 확장성과 모듈성이 중요

1. Multi-Agent Collaboration Pattern

패턴 개요

Multi-Agent Collaboration은 여러 자율 에이전트가 각자의 역할과 전문성을 가지고 협력하여 복잡한 작업을 해결하는 패턴입니다.

Workflow Agents vs Multi-Agent Collaboration

| 특성 | Workflow Agents | Multi-Agent Collaboration | |------|----------------|---------------------------| | 제어 | 중앙 집중식 코디네이터 | 분산형, 역할 기반 피어 | | 상호작용 | 하나의 에이전트가 위임 및 추적 | 여러 에이전트가 협상, 공유, 적응 | | 설계 | 사전 정의된 작업 순서 | 창발적, 유연한 작업 분배 | | 조정 | 절차적 오케스트레이션 | 협력적 또는 경쟁적 상호작용 | | 사용 사례 | 엔터프라이즈 프로세스 자동화 | 복잡한 추론, 탐색, 창발적 전략 |

아키텍처

User Request
    ↓
Manager Agent (초기화)
    ↓
┌─────────┬─────────┬─────────┐
│ Planner │Research │ Executor│  ← 역할 할당
│ Agent   │ Agent   │ Agent   │
└────┬────┴────┬────┴────┬────┘
     │         │         │
     └────→ Shared Memory ←────┘  ← 통신 및 협업
              ↓
         Supervisor Agent (선택적)
              ↓
         Final Output

작동 메커니즘

class MultiAgentSystem:
    def __init__(self):
        self.agents = {
            "planner": PlannerAgent(),
            "researcher": ResearchAgent(),
            "coder": CoderAgent(),
            "tester": TesterAgent(),
            "reviewer": ReviewerAgent()
        }
        self.shared_memory = SharedMemory()
        self.message_queue = MessageQueue()
    
    def execute(self, task):
        # 1. 작업 초기화
        self.shared_memory.set("task", task)
        
        # 2. 역할 할당
        roles = self.assign_roles(task)
        
        # 3. 에이전트 간 협업
        while not self.is_complete():
            for agent_name in roles:
                agent = self.agents[agent_name]
                
                # 공유 메모리에서 컨텍스트 읽기
                context = self.shared_memory.get_context()
                
                # 에이전트 실행
                result = agent.process(context)
                
                # 결과를 공유 메모리에 저장
                self.shared_memory.update(agent_name, result)
                
                # 다른 에이전트에게 메시지 전송
                if result.needs_collaboration:
                    self.message_queue.send(
                        from_agent=agent_name,
                        to_agents=result.target_agents,
                        message=result.message
                    )
        
        # 4. 결과 통합
        return self.synthesize_results()

통신 메커니즘

1. Shared Memory (공유 메모리)

class SharedMemory:
    def __init__(self):
        self.data = {}
        self.lock = threading.Lock()
    
    def write(self, key, value, agent_id):
        with self.lock:
            self.data[key] = {
                "value": value,
                "author": agent_id,
                "timestamp": time.time()
            }
    
    def read(self, key):
        return self.data.get(key)
    
    def query(self, filter_fn):
        return [v for v in self.data.values() if filter_fn(v)]

2. Message Passing (메시지 전달)

class MessageQueue:
    def send(self, from_agent, to_agent, message):
        event = {
            "from": from_agent,
            "to": to_agent,
            "message": message,
            "timestamp": time.time()
        }
        self.queue.put(event)
    
    def receive(self, agent_id):
        messages = []
        while not self.queue.empty():
            event = self.queue.get()
            if event["to"] == agent_id:
                messages.append(event)
        return messages

3. Prompt Chaining (프롬프트 체이닝)

def prompt_chain_collaboration(task):
    # Agent 1: 계획 수립
    plan = planner_agent.create_plan(task)
    
    # Agent 2: 연구 수행
    research = researcher_agent.research(plan)
    
    # Agent 3: 코드 작성
    code = coder_agent.write_code(plan, research)
    
    # Agent 4: 테스트
    test_results = tester_agent.test(code)
    
    # Agent 5: 리뷰
    review = reviewer_agent.review(code, test_results)
    
    return review

적용 케이스

1. 소프트웨어 개발 팀

class SoftwareDevelopmentTeam:
    def develop_feature(self, requirements):
        # Planner: 작업 분해
        tasks = self.planner.decompose(requirements)
        
        # Architect: 설계
        design = self.architect.design(tasks)
        
        # Coder: 구현 (병렬)
        code_results = []
        with ThreadPoolExecutor() as executor:
            futures = [
                executor.submit(self.coder.implement, task)
                for task in tasks
            ]
            code_results = [f.result() for f in futures]
        
        # Tester: 테스트
        test_results = self.tester.test_all(code_results)
        
        # Reviewer: 코드 리뷰
        review = self.reviewer.review(code_results, test_results)
        
        # Documenter: 문서화
        docs = self.documenter.document(code_results, design)
        
        return {
            "code": code_results,
            "tests": test_results,
            "review": review,
            "documentation": docs
        }

2. 연구 팀

class ResearchTeam:
    def conduct_research(self, topic):
        # Searcher: 문헌 검색
        papers = self.searcher.find_papers(topic)
        
        # Summarizer: 요약 (병렬)
        summaries = self.parallel_summarize(papers)
        
        # Analyst: 분석
        analysis = self.analyst.analyze(summaries)
        
        # Validator: 검증
        validation = self.validator.validate(analysis)
        
        # Writer: 보고서 작성
        report = self.writer.write_report(analysis, validation)
        
        return report

3. 비즈니스 시나리오 모델링

class BusinessAnalysisTeam:
    def analyze_scenario(self, scenario):
        # 각 에이전트가 다른 관점에서 분석
        finance_view = self.finance_agent.analyze(scenario)
        policy_view = self.policy_agent.analyze(scenario)
        compliance_view = self.compliance_agent.analyze(scenario)
        
        # 에이전트 간 토론
        discussion = self.facilitate_discussion([
            finance_view,
            policy_view,
            compliance_view
        ])
        
        # 합의 도출
        consensus = self.reach_consensus(discussion)
        
        return consensus

AWS 구현

Amazon Bedrock Multi-Agent Collaboration

import boto3

bedrock_agent = boto3.client('bedrock-agent')

# Supervisor Agent 생성
supervisor = bedrock_agent.create_agent(
    agentName='supervisor-agent',
    foundationModel='anthropic.claude-3-sonnet-20240229-v1:0',
    instruction='''
    당신은 여러 전문 에이전트를 조정하는 슈퍼바이저입니다.
    작업을 분석하고 적절한 에이전트에게 위임하세요.
    '''
)

# Collaborator Agents 생성
researcher = bedrock_agent.create_agent(
    agentName='researcher-agent',
    foundationModel='anthropic.claude-3-sonnet-20240229-v1:0',
    instruction='당신은 정보 검색 및 연구 전문가입니다.'
)

coder = bedrock_agent.create_agent(
    agentName='coder-agent',
    foundationModel='anthropic.claude-3-sonnet-20240229-v1:0',
    instruction='당신은 코드 작성 전문가입니다.'
)

# Multi-Agent Collaboration 설정
bedrock_agent.associate_agent_collaborator(
    agentId=supervisor['agentId'],
    agentVersion='DRAFT',
    collaboratorName='researcher',
    collaboratorId=researcher['agentId']
)

bedrock_agent.associate_agent_collaborator(
    agentId=supervisor['agentId'],
    agentVersion='DRAFT',
    collaboratorName='coder',
    collaboratorId=coder['agentId']
)

Amazon SQS를 활용한 에이전트 간 통신

import boto3
import json

sqs = boto3.client('sqs')

class AgentCommunication:
    def __init__(self, agent_id):
        self.agent_id = agent_id
        self.queue_url = f'https://sqs.region.amazonaws.com/account/agent-{agent_id}'
    
    def send_message(self, to_agent, message):
        sqs.send_message(
            QueueUrl=f'https://sqs.region.amazonaws.com/account/agent-{to_agent}',
            MessageBody=json.dumps({
                'from': self.agent_id,
                'message': message,
                'timestamp': time.time()
            })
        )
    
    def receive_messages(self):
        response = sqs.receive_message(
            QueueUrl=self.queue_url,
            MaxNumberOfMessages=10
        )
        return response.get('Messages', [])

DynamoDB를 활용한 공유 메모리

import boto3

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('agent-shared-memory')

class SharedMemoryDynamoDB:
    def write(self, key, value, agent_id):
        table.put_item(
            Item={
                'key': key,
                'value': value,
                'agent_id': agent_id,
                'timestamp': int(time.time())
            }
        )
    
    def read(self, key):
        response = table.get_item(Key={'key': key})
        return response.get('Item')
    
    def query_by_agent(self, agent_id):
        response = table.query(
            IndexName='agent-index',
            KeyConditionExpression='agent_id = :agent_id',
            ExpressionAttributeValues={':agent_id': agent_id}
        )
        return response['Items']

트레이드오프

장점:

  • 전문화: 각 에이전트가 특정 영역에 최적화
  • 병렬성: 독립적인 작업 동시 실행
  • 확장성: 새로운 에이전트 추가 용이
  • 복원력: 한 에이전트 실패가 전체 시스템에 영향 최소화
  • 창발성: 에이전트 간 상호작용으로 예상치 못한 해결책 도출

단점:

  • 복잡성: 여러 에이전트 관리 및 조정
  • 통신 오버헤드: 에이전트 간 메시지 전달 비용
  • 일관성: 공유 상태 관리의 어려움
  • 디버깅: 분산 시스템 문제 추적 어려움
  • 비용: 여러 LLM 호출로 인한 비용 증가

2. Workflow Orchestration Agents

패턴 개요

Workflow Orchestration Agents는 다단계 작업과 프로세스를 조정하고 관리하는 에이전트입니다. Multi-Agent Collaboration과 달리 중앙 집중식 조정을 사용합니다.

아키텍처

User Input / System Event
    ↓
Workflow Orchestrator
    ↓
┌─────────────────────────┐
│ Context Retrieval       │
│ - Knowledge Base        │
│ - Agent Registry        │
└─────────────────────────┘
    ↓
LLM-based Agent Selection
    ↓
┌──────────┬──────────┬──────────┐
│ Worker 1 │ Worker 2 │ Worker 3 │
└──────────┴──────────┴──────────┘
    ↓
State Tracking & Results

작동 메커니즘

class WorkflowOrchestrator:
    def __init__(self):
        self.worker_agents = {}
        self.state_store = StateStore()
        self.agent_registry = AgentRegistry()
    
    def orchestrate(self, task):
        # 1. 워크플로우 계획 생성
        workflow = self.create_workflow(task)
        
        # 2. 상태 초기화
        execution_id = self.state_store.initialize(workflow)
        
        # 3. 단계별 실행
        for step in workflow.steps:
            # 적절한 워커 에이전트 선택
            worker = self.select_worker(step)
            
            # 컨텍스트 준비
            context = self.state_store.get_context(execution_id)
            
            # 워커 실행
            try:
                result = worker.execute(step, context)
                self.state_store.update(execution_id, step.id, result)
                
                # 재시도 로직
                if result.status == "failed" and step.retryable:
                    result = self.retry_with_backoff(worker, step, context)
                
            except Exception as e:
                self.handle_failure(execution_id, step, e)
        
        # 4. 결과 반환
        return self.state_store.get_final_result(execution_id)
    
    def select_worker(self, step):
        # LLM을 사용한 동적 워커 선택
        candidates = self.agent_registry.find_capable_agents(step)
        
        selection_prompt = f"""
        다음 작업에 가장 적합한 에이전트를 선택하세요:
        작업: {step.description}
        
        후보 에이전트:
        {json.dumps(candidates, indent=2)}
        
        선택 기준: 능력, 비용, 레이턴시, 성공률
        """
        
        selected = llm.select(selection_prompt)
        return self.worker_agents[selected]

적용 케이스

1. 고객 서비스 라우팅

class CustomerServiceOrchestrator:
    def handle_request(self, customer_request):
        # 1. 요청 분류
        classification = self.classify_request(customer_request)
        
        # 2. 워크플로우 결정
        if classification.type == "simple":
            # 단일 에이전트로 처리
            return self.faq_agent.handle(customer_request)
        
        elif classification.type == "complex":
            # 다단계 워크플로우
            workflow = [
                ("gather_info", self.info_agent),
                ("analyze", self.analysis_agent),
                ("resolve", self.resolution_agent),
                ("follow_up", self.followup_agent)
            ]
            
            return self.execute_workflow(workflow, customer_request)
        
        elif classification.type == "escalation":
            # 인간 에이전트로 에스컬레이션
            return self.escalate_to_human(customer_request)

2. 데이터 파이프라인 오케스트레이션

class DataPipelineOrchestrator:
    def process_data(self, data_source):
        workflow = [
            {
                "step": "extract",
                "agent": self.extractor,
                "retry": 3,
                "timeout": 300
            },
            {
                "step": "transform",
                "agent": self.transformer,
                "parallel": True,
                "chunks": 10
            },
            {
                "step": "validate",
                "agent": self.validator,
                "critical": True
            },
            {
                "step": "load",
                "agent": self.loader,
                "retry": 5
            }
        ]
        
        return self.execute_pipeline(workflow, data_source)

AWS 구현

AWS Step Functions를 활용한 오케스트레이션

import boto3
import json

stepfunctions = boto3.client('stepfunctions')

# State Machine 정의
state_machine_definition = {
    "Comment": "Workflow Orchestration Pattern",
    "StartAt": "ClassifyRequest",
    "States": {
        "ClassifyRequest": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:classifier",
            "Next": "RouteToWorker"
        },
        "RouteToWorker": {
            "Type": "Choice",
            "Choices": [
                {
                    "Variable": "$.category",
                    "StringEquals": "technical",
                    "Next": "TechnicalWorker"
                },
                {
                    "Variable": "$.category",
                    "StringEquals": "billing",
                    "Next": "BillingWorker"
                }
            ],
            "Default": "GeneralWorker"
        },
        "TechnicalWorker": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:technical-worker",
            "Retry": [
                {
                    "ErrorEquals": ["States.TaskFailed"],
                    "IntervalSeconds": 2,
                    "MaxAttempts": 3,
                    "BackoffRate": 2.0
                }
            ],
            "Catch": [
                {
                    "ErrorEquals": ["States.ALL"],
                    "Next": "HandleFailure"
                }
            ],
            "Next": "AggregateResults"
        },
        "BillingWorker": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:billing-worker",
            "Next": "AggregateResults"
        },
        "GeneralWorker": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:general-worker",
            "Next": "AggregateResults"
        },
        "AggregateResults": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:aggregator",
            "End": True
        },
        "HandleFailure": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:failure-handler",
            "End": True
        }
    }
}

# State Machine 생성
response = stepfunctions.create_state_machine(
    name='workflow-orchestrator',
    definition=json.dumps(state_machine_definition),
    roleArn='arn:aws:iam::account:role/step-functions-role'
)

# 실행
execution = stepfunctions.start_execution(
    stateMachineArn=response['stateMachineArn'],
    input=json.dumps({
        "request": "고객 요청 내용",
        "customer_id": "12345"
    })
)

트레이드오프

장점:

  • 명확한 제어 흐름
  • 상태 추적 및 모니터링 용이
  • 오류 처리 및 재시도 로직 구현 용이
  • 예측 가능한 실행 경로

단점:

  • 중앙 집중식 병목 가능성
  • 유연성 제한 (사전 정의된 워크플로우)
  • 복잡한 워크플로우 관리 어려움

패턴 선택 가이드

Multi-Agent Collaboration vs Workflow Orchestration

def select_pattern(task):
    if task.requires_creativity or task.is_open_ended:
        return "Multi-Agent Collaboration"
    
    if task.has_clear_steps and task.is_predictable:
        return "Workflow Orchestration"
    
    if task.needs_diverse_perspectives:
        return "Multi-Agent Collaboration"
    
    if task.requires_strict_control:
        return "Workflow Orchestration"
    
    # 기본값
    return "Workflow Orchestration"

실전 구현 예시

엔드-투-엔드 Multi-Agent 시스템

class EnterpriseMultiAgentSystem:
    def __init__(self):
        # 에이전트 초기화
        self.agents = {
            "supervisor": SupervisorAgent(),
            "planner": PlannerAgent(),
            "researcher": ResearchAgent(),
            "analyst": AnalystAgent(),
            "coder": CoderAgent(),
            "tester": TesterAgent(),
            "reviewer": ReviewerAgent()
        }
        
        # 인프라 초기화
        self.shared_memory = DynamoDBSharedMemory()
        self.message_queue = SQSMessageQueue()
        self.state_machine = StepFunctionsOrchestrator()
    
    def execute_complex_task(self, task):
        # 1. Supervisor가 작업 분석
        analysis = self.agents["supervisor"].analyze(task)
        
        # 2. 실행 전략 결정
        if analysis.complexity == "high":
            # Multi-Agent Collaboration
            return self.collaborative_execution(task, analysis)
        else:
            # Workflow Orchestration
            return self.orchestrated_execution(task, analysis)
    
    def collaborative_execution(self, task, analysis):
        # 역할 할당
        roles = analysis.required_roles
        
        # 공유 메모리 초기화
        session_id = self.shared_memory.create_session(task)
        
        # 에이전트 병렬 실행
        with ThreadPoolExecutor() as executor:
            futures = []
            for role in roles:
                agent = self.agents[role]
                future = executor.submit(
                    self.run_agent_with_collaboration,
                    agent,
                    session_id
                )
                futures.append(future)
            
            results = [f.result() for f in futures]
        
        # 결과 통합
        return self.agents["supervisor"].synthesize(results)
    
    def orchestrated_execution(self, task, analysis):
        # Step Functions 워크플로우 생성
        workflow = self.create_workflow(analysis)
        
        # 실행
        execution_arn = self.state_machine.start_execution(workflow, task)
        
        # 결과 대기
        return self.state_machine.wait_for_completion(execution_arn)

모니터링 및 관찰성

class AgentMonitoring:
    def __init__(self):
        self.cloudwatch = boto3.client('cloudwatch')
        self.xray = boto3.client('xray')
    
    def track_agent_execution(self, agent_id, execution_data):
        # 메트릭 전송
        self.cloudwatch.put_metric_data(
            Namespace='MultiAgentSystem',
            MetricData=[
                {
                    'MetricName': 'AgentExecutionTime',
                    'Value': execution_data['duration'],
                    'Unit': 'Milliseconds',
                    'Dimensions': [
                        {'Name': 'AgentId', 'Value': agent_id},
                        {'Name': 'TaskType', 'Value': execution_data['task_type']}
                    ]
                },
                {
                    'MetricName': 'AgentTokenUsage',
                    'Value': execution_data['tokens'],
                    'Unit': 'Count',
                    'Dimensions': [
                        {'Name': 'AgentId', 'Value': agent_id}
                    ]
                }
            ]
        )
        
        # X-Ray 트레이싱
        segment = self.xray.begin_segment(f'agent-{agent_id}')
        segment.put_annotation('task_type', execution_data['task_type'])
        segment.put_metadata('execution_details', execution_data)
        self.xray.end_segment()

시리즈 마무리

이 시리즈에서 다룬 주요 패턴들:

Part 1: 기초 패턴

  • ReAct: 추론과 행동의 결합
  • Reflection: 자기 개선 메커니즘
  • Tool Use: 외부 능력 통합

Part 2: 오케스트레이션

  • Planning: 작업 분해와 실행
  • Routing: 지능형 작업 분배
  • Human-in-the-Loop: 인간 검토 통합

Part 3: Multi-Agent

  • Multi-Agent Collaboration: 분산 협업
  • Workflow Orchestration: 중앙 집중식 조정

패턴 조합 전략

실제 프로덕션 시스템은 여러 패턴을 조합합니다:

# 예시: 엔터프라이즈 AI 시스템
system = {
    "base": "ReAct",  # 기본 추론 패턴
    "quality": "Reflection",  # 품질 보장
    "capabilities": "Tool Use",  # 외부 통합
    "complexity": "Planning",  # 복잡한 작업 분해
    "specialization": "Routing",  # 전문 에이전트 활용
    "safety": "Human-in-the-Loop",  # 중요 결정 검토
    "scale": "Multi-Agent"  # 대규모 병렬 처리
}

참고 자료

Clickable cat