Module 11

Airflow로 MLOps 파이프라인 자동화

MLOps 과정 | 드리프트 감지 → 재학습 → 배포를 자동으로

Section 1

왜 Airflow인가?

지금까지의 문제점

매번 터미널에서 수동으로 실행해야 합니다

# 매번 이걸 직접 쳐야 함
python detect_drift_evidently.py    # 드리프트 확인
python retrain_mlflow.py            # 재학습
python deploy_model.py              # 배포

  • 새벽에 드리프트 발생하면? → 다음 날 출근해서야 발견
  • 주말에 발생하면? → 월요일까지 모름
  • 실행 순서를 잘못 하면? → 재학습 전에 배포할 수도

Airflow = 작업 스케줄러

수동 (지금까지)Airflow (자동)
터미널에서 직접 실행매일 정해진 시간에 자동 실행
결과 보고 다음 명령 실행조건에 따라 자동 분기
실패하면 알아채기 어려움자동 재시도 + 알림
실행 이력 없음웹 UI에서 이력 관리

공식 사이트: airflow.apache.org
GitHub: github.com/apache/airflow (38,000+ Stars)

전체 흐름에서의 위치

Module 9
Evidently
Module 10
MLflow
Module 11
Airflow

Module 9: 드리프트 감지
Module 10: 실험 기록
Module 11: 전부 자동으로 연결

Section 2

핵심 개념 3가지

3가지만 알면 됩니다

DAG
작업 계획서
전체 흐름을 정의한
Python 파일
(Directed Acyclic Graph)
Task
개별 작업
detect_drift.py
retrain_mlflow.py
deploy_model.py
Operator
실행 방법
PythonOperator
BashOperator
BranchOperator

>> 연산자 = "다음 작업"

detect_drift >> check_drift >> retrain_model >> deploy_model

detect_drift
check_drift
retrain_model
deploy_model

>>는 원래 비트 시프트 연산자인데,
Airflow가 "다음 작업" 의미로 재정의한 것입니다.

Section 3

설치 & 실행

설치 & 실행

# 설치
pip install apache-airflow

# 초기화 (최초 1회)
airflow db init

# 관리자 계정 생성
airflow users create \
  --username admin --password admin \
  --firstname Admin --lastname User \
  --role Admin --email admin@example.com

# 터미널 1: 웹 서버
airflow webserver --port 8080

# 터미널 2: 스케줄러
airflow scheduler

http://localhost:8080 → Airflow UI (admin/admin)

Section 4

우리 파이프라인을 DAG로

자동화할 흐름

Task 1
드리프트 감지
Task 2
재학습+MLflow
Task 3
S3 배포

터미널에서 치던 명령 3개를
BashOperator로 그대로 실행하면 됩니다!

DAG 전체 코드 (이게 전부!)

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

PROJECT_DIR = "/home/사용자/01-loan-api-server"

with DAG(
    dag_id="mlops_drift_retrain_deploy",
    schedule="0 0 * * *",   # 매일 자정(UTC) = 오전 9시
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args={"retries": 1, "retry_delay": timedelta(minutes=5)},
) as dag:

    detect_drift = BashOperator(
        task_id="detect_drift",
        bash_command=f"cd {PROJECT_DIR} && python detect_drift_evidently.py",
    )
    retrain = BashOperator(
        task_id="retrain",
        bash_command=f"cd {PROJECT_DIR} && python retrain_mlflow.py",
    )
    deploy = BashOperator(
        task_id="deploy",
        bash_command=f"cd {PROJECT_DIR} && echo 'y' | python deploy_model.py",
    )

    detect_drift >> retrain >> deploy

BashOperator = 터미널 명령 그대로

수동 (터미널에서)
cd 01-loan-api-server
python detect_drift_evidently.py
python retrain_mlflow.py
python deploy_model.py
Airflow (자동)
동일한 명령을
BashOperator에 넣으면
매일 자동 실행!

새로운 문법을 배울 필요 없습니다.
터미널에서 치던 명령을 그대로 넣으면 됩니다.

실행 순서: 이 한 줄이면 끝

# 감지 끝나면 → 재학습, 재학습 끝나면 → 배포
detect_drift >> retrain >> deploy

detect_drift
retrain
deploy

앞 단계가 실패하면 다음 단계는 실행 안 됨
실패 시 자동 재시도 1번 (5분 후)

Section 5

실행 시나리오

시나리오 1: 전부 성공

detect_drift
retrain
deploy

3개 Task 모두 초록(성공). 새 모델이 S3에 배포됩니다.

시나리오 2: 중간에 실패

detect_drift
retrain (실패!)
deploy (실행 안 됨)

retrain 실패 → 5분 후 자동 재시도 1번
재시도도 실패 → deploy는 실행 안 됨
UI의 Logs에서 에러 원인 확인

판단 로직은 어디에?

DAG는 단순하게, 판단은 각 스크립트 안에!

스크립트내부 판단 로직
detect_drift_evidently.py"3개 이상 드리프트면 재학습 필요" 출력
retrain_mlflow.py"새 모델이 더 좋은지" S3 모델과 비교
deploy_model.py정확도 확인 후 S3 업로드

DAG는 "순서대로 실행"만 담당
복잡한 분기 로직은 이미 우리 Python 코드 안에 있습니다!

Section 6

Airflow UI 모니터링

Airflow UI에서 볼 수 있는 것

설명
GraphTask 간 연결을 시각적 그래프로 표시
Grid날짜별 실행 이력 (과거 기록 한눈에)
Logs각 Task의 실행 로그 (디버깅용)
XComTask 간 전달된 데이터 확인

Task 색상:
초록 = 성공 | 빨강 = 실패 | 분홍 = 건너뜀 | 하늘 = 실행 중

테스트: 수동 실행

# UI에서: "Trigger DAG" 버튼 (재생 아이콘) 클릭

# 또는 CLI에서:
airflow dags trigger mlops_drift_retrain_deploy

스케줄(매일 자정)을 기다리지 않고
바로 테스트할 수 있습니다.

Section 7

전체 정리

완성된 MLOps 파이프라인

Airflow
매일 자동
Evidently
드리프트
retrain
+ MLflow
S3 배포
Lambda
→ ECS

Module 9 (Evidently) + Module 10 (MLflow) + Module 11 (Airflow)
= 완전한 MLOps 자동화 파이프라인

Module 11 정리

항목수동Airflow
실행터미널에서 직접매일 자동
조건 분기눈으로 판단XCom + Branch 자동
실패 처리못 알아챔재시도 + 알림
이력없음Grid로 날짜별 관리
모니터링터미널웹 UI

실습

# 1. Airflow 설치 & 초기화
pip install apache-airflow
airflow db init

# 2. DAG 파일 복사
cp mlops_pipeline_dag.py ~/airflow/dags/

# 3. Airflow 실행 (터미널 2개)
airflow webserver --port 8080
airflow scheduler

# 4. UI에서 DAG 확인 & 수동 실행
# http://localhost:8080

확인 포인트:
- DAG가 UI에 나타나는가?
- 수동 실행 후 Graph에서 Task 흐름이 보이는가?
- 드리프트 없을 때 skip이 정상 동작하는가?
- Logs에서 각 Task 실행 결과를 확인했는가?