관리 메뉴

A seeker after truth

<Apache Airflow 기반의 데이터 파이프라인> 1~5단원 메모 본문

Data/데엔 데브코스 TIL

<Apache Airflow 기반의 데이터 파이프라인> 1~5단원 메모

dr.meteor 2024. 1. 15. 15:47

1단원

- 그래프 기반 표현은 전체 작업을 하나의 모놀리식(단일) 스크립트 또는 프로세스로 구성되는 게 아니라 파이프라인을 작은 점진적 태스크로 명확히 분리할 수 있다. 전자가 구현 초기엔 그닥 문제 안되지만, 파이프라인 중간 태스크가 실패하면 전체 스크립트를 재실행해야 하기 때문에 비효율적. 그래프 기반 표현은 그 반대.

 

- 에어플로 덕에 여러 시스템 간 데이터 프로세스를 결합할 수 있는 복잡한 데이터 파이프라인 구축이 가능해졌다.

 

- 그림 1.8 12쪽 암기..

 

- 오퍼레이터와 태스크는 동일한 개념(용어)이다.

 

 

2단원

- 도커 컨테이너 활용해 운영 수준에서 격리하면 파이썬 패키지 세트 뿐 아니라 데베 드라이버, gcc 컴파일러 등 의존성 포함해 도커 컨테이너 생성 가능하단 점에 의의.

 

아, 글고 elt를 현실에서도 이번 플젝에 나온 것처럼 구현하는지 알아보기.

 

 

3단원

- 스케줄링 시작 날짜와  끝날짜는 당연히 API 요청에 보내는 시작, 끝날짜와는 다름. 헷갈릴 이유가 없는데도, 플젝 기간 내내 헷갈렸다.

 

- schedule interval을 timedelta 이용해 정의하면 3일에 한번 과 같은 식의 설정을 할 수 있음  ex) dt.timedelta(3)

 

- https://github.com/K9Ns/data-pipelines-with-apache-airflow/blob/main/chapter03/dags/06_templated_query.py

https://github.com/K9Ns/data-pipelines-with-apache-airflow/blob/main/chapter03/dags/07_templated_query_ds.py

이 두 코드의 start_date, end_date 부분을 비교해 템플릿에서 축약어 옵션 사용하는 법 보자.

 

- 서림님이 하신 것처럼 날짜별로 개별 파일에 이벤트 데이터를 쓰는 작업이라면, 그 자체가 데이터 세트를 일일 배치로 나누는 일이자 파티셔닝이 된다.

 

- **context 이렇게 하면 모든 콘텍스트 변수를 수신. input_path = context["templates_dict"]["input_path"] <- 이런 코드라면 templates_dict 개체에서 템플릿 값을 검색하는 게 되고, 위위 항목에 7번 파이썬 코드에서 본 ds 같은 경우 템플릿되는 값을 전달하는 게 된다.

정리하자면 PythonOperator에서 템플릿을 구현하려면 오퍼레이터의 templates_dict 매개변수를 사용해 템플릿화 해야 하는 모든 인수 전달해야 함. 그런 다음 함수를 통해 전달된 콘텍스트 개체에서 함수 내부의 템플릿 값을 확인할 수 있다.

 

- 지금까지 본 에어플로의 간격 기반 스케줄링과 cron 차이점: cron은 시점 기반(point-based) 스케줄링 시스템으로, 작업이 전날 실행됐다 가정하고 이전 실행이 중단된 위치를 계산하거나 추측해야 한다.

 

- 54~55쪽 메모해야 하는데 피곤하고 의욕이 없어 미룸..

 

- 백필 시행할 때의 과거 시작/끝날짜가 그렇게 헷갈렸다. 스케줄링을 과거 시점에 대해 하는 건 불가능하잖아? 그치만 그게 아니고, 과거 "데이터에 대한" 이다. 결국 이게 정말 의미가 있어지려면, 과거 시점의 특성을 가진 시작&끝값을 데이터 요청 보낼 떄 사용하거나 해야 제대로 사용하는 거라 볼 수 있을 듯. 또, catchup=False로 한 채 시작과 끝 모두를 과거 시점으로 해버리면, 아마.. 실행이 아예 안되는 게 아닐지? 이건 자료를 찾아보거나 직접 실행해보자.

 

- 백필, 재실행 태스크를 포함하려거든(인크리멘탈(증분) 수집/업데이트가 이거) 원자성, 멱등성 지켜 설계할 것. 원자성인 즉 태스크 중 하나 실패하면 중간 저장 결과 없게 하는 거. 난 완전 반대로 생각했는데...? 중간 저장 결과 았음 그 이후 것부터 실행하게 하면 돼서 오히려 이게 나은 게 아냐..? 자료 더 찾아보거나 강의 자료 다시 찾아보자.

원자성이 지켜진 작업과 아닌 작업의 차이는?

https://github.com/K9Ns/data-pipelines-with-apache-airflow/blob/main/chapter03/dags/10_non_atomic_send.py

https://github.com/K9Ns/data-pipelines-with-apache-airflow/blob/main/chapter03/dags/11_atomic_send.py

10번 파일 코드는 한 태스크 안에 2개의 작업이 있다. 여기서 _send_stats 함수가 실패해도(이멜 서버가 불안정한 경우 등) 로직상 이미 output_path 경로에 통계에 대한 출력 파일이 저장돼 있기 때문에 통계 발송이 실패했음에도 작업이 성공한 것처럼 보이게 된다. 그래서 이멜 기능을 별도의 태스크로 분할해야 원자성이 유지된다. <- 우리 코드에서 개선 포인트로 갖고 갈 수도 있지 않을까?

반대로 데이터 갖고오는 API 호출 전에 로그인을 해야 하는 경우는? 인증 토큰을 갖고 오기 위해 추가적 API 호출이 필요하며 그 이후 데이터 수집 API 호출하는 식으로 해야함. 이런 경우는 원자성 유지하겠다고 모든 작업을 개별적 태스크로 분리하는 건 옳지 않단 예시에 해당.

대부분의 오퍼레이터가 내부적으로 인증 등의 밀접한 연결 작업을 수행하는 옵션이 포함된 채 개발된 이유가 이 원자성 유지를 위한 거다.

 

- ⭐️멱등성 문제. 날짜별로 파일 분할해 데이터 쓰기 작업한다 생각해보자. 특정 날짜에 이 태스크를 다시 실행하면 이전 실행과 동일한 이벤트 데이터 세트를 갖고 오고, 기존 JSON 파일에 동일한 결과를 덮어쓰게 된다. 따라서 이벤트 갖고오기 테스크는 효력이 없게 된다. (그림 3.10 참고)

비멱등성 태스크의 예를 위해선 단일 JSON 사용하고, 이 파일에 이벤트 추가하는 게 좋다. 이 경우 태스크를 다시 실행하면 이벤트가 기존 데이터 세트에 단순히 추가되어 그날 이벤트가 복제된다. 따라서 태스크를 추가로 실행하면 결과가 변경되므로 비멱등성 구현이 된다.

=> 멱등성은 실행횟수 관계 없이 동일한 결과를 생성. 일관성와 장애 처리를 보장한다.

 

결론적으로 데이터 쓰기 작업은 기존 결과를 확인하거나 이전 태스크 결과를 덮어 쓸지 여부를 확인해 멱등성 유지 가능. 시간 파티션 데이터 세트로 저장이 되는 경우, 파티션 범위로 결과를 덮어쓸 수 있기 때문에 비교적 간단. 데베는 upsert로 기존 행 덮어쓸 수 있고.

 

 

4단원

- 64쪽 그림 4.2는 유닉스 명령어로 파일 분석하는 모습 보여주고 있음. 회사에서 익히 봤던 모습...

 

- (진자) 템플릿 작성은 프로그래머로서 코드 작성 시점엔 값을 알기 어렵지만 런타임 시 값을 할당하기 위해 사용. 태스크 콘텍스트 내 런타임에 사용 가능한 여러 변수들 중 execution_date는 pendulum 라이브러리의 datetime 객체에 해당. 네이티브 파이썬의 datetime의 호환(drop-in replacement) 객체이므로 파썬에 적용할 수 있는 모든 메서드를 pendulum에도 적용 가능.

ex) datetime.now().year 와 pendulum.now().year 는 동일한 결과 반환.

 

- {{ '{:02}'.format(execution_date.hour) }}  <- 패딩 문자열 형식으로, 예를 들어 오전 7시 같은 경우 '07' 과 같이 표시해주게 만든다.

심지어는 이런 식으로도 가능 https://github.com/K9Ns/data-pipelines-with-apache-airflow/blob/main/chapter04/dags/listing_4_5.py

 

- 여기에도 나오네. 에어플로 태스크 간 데이터 전달 방법은 원래 메타스토어를 사용해 태스크 간 결과를 읽고쓰는 XCom과 디스크 또는 데베에 결과 드롭하는 것이라고. 그리고 이렇게 되는 이유는, 에어플로 태스크는 설정에 따라 물리적으로 서로 다른 컴에서 독립적으로 실행되므로 메모리에서 데이터를 공유할 수 없기 때문임. 그래서 한 태스크 완료 시 다른 태스크가 읽을 수 있는 위치에 데ㅐ이터가 유지돼야 하는 거다.

 

XCom은 피클링 해서 저장. 파이썬의 직렬화 프로토콜. 직렬화는 메모리 개체를 나중에 다시 읽을 수 있도록 디스크에 저장할 수 있는 형식으로 변환하는 것. 기본 파썬 타입인 스트링 등등에서 빌드된 모든 객체를 피클링 가능. 반면 데베 커넥션, 파일핸들러는 못함. 그래서 크기가 작은 오브젝트는 엑스컴 이용한 피클링이 적합하다.

 

메타스토어는 일반적으로 마이시퀄 또는 포스트그레스인데, 이는 크기가 한정돼있고 피클링된 객체는 메타스토어의 blob에 저장된다. 그래서 작은 크기의 데이터 전송시에 적용하는게 좋은 거다.

 

 

5단원

이 부분은 ML 꼈을 때 태스크 설계에 대해 다루고 있기 때문에 좋다. 좋은 참고 자료가 되는 부분이다. 이 정도는 알아야 에어플로로 파이프라인 설계했구나! 의 가치를 느낄 수 있다 생각함.

뿐아니라, 데이터셋이 2개 이상으로 다양할 때, 이번 플젝처럼 DAG를 2개 이상 만드는 게 아니라 하나의 DAG 안에서 어떻게 태스크 설계를 할지 적절한 예시와 함께 알려주는 데 큰 의미가 있는 챕터.

 

1. 의존성 유형

ML 꼈을 때 이 의존성 유형에 따라 태스크 스타일을 다양하게 보여줌

- 선형 체인: 연속적으로 실행됨. 지금까지 플젝서 봤던 모든 게 다 요걸로 짠거.

- 팬인/아웃: 하나의 태스크가 여러 다운스트림 태스크에 연결되거나 반대 동작 수행. [태스크1, 태스크2] -> [태스크] 요런 식으로 쓴 게 다 이것에 해당.

그리고 이걸 의존성 관점에서 봤을 때, 자칫 옵션을 잘못 설정하면 뒷부분 태스크가 실행이 안될 수 있는 경우도 있는 것. 이 때 파썬 브랜치 오퍼레이터 써서 어떻게 해결하는지 제시해 줌.

 

2) 팬인아웃 의존성

그림 5.2. 데이터 종류 불문 수집과 정제 태스크는 선형 의존성, 그러나 서로 다른 종류의 데이터 간엔 의존성 없. 이런 애들은 병렬로 실행 가능.

 

 

2. 브랜치하기

관리자가 ERP 시스템 전환을 결정해, 판매 데이터가 1~2주 내에 다른 소스에서 제공될 예정. 이렇더라도 모델 학습이 중단돼선 안되는 상황. 또 향후 분석에서 과거 판매 데이터를 계속 사용할 수 있도록 이전 시스템과 새 시스템 모두 정상 동작하길 바람.

기존 코드에서 바뀐 코드: https://github.com/K9Ns/data-pipelines-with-apache-airflow/blob/main/chapter05/dags/02_branch_task.py

 

그러나 이 접근 방식은 코드로 분기가 가능한 유사한 태스크로 구성된 경우에만 작동. 그래서 새로운 데이터 소스가 완전히 다른 태스크 체인이 필요하면 데이터 수집을 2개의 개별 태스크 세트로 분할하는 게 나을 수 있다 (그림 5.5). 이 방법은 에어플로에서 어떤 코드 분기를 사용하고 있는지 알기 어렵다는 것.

 

브랜치 오퍼레이터를 사용한다면? https://github.com/K9Ns/data-pipelines-with-apache-airflow/blob/main/chapter05/dags/03_branch_dag.py

트리거 규칙도 설정할 수 있다. none-failed 설정으로 의존성 있는 다운스트림 태스크들 실행할 수 있게 한다.

 

 

3. 조건부 태스크

예시가 기가 막히다. 가장 최근 실행된 dag에 대해서만 모델 배포하도록 dag를 변경해, 모델의 한 버전 즉 가장 최근 데이터 세트에 대해 학습된 모델 중 특정 버전만 배포가 가능하다. 파썬 오퍼레이터를 사용해 배포를 구현하고 배포 함수 내에서 대그 실행 날짜를 명시적으로 확인한다. : https://github.com/K9Ns/data-pipelines-with-apache-airflow/blob/main/chapter05/dags/05_condition_task.py

그러나 이렇게 구현하면 배포로직조건이 혼용됨. 말인 즉 deploy_model 태스크 내에서 내부적으로 확인되기 때문에 이 보기에서 모델이 실제로 배포됐는지 여부 식별할 수 없.

 

아 이하로는 더이상 못쓰겠다.. 이해를 못하고 있음 ㅠ 분명 중요한 챕턴데..

 

마지막으로 가장 궁금했던 부분인, 공유 변수 XCom의 사용. 이번에 우리가 했던 프로젝트에 도입한다 생각하면, 한 태스크 안에 너무 많은 작업이 있는 바람에, 응답 데이터 받는 부분 /  json을 pd df로 변환하는 부분 둘로 나누고, 그 결과인 pd df를 다른 태스크로 넘겨주는 작업을 하고 싶은 게다.

근데 에어플로가 실행하는 이 태스크들-특히 파썬오퍼레이터로 실행되는 경우- 함수형과의 동질감을 느꼈다. 인자로 뭘 받긴 하니 완전 함수형은 아닌데, 반환을 해서 어디에 넘겨주는 걸 볼 수가 없어... 이걸 하려면 어떻게 해야하지? 하다 본게 XCom.

이를 사용하는 방법은 크게 2가지임. 특별히 모듈을 import 하진 않고, 그냥 냅다 관련 메서드를 쓸 수도 있다.

혹은 이를 백엔드스럽게 코드 작성, 정의하는 게 에어플로 2버전 이후로 출시되어 이거 사용하면 의존성 문제 등에서 더 자유로워진다고 한다.

그런데 이 공유변수도 아무때나 함부로 쓸 수 있는 게 아니었다. 쓸 때 원자성이 무너질 수 있는 원인이 되기도 하고, 의존성이 겉으로 드러나지 않고 암묵적/묵시적이므로 이를 어떻게든 알 수 있게 해야 한다는 것. 이 때 쓸 수 있는 게 workflow API라는 게 있다고 한다.

 

여기까지가 1회독 끝에 얻은 지식을 정리한 거고, 더 세세한 설명을 할 수 있는 단계는 아직 아님