Skip to content

Commit

Permalink
spark aqe
Browse files Browse the repository at this point in the history
  • Loading branch information
WonYong-Jang committed May 15, 2024
1 parent f882c42 commit c56151d
Showing 1 changed file with 30 additions and 3 deletions.
33 changes: 30 additions & 3 deletions _posts/spark/2024-04-15-Spark-Adaptive-Query-Execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ rule-based 외에 cost-based 을 포함해 최적화를 실행하였다.
여기서 파티션 개수는 쿼리 성능에 매우 직접적인 연관을 가지고 있다.

`따라서 AQE는 셔플 통계를 보고, 너무 많은 파티션 갯수를 사용할 경우 I/O를
많이 유발할 수 있기 때문에 파티션들을 적절하게 합쳐주는 기능을 제공한다.`
많이 유발할 수 있기 때문에 파티션들을 적절하게 합쳐주는 기능을 제공한다.`

default로 spark.sql.shuffle.partitions 갯수는 200 이기 때문에
기본적으로 shuffle 파티션이 200개가 생성된다.
예를들어, reduceByKey를 할 때 각 key 값이 4개 밖에 없는데
shuffle 파티션을 200개나 만들 필요가 없다.
따라서, AQE를 통해 자동으로 shuffle 파티션을 줄여준다.

아래 예제를 통해 이해해보자.

기존 방식으로 셔플을 진행하면, 아래 그림과 같이 5개의 셔플 파티션이 생기고
각 파티션마다 크기가 달라질 수 있다.
Expand Down Expand Up @@ -57,11 +65,15 @@ Broadcast hash join 을 사용할 경우 shuffle이 발생하지 않기 때문
사용 가능한 경우 이를 적용시켜 준다.`

> RDD를 조인할 때는 Map Side Join 또는 Replicated Join이라고도 부르며, 큰 테이블과 작은 테이블을
join할 때 셔플을 피할 수 있는 방법이다.
join할 때 성능을 향상시킬 수 있는 방법이다.

작은 테이블의 경우는 [Broadcast 변수](https://wonyong-jang.github.io/spark/2021/07/08/Spark-broadcast-accumulator.html)를 driver에서 만들어서
각 executor로 보내주게 되며, 이를 통해 shuffle을 피하여 join을 할 수 있게 된다.
각 executor로 보내주게 되며, 이를 통해 shuffle을 피하여 join을 할 수 있게 된다.

`명시적으로 broadcast hash join을 사용한 쿼리와는 다르게 AQE에서 제공하는
broadcast hash join 은 shuffle이 발생한다.`
`shuffle을 해야 실제 데이터가 얼마나 작은지 확인이 가능하기 때문이며, 대신
sort 단계를 없앨수 있기 때문에 상대적으로 빠르다.`

<img width="700" alt="스크린샷 2024-04-16 오후 3 07 31" src="https://github.com/WonYong-Jang/Pharmacy-Recommendation/assets/26623547/929038a7-4fc3-469a-b1b2-5084666db9cc">

Expand Down Expand Up @@ -169,11 +181,26 @@ spark.sql.adaptive.coalescePartitions.minPartitionSize

### 2-2) skew join 활성화

활성화 옵션은 아래와 같다.

```scala
spark.conf.set("spark.sql.adaptive.enabled",true)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled",true)
```

활성화 되기 위한 조건에 대한 옵션은 아래와 같다.

```
// default
// 다른 파티션들과 비교했을 때 5배 이상 큰 경우 skew join을 사용
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 (compared to medium partition size)
// 파티션이 256MB 보다 클 경우
spark.sql.adaptive.skewJoin.skewedPartitionThresholdingBytes=256MB
```



- - -

**Reference**
Expand Down

0 comments on commit c56151d

Please sign in to comment.