관리 메뉴

A seeker after truth

10주차: 데이터 파이프라인, Airflow (1, 2) 본문

Data/데엔 데브코스 TIL

10주차: 데이터 파이프라인, Airflow (1, 2)

dr.meteor 2023. 12. 12. 14:59

<ETL 코드 실습, 개선과 SQL transaction>

웨ㅂ상에 있는 csv 를 복제해 온 담에 redshift에 적재하는 과정 그 자체를 자동화한다. 이걸 API 등 백엔드 활용해서 하는 건 다른 단계에서 한다...?

 

앞에서 나왔듯 dw는 프라이머리키의 고유성 보장 안하니 같은 값 갖는 레코드들이 다수 존재할 수 있다. 이걸 어케 지켜주느냐 그게 결국 데엔의 책임이고, 이게 멱등성 보장함에 있어 중요한 포인트 된다는 거.

 

실습 문제 해결 포인트

1. 헤더도 데이터로 적재돼버린 것

2. full refresh (delete 한번 수행 후 insert 반복) 문제: 테이블 커지면 사용 불가능한 방법(단순해서 장점이긴 함), 중간에 에러나면 데이터 정합성 깨져버림, 이 일련의 과정을 트랜잭션으로 정의

<-> incremental update: 데이터 소스 커지면 쓸 수밖에 없는 방법이지만 복잡성 증가하며, 특정 날짜 이후 레코드를 읽을 수 있는 기능이 지원되지 않으면 불가능한 사용법, 이걸 사용하는 순간 backfill 이슈 생긴다. 이럴 땐 에어플로우의 execution_date라는 변수? 인자? 사용함.

또 하나 생각해볼 것은, 레코드 삭제 시 delete from 과 truncate 중 뭐 쓸건지. 후자는 조건 거는 게 가능하고, 전자는 where 걸어서 조건에 맞는 레코드만 삭제. 또 하나의 차이는 트랜잭션을 전자는 준수하는데 후자는 무시한다. 즉 트랜잭션 유무와 상관없이 후자는 무조건 다 레코드 삭제, 전자는 트랜잭션 준수해서 삭제. 이게 무슨 차이인지는 뒤에 가서 설명

 

결국 트랜잭션은 한 작업이 불완전한 상태에 놓이게 되는 경우 이를 해결하기 위한 방법인 것.

 

auto commit=True란? 테이블 변경 사항을 바로바로 반영하는 것을 의미한다.

반면 False면 모든걸 트랜잭션 단위로 다룸을 의미. 즉 트루일 땐 그게 아니고 그냥 바로바로 반영한다는 소리인 거고, 이걸 막고 싶으면 begin&end를 써서 막아야 하는 것.

그럼 둘 중 뭘 쓰는게 더 좋냐는 개인/팀의 선호도! 여러 사람이 하면 합의사항 정해서 그걸 따를 것.

 

이거는 만약, 계속 에러 및 롤백 실행될 경우, 트라이 이하 코드는 제대로 실행이 안되고 있단 얘기일텐데, 외부에 어떤 식으로든 알람을 안보낼텐데, 데엔이 이걸 모르고 넘어가게 될 것이다. 따라서 겉으론 편안하게 아무 일 없는 것처럼 보이지만, 사실 try 블록서 실행하고 싶던 코드들이 실행이 안되는 불상사가...

한동안 모르다가, 누군가 데이터 소비하는 쪽에서 그 데이터의 문제가 보이며 업뎃이 안된다, 할 때 이를 파고 들어가 보면 에러가 있단 걸 알게 된다는 것.

 

그래서 이렇게 하면 생겼던 에러가 계속 위로 전파돼서 그 프로그램이 fail할 것.

만약 에어플로에서 이런 데서 문제 생기면, 보통 슬랙 등으로 메시지 가게 만들어 놨기 때문에 여기 문제가 있단 걸 데엔이 알게 된다.

 


<airflow 실습>

팀플젝에선 ec2 쓸 수 밖에 없다.

 

에어플로우에서 task = operator고, 이를 지정한 뒤 이에 맞는 파라미터 채워준다.

중요한 건 몇 개의 task로 구성할 것이며, 할 일 뭔지 명확히 하는 것. 그리고나면 task별로 사용할 오퍼레이터가 뭔지 명확해질 것.

retries 인자 = 재시도 몇 번 해볼지, retry_delay=재시도 딜레이

on_failure_callback on_success_callback을 통해 실패, 성공 여부에 따라 행할 이벤트(이메일로 알림을 보내는 등)를 설정해 줄 수 있다. 이를테면 슬랙/이멜로 메시지 보내는 함수 하나 만들어 이 함수를 on_failure_callback이나 나머지 하나에에 지정, 할당해주는 식.

 

여기서 schedule 인자 해석: 매 시간. hourly 스케줄링. 만약 주기적으로 안하고 싶으면 None, @once로 세팅하고 다른 dag가 끝났을 때 모든 dag 를 trigger하는 형태로 세팅할 수 ㅇ

catchup 인자 의미 이해하는게 중요!

 

여기서 맨 우측 하단은 t1 끝나고 2,3 실행하라는 말임

 

 

이건 도커로의 실행 결과. 원래 스케줄러에다가 터미널 접속해서 하는거 맞고,

이런 식으로 말이다.