티스토리 뷰
- 본 포스팅은 워크플로우 관리 도구인 'apache-airflow'의 개념을 설명하고, DAG을 구현해본 포스팅입니다.
- 정리할 내용은 다음과 같습니다 .
- 에어플로우의 개념
- 에어플로우의 주요 구성 요소 [dag,operator,task,hook]
- DAG 구현
- SQL을 활용한 데이터 이관
- PythonOperator
- SSH 연결
- Airflow API
1) Airflow 란?
- 파이썬 코드로 워크플로우를 작성하고, 스케쥴링, 모니터링하는 플랫폼
- ETL 작업을 자동화하고, DAG형태의 워크플로우 작성이 가능
- Aws,gcp 모두 airflow managed service 를 제공, 전세계 데이터팀들이 널리 사용하고 있으며, 폭넓은 커뮤니티 형성
2) Airflow를 선택한 이유
- python으로 유연한 프로그래밍 가능(데이터엔지니어,데이터분석가,데이터사이언티스트 활용 가능)
- 스케쥴러 기능 제공
- 웹 대시보드 UI를 통한 모니터링 기능 제공
-
Airflow 커뮤니티 활성화 (IT 회사 사용 케이스 多)
- 필자는 ETL 오픈 소스 툴인 talend open studio(TOS)를 이용하여 데이터 파이프라인을 개발했습니다. TOS는 사용자가 따로 코드 구현을 하지 않고, 컴포넌트를 드래그 앤 드롭하여 데이터 이관 순서대로 컴포넌트를 연결하여 데이터 파이프라인을 구성하였습니다. 코드 구현을 하지 않는다는 장점을 가지고 있지만, 사용목적에 따라 컴포넌트 소스를 수정하는데 어려움이 있었고, 모니터링 기능을 제공하지 않아, 별도로 모니터링 대시보드를 구현해야했습니다. 유연한 프로그래밍 어려움, 스케쥴러 기능, 모니터링 기능 부재 등을 이유로 airflow를 선택하게 되었습니다.
<기존 오픈소스 ETL 툴과 airflow 비교>
구 분 | TOS | Airflow |
오픈소스 | 오픈소스 ETL SW | 오픈소스(Airbnb에서 개발) |
주요 기능 | 데이터를 추출하고, 가공하여 적재할 수 있는 일련의 데이터 처리 과정을 지원하는 ETL SW | 워크플로우를 작성하고, 스케쥴링하고 모니터링 할 수 있는 플랫폼 |
언어 | Java(JDK 1.8 버전 설치 권장) | Python |
UI | 이클립스 기반의 그래픽 환경(GUI) 제공 | 웹 UI 제공 (트리, 그래프, 변수, 간트 차트 등 다양한 뷰 제공) DAG의 실행, 로그 변수 등 여러 정보 확인 |
스케쥴러 | 스케쥴러 기능 미제공 배포 후, crontab에 적용 | 스케쥴러 기능 제공 |
배포 | 배포 | 배포 |
소스 관리 | 소스 관리 어려움 | 소스 관리 용이 |
Managed Service | 없음 | Airflow managed Service 제공 (AWS, GCP) |
3) Airflow의 주요 구성 요소
출처 : https://airflow.apache.org/docs/apache-airflow/stable/concepts.html

1. DAG (Directed Acyclic Graph)
- task들 간의 관계와 dependency를 표현하고 있는 task들의 모음
- 어떤 순서와 어떤 dependency를 실행할지, 어떤 스케쥴로 실행할지 등의 정보를 가지고 있음
- 방향성 비순환 그래프 :화살표 방향성의 끝점을 포함하되, 반복이나 순환을 허용하지 않음(이전 태스크가 완료되면, 다음 태스크 실행)

2. Airflow Operator
- Airflow DAG은 여러 태스크로 이루어져있으며, 다양한 오퍼레이터를 제공
- BashOperator: bash command를 실행
- PythonOperator : python 함수를 실행
- MysqlOperator : sql 쿼리를 수행
- Sensor : 시간,파일, db row 등을 기다리는 센서
기타 등등 (커뮤니케이션에서 만든 Operator 다수
4) DAG 구현
1. sql을 이용한 dag 구현
- Ui에서 sql connetion 추가 필요 (sql_conn_id)
<조회 하기> Ui > admin > connetion

from datetime import timedelta
from airflow import DAG
import airflow.utils.dates
from urllib import request
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
# dag 정의
dag = DAG(
dag_id = "mssql_connection",
start_date = airflow.utils.dates.days_ago(1),
schedule_interval=None
)
start = DummyOperator(task_id="start")
end = DummyOperator(task_id="end")
# sql 입력
# mssql_create_sql = "CREATE TABLE airflow_test (a int)"
mssql_select_sql = "SELECT * FROM [dbo].[KT_DKT_INPUT_TRAIN]"
# mssql operator task 정의
create_table_mssql_task = MsSqlOperator(
task_id='mssql_select_sql',
mssql_conn_id='reco_mart',
database='AIDO_RECO_KT',
sql=mssql_select_sql,
params={"table":"airflow_test"},
dag=dag
)
start >> create_table_mssql_task >> end
2. pythonoperator를 이용한 dag 구현
from pathlib import Path
from airflow import DAG
import datetime as dt
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import pandas as pd
# DAG 정의
dag = DAG (
dag_id = "01_unscheduled",
start_date = dt.datetime(2022,4,18),
schedule_interval="@daily" # 매일 자정
)
fetch_events = BashOperator(
task_id="fetch_events",
bash_command=(
"mkdir -p /data/events && "
"curl -o /data/events.json http://events_api:5000/events"
),
dag=dag,
)
# 파이썬 함수 정의
def _calculate_stats(input_path,output_path):
"""
이벤트 통계 계산하기
"""
events = pd.read_json(input_path)
stats = events.groupby(["date","user"]).size().reset_index()
Path(output_path).parent.mkdir(exist_ok = True)
stats.to_csv(output_path,index=False)
# Task 정의 - Pythonoperator
calculate_stats = PythonOperator(
task_id="calculate_stats",
python_callable=_calculate_stats,
op_kwargs = {
"input_path":"data/events.json",
"output_path":"data/stats.csv",
},
dag=dag
)
fetch_events >> calculate_stats
3. ssh operator를 이용한 ssh 연결
- 원격지에 있는 서버에 접근해서 파일을 실행할 때
from airflow.providers.ssh.hooks.ssh import SSHHook
import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.models import Variable
# DAG 정의
dag = DAG(
dag_id="ssh_call",
description="sshcall",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval=None,
)
# 원격 아이피 주소 입력
REMOTE_BIND_IP = Variable.get('SERVER_REMOTE_BIND_IP')
# ssh_hook 정의
ssh_hook = SSHHook(ssh_conn_id=None,remote_host=REMOTE_BIND_IP,username='user',password='passwd')
# ssh_opertor
ssh_opertor= SSHOperator(
ssh_hook = ssh_hook,
task_id = 'ssh_call',
command = 'python airflow_pipeline/ssh_test.py',
dag = dag
)
5) airflow api
- 외부에서 airflow 서버 내 dag 실행이 필요할 경우 사용
- airflow.cfg 파일에서 auth_backend = airflow.api.auth.backend.basic_auth 으로 수정 필요
- airflow webserver 서버 재시작 (sudo systemctl start airflow-webserver
- 호출 Url : http://서버아이피주소:8080/api/v1/dags/dagid(호출하려고 하는 dagid 입력) /dagRuns
<참고 사이트>
https://www.aladin.co.kr/shop/wproduct.aspx?ItemId=290091682 (apache airflow 기반의 데이터 파이프라인)
https://www.bucketplace.co.kr/post/2021-04-13-%EB%B2%84%ED%82%B7%ED%94%8C%EB%A0%88%EC%9D%B4%EC%8A%A4-airflow-%EB%8F%84%EC%9E%85%EA%B8%B0/ (오늘의 집, airflow 도입기)
'TIL' 카테고리의 다른 글
[논문 리뷰] Attention is all you need(Transformer) 논문 리뷰 (0) | 2022.09.18 |
---|---|
# [다짐] 글또 7기를 시작하며. (0) | 2022.05.15 |