S
STONI
AI
Agentic AI
AWS
Design Patterns
Planning
Routing
HITL
Orchestration
Step Functions
EventBridge

Agentic AI 디자인 패턴 (Part 2): 오케스트레이션과 라우팅

Agentic AI 디자인 패턴 (Part 2): 오케스트레이션과 라우팅

Part 1에서 기본 패턴(ReAct, Reflection, Tool Use)을 다뤘다면, 이번 편에서는 복잡한 워크플로우를 관리하는 고급 오케스트레이션 패턴을 살펴봅니다.

1. Planning Pattern: 작업 분해와 실행

패턴 개요

Planning 패턴은 복잡한 작업을 실행 가능한 하위 작업으로 분해하고, 각 작업을 순차적 또는 병렬로 실행하는 Orchestrator-Workers 아키텍처입니다.

아키텍처

User Request
    ↓
Planner Agent (Orchestrator)
    ↓
[Task 1] [Task 2] [Task 3]  ← 하위 작업 분해
    ↓       ↓       ↓
Worker   Worker  Worker      ← 전문 에이전트
    ↓       ↓       ↓
Results Aggregation
    ↓
Final Output

작동 메커니즘

class PlanningAgent:
    def execute(self, user_request):
        # 1. Planning Phase
        plan = self.create_plan(user_request)
        # plan = [
        #     {"task": "데이터 수집", "agent": "researcher"},
        #     {"task": "데이터 분석", "agent": "analyst"},
        #     {"task": "보고서 작성", "agent": "writer"}
        # ]
        
        # 2. Execution Phase
        results = []
        for step in plan:
            worker = self.get_worker(step['agent'])
            result = worker.execute(step['task'])
            results.append(result)
            
            # 3. Re-planning (optional)
            if self.needs_replanning(result):
                plan = self.update_plan(plan, result)
        
        # 4. Synthesis Phase
        return self.synthesize(results)

적용 케이스

언제 사용해야 하는가:

  • 다단계 워크플로우가 명확한 경우
  • 각 단계가 독립적으로 실행 가능한 경우
  • 작업 진행 상황 추적이 중요한 경우

실제 사용 예시:

1. 소프트웨어 개발 자동화

plan = [
    {"task": "요구사항 분석", "agent": "analyst", "output": "spec.md"},
    {"task": "아키텍처 설계", "agent": "architect", "input": "spec.md"},
    {"task": "코드 생성", "agent": "coder", "parallel": True},
    {"task": "테스트 작성", "agent": "tester", "parallel": True},
    {"task": "문서화", "agent": "documenter"}
]

2. 연구 보고서 생성

plan = [
    {"task": "문헌 검색", "agent": "searcher"},
    {"task": "데이터 추출", "agent": "extractor"},
    {"task": "통계 분석", "agent": "analyst"},
    {"task": "보고서 작성", "agent": "writer"},
    {"task": "품질 검토", "agent": "reviewer"}
]

AWS 구현

AWS Step Functions를 활용한 Planning Pattern:

import boto3
import json

stepfunctions = boto3.client('stepfunctions')

# State Machine Definition
state_machine = {
    "Comment": "Planning Pattern Implementation",
    "StartAt": "CreatePlan",
    "States": {
        "CreatePlan": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:planner",
            "Next": "ExecuteSteps"
        },
        "ExecuteSteps": {
            "Type": "Map",
            "ItemsPath": "$.plan",
            "Iterator": {
                "StartAt": "ExecuteStep",
                "States": {
                    "ExecuteStep": {
                        "Type": "Task",
                        "Resource": "arn:aws:lambda:region:account:function:worker",
                        "End": True
                    }
                }
            },
            "Next": "SynthesizeResults"
        },
        "SynthesizeResults": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:synthesizer",
            "End": True
        }
    }
}

Planner Lambda Function:

def lambda_handler(event, context):
    user_request = event['request']
    
    # Bedrock을 사용한 계획 생성
    bedrock = boto3.client('bedrock-runtime')
    
    prompt = f"""
    다음 요청을 실행 가능한 단계로 분해하세요:
    {user_request}
    
    각 단계는 다음 형식으로 출력:
    - task: 작업 설명
    - agent: 담당 에이전트
    - dependencies: 선행 작업 (있는 경우)
    """
    
    response = bedrock.invoke_model(
        modelId='anthropic.claude-3-sonnet-20240229-v1:0',
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "messages": [{"role": "user", "content": prompt}],
            "max_tokens": 2000
        })
    )
    
    plan = parse_plan(response)
    return {"plan": plan}

동적 재계획 (Re-planning)

def execute_with_replanning(plan):
    for step in plan:
        result = execute_step(step)
        
        # 실행 결과 평가
        if result.status == "failed":
            # 실패 시 대체 계획 생성
            alternative_plan = create_alternative_plan(step, result.error)
            plan = insert_alternative(plan, alternative_plan)
        
        elif result.status == "partial":
            # 부분 성공 시 추가 단계 삽입
            additional_steps = create_additional_steps(step, result)
            plan = insert_steps(plan, additional_steps)
    
    return plan

트레이드오프

장점:

  • 명확성: 각 단계가 명시적으로 정의됨
  • 추적성: 진행 상황 모니터링 용이
  • 병렬화: 독립적인 작업 동시 실행 가능
  • 재사용성: 계획 템플릿 재사용

단점:

  • 오버헤드: 계획 생성에 추가 시간 소요
  • 복잡성: 의존성 관리 필요
  • 유연성 제한: 사전 정의된 구조에 의존

2. Routing Pattern: 지능형 작업 분배

패턴 개요

Routing 패턴은 들어오는 요청을 분석하여 가장 적합한 전문 에이전트로 라우팅하는 패턴입니다. 이는 단일 범용 에이전트보다 여러 전문 에이전트를 사용하는 것이 효율적일 때 사용됩니다.

라우팅 전략

1. 의도 기반 라우팅 (Intent-based Routing)

class IntentRouter:
    def __init__(self):
        self.agents = {
            "technical_support": TechnicalSupportAgent(),
            "billing": BillingAgent(),
            "sales": SalesAgent(),
            "general": GeneralAgent()
        }
    
    def route(self, user_message):
        # LLM을 사용한 의도 분류
        intent = self.classify_intent(user_message)
        
        agent = self.agents.get(intent, self.agents["general"])
        return agent.handle(user_message)
    
    def classify_intent(self, message):
        prompt = f"""
        다음 메시지의 의도를 분류하세요:
        {message}
        
        가능한 의도: technical_support, billing, sales, general
        """
        return llm.classify(prompt)

2. 능력 기반 라우팅 (Capability-based Routing)

class CapabilityRouter:
    def __init__(self):
        self.agent_registry = {
            "data_analysis": {
                "agent": DataAnalystAgent(),
                "capabilities": ["statistics", "visualization", "sql"],
                "cost": 0.05,
                "latency": 2.0
            },
            "code_generation": {
                "agent": CoderAgent(),
                "capabilities": ["python", "javascript", "testing"],
                "cost": 0.03,
                "latency": 1.5
            }
        }
    
    def route(self, task):
        required_capabilities = self.extract_capabilities(task)
        
        # 능력, 비용, 레이턴시를 고려한 최적 에이전트 선택
        best_agent = self.select_best_agent(
            required_capabilities,
            optimize_for="latency"  # or "cost"
        )
        
        return best_agent.execute(task)

3. 계층적 라우팅 (Hierarchical Routing)

class HierarchicalRouter:
    def route(self, request):
        # Level 1: 도메인 분류
        domain = self.classify_domain(request)  # "engineering", "business", etc.
        
        # Level 2: 하위 카테고리 분류
        category = self.classify_category(request, domain)
        
        # Level 3: 전문 에이전트 선택
        agent = self.get_specialist(domain, category)
        
        return agent.handle(request)

적용 케이스

언제 사용해야 하는가:

  • 다양한 유형의 요청을 처리해야 하는 경우
  • 각 도메인에 전문화된 에이전트가 있는 경우
  • 비용과 성능 최적화가 중요한 경우

실제 사용 예시:

고객 지원 시스템

router = CustomerSupportRouter()

# 기술 문의
router.route("로그인이 안 됩니다") 
# → TechnicalSupportAgent

# 결제 문의
router.route("환불 요청합니다")
# → BillingAgent

# 복합 문의
router.route("결제 후 서비스 접속이 안 됩니다")
# → [BillingAgent, TechnicalSupportAgent] (순차 또는 병렬)

AWS 구현

Amazon EventBridge를 활용한 이벤트 기반 라우팅:

import boto3
import json

eventbridge = boto3.client('events')

def route_request(request):
    # LLM을 사용한 요청 분류
    classification = classify_request(request)
    
    # EventBridge로 이벤트 발행
    eventbridge.put_events(
        Entries=[
            {
                'Source': 'agentic.router',
                'DetailType': classification['intent'],
                'Detail': json.dumps({
                    'request': request,
                    'priority': classification['priority'],
                    'capabilities': classification['required_capabilities']
                }),
                'EventBusName': 'agentic-ai-bus'
            }
        ]
    )

EventBridge Rule 정의:

{
  "Rules": [
    {
      "Name": "route-to-technical-support",
      "EventPattern": {
        "source": ["agentic.router"],
        "detail-type": ["technical_support"]
      },
      "Targets": [
        {
          "Arn": "arn:aws:lambda:region:account:function:technical-support-agent",
          "Id": "1"
        }
      ]
    },
    {
      "Name": "route-to-billing",
      "EventPattern": {
        "source": ["agentic.router"],
        "detail-type": ["billing"]
      },
      "Targets": [
        {
          "Arn": "arn:aws:lambda:region:account:function:billing-agent",
          "Id": "1"
        }
      ]
    }
  ]
}

Handoff Pattern (핸드오프)

에이전트 간 작업 전달:

class HandoffRouter:
    def execute(self, request):
        current_agent = self.initial_agent
        context = {"request": request, "history": []}
        
        while not context.get("completed"):
            # 현재 에이전트 실행
            result = current_agent.process(context)
            context["history"].append(result)
            
            # 핸드오프 필요 여부 확인
            if result.needs_handoff:
                next_agent = self.select_next_agent(result.handoff_reason)
                context["handoff_reason"] = result.handoff_reason
                current_agent = next_agent
            else:
                context["completed"] = True
        
        return context["history"]

트레이드오프

장점:

  • 전문화: 각 에이전트가 특정 도메인에 최적화
  • 확장성: 새로운 에이전트 추가 용이
  • 비용 효율: 작업에 맞는 모델 선택 가능

단점:

  • 라우팅 오버헤드: 분류 단계 추가
  • 컨텍스트 손실: 에이전트 간 전환 시 정보 손실 가능
  • 복잡성: 여러 에이전트 관리 필요

3. Human-in-the-Loop Pattern: 인간 검토 통합

패턴 개요

Human-in-the-Loop(HITL) 패턴은 중요한 결정 지점에서 에이전트가 실행을 일시 중지하고 인간의 검토나 승인을 받는 패턴입니다.

작동 메커니즘

class HITLAgent:
    def execute(self, task):
        # 1. 초기 분석
        analysis = self.analyze(task)
        
        # 2. 위험도 평가
        risk_level = self.assess_risk(analysis)
        
        # 3. 인간 개입 필요 여부 결정
        if risk_level > self.threshold:
            # 인간 검토 요청
            approval = self.request_human_review(analysis)
            
            if not approval.approved:
                return self.handle_rejection(approval.feedback)
        
        # 4. 실행
        return self.execute_action(analysis)

개입 지점 결정

1. 위험 기반 개입

def should_request_review(action, context):
    risk_factors = {
        "financial_impact": action.cost > 10000,
        "data_sensitivity": action.accesses_pii,
        "irreversibility": not action.can_rollback,
        "confidence": action.confidence < 0.8
    }
    
    return any(risk_factors.values())

2. 정책 기반 개입

policies = {
    "financial": {
        "threshold": 5000,
        "requires": ["manager_approval"]
    },
    "data_access": {
        "pii": True,
        "requires": ["security_review", "legal_review"]
    }
}

적용 케이스

언제 사용해야 하는가:

  • 높은 위험이나 비용이 수반되는 작업
  • 규제 준수가 필요한 경우 (GDPR, 금융 규제)
  • 에이전트의 신뢰도가 충분하지 않은 경우
  • 최종 결정에 인간의 판단이 필요한 경우

실제 사용 예시:

1. 금융 거래 승인

class FinancialAgent:
    def process_transaction(self, transaction):
        # 자동 처리 가능 여부 확인
        if transaction.amount < 1000:
            return self.auto_approve(transaction)
        
        # 인간 검토 필요
        review_request = {
            "transaction": transaction,
            "risk_analysis": self.analyze_risk(transaction),
            "recommendation": self.get_recommendation(transaction)
        }
        
        approval = self.request_approval(review_request)
        
        if approval.approved:
            return self.execute_transaction(transaction)
        else:
            return self.handle_rejection(approval.reason)

2. 콘텐츠 게시 승인

class ContentPublisher:
    def publish(self, content):
        # 자동 검증
        validation = self.validate_content(content)
        
        if validation.has_issues:
            # 인간 검토 요청
            review = self.request_content_review(content, validation)
            
            if review.requires_changes:
                content = self.apply_changes(content, review.suggestions)
        
        return self.publish_content(content)

AWS 구현

AWS Step Functions의 Task Token을 활용한 구현:

# Step Functions State Machine
{
    "StartAt": "ProcessRequest",
    "States": {
        "ProcessRequest": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:processor",
            "Next": "CheckRisk"
        },
        "CheckRisk": {
            "Type": "Choice",
            "Choices": [
                {
                    "Variable": "$.riskLevel",
                    "StringEquals": "HIGH",
                    "Next": "RequestHumanApproval"
                }
            ],
            "Default": "ExecuteAction"
        },
        "RequestHumanApproval": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
            "Parameters": {
                "FunctionName": "request-approval",
                "Payload": {
                    "taskToken.$": "$$.Task.Token",
                    "request.$": "$"
                }
            },
            "Next": "ExecuteAction"
        },
        "ExecuteAction": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:region:account:function:executor",
            "End": True
        }
    }
}

승인 요청 Lambda:

def lambda_handler(event, context):
    task_token = event['taskToken']
    request = event['request']
    
    # SNS로 승인 요청 알림
    sns = boto3.client('sns')
    sns.publish(
        TopicArn='arn:aws:sns:region:account:approval-requests',
        Subject='Approval Required',
        Message=json.dumps({
            'taskToken': task_token,
            'request': request,
            'approvalUrl': f'https://approval.example.com/{task_token}'
        })
    )
    
    # 승인/거부는 별도 API를 통해 처리
    # POST /approve with taskToken

승인 처리 API:

def approve_request(task_token, approved, feedback):
    stepfunctions = boto3.client('stepfunctions')
    
    if approved:
        stepfunctions.send_task_success(
            taskToken=task_token,
            output=json.dumps({'approved': True, 'feedback': feedback})
        )
    else:
        stepfunctions.send_task_failure(
            taskToken=task_token,
            error='ApprovalDenied',
            cause=feedback
        )

사용자 경험 최적화

1. 비동기 승인

# 사용자는 즉시 응답 받음
response = {
    "status": "pending_approval",
    "request_id": "req-123",
    "estimated_time": "2-4 hours",
    "notification_channels": ["email", "slack"]
}

# 백그라운드에서 승인 프로세스 진행

2. 승인 컨텍스트 제공

approval_request = {
    "action": "Deploy to Production",
    "context": {
        "changes": ["Updated API endpoint", "Added new feature"],
        "impact": "Affects 10,000 users",
        "rollback_plan": "Automated rollback available",
        "test_results": "All tests passed"
    },
    "recommendation": {
        "approve": True,
        "confidence": 0.92,
        "reasoning": "Low risk deployment with comprehensive tests"
    }
}

트레이드오프

장점:

  • 안전성: 중요한 결정에 인간 판단 추가
  • 규제 준수: 감사 추적 및 승인 기록
  • 신뢰성: 에이전트 오류 방지

단점:

  • 레이턴시: 인간 응답 대기 시간
  • 확장성 제한: 인간 개입이 병목
  • 비용: 인적 자원 필요

패턴 조합 전략

실제 프로덕션 시스템에서는 여러 패턴을 조합합니다:

예시: 엔터프라이즈 문서 처리 시스템

class DocumentProcessingSystem:
    def process(self, document):
        # 1. Routing: 문서 유형 분류
        doc_type = self.router.classify(document)
        
        # 2. Planning: 처리 계획 생성
        plan = self.planner.create_plan(doc_type, document)
        
        # 3. Execution with HITL
        results = []
        for step in plan:
            result = self.execute_step(step)
            
            # 중요 단계에서 인간 검토
            if step.requires_review:
                approval = self.request_review(result)
                if not approval.approved:
                    result = self.handle_feedback(result, approval)
            
            results.append(result)
        
        return self.synthesize(results)

다음 편 예고

Part 3에서는 Multi-Agent 패턴을 다룹니다:

  • Multi-Agent Collaboration: 여러 에이전트의 협업
  • Workflow Orchestration: 복잡한 워크플로우 관리
  • Agent Communication: 에이전트 간 통신 메커니즘

참고 자료

Clickable cat