From a69b8f0c25250bf0a910c4ca4c4bf2d4f2557003 Mon Sep 17 00:00:00 2001 From: wonyong Date: Sun, 5 Nov 2023 13:14:47 +0900 Subject: [PATCH] spark streaming to structured streaming --- ...Spark-Streaming-To-Structured-Streaming.md | 70 +++++++++++++++++-- 1 file changed, 64 insertions(+), 6 deletions(-) diff --git a/_posts/spark/2022-03-07-Spark-Streaming-To-Structured-Streaming.md b/_posts/spark/2022-03-07-Spark-Streaming-To-Structured-Streaming.md index e83c6f9..bb83a06 100644 --- a/_posts/spark/2022-03-07-Spark-Streaming-To-Structured-Streaming.md +++ b/_posts/spark/2022-03-07-Spark-Streaming-To-Structured-Streaming.md @@ -10,7 +10,7 @@ background: '/img/posts/mac.png' 이번 글에서는 현재 업무에서 사용하던 Spark Streaming을 Structured Streaming 으로 전환 하는 과정에서 -Trouble shooting을 정리해 보려고 한다. +trouble shooting을 정리해 보려고 한다. [Incident Review](https://wonyong-jang.github.io/spark/2023/07/09/Spark-Streaming-Processing-Delay.html)에서 공유한 것처럼 잘못된 구조로 설계되어 있는 부분을 개선하면서 @@ -18,7 +18,7 @@ Trouble shooting을 정리해 보려고 한다. [Structured Streaming](https://wonyong-jang.github.io/spark/2022/01/03/Spark-Structured-Streaming.html) 으로 전환하는 작업을 같이 진행하였다. -현재 Spark Streaming은 AWS EMR Cluster(5.33.1 version) 에서 +> 현재 Spark Streaming은 AWS EMR Cluster(5.33.1 version) 에서 실행하고 있으며 Spark version은 2.4.7, Scala version 2.11을 사용 중이다. @@ -74,7 +74,6 @@ val kinesisStream = KinesisInputDStream.builder 각 batch interval마다 하나의 RDD로 생성된다.` - > Spark Streaming이 Kafka 등에서 여러 파티션을 통해 데이터를 로드하게 되면, 각 batch interval 마다 파티션 개수만큼 RDD가 생성되어 추가로 merge 를 통해 하나의 RDD로 만드는 작업이 필요하다. (Receiver 기반 @@ -89,7 +88,10 @@ KinesisRecordProcessor of Kinesis Client Library(KCL) 가 shard로 부터 - - - -## 2. Structured Streaming 과 kinesis +## 2. Structured Streaming 과 kinesis + +스크린샷 2023-11-05 오전 11 22 14 + 현재 spark 버전으로 kinesis를 input source로써 사용할 수 있는 라이브러리는 아래와 같다. @@ -247,7 +249,6 @@ org.apache.spark.network .client.ChunkFetchFailureException: Failure while fetch ``` spark.dynamicAllocation.enabled=true spark.shuffle.service.enabled=true - ``` 현재 spark version 2.x 버전에서 spark batch job이 아닌, streaming에서 dynamic resource allocation을 사용했을 때, @@ -255,7 +256,64 @@ spark.shuffle.service.enabled=true > 해당 옵션은 spark batch job에서 사용하기 최적화 되어 있는 것 같다. -따라서 해당 옵션을 false로 변경한 후 해당 에러가 발생하지 않음을 확인했다. +따라서 해당 옵션을 false로 변경한 후 해당 에러가 발생하지 않음을 확인했다. + +- - - + +## 3. Configuration + +#### 3-1) kinesis.client.describeShardInterval + +`kinesis의 shard 정보를 확인할 interval을 지정하는 옵션이며, default 1초 이다.` + +이때, [DescribeStream API](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html)를 사용한다. + +`kinesis의 shard는 throughput 단위이며, + shard의 갯수가 증가 하거나 감소할 때 resharding이 발생한다.` +`따라서, describeShardInterval 마다 어플리케이션은 kinesis stream의 변경을 체크하여, + 각 shard들로 부터 읽어올 position을 결정한다.` + +> micro batch interval 10초로 지정했을 때, describeShardInterval은 +일반적으로 10초 ~ 60초 사이가 적당하다. +> 하지만, 어플리케이션이 얼마나 자주 resharding이 발생하는지, + AWS API rate limit이 어느정도 되는지 또는 어플리케이션의 데이터 특성에 따라 달라질 수 있다. + +describeShardInterval 주기를 너무 짧게 설정하면 너무 자주 api를 호출하게 되고, 주기를 너무 길게 설정하면 +shard 변경에 대해서 대응이 늦어지게 되므로 데이터 처리 delay가 발생할 수 있다. + +> describeShardInterval 주기를 길게 설정하여도, kinesis 데이터 보관주기(default 1 day) 안에서는 +data loss는 없고, 처리 delay가 발생할 수 있다. + +따라서, 테스트 및 모니터링을 통해 적절한 주기를 설정해야 한다. + +```scala +val kinesisDataFrame = spark.readStream + .format("kinesis") + .option("streamName", "my-kinesis-stream") + .option("endpointUrl", "https://kinesis.us-west-2.amazonaws.com") + .option("region", "us-west-2") + .option("awsUseInstanceProfile", "false") + .option("kinesis.client.describeShardInterval", "10000") // Check for shard updates every 10 seconds + .load() +``` + +#### 3-2) awsUseInstanceProfile + +`awsUseInstanceProfile의 default 옵션은 true이며, 이는 AWS EC2 인스턴스 프로파일을 이용하여 인증하는 방식이다.` + +```scala +.option("awsUseInstanceProfile", "false") +``` + +`현재 업무에서는 AWS Credentials file(~/.aws/credentials)을 사용하여 인증하는 방식이기 때문에 fals로 지정하였다.` + +#### 3-3) backpressure 관련 옵션 + +``` +.option("kinesis.executor.maxFetchTimeInMs", 1000) +.option("kinesis.executor.maxRecordPerRead", 10000) +.option("kinesis.executor.maxFetchRecordsPerShard", 100000) +``` - - -