[Spark] Join Strategies 과 Shuffle

shuffle join, broadcast join / shuffle sort merge join, broadcast hash join

Posted by Wonyong Jang on April 20, 2024 · 7 mins read

1. 스파크의 조인 타입

스파크에서 조인은 아래와 같은 조인 타입을 제공한다.

1-1) inner join(내부 조인)

왼쪽과 오른쪽 데이터셋에 키가 있는 로우를 유지

1-2) outer join(외부 조인)

왼쪽이나 오른쪽 데이터셋에 키가 있는 로우를 유지

1-3) left outer join(왼쪽 외부 조인)

왼쪽 데이터셋에 키가 있는 로우를 유지

1-4) right outer join(오른쪽 외부 조인)

오른쪽 데이터셋에 키가 있는 로우를 유지

1-5) left semi join

왼쪽 데이터셋의 키가 오른쪽 데이터셋에 있는 경우에는 키가 일치하는 왼쪽 데이터셋만 유지

1-6 left anti join

왼쪽 데이터셋의 키가 오른쪽 데이터셋에 없는 경우에는 키가 일치하지 않는 왼쪽 데이터셋만 유지

1-7) natural join

두 데이터셋에서 동일한 이름을 가진 컬럼을 암시적으로 결합하는 조인

1-8) cross join / cartesian join

왼쪽 데이터셋의 모든 로우와 오른쪽 데이터셋의 모든 로우를 조합


2. 스파크의 조인 수행 방식

일반적으로 join은 동일한 키의 데이터가 동일한 파티션 내에 있어야 하므로 비용이 비싼 작업이다.
조인할 키의 데이터가 동일한 파티션에 있지 않다면 셔플이 필요하고, 이를 통해 동일한 키의 데이터는 동일한 파티션에 위치하게 된다.

즉, 조인의 비용은 키의 개수와 올바른 파티션으로 위치하기 위해 움직이는 규모에 비례해서 커진다.

스파크는 조인 시 크게 두 가지 방식으로 조인을 진행한다.

2-1) Shuffle join

전체 노드간 네트워크 통신을 유발하는 shuffle join 방식이다.
조인 키로 두 데이터 세트를 섞고 동일한 키를 가진 데이터를 동일한 노드로 이동시킨다.

스크린샷 2024-04-29 오후 3 16 31

2-2) Broadcast join

작은 데이터 세트를 broadcast 변수로 driver에서 생성하여 클러스터의 각 executor 별로 복제해 놓고 join 하는 방식이다.

따라서, broadcast 되는 대상 테이블이 크다면 driver의 메모리가 부족하여 비정상 종료 될 수 있다.

driver에서 broadcast 변수로 생성하여 각 executor로 전송할 때 네트워크 비용이 발생하지만, 그 이후 join을 진행할 때는 네트워크를 통한 데이터 이동이 없기 때문에 join 속도가 매우 빠르다.

스크린샷 2024-04-29 오후 3 16 37


3. 조인 전략

먼저, nested loop join, merge join, hash join을 먼저 살펴보고 spark 에서 사용되는 조인 전략에 대해서 살펴보자.

3-1) Nested Loop Join

아주 무식하면서 심플한 방법이며, 아래 그림과 같이 왼쪽 테이블과 오른쪽 테이블을 조인한다고 생각해보자.

스크린샷 2024-04-30 오후 3 57 58

왼쪽 테이블의 첫번째 row의 join key 값을 들고 오른쪽 테이블의 처음부터 마지막 row까지 돌면서 동일한 key 값이 있으면 매칭한다.
그 다음에는 왼쪽 테이블의 두번째 row의 key값을 들고 오른쪽 테이블을 전수조사하는 방식이다.
이런식으로 하다보면 O(T1 * T2)만큼의 시간복잡도가 걸리는 비효율적인 방식이다.

3-2) merge join

매칭을 시작하기 전 양 테이블의 키 값을 기준으로 소팅을 한다.
매칭 로직은 nested loop join과 동일하게 시작한다. 왼쪽 테이블의 A값을 들고 오른쪽 테이블의 첫번째 row부터 살펴본다.
매칭이 된다면 매칭을 시키고 다음 row로 넘어간다. 이를 반복하다가 다른 key 값이 나오면 멈춘다.
왜냐하면 이미 소팅이 되어 있기 때문에, 뒤에는 같은 key 값이 없을 것이기 때문이다.
그럼 다음 key 값인 B를 들고 오른쪽 테이블의 처음부터가 아니라 아까 멈춘 곳부터 매칭을 시작한다.

스크린샷 2024-04-30 오후 4 00 56

결과적으로 O(T1logT1 + T2logT2) 시간복잡도 만큼의 시간이 걸린다고 볼 수 있다.

3-3) Hash Join

hash join은 이름에서도 알 수 있듯이 hash function을 사용하며, 왼쪽 테이블의 모든 key 값을 hash function을 통과시켜 hash 테이블에 배치한다.
그 후 오른쪽 테이블의 모든 row를 매칭시킨다.
hash function은 O(1)의 시간이 들 것이고, 결과적으로 O(T1+ T2) 만큼의 시간 복잡도가 걸릴 것이다.

스크린샷 2024-04-30 오후 4 05 50

이제 스파크에서 주로 사용되는 join 전략에 대해 살펴보자.

3-4) Shuffle Sort Merge Join

두 테이블이 모두 큰 경우 사용 되며, 두 테이블 모두 조인 키 기반으로 repartition 이 발생한다.

default shuffle partition은 200 이기 때문에 200개의 셔플 파티션을 사용한다.

spark 2.3 부터 shuffle hash join 보다 더 좋은 성능을 내는 shuffle sort merge join 을 사용한다.

과거에 shuffle hash join 과 달리 memory가 아닌 disk를 이용할 수 있기 때문에 OOM이 발생하지 않는다.

이를 확인하기 위해 아래 옵션을 이용하여 off 시키고 shuffle hash join을 테스트 해볼 수 있다.

spark.conf.set("spark.sql.join.preferSortMergeJoin","false")

shuffle sort merge join 은 조인 작업 전에 조인 키를 기준으로 shuffle 시켜 정렬되며, 조인 키를 기반으로 두 데이터 세트를 병합한다.

스크린샷 2024-04-20 오후 12 32 28

스크린샷 2024-04-20 오후 12 32 48

3-5) Broadcast Hash Join

위에서 설명한 broadcast 변수를 driver에서 생성하여 각 executor로 복제해 놓고, 해시 조인을 실행하는 방식이다.

스크린샷 2024-04-21 오후 3 38 11

// default: 10MB
// -1로 설정하게 되면 broadcast는 비활성화 된다.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", [your threshold in Bytes])

위 설정과 같이 default로 10MB 이하의 데이터 셋이 broadcast 변수로 생성되며, Spark 엔진 또는 Adaptive Query Exectuion에 의해 Broadcast Hash join이 실행되지만 그렇지 못한 경우는 직접 명시를 해주어야 한다.

import org.apache.spark.sql.functions.broadcast
  
val joinDF = bigDF.join(broadcast(smallDF), "joinKey")

3-6) Partial Manual Broadcast Hash Join

소수의 키에 데이터가 크게 몰려 있어서 메모리에 올릴 수 없는 경우, 몰려 있는 키만 빼고 일반 키들만으로 braodcast join을 하는 방법도 고려해 볼 수 있다.
각 키 별로 필터링하여 broadcast join과 일반적인 join을 나눠서 수행하고 union으로 합치는 방법이다.
이 방법은 다루기 힘든 심하게 skewed 된 데이터를 다룰 때 고려해 볼 수 있을 것이다.

3-7) Broadcast Nested Loop Join

Broadcast hash 조인과 유사하게 작은 데이터 셋이 전체 워커 노드로 전달 되지만, hash 기반 조인이 아닌, nested loop join이 진행된다.

데이터 셋이 크다면 굉장히 비효율적인 방식이다.

3-8) Cartesian Product Join

Shuffle and Replication Nested Loop Join 이기도 하며 데이터 셋이 Broadcast 되지 않는다는 점을 제외하면 Broadcast Nested Loop Join과 매우 유사하게 동작한다.


Reference

https://mjs1995.tistory.com/227#article-1-1--%EC%A0%84%EC%B2%B4-%EB%85%B8%EB%93%9C%EA%B0%84-%ED%86%B5%EC%8B%A0%EC%9D%84-%EC%9C%A0%EB%B0%9C-%EC%85%94%ED%94%8C-%EC%A1%B0%EC%9D%B8(shuffle-join)
https://jaemunbro.medium.com/apache-spark-%EC%A1%B0%EC%9D%B8-join-%EC%B5%9C%EC%A0%81%ED%99%94-c9e54d20ae06
https://angel-jinsu.tistory.com/33
https://velog.io/@kimhaggie/spark-join%EC%9D%98-%EC%A2%85%EB%A5%98
https://bertwagner.com/posts/visualizing-nested-loops-joins-and-understanding-their-implications/