diff --git a/_posts/spark/2024-04-15-Spark-Adaptive-Query-Execution.md b/_posts/spark/2024-04-15-Spark-Adaptive-Query-Execution.md index 1bff3d2..2664db7 100644 --- a/_posts/spark/2024-04-15-Spark-Adaptive-Query-Execution.md +++ b/_posts/spark/2024-04-15-Spark-Adaptive-Query-Execution.md @@ -28,7 +28,15 @@ rule-based 외에 cost-based 을 포함해 최적화를 실행하였다. 여기서 파티션 개수는 쿼리 성능에 매우 직접적인 연관을 가지고 있다. `따라서 AQE는 셔플 통계를 보고, 너무 많은 파티션 갯수를 사용할 경우 I/O를 -많이 유발할 수 있기 때문에 파티션들을 적절하게 합쳐주는 기능을 제공한다.` +많이 유발할 수 있기 때문에 파티션들을 적절하게 합쳐주는 기능을 제공한다.` + +default로 spark.sql.shuffle.partitions 갯수는 200 이기 때문에 +기본적으로 shuffle 파티션이 200개가 생성된다. +예를들어, reduceByKey를 할 때 각 key 값이 4개 밖에 없는데 +shuffle 파티션을 200개나 만들 필요가 없다. +따라서, AQE를 통해 자동으로 shuffle 파티션을 줄여준다. + +아래 예제를 통해 이해해보자. 기존 방식으로 셔플을 진행하면, 아래 그림과 같이 5개의 셔플 파티션이 생기고 각 파티션마다 크기가 달라질 수 있다. @@ -57,11 +65,15 @@ Broadcast hash join 을 사용할 경우 shuffle이 발생하지 않기 때문 사용 가능한 경우 이를 적용시켜 준다.` > RDD를 조인할 때는 Map Side Join 또는 Replicated Join이라고도 부르며, 큰 테이블과 작은 테이블을 -join할 때 셔플을 피할 수 있는 방법이다. +join할 때 성능을 향상시킬 수 있는 방법이다. 작은 테이블의 경우는 [Broadcast 변수](https://wonyong-jang.github.io/spark/2021/07/08/Spark-broadcast-accumulator.html)를 driver에서 만들어서 -각 executor로 보내주게 되며, 이를 통해 shuffle을 피하여 join을 할 수 있게 된다. +각 executor로 보내주게 되며, 이를 통해 shuffle을 피하여 join을 할 수 있게 된다. +`명시적으로 broadcast hash join을 사용한 쿼리와는 다르게 AQE에서 제공하는 +broadcast hash join 은 shuffle이 발생한다.` +`shuffle을 해야 실제 데이터가 얼마나 작은지 확인이 가능하기 때문이며, 대신 +sort 단계를 없앨수 있기 때문에 상대적으로 빠르다.` 스크린샷 2024-04-16 오후 3 07 31 @@ -169,11 +181,26 @@ spark.sql.adaptive.coalescePartitions.minPartitionSize ### 2-2) skew join 활성화 +활성화 옵션은 아래와 같다. + ```scala spark.conf.set("spark.sql.adaptive.enabled",true) spark.conf.set("spark.sql.adaptive.skewJoin.enabled",true) ``` +활성화 되기 위한 조건에 대한 옵션은 아래와 같다. + +``` +// default +// 다른 파티션들과 비교했을 때 5배 이상 큰 경우 skew join을 사용 +spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 (compared to medium partition size) + +// 파티션이 256MB 보다 클 경우 +spark.sql.adaptive.skewJoin.skewedPartitionThresholdingBytes=256MB +``` + + + - - - **Reference**