관리 메뉴

A seeker after truth

10주차: 데이터 파이프라인, Airflow (3) 본문

Data/데엔 데브코스 TIL

10주차: 데이터 파이프라인, Airflow (3)

dr.meteor 2023. 12. 13. 17:25

코드 분석

1,2 일차에 봤던건 더미오퍼레이터랑 배쉬오퍼~ 프로그램. 이번엔 PythonOperator. 자유도가 높은 대신 할 일이 많음. 그에 걸맞는 걸 구현해야 할 경우 이걸 사용한다. 그냥 헬로월드 출력하는 거라 배쉬오퍼레이터 써도 상관없는데 이걸로 해보는 거다.

 

만약 맨 아래줄처럼 순서를 정해주지 않으면? 저 둘이 동시에 실행된다.

 

max_active_runs: 동시에 실행될 수 있는 dag 개수. max_active_runs max_active_tasks 둘 다 아무리 큰 값을 지정해도 Airflow 워커에 있는 CPU의 총합이 max가 된다.

또 full refresh인 경우엔 catchup 하나 마나 의미 없다고 했다.

 

아까 하드코딩된 연결 정보, s3 파일 주소 등의 문제는 무엇인가? 이 경로 혹은 값들이 바뀌면 일일이 바꿔줘야 한다는 문제다.

이걸 해결하는 방법이 Connections, Variables로, 이번 2차 플젝에서 했던 것처럼 환경 변수로 설정해주고 (자동) import 하도록 만드는 작업.

따라서 3번째 줄이 생겼다.

 

인자가 **context로 바꼈다. 또 34번째 줄의 의미는, extract란 코드의 리턴값을 읽어오란 의미. 이를 호출하기 위해 task_instance라는 값 사용.

 

엑스컴은 테스크 간 너무 큰 데이터를 주고받아야 할 경우 사용할 수 없다.

 

위와 같이 디폴트 인자에 지정된 환경 설정은 모든 arg에 적용되는 설정

 

 

 

PythonOperator를 쓸 땐 @task 사용하지만, 다른 오퍼레이터 쓸 땐 안 쓸거다.


실습

airflow-setup/dags/ 밑으로 지금까지 설명한 5개 버전의 dag 파이썬 코드를 모두 실행해 볼 것. 여기로 5개가 복사돼야 하는 거다 -> 

airflow-setup 밑으로 클론.

-r 옵션은 recursive하게 하위 폴더?파일?까지 다 복사하라는 말과 동일하다.

 

그럼 이런 에러와

dag 목록 생성된 것들. 위 에러들은 커넥션 에러 난거다 설정 바꿔야 하는데 안바꾼게 있어서 저렇다.

 

여기서 Log 하늘색 박스 표시 된거 보이지? 저거 누르면 실패 원인 알 수 있음

데웨 연결 설정이 반영 안됐던거라 고치고 와서 성공.

admin/connections 메뉴에서 연결 정보 설정해 id 부여한걸 db hook으로 갖고 와서 실행하는 거... 비번 입력 후 'test' 버튼 누른 뒤 connections success 가 떠야 넘어갈 수 있고, 그렇게 해서 겨우 성공함

얘도 성공.

 

 

1번 질문.

PostgresHook은 레드시프트에 연결하기 위한 것임. 얘의 오토커밋 디폴트값은 false라는 질문 가끔 나옴. 즉 자동 커밋 안하고 쓰는 작업 일일이 호출 및 실행해줘야 진짜로 그걸 하는 거임. 이때 비긴은 아무 영향이 없지

 

2번 질문.

Task를 너무 적게 만들면 모듈화가 되지 않고 재실행의 시간이 오래 걸린다: 이 말은 예를들어 한 개짜리 테스크를 3개로 나눈 상태라 쳐봐 근데 1,2번까지 성공하고 3번은 실패했으면 재실행 시 3번부터 하면 되는 식이라서 나은 것 (나는 데이터플로우 쓸 때 이 이점을 못 살렸군..?)

그래서 기용쌤의 기준은: Task의 수를 너무 늘리지는 않되 재실행의 이슈가 발생했을 때 어떻게 하면 재실행 시간을 줄일 수 있을지의 관점에서 고려해 보아야 한다. 물론 정답 있는건 아니고.

 

3번 질문. Airflow의 Variable 관리 vs. 코드 관리

SQL 같은 것도 에어플로 변수로 관리하는 경우가 있다. 변수로 빼놓고 필요할 때마다 고쳐 쓰는 식. 그것도 나쁜 방법은 아니나, 이게 크리티컬한 SQL이라면 이를 어느 정도 테스트 하고 나서 변수로 관리하는 습관 필요하고, 이것도 어떤 형태로든 기본기가 남아야지만 나중에 문제 생겼을 때 디버깅 이슈해결 가능. 깃헙에 이런게 기록 되면 커밋 되돌리는 등의 일을 할 수 도 있고.

그래서 기용쌤 같으면, 엄청 중요한 거라면 변수로 분리해 관리할 것 같진 않으시다고 함

 

숙제: airflow.cfg 파일

이 파일 이씨투 직접 설치할 땐 이 파일 직접 수정하기도 했었다. 에어플로 환경설정과 관련된 다양한 정보들이 이 안에 있는데, 여기 있는 항목들을 풀어 숙제로 제출해달라

1번. 에어플로 대그들 키 지정해주는 폴더? 있는데 그게 어딘지. 근데 2번 설명하면서 이미 답을 알려드렸다. 기본이 5분이란거. 근데 이 5분이란 게 어디 기록돼있는지?를 볼 것.


Yahoo Finance API DAG

이번엔 Yahoo Finance API DAG 작성을 Full Refresh, 인크리멘탈 업뎃 두 방법으로 작성해보자! 전반적ㅇ로 앞 예제랑 거의 비슷하다. 먼저 설치부터 하자. 스캐ㅔ줄러 안 터미널 접속 후 설치 명령

원래 위 명령어 입력 후 airflow tasks list UpdateSymbol 실행해야 하는데 실패한다. 이떄 아래 사진과 같이 해결 가능하고, 특히 1번 명령어를 잘 기억해 둘 필요 있다.

 

위 명령 성공 뒤

이 명령 입력 후

마침내 성공.

 

 

2번째 방법으로 만드는 경우, ETL 중 로드 부분만 바뀜. 원래는 원본 테이블 삭제하고 새로 만든 다음 읽어온 걸 적재하는형태로, 매일 새로운 지난 30일 간의 주식 정보만 저장. 그러나 이번엔 삭제 안하고 읽어온 것들 다 적재한 후 중복 레코드를 걸러내는 식으로 구현! 늘 배워보고 싶었던 부분이군. 이렇게 하면 전날 주식시장 열렸단 전제 하에 하루씩 레코드가 늘어나지.

또, 이번엔 이전과 다르게 테이브을 새로 만들거다. 스키마는 동일하나 원래 테이블인 stock_info에 _v2라는 어...미?를 추가로 붙인 테이블 만들어 인크리멘탈하게 업뎃된 레코드를 적재한다!

 stock_info_v2는 유니크 레코드들로 구성된 테이블인데, 여기에 바로 적재를 하기 힘들기 때문에 코드상으로는 임시 테이블을 만들어 거기에 적재한 뒤 그 임시 테이블에서 stock_info_v2로 다시 로드하는 식임. 이는 레드시프트 세션 끝나면 사라지므로 굳이 드롭해주지 않아도 ㄱㅊ.

 

이 스탭을 트랜잭션으로 만드는 것. 임시 테이블 만드는 부분은 에러 난다 해도 큰 문제 없다 - 원본 테이블 정합성에 문제가 생기진 않기 때문. 트랜잭션은 정확히 3,4번째에 걸려야 한다. 물론 전체에 다 걸려도 상관없긴 하나 트랜잭션이 걸리는 영역은 최소화하는게 좋기 때문에 다 걸어도 되고 3,4번째 스텝만 트랜잭션으로 걸어줘도 된다.

 

이런게 ctas.

 

이때 UpdateSymbol_v2 과 버전1 둘다 실행 안되는 문제가 있는데, 도커컴포즈 야믈파일의 _PIP_ADDITIONAL_REQUIREMENTS 부분 중 우리 실습 파일 기준 63번째에 원래 _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}

이렇게만 돼있던거 뒤에 우리가 설치해야 하는 모듈인 pymysql yfinance를 띄어쓰기 잘 준수해서(이 한끝 차이로 코드 색깔이 바뀜..) 다시 해보면 된다고 하는데...?

 

와 진짜 된다.


숙제 리뷰

여기 잘 찾아보면 요일 세팅하는 부분 있음

 

 

여기 5번에서, 많은 사람들이 DB init하면 되지 않느냐고 생각하지만 ... 그럼 메타데이터를 처음부터 다시 만들고 불러와 버린다. 이건 백엔드 디비 설정 같은게 바껴서 그걸 불러와 준다던지, 하는 경우에만 실행해줘야 한다.

 

airflow.cfg에는 두 종류의 Timezone 관련 키가 존재하는데 default_timezone, default_ui_timezone이 있다.

start_date (시작일), end_date(종료일), schedule은 default_timezone에 지정된 Timezone을 따른다.

후자는 말그대로 그냥 UI 화면서 보이는 타임존 설정이다. execution_date은 전, 후자의 설정과 관계 없이 항상 UTC를 따르게 돼있다.

결국 서로 너무 혼란이라서, 기용쌤은 그냥 전부 UTC를 쓰신다 한다.

 

맨 아래 2줄이 무슨 소리냐, 점 2개에 적은 이유 때문에 만약 그 테스트 코드 안에 혹시라도 저런 delete 어쩌고 하는 코드가 있는 경우... 귀신이 곡할 노릇. 이것떔에 크게 고생해본 적 있음

 

CountryInfo DAG 코드 리뷰. extract_transform 부분 제외 나머진 Update_symbol 1,2 .py 코드랑 똑같으니 이 부분만 리뷰! 단 name = country['name']['common'] 이 부분 common->official로 바꿔 달라.

요일 파라미터는 scledule = '* * * * *' 중 맨 마지막 칸이었다! 0부터 6사이 값 가지며, 일~토요일로 매칭된다.

 

airflow.cfg가 웹 서버 밑에 있는 줄 알았는데, 스케줄러 밑에 들어가자마자 바로있네...?

CountryInfo.py 우리 파일엔 없지만...

 

여기서 except 의 롤백 코드 뒤에 raise 붙여서 데엔들이 여기서 에러났단 사실 알게 해주는 게 좋다! 커리큘럼 뒷부분에 따로 올려두신다 함

 

 

 

*과제정답: https://hongcana.tistory.com/122