관리 메뉴

A seeker after truth

<Apache Airflow 기반의 데이터 파이프라인> 7, 9단원 메모 본문

Data/데엔 데브코스 TIL

<Apache Airflow 기반의 데이터 파이프라인> 7, 9단원 메모

dr.meteor 2024. 1. 18. 22:18

ch7. 외부 시스템과 통신하기

유익한 내용이 너무 많다. 배운게 너무 많다... 장고를 백엔드, cs, 파이썬에 대한 지식과 숙련도를 올리는 수단으로 썼을 때와 동일한 경험을 했다. 코드 차원서도 그렇고, 비슷한 문제를 이미 다른 프로젝트를 통해 경험해봤으며, 그 과정서 생겼던 은연 중의 의문들을 이 책을 통해 해소할 수 있었단 점에서 그랬다.

 

여기서 외부 시스템 = 에어플로 및 에어플로가 구동되는 시스템 외의 모든 기술.  ex) 퍼블릭 클라우드 플랫폼 내 서비스들, spark 클러스터

 

여기서 다루는 건 데이터 이동 및 변환 작업이 주다. 뿐 아니라 또 MLOps 스러운걸 다룸. 그래서 sagemaker, 포스트그레스, 에어비앤비 데이터(http://insideairbnb.com/)를 사용한다.

 

하루 한 번 최신 데이터를 포스트그레스->S3로 다운로드. 그리고 난 후 도커 컨테이너 안에서 판다스 작업 실행해 가격 변동 확인후 다시 s3에 저장.

 

aws '클라이언트'는 boto3, gcp '클라이언트'는 cloud sdk. 클라이언트는 요청에 필요한 세부 정보 입력하면 요청 및 응답 처리를 내부적으로 처리하는 기능을 제공하는 거다.

 

에어플로에서 프로그래머에게 제공하는 '인터페이스' = 오퍼레이터.

오퍼레이터는 클라우드 서비스에 요청하는 세부 정보를 전달하는 편리한 클래스로서, 기술적 구현을 내부적으로 처리

내부적으론 클라우드의 sdk를 사용해 요청을 보내고, 클라우드 sdk를 감싼 레이어를 제공하는 것과 동일하다. (그림 7.1)

 

 

7.1 클라우드 서비스 연결하기

또 다른 예제로 손글씨 인식 ML 모델 개발 상황을 상정한다. 이 부분을 처음 알게 됐는데, 모델은 온라인과 오프라인 두 부분으로 나뉜다.

오프라인 부분은 데이터 학습 시키고 결과(파라미터들)가 저장된다. 새 데이터 수집될 때마다 이 프로세스가 주기적으로 수행됨.

온라인 부분은 모델 로드하고 분류 예측값 반환.

에어플로는 일반적으로 오프라인 부분을 담당. 데이터 로드, 전처리, 학습 과정, 주기적 재학습 과정이란 절차를 떠올려보면 에어플로 쓰기에 워낙 좋은 거다.

전체 파이프라인은, 샘플 데이터를 개인 s3로 복사 → 기존 이미지 파일(데이터)을 모델서 사용 가능한 형식으로 변환 → 싸지메이커로 학습 → 새로운 사진에 대해 예측값 반환.

이 파이프라인이면, 모델이 변경됐을 때 파이프라인을 재실행해야 함.

태스크로만 보면 4개지만, 실제 코드는 굉장히 길다. 싸지메이커를 위해 구성해야 하는 태스크가 워낙 많아서 그렇다.

https://github.com/K9Ns/data-pipelines-with-apache-airflow/blob/main/chapter07/digit_classifier/dags/chapter9_digit_classifier.py

 

위 코드에서 인상깊게 본 것들은 아래와 같다.

- mnist_obj = s3hook.get_key(bucket_name="your-bucket", key="mnist.pkl.gz") → 이 코드를 's3 객체를 다운로드한다' 고 표현한 점. 메모리 내 바이너리 스트림으로 데이터를 다운로드 하는 것.

- gzip 파일 압축 풀고 데이터셋 추출, 변환 후 버킷에 재업하는 코드 부분 (버퍼 사용 등). 넘파이 배열을 RecordIO 포맷으로 변환하는 게 write_numpy_to_dense_tensor다. io.BytesIO()는 인메모리 바이너리 스트림이다. 이 때문에 데이터가 파일 시스템에 저장되지 않아 작업 후 파일이 남아있지 않다. 대규모 데이터였으면 데이터를 디스크에 저장하고 chunk로 처리하는 게 더 나았을 것.

- 아래 코드는 오퍼레이터가 학습 완료될 때까지 대기하며 학습 중 클라우드워치로 로그 출력하게 하는 옵션이다. 특히 첫번째 옵션은 false로 돼있으면 단순히 명령만 실행하고 그 외건 신경 안씀(boto3 클라이언트 동작의 특성). 이걸 트루로 해두면 싸지메이커 오퍼레이터가 태스크 완료까지 기다리는데, 내부적으론 오퍼레이터가 x초마다 태스크 실행 중인지 확인.

wait_for_completion=True,
    print_log=True,

 

위 파이프라인에서 손글씨 숫자 입력 후 결과 반환하게 만드려면 이에 대한 인ㅍ터페이스 혹은 API 필요함. 이를 위해 AWS에서 인터넷에 액세스 하기 위해 싸지메이커 엔드포인트를 트리거하는 람다를 개발 및 배포하고 API GATEWAY를 생성 및 연결해 외부에서 접속할 수 있는 http 엔드포인트를 만든다.

그럼 이 부분(인프라 배포를 코드로 구성해 배포)을 파이프라인에 통합하지 않는 이유는? 주기적이 아닌 한 번만 배포하면 되기 때문.

 

AWS Chalice를 사용하는 사용자용 API 예시 코드: https://github.com/K9Ns/data-pipelines-with-apache-airflow/blob/main/chapter07/digit_classifier/number-classifier/app.py

위 코드에서

1) 아래 코드는 입력 이미지를 grayscale nparray로 변환하는 것.

img = Image.open(BytesIO(app.current_request.raw_body)).convert("L")
    img_arr = np.array(img, dtype=np.float32)

2) 싸지메이커 응답은 바이트로 반환되기 때문에 Decode를 해주는 것.

 

 

이렇게 외부 서비스 사용 시, 파이프라인에 다양한 컴포넌트를 정확히 통합하는 걸 보장해야 하므로 복잡해지는 경우가 많다. 외부 서비스 끼는 파이프라인 개발 쉬운거 아니다.

에어플로는 위와 같이 그 자체로도 학습을 위한 피처 세트를 관리할 수 있는 범용 오케스트레이션 프레임워크다. 

 

 

7.2 시스템 간 데이터 이동

https://github.com/K9Ns/data-pipelines-with-apache-airflow/blob/main/chapter07/insideairbnb/dags/custom/postgres_to_s3_operator.py

위 코드에 대한 설명

- 버퍼는 메모리에 있으며 처리 후 파일 시스템에 남기지 않기 때문에 편리하다. 하지만 포스트그레스 쿼리 출력값은 메모리에 위치하므로, 쿼리 결과의 크기를 메모리에 맞춰야 하는 걸 유념해야 한다.

- 멱등성을 보장하기 위한 핵심은 replace=True 세팅이다.

 

7.2.2 큰 작업을 외부에서 수행하기

- 에어플로를 오케스트레이션 시스템으로 볼 뿐 아니라 태스크 실행 시스템으로도 봐야 하는지 논의되곤 한다. 또 반대 의견으로 에어플로를 태스크 트리거 시스템으로만 사용해야 한단 생각으로 에어플로가 자체적으로 실제 작업을 수행해선 안되며, 대신 ㅎ스파크 같은 데이터 처리 시스템에서 수행해야 한다고 주장하기도 한다. 

- 도커 오퍼레이터. 왜, 언제 쓸 수 있는지 플젝 리뷰 떄 했으니 생략


9단원: 테스트

- 무결성 테스트: 사이클 존재 검사함. (test_dag_integrity.py). 모든 dag 파일이 들어있는 폴더를 대상으로 함

- 예시 코드 통해 테스트 디렉터리 구조 알 수 ㅇ맀고, tests/ 디렉터리엔 init파일 없음. 이 디ㅂ렉터리는 모듈로 동작하는 구조가 아니기 때문에 테스트 작업은 서로 의존적이지 않으며 독립적으로 실행할 수 있어야 함. 파이테스트는 디렉터리와 파일을 스캔하고 테스트를 자동 검색. 따라서 init파일로 모듈 만들 필요 없음