아파치 스파크 변환
IT 위키
아파치 스파크(Apache Spark)에서 변환(Transformation)은 기존 RDD(Resilient Distributed Dataset)에서 새로운 RDD를 생성하는 연산을 의미한다. 변환 연산은 지연 실행(lazy evaluation)을 기반으로 동작하며, 액션이 호출될 때까지 실행되지 않는다.
변환(Transformation)의 특징[편집 | 원본 편집]
- Lazy Evaluation(지연 실행) - 변환 연산은 즉시 실행되지 않고, 후속 액션이 호출될 때 실행된다.
- Immutable(불변성) - 기존 RDD를 변경하지 않고 새로운 RDD를 생성한다.
- Narrow Transformation(좁은 변환)과 Wide Transformation(넓은 변환)으로 구분된다.
Transformation의 종류[편집 | 원본 편집]
스파크에서는 다양한 변환 연산을 제공하며, 주요 연산을 아래와 같이 정리할 수 있다.
Narrow Transformation(좁은 변환)[편집 | 원본 편집]
Narrow Transformation은 각 파티션에서 독립적으로 수행될 수 있는 연산으로, Shuffling이 발생하지 않는다.
연산 | 설명 | 예제 |
---|---|---|
map | 각 요소에 함수를 적용하여 새로운 RDD 생성 | rdd.map(lambda x: x * 2) |
flatMap | 각 요소를 여러 개의 출력 요소로 변환 | rdd.flatMap(lambda x: x.split(" ")) |
filter | 조건을 만족하는 요소만 포함하는 RDD 생성 | rdd.filter(lambda x: x % 2 == 0) |
distinct | 중복 요소를 제거한 새로운 RDD 생성 | rdd.distinct() |
mapValues | (키, 값) 형태의 RDD에서 값에 함수 적용 | rdd.mapValues(lambda x: x * 2) |
sample | 임의의 샘플 데이터를 포함하는 RDD 생성 | rdd.sample(False, 0.5) |
coalesce | 파티션 개수를 줄여서 새로운 RDD 생성 | rdd.coalesce(2) |
Wide Transformation(넓은 변환)[편집 | 원본 편집]
Wide Transformation은 여러 파티션에서 데이터를 교환(Shuffling)해야 하는 연산으로, 네트워크 비용이 크다.
연산 | 설명 | 예제 |
---|---|---|
groupByKey | 동일한 키를 가진 값을 그룹화 (Shuffling 발생) | rdd.groupByKey() |
reduceByKey | 동일한 키를 가진 값을 연산하여 하나의 값으로 변환 | rdd.reduceByKey(lambda a, b: a + b) |
sortByKey | 키를 기준으로 정렬 | rdd.sortByKey() |
join | 두 개의 RDD를 키를 기준으로 조인 | rdd1.join(rdd2) |
cogroup | 여러 개의 RDD를 그룹화 | rdd1.cogroup(rdd2) |
combineByKey | 값을 조합하는 고급 연산 | rdd.combineByKey(lambda x: (x, 1), ...) |
repartition | 파티션 개수를 변경하여 새로운 RDD 생성 | rdd.repartition(4) |
Narrow vs. Wide Transformation 비교[편집 | 원본 편집]
구분 | Narrow Transformation | Wide Transformation |
---|---|---|
데이터 이동 | 같은 파티션에서 처리 | 여러 파티션 간 데이터 이동(Shuffling 발생) |
실행 비용 | 낮음 | 높음 |
예제 연산 | map, filter, flatMap, mapValues, sample | groupByKey, reduceByKey, join, cogroup, repartition |
Transformation의 실행 과정[편집 | 원본 편집]
Transformation은 즉시 실행되지 않으며, 액션이 호출될 때 실행된다. 예를 들어, 아래의 코드를 실행하면 변환 연산이 즉시 수행되지 않는다.
rdd = sc.parallelize([1, 2, 3, 4, 5])
mapped_rdd = rdd.map(lambda x: x * 2)
filtered_rdd = mapped_rdd.filter(lambda x: x > 5)
이때, filtered_rdd는 변환 연산만 기록된 상태이며, 아래의 액션이 실행될 때 연산이 수행된다.
result = filtered_rdd.collect()
print(result)
변환 연산의 최적화[편집 | 원본 편집]
아파치 스파크는 변환 연산을 최적화하기 위해 여러 가지 기법을 제공한다.
- Lazy Evaluation - 불필요한 연산을 최소화하여 최적화된 실행 계획을 수립.
- Pipeline Execution - 여러 Narrow Transformation을 한 번에 실행하여 성능 최적화.
- Shuffling 최소화 - Wide Transformation 사용을 최소화하여 네트워크 비용 감소.
- Cache & Persist - 반복적으로 사용되는 RDD를 캐싱하여 성능 향상.