[Spark] Dynamic Partition Pruning / Speculative Execution

filter push down / dimension 테이블과 fact 테이블 조인시 쿼리 성능 최적화

Posted by Wonyong Jang on May 15, 2024 · 7 mins read

이번 글에서는 Spark 3.0 부터 제공되는 최적화 기법 중 하나인 Dynamic Partition Pruning에 대해서 살펴보자.


1. Dynamic Partition Pruning

Dynamic Partition Pruning 의 핵심은 full scan을 피하기 위해 내가 필요한 데이터만 골라서(pruning) 읽는 것이다.

// default: true 로 되어 있다.   
spark.sql.optimizer.dynamicPartitionPruning.enabled=true

여기서 pruning의 의미는 내가 필요로 하지 않는 데이터를 읽지 않도록 가지치기 하여 스킵하는 것이다.

쿼리를 최적화를 하는 여러가지 방법(column pruning, constant folding, filter push down)이 있는데, Dynamic Partition Pruning은 filter push down을 사용한 최적화 기법이다.

filter push down을 아래 쿼리로 이해해보자.

select * from student where subject = 'English';

이러한 쿼리를 처리할 때 보통 데이터 베이스 > full scan > filter 순으로 처리를 한다.

스크린샷 2024-05-18 오후 12 33 52

filter push down 이란 필터를 scan 보다 먼저 둬서, 미리 필터링한 값을 scan 하기 때문에 full scan을 피하고 원하는 값만 로드하여 성능향상을 시킨다.

스크린샷 2024-05-18 오후 12 33 57

1-1) Static Partition Pruning

먼저, Static Partition Pruning(SPP)는 말그대로 정적으로 파티션을 pruning 하는 것이다.
아래와 같은 이미지에서 좌측은 scan이 먼저 발생하여 모든 데이터를 스캔하는 반면에, 필터를 pushdown한 우측과 같은 경우 원하는 데이터를 가지고 있는 부분만 추린 이후에 스캔하기 때문에 후자는 전자에 비해 적은 데이터를 스캔한다.
이는 성능으로 이어지게 된다.

스크린샷 2024-05-18 오후 1 10 17

위에서 간략히 그린 Data라는 부분을 스파크의 환경에서 구체적으로 살펴보면, 스파크는 데이터를 논리적으로 여러 파티션으로 인식한다.

따라서 pruning은 파티션 단위로 결정된다.

스크린샷 2024-05-18 오후 1 12 06

위와 같이 단일한 형태라면 SPP로도 충분하다. 하지만 실제 데이터 환경에서는 아래와 같이 join이 추가된 형태의 쿼리가 자주 발생하게 된다.

select * from sales join date on sales.day = date.day 
where date.day_of_week = 'Mon'

스크린샷 2024-05-18 오후 1 20 33

위와 같은 경우 date 테이블에 대해서는 진행된 SPP가 join으로 이어진 sales에는 적용되지 못하는 것을 알 수 있다.
이러한 Fact - Dimension 테이블 간의 조인은 데이터 분석에서 흔한 패턴인데, Fact 테이블의 사이즈가 보통 매우 크기 때문에 위와 같은 Fact 테이블의 SPP의 부재는 많은 양의 데이터 스캔을 발생 시킨다.

1-2) Dynamic Partition Pruning

DPP는 위와 같은 상황에서 아래 이미지와 같이 런타임 시에 동적으로 생성된 Partition Pruning이 적용되도록 하는 기능이다.

스크린샷 2024-05-18 오후 1 28 07

즉, dimension table(작은 테이블)의 필터링 결과를 fact table(큰 테이블) 에 적용하여 큰 테이블에서 full scan을 하지 않고 필요한 데이터만 필터링하여 읽도록 하는 방법이다.

아래 그림과 같이 파티셔닝 된 데이터 셋(큰 크기의 fact 테이블)과 파티셔닝 되지 않은 데이터 셋(작은 크기의 dimension 테이블) 이 있다고 가정해보자.

왼쪽 fact 테이블은 분할된 파티션들을 색으로 표현했다.

스크린샷 2024-05-18 오후 10 28 51

조인에 필요한 파티션이 빨간색과 노란색이라고 했을 때, 위 그림처럼 필터를 적용하여 full scan을 피하게 된다.

이 때 broadcast hash join을 이용하게 된다.
dimension 테이블의 값들을 hash 테이블로 변환하고 각 executor로 전달 후 fact 테이블과 조인을 하여 shuffle 없이 성능을 최적화 한다.


2. Speculative Execution

task가 fault worker node로 인해 완료되지 않는 것으로 의심되면 다른 node에서 해당 작업을 동시에 실행한다.
둘 중 하나의 task가 완료되면 나머지 task는 kill 시킨다.

하지만, Speculative Execution은 overhead를 동반하기 때문에 대다수의 경우 선호되지는 않지만 LinkedIn이 공개한 자료에 따르면 10,000대 이상 머신에서 40,000개 이상의 Spark 앱이 동시에 돌아가는 환경에서는 노드 성능 편차가 워낙 크기 때문에 speculation이 의미가 있었다고 한다.

단 기본값 그대로가 아니라, 보수적으로 튜닝하여서 사용

스크린샷 2024-06-14 오후 10 54 36

이는 특정 노드의 하드웨어적 결함으로 인해 해당 노드에 배치된 task가 오래 걸려 전체 작업이 지연될 수 있다. 이러한 상황에서는 Speculative Execution 옵션을 활성화 하여 slow tasks들에 대해서 복제 실행시켜서 전체 실행시간을 줄일 수 있다.

또한, data skew, insufficient memory에 대해서는 해결할 수 없다.

주로 특정 노드의 네트워크 지연, 디스크 I/O 병목 등이 발생하여 해당 task가 지연이 발생하는 케이스를 예로 들 수 있다.

# default: false
spark.speculation=true

# slow tasks 를 확인하는 주기, default: 100ms
spark.speculation.interval

# 해당 task 중 이 비율만큼 완료되어야 speculation을 시작한다. default=0.75
# 예를 들면 0.9로 했을 때 전체 task 중 90% 완료가 되었을 때만 speculation이 발동된다.  
# 이 조건이 충족되지 않으면 multiplier 조건과 무관하게 speulation은 발동되지 않는다.   
spark.speculation.quantile

# quantile 조건이 충족된 이후, 완료된 task 들의 중앙값(median)을 기준으로 판단한다.  
# 실행 중인 모든 task의 중앙값(median) 대비 이 배수보다 느린 task를 speculation 대상으로 판단  
# 임계값 = median 실행시간 x multiplier  
# 즉, 현재 실행 중인 task의 실행 시간이 이 임계값을 초과하면 해당 task를 다른 node에서 복제 실행한다.   
spark.speculation.multiplier 

# 30 초 미만 task는 제외 
spark.speculation.minTaskRuntime=30s

추가적으로 Spark 4.0 부터는 speculation이 덜 공격적으로 동작하도록 spark.speculation.multiplier=3, spark.speculation.quantile=0.9 로 default가 변경되었다.

Spark 4.0 에서 두 값이 모두 올라간 이유는 기존 기본값이 너무 공격적이여서 불필요한 speculation으로 리소스가 낭비되는 사례가 많았기 때문이다.


Reference

https://www.slideshare.net/slideshow/dynamic-partition-pruning-in-apache-spark/188385762
https://kadensungbincho.tistory.com/88
https://eyeballs.tistory.com/248
https://medium.com/@nethaji.bhuma/spark-speculative-execution-02e8bcb03f39