스파크에서 조인은 아래와 같은 조인 타입을 제공한다.
왼쪽과 오른쪽 데이터셋에 키가 있는 로우를 유지
왼쪽이나 오른쪽 데이터셋에 키가 있는 로우를 유지
왼쪽 데이터셋에 키가 있는 로우를 유지
오른쪽 데이터셋에 키가 있는 로우를 유지
왼쪽 데이터셋의 키가 오른쪽 데이터셋에 있는 경우에는 키가 일치하는 왼쪽 데이터셋만 유지
왼쪽 데이터셋의 키가 오른쪽 데이터셋에 없는 경우에는 키가 일치하지 않는 왼쪽 데이터셋만 유지
두 데이터셋에서 동일한 이름을 가진 컬럼을 암시적으로 결합하는 조인
왼쪽 데이터셋의 모든 로우와 오른쪽 데이터셋의 모든 로우를 조합
일반적으로 join은 동일한 키의 데이터가 동일한 파티션 내에 있어야 하므로 비용이 비싼 작업이다.
조인할 키의 데이터가 동일한 파티션에 있지 않다면 셔플이 필요하고, 이를 통해 동일한
키의 데이터는 동일한 파티션에 위치하게 된다.
즉, 조인의 비용은 키의 개수와 올바른 파티션으로 위치하기 위해 움직이는 규모에 비례해서 커진다.
스파크는 조인 시 크게 두 가지 방식으로 조인을 진행한다.
전체 노드간 네트워크 통신을 유발하는 shuffle join 방식이다.
조인 키로 두 데이터 세트를 섞고 동일한 키를 가진 데이터를 동일한 노드로
이동시킨다.
작은 데이터 세트를 broadcast 변수로 driver에서 생성하여
클러스터의 각 executor 별로 복제해 놓고 join 하는 방식이다.
따라서, broadcast 되는 대상 테이블이 크다면 driver의 메모리가 부족하여 비정상 종료 될 수 있다.
driver에서 broadcast 변수로 생성하여 각 executor로 전송할 때
네트워크 비용이 발생하지만, 그 이후 join을 진행할 때는 네트워크를 통한
데이터 이동이 없기 때문에 join 속도가 매우 빠르다.
먼저, nested loop join, merge join, hash join을 먼저 살펴보고 spark 에서 사용되는 조인 전략에 대해서 살펴보자.
아주 무식하면서 심플한 방법이며, 아래 그림과 같이 왼쪽 테이블과 오른쪽 테이블을 조인한다고 생각해보자.
왼쪽 테이블의 첫번째 row의 join key 값을 들고 오른쪽 테이블의 처음부터 마지막 row까지
돌면서 동일한 key 값이 있으면 매칭한다.
그 다음에는 왼쪽 테이블의 두번째 row의 key값을 들고 오른쪽 테이블을 전수조사하는 방식이다.
이런식으로 하다보면 O(T1 * T2)만큼의 시간복잡도가 걸리는 비효율적인 방식이다.
매칭을 시작하기 전 양 테이블의 키 값을 기준으로 소팅을 한다.
매칭 로직은 nested loop join과 동일하게 시작한다. 왼쪽 테이블의 A값을 들고 오른쪽 테이블의
첫번째 row부터 살펴본다.
매칭이 된다면 매칭을 시키고 다음 row로 넘어간다. 이를 반복하다가 다른 key 값이
나오면 멈춘다.
왜냐하면 이미 소팅이 되어 있기 때문에, 뒤에는 같은 key 값이 없을 것이기 때문이다.
그럼 다음 key 값인 B를 들고 오른쪽 테이블의 처음부터가 아니라 아까 멈춘 곳부터 매칭을 시작한다.
결과적으로 O(T1logT1 + T2logT2) 시간복잡도 만큼의 시간이 걸린다고 볼 수 있다.
hash join은 이름에서도 알 수 있듯이 hash function을 사용하며, 왼쪽 테이블의 모든 key 값을
hash function을 통과시켜 hash 테이블에 배치한다.
그 후 오른쪽 테이블의 모든 row를 매칭시킨다.
hash function은 O(1)의 시간이 들 것이고, 결과적으로 O(T1+ T2) 만큼의 시간 복잡도가 걸릴 것이다.
이제 스파크에서 주로 사용되는 join 전략에 대해 살펴보자.
두 테이블이 모두 큰 경우 사용 되며, 두 테이블 모두 조인 키 기반으로 repartition 이 발생한다.
default shuffle partition은 200 이기 때문에 200개의 셔플 파티션을 사용한다.
spark 2.3 부터 shuffle hash join 보다 더 좋은 성능을 내는 shuffle sort merge join 을 사용한다.
과거에 shuffle hash join 과 달리 memory가 아닌 disk를 이용할 수 있기
때문에 OOM이 발생하지 않는다.
이를 확인하기 위해 아래 옵션을 이용하여 off 시키고 shuffle hash join을 테스트 해볼 수 있다.
spark.conf.set("spark.sql.join.preferSortMergeJoin","false")
shuffle sort merge join 은 조인 작업 전에 조인 키를 기준으로 shuffle 시켜 정렬되며, 조인 키를 기반으로 두 데이터 세트를 병합한다.
위에서 설명한 broadcast 변수를 driver에서 생성하여 각 executor로 복제해 놓고, 해시 조인을 실행하는 방식이다.
// default: 10MB
// -1로 설정하게 되면 broadcast는 비활성화 된다.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", [your threshold in Bytes])
위 설정과 같이 default로 10MB 이하의 데이터 셋이 broadcast 변수로 생성되며, Spark 엔진 또는 Adaptive Query Exectuion에 의해 Broadcast Hash join이 실행되지만 그렇지 못한 경우는 직접 명시를 해주어야 한다.
import org.apache.spark.sql.functions.broadcast
val joinDF = bigDF.join(broadcast(smallDF), "joinKey")
소수의 키에 데이터가 크게 몰려 있어서 메모리에 올릴 수 없는 경우,
몰려 있는 키만 빼고 일반 키들만으로 braodcast join을 하는 방법도
고려해 볼 수 있다.
각 키 별로 필터링하여 broadcast join과 일반적인 join을 나눠서 수행하고 union으로
합치는 방법이다.
이 방법은 다루기 힘든 심하게 skewed 된 데이터를 다룰 때 고려해 볼 수 있을 것이다.
Reference
https://mjs1995.tistory.com/227#article-1-1--%EC%A0%84%EC%B2%B4-%EB%85%B8%EB%93%9C%EA%B0%84-%ED%86%B5%EC%8B%A0%EC%9D%84-%EC%9C%A0%EB%B0%9C-%EC%85%94%ED%94%8C-%EC%A1%B0%EC%9D%B8(shuffle-join)
https://jaemunbro.medium.com/apache-spark-%EC%A1%B0%EC%9D%B8-join-%EC%B5%9C%EC%A0%81%ED%99%94-c9e54d20ae06
https://angel-jinsu.tistory.com/33
https://velog.io/@kimhaggie/spark-join%EC%9D%98-%EC%A2%85%EB%A5%98
https://bertwagner.com/posts/visualizing-nested-loops-joins-and-understanding-their-implications/