[Spark] 아파치 스파크(spark) 시작하기

Driver, Executor, Node, Job, Stage, Task, Cluster Manager/ RDD, Fault tolerance / Hadoop

Posted by Wonyong Jang on April 11, 2021 · 16 mins read

아파치 스파크를 한마디로 정의하자면 '빅데이터 처리를 위한 오픈소스 분산 처리 플랫폼' 또는 '빅데이터 분산 처리 엔진' 정도로 표현할 수 있다.

여기서 말하는 분산 프로그래밍은 무엇을 의미할까?

간단히 말해 여러 디스크로부터 데이터를 한번에 읽는 것을 말한다.
사실 하드디스크 드라이브 저장 용량이 엄청나게 증가한데 반해 액세스 속도는 그에 훨씬 미치지 못하여 드라이브를 읽는데도 시간이 굉장히 오래 걸렸다. 그래서 이 시간을 줄이기 위해 하둡을 사용하게 되었는데, 예를 들어 100개의 드라이브가 있고, 각 드라이브에 1/100만큼 데이터를 저장했다고 가정했을 때 이것이 병렬로 동작한다면 더 빠른 시간 내에 데이터 읽기가 가능하다고 한다.

빅데이터 개념이 등장했을 당시, ‘빅데이터 처리 = 하둡(Hadoop)’ 이라고 할 정도로 하둡 에코시스템이 시장을 지배했다.
하둡은 HDFS(Hadoop Distributed File System)라고 불리는, 분산형 파일 시스템을 기반으로 만들어졌다.

하둡은 여러 대의 서버를 이용해서 하나의 클러스터를 구성하여, 이렇게 클러스터로 묶인 서버의 자원을 하나의 서버처럼 사용할 수 있는 클러스터 환경을 제공한다.
기본적인 동작방식은 분석할 데이터를 하둡 파일 시스템인 HDFS에 저장해 두고 HDFS 상에서 맵리듀스 프로그램을 이용해 데이터 처리를 수행하는 방식이다.

문제는 하둡의 맵리듀스가 DISK I/O를 기반으로 동작한다는 것에 있었다.
실시간성 데이터에 대한 니즈(NEEDS)가 급격하게 증가하면서, 하둡으로 처리하기에는 속도 측면에서 부적합한 상황이 발생하였다.

이 때 등장한 것이 아파치 스파크이다.
최근에는 경쟁 관계를 넘어서, 하둡+스파크 라는 둘의 연계가 하나의 큰 흐름으로 자리 잡았다.

즉, 스파크는 하둡을 대체하는 것이 아니라 MapReduce를 대체하는 존재다. 예를 들어, 분산 파일 시스템인 HDFS나 리소스 관리자인 YARN 등은 Spark에서도 그대로 사용 할 수 있다. Hadoop을 이용하지 않는 구성도 가능하며, 분산 스토리지로 Amazon S3를 이용하거나 분산 데이터베이스인 카산드라(cassandra)에서 데이터를 읽어 들이는 것도 가능하다.

스크린샷 2021-02-22 오후 11 41 28


1. Spark 란?

현재 빅데이터 분석기술 중에서 가장 주목 받는 기술은 아파치 스파크(spark)이다.
스파크는 인메모리(In-Memory) 기반으로 동작하기 때문에, 반복적인 처리가 필요한 작업에서 속도가 최소 100배 이상 빠르다.

스파크는 스칼라로 작성되어 자바 보다 간결한 코드로 같은 작업을 표현할 수 있다. 또한 JVM에서 동작하기 때문에 기존에 사용하던 자바 라이브러리를 모두 사용할 수 있다.

또한, 다양한 방식으로 배포가 가능하며 Java, Scala, Python, R 프로그래밍 언어를 위한 네이티브 바인딩을 제공하고 SQL, 스트리밍 데이터, 머신러닝, 그래프 프로세싱을 지원한다.

1-1) Spark vs Multiprocessing

분산 환경을 사용하는 가장 큰 이유는 유연한 확장성에 있다.(다양한 하둡 에코시스템을 통한 효율적인 빅데이터 저장/분석이 가능한 부분도 있다)
데이터가 정적으로 고정되어 있지 않고 지속적으로 증가한다고 생각해보자.

이를 대응하기 위해서 단일 서버는 내부의 하드웨어를 교체하는 등의 고비용이 요구되고, 이를 처리하기 위한 복잡한 저수준의 프로그래밍이 필요하다. 분산환경에서는 값싼 서버를 하나씩 늘리면 될 뿐이다.

병렬 연산을 꼭 분산 클러스터를 구성해서 해결해야 하는지를 먼저 따져봐야 한다. 단일 서버에서 많은 메모리와 멀티 코어 수와 함께 multiprocessing을 사용하면 웬만한 크기의 병렬은 해결할 수 있다. 분산 환경에서 실행 계획, 리소스 할당 등 테스크를 돌리는 과정이 있기에 단일 서버보다 성능이 저하되는 경우도 있다.

컴퓨팅 성능에서 가장 느린 부분은 디스크 입출력이다. 대용량 데이터일수록 디스크 입출력은 굉장히 느려진다. 분산 환경은 대용량 파일을 잘게 나눠서 저장하고 여러곳에서 동시에 입출력할 수 있다. 즉, 디스크 I/O를 병렬로 할 수 있는 것인데 이는 데이터가 많아질수록 유리한 구조이다.

이렇듯 문제를 잘 파악하고 단일 서버에서 multiprocessing으로 돌릴 것인지 분산 환경에서 spark로 돌릴 것인지 결정해야 한다. 만일 대용량 데이터가 크지 않고 고정되어 있다면 단일 서버도 고려해볼 필요가 있다.


2. Spark 구조

스파크는 마스터-슬래이브 구조로 실행 되며, 작업을 관장하는 드라이버(driver)와 실제 작업이 동작하는 익스큐터(executor)로 구성된다.

Driver와 Executor는 각각 독립된 JVM 프로세스이다.

드라이버는 스파크 컨텍스트 객체를 생성하여 클러스터 매니저와 통신하면서 애플리케이션 라이프 사이클을 관리한다.

스크린샷 2021-02-22 오후 10 36 59

스파크 실행 프로그램(Application)으로 드라이버와 익스큐터 프로세스로 실행되는 프로그램을 말한다.

클러스터 매니저는 스파크의 클러스터 모드를 구성하는 컴포넌트 중 하나로 여러 대의 서버로 구성된 클러스터 환경에서 다수의 애플리케이션이 함께 구동 될 수 있게 애플리케이션 간의 CPU나 메모리, 디스크와 같은 컴퓨팅 자원을 관리해 주는 역할을 담당하며, 클러스터 매니저의 종류는 하둡의 Yarn이나 아파치 메소스(Mesos), 스파크에서 자체적으로 제공하는 스탠드얼론(Standalone), 쿠버네티스 등을 예로 들 수 있다.

즉, Spark Cluster 모드 사용시 Spark Application은 아래 과정을 거쳐 Driver와 Executor가 생성 및 실행된다.

  • spark-submit을 이용해 Application을 클러스터에 제출한다.

    spark-submit은 Cluster Manager에게 Spark Application을 제출하는 툴이며, 쉘 스크립트로 되어 있다.

  • spark-submit은 Driver 프로그램을 실행하여 main() 함수를 호출하고, Spark Context를 만든다.
  • Driver 내의 Spark Context는 Cluster Manager와 통신하여(Yarn, Kubernetes 등) Executor를 할당 받는다.
  • Driver 내의 Spark Context는 사용자의 코드를 바탕으로 실행 계획을 만들어 최종적으로는 Task를 Executor에게 전달한다.
  • 이 과정은 Cluster Manager가 Yarn, Kubernetes 상관 없이 동일하다.
  • Application이 종료되면 Cluster Manager에게 리소스를 반납한다.

Spark Driver와 Executor가 초기화 된 후 사용자가 요청한 Task가 실행된다. 이 때 Driver는 Executor로 부터 데이터를 직접 받거나(collect) Driver로 부터 Executor로 실행될 코드 등을 전송할 수 있다.

또한 Executor 간에는 데이터 이동(Shuffle) 발생할 수 있으므로 같은 네트워크 영역에 위치하는 편이 좋다. 따라서, EMR을 만들 경우 단일 AZ 내에서만 사용할 수 있다.(AZ-a 만 사용하는 등)

아래 각 용어에 대해 더 자세히 살펴보자.

2-1) Spark Cluster

클러스터라고 하면 일반적으로 여러 대의 서버가 마치 한 대의 서버처럼 동작하는 것을 의미한다. 스파크 역시 클러스터 환경에서 동작하며, 대량의 데이터를 여러 서버로 나누어 병렬로 처리한다.

따라서 항상 클러스터 환경에서 동작하는 프로그램을 작성할 때는 데이터가 여러 서버에 나눠져 병렬로 처리되고 있다는 사실을 기억하고 있어야 한다.

2-2) Driver

스파크에서는 잡을 실행하는 프로그램, 즉 메인 함수를 가지고 있는 프로그램을 가리켜 드라이버라고 한다. 더 정확하게 표현하자면 드라이버란 스파크 컨텍스트를 생성하고 그 인스턴스를 포함하고 있는 프로그램을 의미한다.

드라이버 프로그램은 자신을 실행한 서버에서 동작하면서 스파크 컨텍스트를 생성해 클러스터의 각 워커 노드들에게 작업을 지시하고 결과를 취합하는 역할을 수행한다.

즉, 실제 수행 단위인 Task로 변환 해 Executor에게 전달한다.

2-3) Spark Context

스파크 컨텍스트는 스파크 어플리케이션과 클러스터의 연결을 관리하는 객체로서 모든 스파크 어플리케이션은 반드시 스파크 컨텍스트를 생성해야 한다.
RDD를 비롯해 스파크에서 사용하는 주요 객체는 스파크 컨텍스트를 이용해 생성할 수 있다.

스파크 어플리케이션에 반드시 1개의 스파크 컨텍스트만 존재해야 한다.
스파크 컨텍스트로 부터 생성한 RDD는 다른 어플리케이션 내에 스파크 컨텍스트에서 참조는 불가능하다.

val conf = new SparkConf().setMaster("local[*]").setAppName("RDDSample")   
val sc = new SparkContext(conf)   

스파크 컨텍스트를 생성할 때는 스파크 동작에 필요한 여러 설정 정보를 지정할 수 있다. 이 가운데 클러스터 마스터 정보와 어플리케이션 이름은 반드시 지정해야 하는 필수 정보이다.

즉, 스파크 컨텍스트는 executor의 memory, core, parallelism, compression 등의 모든 설정들을 관리하고 전달하게 된다.
또한, RDD 생성 및 accumulator, broadcast 공유 변수 또한 스파크 컨텍스트가 생성하게 된다.

마스터 정보는 스파크가 동작할 클러스터의 마스터 서버를 의미하는 것으로 로컬 모드에서는 local 또는 local[3], local[*] 같이 사용된다.

위처럼 Local 모드에서 여러 스레드로 실행하면, driver가 동시에 executor 역할도 같이 하게 된다.

local[3]은 3개의 스레드를 의미하고 local[*] 은 가용한 만큼의 Core 갯수로 실행을 의미한다.

2-4) Executor 와 Node

먼저, Node는 AWS EC2와 같은 머신이라고 생각하면 된다.
하나의 EC2 머신 내에서는 여러개의 Process가 실행될 수 있다.

Spark Executor는 JVM Process이다.
즉, Task 실행을 담당하며, 실제 작업을 진행하는 Process이다.
Task 단위로 작업을 실행하고 결과를 Driver에 알려준다.
Executor가 동작 중 오류가 발생하면 다시 재작업을 진행한다.

여기서 Worker Node는 Cluster Manager가 컨트롤 하며, Cluster Manager가 Worker Node에게 Executor의 cpu, memory등을 할당하여 지시한다.

executor가 정상적으로 실행되면, driver에게 등록되었다고 전달하며, driver는 지속적으로 executor의 상태를 체크한다.

정리해보면, EC2 = Node 내에는 여러개의 Spark Executor가 실행될 수 있다.
여러개의 Executor 실행시 EC2 = Node의 Resource(CPU, Memory, Disk, Network)를 나눠 쓸 수 있다.

스크린샷 2023-01-08 오후 6 34 22

정리해보면, EC2 = Node 내에는 여러개의 Spark Executor가 실행될 수 있다. 다만 여러개의 Executor 실행시 EC2 = Node의 Resource(CPU, Memory, Disk, Network)를 나눠 쓸 수 있다.

  • 일반적으로 CPU, Memory는 쉽게 증설이 가능하지만 Disk IO나 Network Bandwidth는 비용대비 효과를 보기 어렵거나 증설이 쉽지 않다. EC2에서는 Disk IO는 사이즈나 타입마다 다르며, 증설은 시간이 걸리고 Network Bandwidth는 머신 타입을 바꾸지 않으면 높이기 어렵다.
  • Cloud 환경에서는 머신을 생성하는 노동 비용이 거의 없다시피 하므로, 큰 머신을 너무 잘게 쓰는 것보다 머신 숫자를 늘리는게 나을 경우가 많다.

참고로 Kubernetes를 쓸 경우, Node(EC2) 내의 1개의 Pod가 Executor가 된다.

스크린샷 2023-01-08 오후 6 34 37

2-5) Task

익스큐터에서 실행되는 실제 작업이다. 익스큐터의 캐쉬를 공유하여 작업의 속도를 높일 수 있다.
일반적으로 하나의 Task가 하나의 스레드로 실행된다.
즉, 우리가 작성한 코드를 Driver가 Task로 직렬화 해서 네트워크를 통해 Executor에게 전달하여 처리한다.

하나의 Executor에 core가 여러개라면 병렬처리 또한 가능하다.
즉, executor의 core가 3개라면, 최대 3개의 task가 병렬로 실행된다.

2-6) Job

Spark에서 데이터를 RDD로 읽어서 여러 Transformation 작업을 거쳐서 최종적으로 action(save, collect 등)작업을 진행한다.
action 작업 발생하면 그때서야 lazy하게 각 task마다 병렬로 연산이 이루어 지는데, 해당 단위가 Job이다.

스크린샷 2023-01-12 오후 4 45 54

2-7) Stage

위의 각 Job을 최적화를 위해서 나누는 단위가 Stage이다.

아래 코드를 통해 stage가 생성된 예제를 살펴보자.

스크린샷 2023-01-23 오후 3 30 24

각 파티션은 task 1개가 할당되어 처리되며, stage 0이 모든 작업이 끝나면 stage1 이 시작된다.
groupByKey 연산을 실행하면, repartition이 발생하기 때문에 작업의 최적화를 위해 stage가 나뉘게 된다.
즉, 파티션의 변화가 있다면 stage를 나누게 된다.

스크린샷 2023-01-23 오후 3 29 02


3. RDD, DataFrame, DataSet

스파크 어플리케이션을 구현 방법은 스파크 v1에서 발표한 RDD를 이용하는 방법과 스파크 v2에서 RDD의 단점으로 개선하여 발표한 DataFrame과 DataSet을 이용하는 방법 두가지가 있다.

DataFrameDataSet 는 해당 링크를 참고하자.

스크린샷 2021-02-23 오후 7 19 53

스파크가 사용하는 기본 데이터 모델로써 RDD를 잘 이해하고 다루는 것은 스파크 어플리케이션을 작성하고 이해하는 데 기본이라 할 수 있다.

왜냐하면 DataFrame 도 내부를 살펴보면 RDD로 되어 있기 때문에 RDD에 대해서 자세히 살펴보자.


4. RDD(Resilient Distributed Datasets)

RDD는 문자 그대로 해석하면 회복력을 가진 분산 데이터 집합 정도가 될 것이다.

또한, 클러스터에 분산된 메모리를 활용하여 계산되는 List라고도 표현 할 수 있을 것 같다.

여기서 회복력이 있다는 말은 데이터를 처리하는 과정에서 일부 문제가 발생하더라도 스스로 복구할 수 있다는 의미이다.
단, 복구의 의미는 스파크 어플리케이션이 정상적으로 동작하고 있는 상황을 가정한 것으로 작업 수행 도중 서버나 네트워크, 자원 할당 등에 일시적/부분적 문제가 발생했을 때 RDD의 작업 히스토리를 이용한 재시도를 수행함으로써 복구를 수행할 수 있다는 뜻이다.

따라서, 어플리케이션 코드 자체에 버그가 있거나 드라이버 프로그램이 오류로 종료되어 스파크 어플리케이션과 서버 프로세스 간 연결이 끊어지는 등의 영구적 장애 상황은 RDD에서 말하는 복구 대상이 아니라는 점을 알고 있어야 한다.

위에서 언급한 것처럼 데이터에 문제가 생겨도 원래 상태로 복구가 가능한 것은 RDD는 불변성을 띠기 때문이다.

스파크는 데이터의 일부가 유실되면 어딘가에 백업해둔 데이터를 다시 불러오는 것이 아니고 데이터를 다시 만들어내는 방식으로 복구를 수행한다. 한번 만들어진 RDD는 어떤 경우에도 그 내용이 변경되지 않기 때문에 같은 방법으로 만든 RDD는 항상 같은 데이터를 갖게 된다.
따라서, RDD를 만드는 방법만 기억하고 있으면 언제든 똑같은 데이터를 다시 만들어 낼 수 있게 된다.

또한, RDD 데이터는 클러스터를 구성하는 여러 서버에 나누어 저장한다. 스파크는 이렇게 분할 된 데이터를 파티션이라는 단위로 관리한다.

즉 RDD는 Driver에 위치하지 않고, 파티션 단위로 분리되어 다수의 executor에 분산되어 저장된다.

스크린샷 2021-02-23 오후 11 13 07

RDD는 트랜스포메이션(transformation), 액션(action) 두가지 타입의 연산을 가지고 있다.
트랜스포메이션은 필터링 같은 작업으로 RDD에서 새로운 RDD를 반환한다.
액션은 RDD로 작업을 처리하여 결과를 반환한다. 주로 드라이버로 연산 결과를 보낼 때 이루어 진다.
스파크는 지연처리(lazy evalution)를 지원하여 트랜스포메이션을 호출할 때는 작업을 처리하지 않고, 액션을 호출하는 시점에 작업을 처리하여 효율성을 제공한다.

스크린샷 2021-04-13 오후 11 33 38

이러한 동작 방식의 차이로 인한 가장 큰 장점은 실행 계획의 최적화가 가능하다는 점이다. 사용자가 입력한 변환 연산들을 즉시 수행하지 않고 모아뒀다가 한번에 실행함으로써 불필요한 네트워크 통신 비용을 줄일 수 있기 때문이다.

RDD는 액션이 실행될 때마다 새로운 연산을 처리한다. 작업의 처리 결과를 재사용하고 싶으면 캐시를 사용하여 결과를 메모리 또는 디스크에 유지하여 성능향상을 할 수 있다.

4-1) 장애시 복구 계획( Resilent )

스파는 RDD가 생성되어 변경되는 모든 과정을 일일이 기억하는 대신 RDD를 한번 생성되면 변경되지 않는 읽기 전용 모델로 만든 후 RDD 생성과 관련된 내용만 기억하고 있다가 장애가 발생하면 이전에 RDD를 만들 때 수행했던 작업을 똑같이 실행해(똑같은 데이터를 가진 새로운 RDD를 만들어) 데이터를 복구하는 방식을 사용한다.

이처럼 스파크에서 RDD 생성 작업을 기록해 두는 것을 리니지(lineage)라고 한다.

이를 DAG(Directed Acyclic Graph)로 표현하여 기록한다.

데이터를 일단 RDD로 만든 후 데이터 변형이 필요하면 그 RDD로 부터 변형된 새로운 RDD를 만들고 그것으로부터 또 다른 RDD를 생성해서 최종적인 모습의 RDD를 만들어 가는 형태로 데이터를 처리한다.

이때 기존 RDD는 변형되지 않고 매번 새로운 RDD가 재 생성 되기 때문에 클러스터 중 일부가 장애가 발생하면 문제가 발생했던 구간의 작업만 수행해서 RDD를 재빨리 복원할 수 있는 것이다.

스크린샷 2021-04-13 오후 11 33 09


Reference

https://1ambda.blog/2022/01/02/practical-spark-12/
https://wikidocs.net/book/2350
https://www.learningjournal.guru/article/apache-spark/apache-spark-parallel-processing/
https://artist-developer.tistory.com/7
https://subscription.packtpub.com/book/big_data_and_business_intelligence/9781787126497/7/ch07lvl1sec46/rdd-partitioning