Module 11
Airflow로 MLOps 파이프라인 자동화
MLOps 과정 | 드리프트 감지 → 재학습 → 배포를 자동으로
지금까지의 문제점
매번 터미널에서 수동으로 실행해야 합니다
# 매번 이걸 직접 쳐야 함
python detect_drift_evidently.py # 드리프트 확인
python retrain_mlflow.py # 재학습
python deploy_model.py # 배포
- 새벽에 드리프트 발생하면? → 다음 날 출근해서야 발견
- 주말에 발생하면? → 월요일까지 모름
- 실행 순서를 잘못 하면? → 재학습 전에 배포할 수도
Airflow = 작업 스케줄러
| 수동 (지금까지) | Airflow (자동) |
| 터미널에서 직접 실행 | 매일 정해진 시간에 자동 실행 |
| 결과 보고 다음 명령 실행 | 조건에 따라 자동 분기 |
| 실패하면 알아채기 어려움 | 자동 재시도 + 알림 |
| 실행 이력 없음 | 웹 UI에서 이력 관리 |
전체 흐름에서의 위치
Module 9
Evidently
→
Module 10
MLflow
→
Module 11
Airflow
Module 9: 드리프트 감지
Module 10: 실험 기록
Module 11: 전부 자동으로 연결
3가지만 알면 됩니다
DAG
작업 계획서
전체 흐름을 정의한
Python 파일
(Directed Acyclic Graph)
Task
개별 작업
detect_drift.py
retrain_mlflow.py
deploy_model.py
Operator
실행 방법
PythonOperator
BashOperator
BranchOperator
>> 연산자 = "다음 작업"
detect_drift >> check_drift >> retrain_model >> deploy_model
detect_drift
→
check_drift
→
retrain_model
→
deploy_model
>>는 원래 비트 시프트 연산자인데,
Airflow가 "다음 작업" 의미로 재정의한 것입니다.
설치 & 실행
# 설치
pip install apache-airflow
# 초기화 (최초 1회)
airflow db init
# 관리자 계정 생성
airflow users create \
--username admin --password admin \
--firstname Admin --lastname User \
--role Admin --email admin@example.com
# 터미널 1: 웹 서버
airflow webserver --port 8080
# 터미널 2: 스케줄러
airflow scheduler
http://localhost:8080 → Airflow UI (admin/admin)
자동화할 흐름
Task 1
드리프트 감지
→
Task 2
재학습+MLflow
→
Task 3
S3 배포
터미널에서 치던 명령 3개를
BashOperator로 그대로 실행하면 됩니다!
DAG 전체 코드 (이게 전부!)
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
PROJECT_DIR = "/home/사용자/01-loan-api-server"
with DAG(
dag_id="mlops_drift_retrain_deploy",
schedule="0 0 * * *", # 매일 자정(UTC) = 오전 9시
start_date=datetime(2026, 1, 1),
catchup=False,
default_args={"retries": 1, "retry_delay": timedelta(minutes=5)},
) as dag:
detect_drift = BashOperator(
task_id="detect_drift",
bash_command=f"cd {PROJECT_DIR} && python detect_drift_evidently.py",
)
retrain = BashOperator(
task_id="retrain",
bash_command=f"cd {PROJECT_DIR} && python retrain_mlflow.py",
)
deploy = BashOperator(
task_id="deploy",
bash_command=f"cd {PROJECT_DIR} && echo 'y' | python deploy_model.py",
)
detect_drift >> retrain >> deploy
BashOperator = 터미널 명령 그대로
수동 (터미널에서)
cd 01-loan-api-server
python detect_drift_evidently.py
python retrain_mlflow.py
python deploy_model.py
Airflow (자동)
동일한 명령을
BashOperator에 넣으면
매일 자동 실행!
새로운 문법을 배울 필요 없습니다.
터미널에서 치던 명령을 그대로 넣으면 됩니다.
실행 순서: 이 한 줄이면 끝
# 감지 끝나면 → 재학습, 재학습 끝나면 → 배포
detect_drift >> retrain >> deploy
detect_drift
→
retrain
→
deploy
앞 단계가 실패하면 다음 단계는 실행 안 됨
실패 시 자동 재시도 1번 (5분 후)
시나리오 1: 전부 성공
detect_drift
→
retrain
→
deploy
3개 Task 모두 초록(성공). 새 모델이 S3에 배포됩니다.
시나리오 2: 중간에 실패
detect_drift
→
retrain (실패!)
→
deploy (실행 안 됨)
retrain 실패 → 5분 후 자동 재시도 1번
재시도도 실패 → deploy는 실행 안 됨
UI의 Logs에서 에러 원인 확인
판단 로직은 어디에?
DAG는 단순하게, 판단은 각 스크립트 안에!
| 스크립트 | 내부 판단 로직 |
detect_drift_evidently.py | "3개 이상 드리프트면 재학습 필요" 출력 |
retrain_mlflow.py | "새 모델이 더 좋은지" S3 모델과 비교 |
deploy_model.py | 정확도 확인 후 S3 업로드 |
DAG는 "순서대로 실행"만 담당
복잡한 분기 로직은 이미 우리 Python 코드 안에 있습니다!
Section 6
Airflow UI 모니터링
Airflow UI에서 볼 수 있는 것
| 뷰 | 설명 |
| Graph | Task 간 연결을 시각적 그래프로 표시 |
| Grid | 날짜별 실행 이력 (과거 기록 한눈에) |
| Logs | 각 Task의 실행 로그 (디버깅용) |
| XCom | Task 간 전달된 데이터 확인 |
Task 색상:
초록 = 성공 | 빨강 = 실패 | 분홍 = 건너뜀 | 하늘 = 실행 중
테스트: 수동 실행
# UI에서: "Trigger DAG" 버튼 (재생 아이콘) 클릭
# 또는 CLI에서:
airflow dags trigger mlops_drift_retrain_deploy
스케줄(매일 자정)을 기다리지 않고
바로 테스트할 수 있습니다.
완성된 MLOps 파이프라인
Airflow
매일 자동
→
Evidently
드리프트
→
retrain
+ MLflow
→
S3 배포
→
Lambda
→ ECS
Module 9 (Evidently) + Module 10 (MLflow) + Module 11 (Airflow)
= 완전한 MLOps 자동화 파이프라인
Module 11 정리
| 항목 | 수동 | Airflow |
| 실행 | 터미널에서 직접 | 매일 자동 |
| 조건 분기 | 눈으로 판단 | XCom + Branch 자동 |
| 실패 처리 | 못 알아챔 | 재시도 + 알림 |
| 이력 | 없음 | Grid로 날짜별 관리 |
| 모니터링 | 터미널 | 웹 UI |
실습
# 1. Airflow 설치 & 초기화
pip install apache-airflow
airflow db init
# 2. DAG 파일 복사
cp mlops_pipeline_dag.py ~/airflow/dags/
# 3. Airflow 실행 (터미널 2개)
airflow webserver --port 8080
airflow scheduler
# 4. UI에서 DAG 확인 & 수동 실행
# http://localhost:8080
확인 포인트:
- DAG가 UI에 나타나는가?
- 수동 실행 후 Graph에서 Task 흐름이 보이는가?
- 드리프트 없을 때 skip이 정상 동작하는가?
- Logs에서 각 Task 실행 결과를 확인했는가?