관리 메뉴

A seeker after truth

한 페이지로 정리하는 spark 개념 본문

Data/hadoop ecosystem

한 페이지로 정리하는 spark 개념

dr.meteor 2022. 6. 22. 09:58

https://datacadamia.com/db/spark/spark
위 문서 번역/참고해서 작성한 내용입니다. 지금까지 봤던 스파크 아키텍처 관련 자료 중 가장 정리가 잘된 자료였습니다.

 

 

https://blog.knoldus.com/cluster-vs-client-execution-modes-for-a-spark-application/

 

1. 스파크 클러스터 구성 요소 

스파크 클러스터는
- 스파크 애플리케이션 ⊃ 드라이버 ⊃ (SparkContext OR SparkSession - 클러스터매니저와 연결되어있음)
- 클러스터 매니저 (executor를 갖고온다/얻을 수 있다. 책에선 드라이버(=마스터) 와 워커 개념을 갖고 있다고 함)
- worker node ⊃ executors ⊃ tasks, cache
⊃ daemon process(얜 위치가 어딘지 모르겠 - 개별 워커 노드를 실행하고 관리한다)
로 구상되어있다. 자세한건 그림 보면서 외우자

각각의 역할은?

1) 스파크 애플리케이션: an instance of a driver. 한 애플리케이션 내에 있더라도 서로 다른 스레드에서 잡이 제출됐다면 한번에 여러개의 job이 동시에 실행될 수 있다. 글고 애플리케이션 내에 애플리케이션을 포함할 수도 있다!
클러스터와는 독립적인 프로세스 집합이며, main program(a.k.a. driver program) 의 SC 객체에 의해 조정된다.
➡︎ 드라이버(main() 실행되는 곳)
- 개념: S.C.(connection)를 get 했을 떄 만들어지는 데몬(또는 서비스)의 wrapper. 익스큐터(JVM) 위에서 다양한 병렬(parallel) 연산을 launch 한다. 이 병렬 연산은 클러스터 안에서만 실행되거나 locally on the same machine 에서 실행된다.
- 하는 일: 자체 데몬(서비스)를 시작함, 클러스터 매니저에 연결해줌, worker(executor) 를 get 하고 관리함, 그 worker(executor)들에서 오는 연결 요청을 listen 및 accept 해줌(따라서 드라이버 프로그램은 워커 노드들로부터 네트워크 주소로 접근 가능해야만 한다!).
- 관리
• spark.driver.memory
• spark.driver.cores: 스레드 개수 설정!
• 서비스 포트

• UI: 4040 포트로 들가면됨
• 머신: 드라이버가 실행되는 곳이며, 스파크 잡을 초기화하고 summary results 가 collected 된다. client(로컬 머신), cluster(리소스 매니저) 중 하나로 선택할 수 있다.
- 책 내용: 스파크애플리케이션의 실행을 제어하고 클러스터(익스큐터의 상태와 태스크)의 모든 상태 정보를 유지한다. 물리적 컴퓨팅 자원 확보 및 익스큐터 실행을 위해 클러스터 매니저와 통신.
- 여담: YARN의 맥락에선 어플리케이션 매니저와 동의어다. 모든 드라이버는 single SC object를 갖고있다.
*데몬(또는 서비스): 익스큐터를 시작하는 드라이버인데, 핵심은!!!!!!
스파크 데몬 == 스레드를 실행 중인 JVM == core == slot = the number of available threads for each executor
즉 여기서 말하는 코어는 cpu 코어를 말하는게 아니고, 스파크 안에서 병렬 처리를 수행할 수 있는 스레드를 지칭한다. 그걸 core, slot 두 단어를 혼용해서 쓰고있다!
드라이버(익스큐터) 1개 == (1 JVM & many core)
As the JVM has to startup and initialize certain data structures before it can begin running tasks, running more core is preferable as running more executors.

https://datacadamia.com/db/spark/cluster/core

앞&뒤에서 나온 spark.driver.cores, spark.executor.cores 말고 전체 클러스터에 대해 설정하고 싶으면 spark.cores.max 설정을 하면 된다.
➡︎ SC
- 클러스터 매니저에 연결되어 얘로부터 자원을 할당받는다.
- 클러스터 안에서 노드에 있는 익스큐터를 요구한다
- SC를 거쳐 익스큐터로 application code 를 전송/전달해준다.
- 익스큐터가 run 하도록 tasks를 보내준다.
*SparkSession: connection 객체로, 애플~이ㅡ 엔트리 포인트며 dataset/dataframe 을 만드는데 쓰인다.

2) 클러스터 매니저
Spark is agnostic to the underlying cluster manager. 대충 스파크란건 클 매 없이 존재할수없다 정도로 이해
- 책 내용: 프로세스가 아닌 물리적 머신에 연결되는 개념이다!
(1) standalone
a simple cluster manager included with Spark that makes it easy to set up a cluster. spark://hostnameMaster:port 로 접속 가능. 마스터는 8080, 워커는 8081, 서비스는 7077 포트로 접속 가능하다.
- 마스터: 하둡의 마스터 노드(헤드 노드) 와 동일. 워커 노드를 관리하는 녀석. hdfs 에선 여기에 namenode 서비스가 설치되고 yarn 에서는 리소스 매니저 서비스가 설치된다.

3) 워커 노드(executor)
스파크 애플~ 을 위해 연산을 실행하고 데이터를 저장하는 프로세스들이다. 클러스터 안 각 스파크 앱들은 단지 테스크를 실행하고 데이터를 저장하는 독립적인 익스큐터 JVM 집합을 갖고 있다.
- 책 내용: 드라이버가 할당한 태스크를 수행하는 프로세스. 태스크의 상태&결과(성공/실패)를 드라이버에 보고한다. 모든 스파크 애플~~션은 개별 익스큐터 프로세스를 사용한다.
- 워커 간엔 통신이 없다. 독립적이라구~
- 함수를 사용하는 트랜스포메이션/액션을 실행할 때, 스파크는 그 함수를 포함하고 있는 클로져가 워커에서 실행될 수 있게 자동으로 워커로 push 할 것이다. 하나의 클로저는 모든 task에 대해 각 워커마다 send 될 것이다.
- 워커에 있는 전역 변수가 수정되는 어떠한 경우라도 그 수정 사항이 다른 워커들이나 다른 드라이버로 전송되지 않는다.

이런 식으로 나뉘어져 들어간다

- spark.executor.memory, spark.executor.cores 옵션이 있다.

4) 기타
NODE란? a physical computer that logically run in a cluster (종류마다 다 메모할거면 하둡의 node 링크 들어가서 참고: https://datacadamia.com/db/hadoop/node)

2. 

1) Job
스파크 앱 의 작업 단위다. 액션에 의해 트리거된다. 긍까 액션 하나당 하나의 스파크 잡이 생성되며 액션은 항상 결과를 반환한다.
2) Stage
다수의 머신에서 동일한 연산을 수행하는 task의 그룹. 스파크는 가능한 한 많은 task(transformation)를 동일한 스테이지로 묶으려 한다. 스파크는 스테이지 수는 셔플 작업이 얼마나 많이 발생하는지에 따라 다르다. 셔플이 일어난담에는 반드시 새로운 스테이지를 시작한다.
파티션을 재분배하는 과정은 데이터를 이동시키는 작업이므로 익스큐터 간의 조정이 필요해서 이와 같은 이유로 새 스테이지 시작하며 최종 결과 계산을 위해 스테이지 실행 순서를 계속 추적한다.
3) Task
슬롯(코어) 위에서 실행된 스레드를 일컫는다. (때론 병렬로) 워커(익스큐터)에 의해 실행된다. The total number of slot is the number of thread available. The number of Partitions dictate the number of tasks that are launched.
- 책 내용: 각 task는 단일 익스큐터에서 실행할 데이터의 블록과 다수의 트랜스포메이션 조합으로 볼 수 있다. 만약 데이터셋이 거대한 하나의 파티션인 경우 하나의 task만 생성된다. 파티션이 1000개면 1000개의 task를 만들어 병렬 실행 가능. 즉 task란 데이터 단위(파티션)에 적용되는 연산 단위 의미.

3. 스파크 엔진

트랜스포메이션 & 액션, 파티션, 셔플, 클로저, 공유변수
1) 트랜스포메이션: 특별한 내용 없고 예시도 없군...
2) 액션: 한개 이상의 잡으로 구성돼있다.
3) 파티션: 아까 나왔듯 파티션 개수는 launched 된 테스크 개수를 가리킨다. 파티션 개수는 spark.sql.files.maxPartitionBytes 옵션에 의해 정해진다 저거 디폴트값이 128MB = 134217728 옵션에 의해 정해지는거있잖아 그거임. 이를테면 1기가 사이즈의 파일 하나를 읽으면 10개의 파티션이 생김. 이는 Hdfs 블럭 사이즈와도 동일한 거 알지? 그리고 repartition 관련 연산들은 모두 spark.sql.shuffle.partitions 옵션으로부터 설정된 파티션 개수를 알아내 사용한다. 이 값은 클러스터 코어 수에 맞춰 설정한다!! 자세한건 챕터19에 나온다고함! 보통 클러스터의 익스큐터 수보다 파티션 수를 더 크게 지정하는게 좋다. 로컬 머신서 코드 실행하는 경우엔 병렬 처리 가능 task 수가 제한적이므로 이 값을 작게 설정해야함. 이 설정은 더 많은 익스큐터 코어를 사용할 수 있는 클러스터 환경을 위한 기본값. 최종 스테이지서는 결과를 단일 파티션으로 모으는 작업을 한다.
파티션 수를 늘리면 더 높은 병렬성을 얻을 수 있으며 만병통치약은 아니지만 최적화를 위한 가장 간단한 방법이다.
4) 셔플: moving data rows by rows between partition.
spark.sql.shuffle.partitions 옵션은 조인, aggregation을 위해 데이터를 셔플링할 때 사용할 파티션 개수를 설정한다.
5) 클로저: 일단 시험 나오는 애는 아니니 걍 읽는 정도로- https://datacadamia.com/db/spark/pyspark/closure
6) 어큐뮬레이터
Accumulators can only be written by workers and read by the driver program. 그래서 워커에서 값들을 aggregate 한담에 드라이버로 돌려준다. 기본적으로 테스크 안에서 write-only고, 오직 드라이버만이 어큐뮬레이터 값에 접근할 수 있다. Use to count errors seen in RDD across workers.
7) 브로드캐스트: https://datacadamia.com/db/spark/rdd/broadcast

3,4,6,7 은 공식문서: https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#shuffle-operations

8) 조인: 조인 전엔 셔플이 일어난다.
9) optimizer(catalyst)
https://datacadamia.com/db/spark/engine/optimizer

10) https://datacadamia.com/data/type/relation/engine/physical_plan
https://datacadamia.com/db/spark/engine/physical_plan
https://datacadamia.com/db/spark/engine/plan
11) cache(): 함수는 persist(storageLevel = MEMORY_AND_DISK) 함수와 동일 (참고 : Storage Level )

 

 

 

 

4. 실행 모드: application 실행 시 요청한 자원의 물리적 위치를 결정한다.

https://eyeballs.tistory.com/269?category=785117
1) 로컬 모드
2) 클라이언트 모드
- 책 내용: 클라이언트 머신에 스파크 드라이버 프로세스, 클러스터 매니저에 익스큐터 프로세스가 있다 이거 셤문제 나온거니까 꼭... 이런 머신을 게이트웨이 머신 또는 에지 노드라 한다.
- 공식 문서: the driver runs in the client process, and the application master is only used for requesting resources from YARN
- 검색 내용: 특정 잡 상태를 모니터링하고 싶을 때 사용한다. 드라이버가 로컬에 있기 땜에 전체 애플리케이션이 로컬 머신에 의존하고 있다. 하지만 문제 생기면 로컬의 드라이버->애플리케이션 순으로 꺼지고 말테니 프로덕션 레벨에선 사용하면 안된다. 하지만 디버깅이나 테스팅 용으로는 좋다
3) 클러스터 모드
- 책 내용: 컴파일된 jar 파일, 파썬 스크립트 등을 클러스터 매니저에게 전달해야 한다. 얘는 파일 받은 다음 워커 노드에 드라이버&익스큐터 프로세스 실행한다. 이를테면 하나의 워커 노드에는 스파크 드라이버를 할당하고 다른 워커 노드에는 익스큐터를 할당한다.
- 공식 문서: the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application
- 검색 내용: 워커 머신과 애플리케이션이 서로 멀리 있을때 (이를테면 서로 다른 컴퓨터에 있을떄) 사용하면 좋다.