Skip to content

Commit

Permalink
spark streaming to structured streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
WonYong-Jang committed Nov 5, 2023
1 parent a53deef commit a69b8f0
Showing 1 changed file with 64 additions and 6 deletions.
70 changes: 64 additions & 6 deletions _posts/spark/2022-03-07-Spark-Streaming-To-Structured-Streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ background: '/img/posts/mac.png'

이번 글에서는 현재 업무에서 사용하던 Spark Streaming을
Structured Streaming 으로 전환 하는 과정에서
Trouble shooting을 정리해 보려고 한다.
trouble shooting을 정리해 보려고 한다.

[Incident Review](https://wonyong-jang.github.io/spark/2023/07/09/Spark-Streaming-Processing-Delay.html)에서
공유한 것처럼 잘못된 구조로 설계되어 있는 부분을 개선하면서
성능 향상 및 DStream의 단점을 보완할 수 있는
[Structured Streaming](https://wonyong-jang.github.io/spark/2022/01/03/Spark-Structured-Streaming.html) 으로 전환하는 작업을 같이 진행하였다.


현재 Spark Streaming은 AWS EMR Cluster(5.33.1 version) 에서
> 현재 Spark Streaming은 AWS EMR Cluster(5.33.1 version) 에서
실행하고 있으며 Spark version은 2.4.7, Scala version 2.11을
사용 중이다.

Expand Down Expand Up @@ -74,7 +74,6 @@ val kinesisStream = KinesisInputDStream.builder
각 batch interval마다 하나의 RDD로 생성된다.`



> Spark Streaming이 Kafka 등에서 여러 파티션을 통해
데이터를 로드하게 되면, 각 batch interval 마다 파티션 개수만큼 RDD가
생성되어 추가로 merge 를 통해 하나의 RDD로 만드는 작업이 필요하다. (Receiver 기반
Expand All @@ -89,7 +88,10 @@ KinesisRecordProcessor of Kinesis Client Library(KCL) 가 shard로 부터

- - -

## 2. Structured Streaming 과 kinesis
## 2. Structured Streaming 과 kinesis

<img width="800" alt="스크린샷 2023-11-05 오전 11 22 14" src="https://github.com/WonYong-Jang/Pharmacy-Recommendation/assets/26623547/833e8554-0c15-4c49-8622-6cd23394c59f">


현재 spark 버전으로 kinesis를 input source로써 사용할 수 있는
라이브러리는 아래와 같다.
Expand Down Expand Up @@ -247,15 +249,71 @@ org.apache.spark.network .client.ChunkFetchFailureException: Failure while fetch
```
spark.dynamicAllocation.enabled=true
spark.shuffle.service.enabled=true
```

현재 spark version 2.x 버전에서 spark batch job이 아닌, streaming에서 dynamic resource allocation을 사용했을 때,
side effect가 발생할 수 있음을 확인했다.

> 해당 옵션은 spark batch job에서 사용하기 최적화 되어 있는 것 같다.
따라서 해당 옵션을 false로 변경한 후 해당 에러가 발생하지 않음을 확인했다.
따라서 해당 옵션을 false로 변경한 후 해당 에러가 발생하지 않음을 확인했다.

- - -

## 3. Configuration

#### 3-1) kinesis.client.describeShardInterval

`kinesis의 shard 정보를 확인할 interval을 지정하는 옵션이며, default 1초 이다.`

이때, [DescribeStream API](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html)를 사용한다.

`kinesis의 shard는 throughput 단위이며,
shard의 갯수가 증가 하거나 감소할 때 resharding이 발생한다.`
`따라서, describeShardInterval 마다 어플리케이션은 kinesis stream의 변경을 체크하여,
각 shard들로 부터 읽어올 position을 결정한다.`

> micro batch interval 10초로 지정했을 때, describeShardInterval은
일반적으로 10초 ~ 60초 사이가 적당하다.
> 하지만, 어플리케이션이 얼마나 자주 resharding이 발생하는지,
AWS API rate limit이 어느정도 되는지 또는 어플리케이션의 데이터 특성에 따라 달라질 수 있다.

describeShardInterval 주기를 너무 짧게 설정하면 너무 자주 api를 호출하게 되고, 주기를 너무 길게 설정하면
shard 변경에 대해서 대응이 늦어지게 되므로 데이터 처리 delay가 발생할 수 있다.

> describeShardInterval 주기를 길게 설정하여도, kinesis 데이터 보관주기(default 1 day) 안에서는
data loss는 없고, 처리 delay가 발생할 수 있다.

따라서, 테스트 및 모니터링을 통해 적절한 주기를 설정해야 한다.

```scala
val kinesisDataFrame = spark.readStream
.format("kinesis")
.option("streamName", "my-kinesis-stream")
.option("endpointUrl", "https://kinesis.us-west-2.amazonaws.com")
.option("region", "us-west-2")
.option("awsUseInstanceProfile", "false")
.option("kinesis.client.describeShardInterval", "10000") // Check for shard updates every 10 seconds
.load()
```

#### 3-2) awsUseInstanceProfile

`awsUseInstanceProfile의 default 옵션은 true이며, 이는 AWS EC2 인스턴스 프로파일을 이용하여 인증하는 방식이다.`

```scala
.option("awsUseInstanceProfile", "false")
```

`현재 업무에서는 AWS Credentials file(~/.aws/credentials)을 사용하여 인증하는 방식이기 때문에 fals로 지정하였다.`

#### 3-3) backpressure 관련 옵션

```
.option("kinesis.executor.maxFetchTimeInMs", 1000)
.option("kinesis.executor.maxRecordPerRead", 10000)
.option("kinesis.executor.maxFetchRecordsPerShard", 100000)
```

- - -

Expand Down

0 comments on commit a69b8f0

Please sign in to comment.