Skip to content

Commit

Permalink
Change docs for flux based adapter
Browse files Browse the repository at this point in the history
Signed-off-by: Taeik Lim <[email protected]>
  • Loading branch information
acktsap committed Apr 14, 2024
1 parent 98c2023 commit 4d819e9
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# ItemStreamReaderProcessorWriter
# ItemStreamFluxReaderProcessorWriter

- [Create a tasklet with a processor](#create-a-tasklet-with-a-processor)
- [Java](#java)
Expand All @@ -14,9 +14,9 @@ Spring uses a reactive library called [Reactor](https://projectreactor.io/), whi

A chunk-oriented step in Spring Batch consists of `ItemReader`, `ItemProcessor`, and `ItemWriter`, which are usually defined separately and then assembled to define a step. However, there are some issues with this approach: it is difficult to share data between `ItemReader`, `ItemProcessor`, and `ItemWriter`, and you need to see each respective file to understand the batch flow. Also, if the classes are not reused, they can make the elements of a job less coherent.

To resolve such issues, `ItemStreamReaderProcessorWriter` provides an adapter to use `Flux` and helps you define `ItemReader`, `ItemProcessor`, and `ItemWriter` in a single file.
To resolve such issues, `ItemStreamFluxReaderProcessorWriter` provides an adapter to use `Flux` and helps you define `ItemReader`, `ItemProcessor`, and `ItemWriter` in a single file.

Because Spring Batch Plus has compileOnly dependencies on Reactor, you need to manually add them to use `ItemStreamReaderProcessorWriter`.
Because Spring Batch Plus has compileOnly dependencies on Reactor, you need to manually add them to use `ItemStreamFluxReaderProcessorWriter`.

```kotlin
dependencies {
Expand All @@ -26,16 +26,16 @@ dependencies {

## Create a tasklet with a processor

You can use `ItemStreamReaderProcessorWriter` to define `ItemStreamReader`, `ItemProcessor`, and `ItemStreamWriter` in a single class.
You can use `ItemStreamFluxReaderProcessorWriter` to define `ItemStreamReader`, `ItemProcessor`, and `ItemStreamWriter` in a single class.

### Java

In Java, you can convert a tasklet defined using `AdaptorFactory` to `ItemStreamReader`, `ItemProcessor`, and `ItemStreamWriter`.
In Java, you can convert a tasklet defined using `AdapterFactory` to `ItemStreamReader`, `ItemProcessor`, and `ItemStreamWriter`.

```java
@Component
@StepScope
class SampleTasklet implements ItemStreamReaderProcessorWriter<Integer, String> {
class SampleTasklet implements ItemStreamFluxReaderProcessorWriter<Integer, String> {

@Value("#{jobParameters['totalCount']}")
private long totalCount;
Expand Down Expand Up @@ -92,12 +92,12 @@ public class TestJobConfig {
}
```

You can statically import the method of `AdaptorFactory` for better readability.
You can statically import the method of `AdapterFactory` for better readability.

```java
import static com.navercorp.spring.batch.plus.item.AdaptorFactory.itemProcessor;
import static com.navercorp.spring.batch.plus.item.AdaptorFactory.itemStreamReader;
import static com.navercorp.spring.batch.plus.item.AdaptorFactory.itemStreamWriter;
import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemProcessor;
import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader;
import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter;

...

Expand Down Expand Up @@ -133,7 +133,7 @@ In Kotlin, you can convert a tasklet defined using an extension function to `Ite
@StepScope
open class SampleTasklet(
@Value("#{jobParameters['totalCount']}") private var totalCount: Long
) : ItemStreamReaderProcessorWriter<Int, String> {
) : ItemStreamFluxReaderProcessorWriter<Int, String> {
private var count = 0

override fun readFlux(executionContext: ExecutionContext): Flux<Int> {
Expand Down Expand Up @@ -183,16 +183,16 @@ open class TestJobConfig(

## Create a tasklet without a processor

If you need only `ItemStreamReader` and `ItemStreamWriter` without a processor, you can inherit `ItemStreamReaderWriter` to define `ItemStreamReader` and `ItemStreamWriter` in a single class.
If you need only `ItemStreamReader` and `ItemStreamWriter` without a processor, you can inherit `ItemStreamFluxReaderWriter` to define `ItemStreamReader` and `ItemStreamWriter` in a single class.

### Java

In Java, you can convert a tasklet defined using `AdaptorFactory` to `ItemStreamReader` and `ItemStreamWriter`.
In Java, you can convert a tasklet defined using `AdapterFactory` to `ItemStreamReader` and `ItemStreamWriter`.

```java
@Component
@StepScope
class SampleTasklet implements ItemStreamReaderWriter<Integer> {
class SampleTasklet implements ItemStreamFluxReaderWriter<Integer> {

@Value("#{jobParameters['totalCount']}")
private long totalCount;
Expand Down Expand Up @@ -234,20 +234,20 @@ public class TestJobConfig {
.start(
new StepBuilder("testStep", jobRepository)
.<Integer, Integer>chunk(3, transactionManager)
.reader(AdaptorFactory.itemStreamReader(sampleTasklet))
.writer(AdaptorFactory.itemStreamWriter(sampleTasklet))
.reader(AdapterFactory.itemStreamReader(sampleTasklet))
.writer(AdapterFactory.itemStreamWriter(sampleTasklet))
.build()
)
.build();
}
}
```

You can statically import the method of `AdaptorFactory` for better readability.
You can statically import the method of `AdapterFactory` for better readability.

```java
import static com.navercorp.spring.batch.plus.item.AdaptorFactory.itemStreamReader;
import static com.navercorp.spring.batch.plus.item.AdaptorFactory.itemStreamWriter;
import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader;
import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter;

...

Expand Down Expand Up @@ -282,7 +282,7 @@ In Kotlin, you can easily convert a tasklet defined using an extension function
@StepScope
open class SampleTasklet(
@Value("#{jobParameters['totalCount']}") private var totalCount: Long
) : ItemStreamReaderWriter<Int> {
) : ItemStreamFluxReaderWriter<Int> {
private var count = 0

override fun readFlux(executionContext: ExecutionContext): Flux<Int> {
Expand Down Expand Up @@ -328,14 +328,14 @@ open class TestJobConfig(

## Use a callback

You can define a callback method for `ItemStream` of `ItemStreamWriter` in `ItemStreamReaderProcessorWriter` and `ItemStreamReaderWriter`. You can selectively define a callback method.
You can define a callback method for `ItemStream` of `ItemStreamWriter` in `ItemStreamFluxReaderProcessorWriter` and `ItemStreamFluxReaderWriter`. You can selectively define a callback method.

### Java

```java
@Component
@StepScope
public class SampleTasklet implements ItemStreamReaderProcessorWriter<Integer, String> {
public class SampleTasklet implements ItemStreamFluxReaderProcessorWriter<Integer, String> {

@Value("#{jobParameters['totalCount']}")
private long totalCount;
Expand Down Expand Up @@ -430,7 +430,7 @@ public class TestJobConfig {
@StepScope
open class SampleTasklet(
@Value("#{jobParameters['totalCount']}") private var totalCount: Long
) : ItemStreamReaderProcessorWriter<Int, String> {
) : ItemStreamFluxReaderProcessorWriter<Int, String> {
private var count = 0

override fun onOpenRead(executionContext: ExecutionContext) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# ItemStreamReaderProcessorWriter
# ItemStreamFluxReaderProcessorWriter

- [processor를 포함하여 Tasklet 작성하기](#processor를-포함하여-tasklet-작성하기)
- [Java](#java)
Expand All @@ -14,9 +14,9 @@ Spring 진영에서는 Reactive library로 [Reactor](https://projectreactor.io/)

Spring Batch의 Chunk-oriented Step은 `ItemReader`, `ItemProcessor`, `ItmeWriter`로 구성됩니다. Spring Batch에서는 일반적으로 `ItemReader`, `ItemProcessor`, `ItmeWriter`를 각각 정의하고 이를 Step을 정의할 때 조립해서 사용합니다. 그런데 이 경우 `ItemReader`, `ItemProcessor`, `ItmeWriter`간에 데이터 공유가 힘들고 배치의 흐름을 알기 위해서는 `ItemReader`, `ItemProcessor`, `ItmeWriter`파일 각각을 살펴봐야 한다는 문제점이 있습니다. 또한 해당 클래스들이 재활용 되지 않는 케이스라면 Job의 응집도를 해치는 요소가 될 수 있습니다.

이 두가지 이슈를 해결하기 위해 `ItemStreamReaderProcessorWriter``Flux`를 사용할 수 있는 Adaptor와 단일 파일에서 `ItemReader`, `ItemProcessor`, `ItmeWriter` 정의할 수 있는 기능을 제공핣니다.
이 두가지 이슈를 해결하기 위해 `ItemStreamFluxReaderProcessorWriter``Flux`를 사용할 수 있는 Adaptor와 단일 파일에서 `ItemReader`, `ItemProcessor`, `ItmeWriter` 정의할 수 있는 기능을 제공핣니다.

Spring Batch Plus에서는 reactor를 compileOnly로 의존하기 때문에 `ItemStreamReaderProcessorWriter`를 사용하기 위해서는 직접 Reactor 의존성을 추가해야 합니다.
Spring Batch Plus에서는 reactor를 compileOnly로 의존하기 때문에 `ItemStreamFluxReaderProcessorWriter`를 사용하기 위해서는 직접 Reactor 의존성을 추가해야 합니다.

```kotlin
dependencies {
Expand All @@ -26,16 +26,16 @@ dependencies {

## processor를 포함하여 Tasklet 작성하기

`ItemStreamReaderProcessorWriter`를 사용해서 단일 class에서 `ItemStreamReader`, `ItemProcessor`, `ItemStreamWriter`를 정의할 수 있습니다.
`ItemStreamFluxReaderProcessorWriter`를 사용해서 단일 class에서 `ItemStreamReader`, `ItemProcessor`, `ItemStreamWriter`를 정의할 수 있습니다.

### Java

Java의 경우 `AdaptorFactory`를 이용해서 정의한 Tasklet을 `ItemStreamReader`, `ItemProcessor`, `ItemStreamWriter`로 변환하여 사용할 수 있습니다.
Java의 경우 `AdaptorFactery`를 이용해서 정의한 Tasklet을 `ItemStreamReader`, `ItemProcessor`, `ItemStreamWriter`로 변환하여 사용할 수 있습니다.

```java
@Component
@StepScope
class SampleTasklet implements ItemStreamReaderProcessorWriter<Integer, String> {
class SampleTasklet implements ItemStreamFluxReaderProcessorWriter<Integer, String> {

@Value("#{jobParameters['totalCount']}")
private long totalCount;
Expand Down Expand Up @@ -92,12 +92,12 @@ public class TestJobConfig {
}
```

이 경우 `AdaptorFactory`의 method를 static import를 해서 사용하는게 미관상 보기 더 좋습니다.
이 경우 `AdapterFactory`의 method를 static import를 해서 사용하는게 미관상 보기 더 좋습니다.

```java
import static com.navercorp.spring.batch.plus.item.AdaptorFactory.itemProcessor;
import static com.navercorp.spring.batch.plus.item.AdaptorFactory.itemStreamReader;
import static com.navercorp.spring.batch.plus.item.AdaptorFactory.itemStreamWriter;
import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemProcessor;
import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader;
import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter;

...

Expand Down Expand Up @@ -133,7 +133,7 @@ Kotlin에서는 extension function을 이용하여 정의한 Tasklet을 `ItemStr
@StepScope
open class SampleTasklet(
@Value("#{jobParameters['totalCount']}") private var totalCount: Long
) : ItemStreamReaderProcessorWriter<Int, String> {
) : ItemStreamFluxReaderProcessorWriter<Int, String> {
private var count = 0

override fun readFlux(executionContext: ExecutionContext): Flux<Int> {
Expand Down Expand Up @@ -183,16 +183,16 @@ open class TestJobConfig(

## Processor 없이 Tasklet 작성하기

Process 과정이 불필요하고 `ItemStreamReader``ItemStreamWriter` 만 필요하다면 `ItemStreamReaderWriter`를 상속하여 단일 class에서 `ItemStreamReader`, `ItemStreamWriter`를 정의할 수 있습니다.
Process 과정이 불필요하고 `ItemStreamReader``ItemStreamWriter` 만 필요하다면 `ItemStreamFluxReaderWriter`를 상속하여 단일 class에서 `ItemStreamReader`, `ItemStreamWriter`를 정의할 수 있습니다.

### Java

Java의 경우 `AdaptorFactory`를 이용해서 정의한 Tasklet을 `ItemStreamReader`, `ItemStreamWriter`로 변환하여 사용할 수 있습니다.
Java의 경우 `AdapterFactory`를 이용해서 정의한 Tasklet을 `ItemStreamReader`, `ItemStreamWriter`로 변환하여 사용할 수 있습니다.

```java
@Component
@StepScope
class SampleTasklet implements ItemStreamReaderWriter<Integer> {
class SampleTasklet implements ItemStreamFluxReaderWriter<Integer> {

@Value("#{jobParameters['totalCount']}")
private long totalCount;
Expand Down Expand Up @@ -234,20 +234,20 @@ public class TestJobConfig {
.start(
new StepBuilder("testStep", jobRepository)
.<Integer, Integer>chunk(3, transactionManager)
.reader(AdaptorFactory.itemStreamReader(sampleTasklet))
.writer(AdaptorFactory.itemStreamWriter(sampleTasklet))
.reader(AdapterFactory.itemStreamReader(sampleTasklet))
.writer(AdapterFactory.itemStreamWriter(sampleTasklet))
.build()
)
.build();
}
}
```

이 경우 `AdaptorFactory`의 method를 static import를 해서 사용하는게 미관상 보기 더 좋습니다.
이 경우 `AdapterFactory`의 method를 static import를 해서 사용하는게 미관상 보기 더 좋습니다.

```java
import static com.navercorp.spring.batch.plus.item.AdaptorFactory.itemStreamReader;
import static com.navercorp.spring.batch.plus.item.AdaptorFactory.itemStreamWriter;
import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader;
import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter;

...

Expand Down Expand Up @@ -282,7 +282,7 @@ Kotlin 사용시에는 Spring Batch Plus가 제공하는 extension function 을
@StepScope
open class SampleTasklet(
@Value("#{jobParameters['totalCount']}") private var totalCount: Long
) : ItemStreamReaderWriter<Int> {
) : ItemStreamFluxReaderWriter<Int> {
private var count = 0

override fun readFlux(executionContext: ExecutionContext): Flux<Int> {
Expand Down Expand Up @@ -328,14 +328,14 @@ open class TestJobConfig(

## Callback 사용하기

`ItemStreamReaderProcessorWriter`, `ItemStreamReaderWriter` 에는 `ItemStreamReader`, `ItemStreamWriter``ItemStream`에 대한 callback method도 같이 정의할 수 있습니다. Callback method는 선택적으로 정의할 수 있습니다.
`ItemStreamFluxReaderProcessorWriter`, `ItemStreamFluxReaderWriter` 에는 `ItemStreamReader`, `ItemStreamWriter``ItemStream`에 대한 callback method도 같이 정의할 수 있습니다. Callback method는 선택적으로 정의할 수 있습니다.

### Java

```java
@Component
@StepScope
public class SampleTasklet implements ItemStreamReaderProcessorWriter<Integer, String> {
public class SampleTasklet implements ItemStreamFluxReaderProcessorWriter<Integer, String> {

@Value("#{jobParameters['totalCount']}")
private long totalCount;
Expand Down Expand Up @@ -430,7 +430,7 @@ public class TestJobConfig {
@StepScope
open class SampleTasklet(
@Value("#{jobParameters['totalCount']}") private var totalCount: Long
) : ItemStreamReaderProcessorWriter<Int, String> {
) : ItemStreamFluxReaderProcessorWriter<Int, String> {
private var count = 0

override fun onOpenRead(executionContext: ExecutionContext) {
Expand Down Expand Up @@ -502,4 +502,4 @@ open class TestJobConfig(
}
}
}
```
```

0 comments on commit 4d819e9

Please sign in to comment.