데이터 엔지니어링 과정에서 join, group by 와 같은 shuffle이 발생하는 연산을 사용할 때, 특정 key에 데이터가 집중되는 경우가 발생할 수 있다.
즉, Data Skew는 데이터가 파티션들 간에 고르게 분산되지 않은 상황을 말한다.
이런 현상을 데이터 skew라고 하며, skew가 발생하게 되면 분산처리 성능에 큰 영향을 끼치므로 반드시 해결해야 하는 문제 중 하나이다.
Salting은 데이터 skew에 대한 대응책으로 자주 쓰이는 데이터 엔지니어링 기법 중 하나이다.
Spark 3.0 부터는 Adaptive Query Execution을
제공하면서 설정만으로 skew 문제를 해결할 수 있게 되었지만, 해당 설정만으로 해결되지 않는 케이스에는
해당 기법과 함께 사용이 필요할 수 있다.
Data Skew가 발생한 예시를 살펴보자.
서울에서 전국 달리기 대회를 개최했고, 총 190 여명의 선수가 참가했으며 다음과 같은 선수 데이터가 만들어 졌다.
아무래도 서울에서 개최된 경기이다보니, 서울 근처에서 가장 많이들 참가했다.
지역별 참가자 수는 다음과 같다.
데이터 분석을 위해, 여기에 각 지역의 평균 연령 데이터를 join 한다고 가정해보자.
우선 salting 없이 그냥 join하려면 다음과 같이 하면 된다.
joined_df = runner.join(city, on='city', how='left')
data skew는 join할 때 유니크 키가 아닌 키로 조인할 경우 skew 현상이 더 두드러지게 나타난다.
위에서 선수 데이터와 광역단체 데이터는 각각 runner, city 의
spark DataFrame으로 정의되어 있다고 가정하자.
아래 spark UI를 통해 데이터 분산 처리 내역을 확인해보면,
Data Skew가 발생하고 있음을 확인할 수 있다.
특정 2개의 파티션에만 데이터가 skew 되었고, 데이터 사이즈가 커질 수록
다른 파티션은 이미 작업이 완료 되었지만 특정 2개의 파티션만 작업이 늦게 끝나게 되어
전체 성능에 영향을 끼칠 것이다.
이렇게 되는 이유는 선수들이 서울과 경기도에 몰려있고, city를 key로 join 할 경우 서울이나 경기도를 처리하는 join 파티션에 이 데이터들이 전부 몰려가기 때문이다.
요리를 할 때 소금을 뿌림으로써 재료의 맛과 향을 더해줄 수 있다. spark의 salting 기법도 요리에서 소금을 뿌리는 것과 비슷하게 데이터가 더 잘 처리될 수 있도록 도와주는 역할을 한다.
이제 salting 기법을 이용해 Data Skew 를 해결해보자.
salt_size = 10
runner_2 = runner.withColumn("salt", lit(rand() * salt_size).cast("int"))
city_2 = city.withColumn("salt", explode(array([lit(i) for i in range(salt_size)])))
joined_2 = runner_2.join(city_2, on=['city', 'salt'], how='left')
소금(salt) 컬럼이 추가된 두 개 데이터를 join 하는데, key 로는 city, salt 둘 다 사용한다.
salt 컬럼이 추가된 선수 데이터 프레임은 아래와 같다.
salt 컬림이 추가된 지역 데이터 프레임은 아래와 같으며, 각 행이 10개씩 복제되어
데이터 크기가 10배로 커졌다.
spark UI를 통해 join 과정을 살펴보면, Data Skew 가 상당히 완화된 것을 확인할 수 있다.
salting을 한마디로 말하면, 새로운 join key를 추가하여 데이터를 더 잘개 쪼개주는 것이다.
그 새로운 join key가 소위 소금(salt)이며, 데이터를 얼마나 잘게 더 쪼갤지를 결정하는 것이
소금의 크기(salt_size)이다.
하지만 salting은 마냥 좋기만 한 것은 아니다, 데이터가 쏠리는 파티션들이 줄어드는 대신,
한쪽의 데이터가 소금의 크기 만큼 복제되어야 하기 때문에 그만큼의 네트워크 및
메모리 추가를 일으킨다.
이 때문에, salt_size를 적정한 수준으로 결정할 필요가 있다.
앞에서, 적절한 salt size를 설정함으로써 salting은 꽤 효과적으로
동작할 수 있음을 확인했다.
다만, 실무에서 이 적절한 소금 크기를 결정하는 것이 만만치 않은 경우도 있다.
예를 들어, 복제되어야 하는 쪽의 데이터가 매우 클 경우 아무리 작은
소금 크기라도 부담스로울 수 있다.
이럴 때는 소금 크기를 데이터에 맞춰 동적으로 설정함으로써 데이터
복제 부하를 최소화 할 수도 있다.
이런 과정까지를 포함한 심화된 버전의 코드는 다음과 같다.
최종 join 결과에서 각 지역마다 적절한 소금 크기가 사용된 것을 확인할 수 있다.
Reference
https://mesh.dev/20220130-dev-notes-008-salting-method-and-examples/
https://gyuhoonk.github.io/spark-salting
https://suminii.tistory.com/entry/Spark%EC%97%90%EC%84%9C-Data-Skew-%ED%95%B4%EA%B2%B0%ED%95%98%EA%B8%B0-%EB%8D%B0%EC%9D%B4%ED%84%B0%EA%B0%80-Even%ED%95%98%EA%B2%8C-%EB%B6%84%EC%82%B0%EB%90%98%EC%A7%80-%EC%95%8A%EC%95%98%EC%96%B4%EC%9A%94
https://medium.com/@suffyan.asad1/handling-data-skew-in-apache-spark-techniques-tips-and-tricks-to-improve-performance-e2934b00b021