이번 글에서는 RDD가 제공하는 여러 연산들에 대해 살펴보자.
RDD는 기본적으로 네트워크를 통해 전달이 가능해야 하기 때문에, serializable type이면 생성이 가능하다.
RDD 내에 한건 한건을 element 또는 record라고 부른다.
RDD의 처음 생성은 디스크에서 데이터를 메모리로 로딩할 때 처음 생성된다. 그 후 코드에서 생성되는 데이터를 저장할 때 생성된다.
위에서 스파크 컨텍스트를 만들었다면 이제 RDD를 생성할 수 있다.
스파크는 크게 두 종류의 RDD 생성 방법을 제공한다.
첫번째 방법은 드라이버 프로그램의 컬렉션 객체를 이용하는 것이다.
val rdd1 = sc.parallelize(List("a", "b", "c"))
문자열을 포함한 컬렉션 객체를 생성하고 스파크 컨텍스트의 parallelize() 메서드를 이용해 RDD를 생성했다.
parallelize() 메서드는 생성될 RDD의 파티션 수를 지정하는 옵션을 가지고 있다.
val rdd1 = sc.parallelize(1 to 1000, 10) // 10개 파티션 지정
위의 예제는 1 부터 1000까지의 숫자를 담은 컬렉션 객체를 생성했고 파티션 크기를 10으로 주었다. 만약 RDD에 포함된 전체 요소의 크기보다 파티션의 수가 더 크다면 생성된 파티션 중 일부는 요소를 하나도 포함하지 않는 빈 파티션이 된다.
또는, 아래와 같이 rdd를 생성할 수도 있다.
val rdd2 = sc.makeRDD(1 to 100, numSlices = 10) // 1 ~ 100 이며 파티션 10개로 지정
val rdd3 = sc.range(1 , 100, step = 2, numSlices = 10) // 1 ~ 99 까지이며, 1, 3, 5...
두번째 방법은 파일이나 데이터베이스 같은 외부 데이터를 읽어서 새로운 RDD를
생성하는 방법이다.
val rdd1 = sc.textFile("<spark_home_dir>/README.md")
이때 파일 각 한줄은 한개의 RDD 구성요소가 된다.
RDD의 연산은 트랜스포메이션과 액션 연산으로 나눌 수 있으며,
두 연산을 구분하는 기준은 연산의 수행 결과가 RDD인지 아닌지를 확인해보면
구분이 가능하다.
collect는 RDD의 모든 원소를 모아서 배열로 돌려준다. 반환 타입이 RDD가 아닌 배열이므로
연산은 액션에 속하는 연산이다.
collect 연산을 수행하면 RDD에 있는 모든 요소들이 collect 연산을 호출한 서버의 메모리에 수집된다. 따라서 전체 데이터를 모두 담을 수 있을 정도의 충분한 메모리 공간이 확보돼 있는 상태에서만 사용해야 한다.
val rdd1 = sc.parallelize(1 to 5)
val result = rdd1.collect
println(result.mkString(", "))
// print : 1, 2, 3, 4, 5
mkString은 리스트에 담긴 요소를 하나의 문자열로 표현하는 메서드이다.
count는 RDD를 구성하는 전체 요소의 개수를 반환한다.
val rdd1 = sc.parallelize(1 to 5)
val result = rdd1.count
println(result) // 5
그 외에 first, foreach, top(n), take, saveAsTextFile 연산등이 존재한다.
트랜스포메이션은 기존 RDD를 이용해 새로운 RDD를 생성하는 연산이다. 이러한 연산에는 각 요소의 타입을 문자열에서 숫자로 바꾸거나 불필요한 요소를 제외하거나 기존 요소의 값에 특정 값을 더하는 등의 작업이 모두 포함된다.
연산은 크게 2가지로 구분 된다.
부모 RDD의 하나의 파티션에만 의존하여 새로운 RDD를 생성하게 된다.
즉 shuffle이 일어나지 않으며 하나의 stage로 묶이게 된다.
부모의 여러 파티션에 의존하여 새로운 RDD가 생성된다.
즉 shuffle이 발생하게 되며, 새로운 stage가 생성된다.
map은 스파크를 이용한 데이터 처리 작업에서 흔히 사용되는 대표적인 연산 중 하나이다.
아래 예제는 1부터 5까지의 수로 구성된 RDD의 각 요소에 1을 더하는 함수를 적용해서
2부터 6까지의 숫자로 구성된 새로운 RDD를 생성하는 예제이다.
val rdd = sc.parallelize(1 to 5).map(_ + 1)
println(rdd.collect.mkString(", "))
// 2, 3, 4 ,5 ,6
def map[U: ClassTag](f: T => U): RDD[U]
T 타입을 U 타입으로 변환하는 함수 f를 이용해서 RDD[T] 타입의 RDD를 RDD[U] 타입으로
변환하는 메서드 라는 의미이다.
예를 들어, row의 개수가 1개라면, 반환값도 1개가 반환된다.
아래 예제를 보면 fruits는 모두 3개의 단어가 포함되어 있고, ‘ , ‘ 기준으로 분리하여 과일 리스트를 생성하려고 한다.
val fruits = List("apple,orange", "grape,apple,mango", "blueberry,tomato,orange")
val rdd1 = sc.parallelize(fruits); // 단어 3개를 가진 List
map을 이용해서 분리를 한다면 아래와 같을 것이다.
우리는 이런 결과 말고 과일 리스트만 배열로 얻기를 원한 때 flatMap을 사용할 수 있다.
val rdd2 = rdd1.map(_.split(","))
println(rdd2.collect.map(_.mkString("{",", ", "}")).mkString("{",", ", "}"))
// [{apple, orange}, {grape, apple, mango}, {blueberry, tomato, orange}]
위의 경우 ‘apple,orange’ 라는 문자열이 apple과 orange 포함한 배열로 변환되는데
배열에 포함된 요소를 모두 밖으로 끄집어 내는 작업이 필요하다.
flatMap() 연산은 하나의 입력값에 대응하는 반환값이 여러 개 일 때 유용하게
사용 할 수 있다.
주로 nested array를 flatten 하는데 사용된다.
예를 들어 row 개수가 1개라면, 결과값은 1개 이상일 수 있다.
val fruits = List("apple,orange", "grape,apple,mango", "blueberry,tomato,orange")
val rdd1 = sc.parallelize(fruits);
val rdd2 = rdd1.flatMap(_.split(","))
println(rdd2.collect.mkString(", "))
// apple, orange, grape, apple, mango, blueberry, tomato, orange
RDD의 요소가 키와 값의 쌍을 이루고 있는 경우 페어RDD(PairRDD)라는 용어를
사용한다.
mapValues()는 RDD의 모든 요소들이 키와 값의 쌍을 이루고 있는 경우에만
사용 가능한 메서드이며, 인자로 전달받은 함수를 값에 해당하는 요소에만
적용하고 그 결과로 구성된 새로운 RDD를 생성한다.
즉, 키에 해당하는 부분은 그대로 두고 값에만 map() 연산을 적용한
것과 같다.
val rdd = sc.parallelize(List("a", "b", "c")).map((_, 1))
val result = rdd.mapValues(i => i+1)
println(result.collect.mkString(", "))
// (a,2), (b,2), (c,2)
마찬가지로 RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에만 사용 할 수 있는 메서드이다.
val rdd = sc.parallelize(Seq((1,"a,b"), (2,"a,c"), (1,"d,e")))
val result = rdd.flatMapValues(_.split(","))
println(result.collect.mkString("\t"))
// (1,a) (1,b) (2,a) (2,c) (1,d) (1,e)
pair rdd를 사용할 때, key 값의 연산이 필요 없다면 map, flatMap을
사용하는 것보다 mapValues, flatMapValues를 사용하는 것이 네트워크 이동 비용을 아낄 수
있기 때문에 성능상 이점이 있다.
zip() 연산은 두 개의 서로 다른 RDD를 각 요소의 인덱스에 따라 하나의 (키, 값) 쌍으로 묶어 준다.
val rdd1 = sc.parallelize(List("a", "b", "c"))
val rdd2 = sc.parallelize(List(1, 2, 3))
val result = rdd1.zip(rdd2)
println(result.collect.mkString(", "))
// (a,1), (b,2), (c,3)
주의해야할 점은 서로 크기가 다른 RDD 간에는 zip() 메서드를 사용할 수 없다.
즉, RDD간의 파티션 개수와 element개수도 동일해야 한다.
zipPartitions는 파티션에 포함된 요소를 반복문으로 처리한다는 점에서 mapPartitions와
유사하지만, 여러 RDD의 파티션을 결합하는데 사용한다는 점은 다르다.
연산자를 호출하는 RDD를 포함해 최대 RDD를 네 개까지 결합할 수 있다.
모든 RDD는 파티션 개수가 동일해야 하지만, 파티션에 포함된 요소 개수가 반드시 같을 필요는 없다.
zipPartitions는 인자 목록을 두 개 받는다.
첫 번째 목록에는 zipPartitions로 결합할 RDD를 전달하며, 두 번째 목록에는
조인 함수를 정의해 전달한다.
조인 함수는 입력 RDD 별로 각 파티션의 요소를 담은 Iterator 객체들을 받고
결과 RDD 값을 담은 새로운 Iterator를 반환해야 한다.
rdd1.zipPartitions(rdd2, true)((iter1, iter2) => {
iter1.zipAll(iter2, -1, "empty")
.map({case(x1, x2)=>x1+"-"+x2})
}).collect()
groupBy()는 RDD의 요소를 일정한 기준에 따라 여러 개의 그룹으로 나누고 이 그룹으로 구성된 새로운 RDD를 생성한다.
val rdd = sc.parallelize(1 to 10)
val result = rdd.groupBy{
case i: Int if(i % 2 == 0) => "even"
case _ => "odd"
}
result.foreach {
v => println(s"${v._1}, [${v._2.mkString(", ")}]")
}
// even, [2, 4, 6, 8, 10]
// odd, [1, 3, 5, 7, 9]
groupBy() 메서드가 요소의 키를 생성하는 작업과 그룹으로 분류하는 작업을
동시에 수행한다면 groupByKey()는 이미 RDD의 구성요소가 키와 값으로 쌍으로
이루어진 경우에 사용 가능한 메서드이다.
val rdd = sc.parallelize(List("a","b","c","c","b")).map((_,1))
val result = rdd.groupByKey
result.collect.foreach {
v => println(s"${v._1}, [${v._2.mkString(",")}]")
}
// a, [1]
// b, [1,1]
// c, [1,1]
참고로, GroupByKey를 사용하게 되면 Spark에서 가장 기피해야 하지만 어쩔 수 없이 발생하는 data shuffling이 모든 node 사이에서 일어나게 된다.
reduceByKey를 사용해도 동일하게 shuffling은 일어나지만, 두 함수의 가장 큰 차이점은 reduceByKey는 shuffle 하기 전에 먼저 reduce 연산을 수행해서 네트워크를 타는 데이터를 현저히 줄여준다.
그래서 가급적이면 reduceByKey나 aggregateByKey 등 shuffling 이전에 데이터 크기를 줄여줄 수 있는 함수를 먼저 고려해야 한다. 똑같은 Wide transformation 함수라도 성능 차이가 많이 날 수 있다.
즉, 스파크 성능에 영향을 미칠 수 있는 요소 중 하나가 groupByKey 대신에
reduceByKey로 해결할 수 있는 문제 상황에서는 무조건 reduceByKey를
사용했을 때 성능에 이점을 볼 수 있다.
distinct()는 RDD의 원소에서 중복을 제외한 요소로만 구성된 새로운 RDD를 생성하는 메서드이다.
val rdd = sc.parallelize(List(1,2,3,1,2,3,1,2,3))
val result = rdd.distinct()
println(result.collect.mkString(", "))
// 1, 2, 3
cartesian()은 두 RDD 요소의 카테시안곱을 구하고 그 결과를 요소로 하는 새로운 RDD을 생성하는 메서드이다.
val rdd1 = sc.parallelize(List(1,2,3))
val rdd2 = sc.parallelize(List("a","b","c"))
val result = rdd1.cartesian(rdd2)
println(result.collect.mkString(", "))
// (1,a), (1,b), (1,c), (2,a), (2,b), (2,c), (3,a), (3,b), (3,c)
아래와 같이 rdd1, rdd2을 합쳐서 새로운 RDD를 생성하는 메서드이다.
val rdd1 = sc.parallelize(List("a","b","c"))
val rdd2 = sc.parallelize(List("d","e","f"))
val result = rdd1.union(rdd2)
println(result.collect.mkString(", "))
// a, b, c, d, e, f
join()은 RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는
메서드이다. 같은 키를 가지고 있는 요소를 모아서 그룹을 형성하고,
이 결과로 구성된 새로운 RDD를 생성하는 메서드이다.
join() 메서드의 수행 결과로 생성된 RDD는 튜플 타입의 요소를 가지며, Tuple(키, Tuple(첫번째 RDD, 두번째 RDD)) 형태로 구성된다.
val rdd1 = sc.parallelize(List("a","b","c","d","e")).map((_, 1))
val rdd2 = sc.parallelize(List("b","c")).map((_,2))
val result = rdd1.join(rdd2)
println(result.collect.mkString("\n"))
// (b,(1,2))
// (c,(1,2))
RDD의 구성요소가 키와 값의 쌍으로 구성된 경우에 사용 할 수 있는 메서드이며, sql 사용하는것과 비슷하게 왼쪽 외부조인과 오른쪽 외부 조인을 수행한다.
val rdd1 = sc.parallelize(List("a","b","c")).map((_, 1))
val rdd2 = sc.parallelize(List("b","c")).map((_,2))
val result1 = rdd1.leftOuterJoin(rdd2)
val result2 = rdd1.rightOuterJoin(rdd2)
println("Left: " + result1.collect.mkString("\n"))
println("Right: "+ result2.collect.mkString("\n"))
// Left: (a,(1,None))
// (b,(1,Some(2)))
// (c,(1,Some(2)))
// Right: (b,(Some(1),2))
// (c,(Some(1),2))
RRD의 구성 요소가 키와 값의 쌍으로 구성된 경우에 사용할 수 있는
메서드이다.
rdd1의 요소 중에서 rdd2에 같은 키가 존재하는 요소를 제외한
나머지로 구성된 새로운 RDD를 생성한다.
val rdd1 = sc.parallelize(List("c","b")).map((_, 1))
val rdd2 = sc.parallelize(List("b","a")).map((_,2))
val result = rdd1.subtractByKey(rdd2)
println(result.collect.mkString("\n"))
// (c,1)
RDD에서 병렬로 처리 되는 과정을 살펴보자. 먼저, 아래 소스를 보면 txt 파일을 RDD로 생성하고 default partition개수를 확인했다. default로 RDD를 생성할 수도 있고 직접 지정할 수도 있다. 아래는 5개로 지정해주었다. 마지막 줄에는 각 partition 갯수만큼 for each를 순회한다.
기본적으로 파티션 1개의 task 1개가 수행된다.
task는 기본적으로 1개의 스레드가 할당되어 처리된다.
또한, Data Locality 를 고려하여 task는 최대한 저장된 데이터와 가까운 워커 노드에서 실행되도록 진행한다.
//load a text file from current directory
val flistRDD = sc.textFile("flist.txt")
//Check number of defaults partitions
flistRDD.getNumPartitions
//Reload with five partitions
val flistRDD = sc.textFile("flist.txt", 5)
//Count the number of elements in each partition
flistRDD.foreachPartition(
p => println("Items in partition-" + p.count(y => true))
)
위처럼 RDD를 생성할 때 default로 파티션을 사용하거나, 직접 지정하는 방법이 있고 또 다른 방법은 wide operation을 사용할 때 지정할 수도 있다.
pairs.reduceByKey(a,b => a+b, 15)
// 파티션 개수를 15개로 지정했으며, 데이터에서 key의 분포를 고려하여 지정해야 한다.
대표적인 wide operation은 reduceByKey, join, repartition, coalesce 등이 있다.
Reference
https://wikidocs.net/book/2350
https://www.learningjournal.guru/article/apache-spark/apache-spark-parallel-processing/
https://artist-developer.tistory.com/7
https://subscription.packtpub.com/book/big_data_and_business_intelligence/9781787126497/7/ch07lvl1sec46/rdd-partitioning