자주 사용하는 RDD는 메모리 또는 디스크에 캐싱하는게 성능 향상에 유리하다.
캐싱을 하게 되면 반복적으로 action을 실행할 때마다 원본 파일로 부터 RDD 생성 및
수 많은 transformation을 진행하는 것이 아니라, 캐싱되어 있는 RDD 부터
연산을 수행하게 된다.
즉, 반복적으로 사용되는 RDD가 있다면 메모리 및 디스크에 캐싱을 해 놓는게 여러 action 연산을 수행할 때 성능 향상을 할 수 있다.
또한, cache는 fault-tolerant 하다.
장애가 발생하여 캐시된 RDD가 손실되어도, 자동으로 다시 recomputation하여 캐싱 진행 및
연산을 수행한다.
아래 예시를 통해 자세히 이해해 보자.
driver 1대와 worker 노드 3대로 구성된 클러스터에서 log mining 작업을 진행하려고 한다.
각 worker당 1개의 executor가 실행되도록 설정하였고,
데이터는 hdfs에서 가져와서 작업을 한다.
기본적으로 hdfs는 block 단위로 파일을 분산하여 저장한다.
우선 driver는 대상 데이터의 위치와 크기를 확인한다.
위 예제의 hdfs의 경우 block 단위로 3개가 저장되어 있는 것을 확인하고,
block 당 1개의 task를 생성하여 각 executor에게 전달한다.
최종적으로 executor마다 task를 처리하고 결과값을 driver에게 전달한다.
위의 예시에는 count action이 실행되면, 각 executor마다 분산 처리하여 count 값을 driver에게 전달하게 된다.
그럼 이번에는 아래와 같이 count action을 한번 더 실행하면 어떻게 될까?
Spark는 action을 수행할 때마다 새로운 job을 실행한다.
해당 job은 데이터를 처음부터 hdfs에서 읽어와서 순차적으로 처리하게 된다.
이 글에서는 hdfs에 원본 데이터가 존재하지만 s3, 외부 db 등이 될 수도 있다.
즉, 위의 동일한 transformation 작업이 반복적으로 발생하게 된다.
그럼 이번에는 성능향상을 위해 RDD 데이터를 메모리에 캐싱
해서 동일한 작업을 해보자.
위와 같이 반복적으로 사용하는 rdd를 cache() 함수를 통해서 캐싱하였다.
각 executor마다 rdd가 캐싱되며, 반복적으로 action이 발생하였을 때 hdfs에서
읽어 처음부터 연산을 하는 것이 아닌 캐싱된 rdd 부터 작업을 시작하여 성능 향상을 할 수 있다.
1TB 데이터 기준으로 캐싱을 사용하지 않는 경우 170초가 걸리는 반면, 캐싱을 했을 경우 5~7초가 소요된다.
만약 executor의 메모리가 부족하다면 어떻게 될까?
꼭 기억해야할 부분은 캐시는 RDD의 파티션 단위로 처리 된다.
하나의 파티션 데이터를 메모리에 모두 캐싱하기에 부족하다면,
일부 메모리 여유가 있더라도 파티션 데이터 모두 캐싱되지 않는다.
즉, 하나의 파티션 데이터 모두가 캐싱되거나 되지 않거나 이다.
캐시되지 않는 파티션 데이터는 원본 데이터를 읽어서
처음부터 연산을 수행한다.
메모리에 여유가 생겨 해당 파티션을 캐시하기 전까지는
action 수행시마다 원본 데이터를 읽어 반복적으로 다시 처리된다.
초창기 Spark은 메모리에만 캐싱을 진행했지만, 현재는 다양한 옵션을 제공한다.
각 storage level은 아래와 같이 persist 함수를 통해 추가할 수 있다.
또는 cache() 함수를 통해 가능하며, rdd의 default 값은 MEMORY_ONLY 이며,
dataframe 또는 dataset의 경우는 default 값으로 MEMORY_AND_DISK이다.
import org.apache.spark.storage.StorageLevel
val rdd = sc.makeRDD(1 to 10000, 10)
rdd.setName("persistedRDD") // rdd 의 이름을 주게 되면 web ui에서 확인이 편하다.
rdd.persist(StorageLevel.DISK_ONLY) // disk에 캐싱
// storageLevel 확인
rdd.getStorageLevel
// Output : StorageLevel(disk, 1 replicas) => disk 저장이며, 따로 copy하지 않았기 때문에 1개의 replica만 존재
캐시된 내용을 제거하기 위해서는 아래와 같이 진행한다.
unpersist() 메서드를 하게되면, 메모리 또는 디스크에 캐싱해 두었던
데이터를 제거하게 된다.
rdd.unpersist()
default로 spark는 역직렬화된 형태로 RDD 데이터를 메모리에 캐싱하게 된다.
단, Spark Streaming의 DStream은 default로 StorageLevel.MEMORY_ONLY_SER 로 설정되어 있다.
만약 메모리가 부족하다면 메모리에 캐싱을 하지 않는다.
만약 RDD내에 파티션이 10개로 나뉘어져 있다고 가정해보고 예를 들어보자.
이 중에서 메모리에 8개 파티션은 캐싱을 했고, 2개는 메모리가 부족하여
캐싱을 진행하지 않았다.
그렇다면 메모리에 올리지 않은 2개의 파티션은
원본 파일로 부터 직접 계산을 수행하고 캐싱한 8개 파티션은
캐시된 형태로 재사용하게 된다.
1차적으로 메모리에 캐싱을 진행하며, 메모리가 부족한 경우는 디스크에 캐싱을 한다.
여기서 디스크는 로컬 디스크라고 생각하면 된다.
java와 scala에서만 사용 가능하며, 파티션 마다 byte 배열의 직렬화 형태로 RDD 데이터를 저장한다.
위에서 역직렬화 형태로 RDD를 저장하는 것보다 효율적인 공간을 사용할 수 있게 되는 장점이 있다.
단지, 캐싱된 데이터를 다시 연산을 할 때 역직렬화를 해주어야 함으로 더 많은 CPU를 쓰게 된다.
직렬화하여 메모리에 저장하게 되면 GC의 시간을 줄이는데 유용하기 때문에
Spark Streaming의 DStream은 default StorageLevel은 MEMORY_ONLY_SER로 설정되어 있다.
메모리 공간을 훨씬 적게 사용하므로 상황에 따라 유용하게 사용 될 수 있다.
위와 마찬가지로 직렬화 형태로 RDD 데이터를 저장하며, 부족할 경우 디스크에 저장한다.
오직 디스크에만 캐싱을 진행하며, 보통 데이터가 TB, PB 이상으로 큰 경우 사용을 고려해보자.
각 executor의 로컬 디스크에 캐싱을 진행한다.
로컬 디스크의 저장 위치는 /conf/spark-env.sh 에서 아래와 같이 저장되며, 지정되어 있지 않은 경우는 /tmp 경로에 생성된다.
SPARK_LOCAL_DIRS=/저장경로
web ui에서는 아래와 같이 disk에 저장된 것을 확인할 수 있다.
위에서 캐시도 fault-tolerant 를 지원하여 데이터가 손실되어도, recomputation 된다고 언급했다.
하지만 데이터 복구하는 과정에서 약간의 지연 시간이 발생할 수 있다.
따라서 이를 방지하기 위해 처음부터 캐싱 데이터를 copy 하여서 장애 발생시
recomputation 되는 것을 방지할 수 있다.
Spark Streaming의 Input DStream은 해당 StorageLevel을 사용한다.
외부에서 Receiver가 데이터를 받아온 데이터를 Input DStream이라고 한다.
직렬화 후 메모리에 저장하게 되면 메모리도 적게 사용하고 GC 시간도 단축시킬 수 있다고 언급했다.
그럼 왜 디스크에 저장하며 copy까지 해서 사용할까?
스트리밍 데이터가 갑작스럽게 증가하여 메모리가 부족한 경우 데이터 유실이 발생할 수 있기 때문에,
외부에서 최초 받아온 데이터를 저장할 때 메모리가 부족한 경우 디스크에 저장하여 유실을 방지한다.
이렇게 디스크에 저장하더라도 장애로 인해 데이터 유실이 발생할 수 있기 때문에 copy까지 해서
fault tolerance를 보장한다.
보통 캐싱되는 곳은 executor의 메모리를 사용한다.
하지만, 해당 옵션은 executor의 메모리가 아닌 별도의 메모리 관련 서비스를 이용하기 위한 옵션이다.
캐싱을 하고 모니터링을 위해 Spark Web UI에서 확인해보자.
기본적으로 캐시는 Storage 탭
에서 확인할 수 있다.
위 그림은 55개 파티션에 대해서 100% 캐시가 이루어진 것을 확인할 수 있으며,
캐시된 데이터 크기도 확인할 수 있다.
이번에는 jobs탭에서 job의 DAG도 확인해보자.
위 그림에서 연두색 점이 캐싱이 되었다는 표시
이며, 해당 파티션과 input 크기를
확인할 수 있다.
stoarage 탭에서 살펴봤던 메모리 크기와 동일한 것을 확인할 수 있다.
즉, 해당 job은 캐싱된 데이터를 이용하여 연산을 수행
하였고 수행 시간이 단축되었다.
이번에는, executor의 메모리 부족으로 100% 캐시가 이루어 지지 않는 경우
Web UI에 어떻게 나타나는지 살펴보자.
위처럼 55개 파티션 중 1개의 파티션이 캐싱이 안된 것을 확인할 수 있다.
jobs 탭에서 job의 DAG를 확인해보자.
위 그림에서 input 크기를 살펴보면, 전체 캐시했을 때보다 조금 증가한 것을 확인할 수 있다.
해당 크기는
캐시된 데이터와 캐시되지 않은 1개 파티션은 hdfs에서 읽어온 데이터의 합이다.
마지막으로 빨간색 박스에 있는 링크를 클릭하여 task 별로도 살펴보자.
Locality level이 PROCESS_LOCAL로 표기된 것은 task가 돌고 있는 같은 executor 메모리 캐시를 읽은 것이다.
RACK_LOCAL 은 메모리에서 읽지 않고, 같은 RACK 장비에 있는 다른 노드의 디스크를 읽었다는 의미이다.
즉, 131.3MB는 hdfs에 다시 읽어서 연산했다는 의미이다.
추가적으로 NODE_LOCAL은 같은 노드의 디스크에서 데이터를 읽었다는 의미이다.
위에서 언급한 locality level에 대해서 조금 더 자세히 살펴보자.
Data Locality는 Spark job의 퍼포먼스에 많은 영향을 주는 요소 중 하나이다.
Spark는 Data Locality를 고려하여 task를 저장된 데이터와 최대한 가까운 executor에서
실행되도록 Scheduling한다.
Data Locality는 데이터를 처리하는 노드가 얼마나 가까운 거리에 있는지를 나타낸다.
데이터의 현재 위치를 기반으로 Locality는 여러 Level로 정의된다.
가장 가까운 것에서 가장 먼 순으로 Level을 살펴보자.
Reference
https://blog.devgenius.io/a-neglected-fact-about-apache-spark-performance-comparison-of-coalesce-1-and-repartition-1-80bb4e30aae4
https://jaemunbro.medium.com/apache-spark-partition-%EA%B0%9C%EC%88%98%EC%99%80-%ED%81%AC%EA%B8%B0-%EC%A0%95%ED%95%98%EA%B8%B0-3a790bd4675d
https://m.blog.naver.com/syung1104/221103154997
https://thebook.io/006908/part01/ch04/02-01/
https://fastcampus.co.kr/data_online_spkhdp