From 56aa802bba104c406898b6ddc199e94eb59d425f Mon Sep 17 00:00:00 2001 From: Taeik Lim Date: Sun, 14 Apr 2024 23:57:32 +0900 Subject: [PATCH] Add docs for new adapters Signed-off-by: Taeik Lim --- doc/en/README.md | 5 +- ...stream-iterable-reader-processor-writer.md | 513 ++++++++++++++++++ ...stream-iterator-reader-processor-writer.md | 507 +++++++++++++++++ ...m-stream-simple-reader-processor-writer.md | 468 ++++++++++++++++ doc/ko/README.md | 5 +- ...tem-stream-flux-reader-processor-writer.md | 6 +- ...stream-iterable-reader-processor-writer.md | 513 ++++++++++++++++++ ...stream-iterator-reader-processor-writer.md | 507 +++++++++++++++++ ...m-stream-simple-reader-processor-writer.md | 468 ++++++++++++++++ 9 files changed, 2987 insertions(+), 5 deletions(-) create mode 100644 doc/en/step/item-stream-iterable-reader-processor-writer.md create mode 100644 doc/en/step/item-stream-iterator-reader-processor-writer.md create mode 100644 doc/en/step/item-stream-simple-reader-processor-writer.md create mode 100644 doc/ko/step/item-stream-iterable-reader-processor-writer.md create mode 100644 doc/ko/step/item-stream-iterator-reader-processor-writer.md create mode 100644 doc/ko/step/item-stream-simple-reader-processor-writer.md diff --git a/doc/en/README.md b/doc/en/README.md index 6a607a3..65a41a9 100644 --- a/doc/en/README.md +++ b/doc/en/README.md @@ -4,12 +4,15 @@ Spring Batch Plus provides useful classes available in [Spring Batch](https://gi ## User guide -The Kotlin DSL helps you declaratively declare a `Job`, `Step`, and `Flow` by using Kotlin’s [type-safe builders](https://kotlinlang.org/docs/type-safe-builders.html), without using `JobBuilder`, `StepBuilder`, or `FlowBuilder`. `ClearRunIdIncrementer` is a class that can replace the `RunIdIncrementer` of Spring Batch which reuses JobParameters in the previous JobExecution. `DeleteMetadataJob` is a `Job` that deletes old metadata. `ItemStreamFluxReaderProcessorWriter` helps you implement `ItemStreamReader`, `ItemProcessor`, and `ItemStreamWriter` as a single class. +The Kotlin DSL helps you declaratively declare a `Job`, `Step`, and `Flow` by using Kotlin’s [type-safe builders](https://kotlinlang.org/docs/type-safe-builders.html), without using `JobBuilder`, `StepBuilder`, or `FlowBuilder`. `ClearRunIdIncrementer` is a class that can replace the `RunIdIncrementer` of Spring Batch which reuses JobParameters in the previous JobExecution. `DeleteMetadataJob` is a `Job` that deletes old metadata. `ItemStreamFluxReaderProcessorWriter`, `ItemStreamIterableReaderProcessorWriter`, `ItemStreamIteratorReaderProcessorWriter` and `ItemStreamSimpleReaderProcessorWriter` helps you implement `ItemStreamReader`, `ItemProcessor`, and `ItemStreamWriter` as a single class. - [Kotlin DSL](./configuration/kotlin-dsl/README.md) - [ClearRunIdIncrementer](./job/clear-run-id-incrementer.md) - [DeleteMetadataJob](./job/delete-metadata-job.md) - [ItemStreamFluxReaderProcessorWriter](./step/item-stream-flux-reader-processor-writer.md) +- [ItemStreamIterableReaderProcessorWriter](./step/item-stream-iterable-reader-processor-writer.md) +- [ItemStreamIteratorReaderProcessorWriter](./step/item-stream-iterator-reader-processor-writer.md) +- [ItemStreamSimpleReaderProcessorWriter](./step/item-stream-simple-reader-processor-writer.md) ## Code samples diff --git a/doc/en/step/item-stream-iterable-reader-processor-writer.md b/doc/en/step/item-stream-iterable-reader-processor-writer.md new file mode 100644 index 0000000..bd40ebb --- /dev/null +++ b/doc/en/step/item-stream-iterable-reader-processor-writer.md @@ -0,0 +1,513 @@ +# ItemStreamIterableReaderProcessorWriter + +- [Create a tasklet with a processor](#create-a-tasklet-with-a-processor) + - [Java](#java) + - [Kotlin](#kotlin) +- [Create a tasklet without a processor](#create-a-tasklet-without-a-processor) + - [Java](#java-1) + - [Kotlin](#kotlin-1) +- [Use a callback](#use-a-callback) + - [Java](#java-2) + - [Kotlin](#kotlin-2) + +A chunk-oriented step in Spring Batch consists of `ItemReader`, `ItemProcessor`, and `ItemWriter`, which are usually defined separately and then assembled to define a step. However, there are some issues with this approach: it is difficult to share data between `ItemReader`, `ItemProcessor`, and `ItemWriter`, and you need to see each respective file to understand the batch flow. Also, if the classes are not reused, they can make the elements of a job less coherent. + +To resolve such issues, `ItemStreamIterableReaderProcessorWriter` provides an `Iterable` baesd adapter to helps you define `ItemReader`, `ItemProcessor`, and `ItemWriter` in a single file. + +## Create a tasklet with a processor + +You can use `ItemStreamIterableReaderProcessorWriter` to define `ItemStreamReader`, `ItemProcessor`, and `ItemStreamWriter` in a single class. + +### Java + +In Java, you can convert a tasklet defined using `AdapterFactory` to `ItemStreamReader`, `ItemProcessor`, and `ItemStreamWriter`. + +```java +@Component +@StepScope +class SampleTasklet implements ItemStreamIterableReaderProcessorWriter { + + @Value("#{jobParameters['totalCount']}") + private long totalCount; + + private int count = 0; + + @NonNull + @Override + public Iterable readIterable(@NonNull ExecutionContext executionContext) { + System.out.println("totalCount: " + totalCount); + return () -> new Iterator<>() { + @Override + public boolean hasNext() { + return count < totalCount; + } + + @Override + public Integer next() { + return count++; + } + }; + } + + @Override + public String process(@NonNull Integer item) { + return "'" + item.toString() + "'"; + } + + @Override + public void write(@NonNull Chunk chunk) { + System.out.println(chunk.getItems()); + } +} +``` + +```java +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(AdapterFactory.itemStreamReader(sampleTasklet)) + .processor(AdapterFactory.itemProcessor(sampleTasklet)) + .writer(AdapterFactory.itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +You can statically import the method of `AdapterFactory` for better readability. + +```java +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemProcessor; +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader; +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter; + +... + +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(itemStreamReader(sampleTasklet)) + .processor(itemProcessor(sampleTasklet)) + .writer(itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +### Kotlin + +In Kotlin, you can convert a tasklet defined using an extension function to `ItemStreamReader`, `ItemProcessor`, and `ItemStreamWriter`. + +```kotlin +@Component +@StepScope +open class SampleTasklet( + @Value("#{jobParameters['totalCount']}") private var totalCount: Long, +) : ItemStreamIterableReaderProcessorWriter { + private var count = 0 + + override fun readIterable(executionContext: ExecutionContext): Iterable { + println("totalCount: $totalCount") + return Iterable { + object : Iterator { + override fun hasNext(): Boolean { + return count < totalCount + } + + override fun next(): Int { + return count++ + } + } + } + } + + override fun process(item: Int): String? { + return "'$item'" + } + + override fun write(chunk: Chunk) { + println(chunk.items) + } +} +``` + +```kotlin +@Configuration +open class TestJobConfig( + private val batch: BatchDsl, + private val transactionManager: PlatformTransactionManager, +) { + @Bean + open fun testJob( + sampleTasklet: SampleTasklet, + ): Job = batch { + job("testJob") { + step("testStep") { + chunk(3, transactionManager) { + reader(sampleTasklet.asItemStreamReader()) + processor(sampleTasklet.asItemProcessor()) + writer(sampleTasklet.asItemStreamWriter()) + } + } + } + } +} +``` + +## Create a tasklet without a processor + +If you need only `ItemStreamReader` and `ItemStreamWriter` without a processor, you can inherit `ItemStreamIterableReaderWriter` to define `ItemStreamReader` and `ItemStreamWriter` in a single class. + +### Java + +In Java, you can convert a tasklet defined using `AdapterFactory` to `ItemStreamReader` and `ItemStreamWriter`. + +```java +@Component +@StepScope +class SampleTasklet implements ItemStreamIterableReaderWriter { + + @Value("#{jobParameters['totalCount']}") + private long totalCount; + + private int count = 0; + + @NonNull + @Override + public Iterable readIterable(@NonNull ExecutionContext executionContext) { + System.out.println("totalCount: " + totalCount); + return () -> new Iterator<>() { + @Override + public boolean hasNext() { + return count < totalCount; + } + + @Override + public Integer next() { + return count++; + } + }; + } + + @Override + public void write(@NonNull Chunk chunk) { + System.out.println(chunk.getItems()); + } +} +``` + +```java +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(AdapterFactory.itemStreamReader(sampleTasklet)) + .writer(AdapterFactory.itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +You can statically import the method of `AdapterFactory` for better readability. + +```java +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader; +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter; + +... + +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(itemStreamReader(sampleTasklet)) + .writer(itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +### Kotlin + +In Kotlin, you can easily convert a tasklet defined using an extension function in Spring Batch Plus to `ItemStreamReader` and `ItemStreamWriter`. + +```Kotlin +@Component +@StepScope +open class SampleTasklet( + @Value("#{jobParameters['totalCount']}") private var totalCount: Long, +) : ItemStreamIterableReaderWriter { + private var count = 0 + + override fun readIterable(executionContext: ExecutionContext): Iterable { + println("totalCount: $totalCount") + return Iterable { + object : Iterator { + override fun hasNext(): Boolean { + return count < totalCount + } + + override fun next(): Int { + return count++ + } + } + } + } + + override fun write(chunk: Chunk) { + println(chunk.items) + } +} +``` + +```Kotlin +@Configuration +open class TestJobConfig( + private val batch: BatchDsl, + private val transactionManager: PlatformTransactionManager, +) { + + @Bean + open fun testJob( + sampleTasklet: SampleTasklet, + ): Job = batch { + job("testJob") { + step("testStep") { + chunk(3, transactionManager) { + reader(sampleTasklet.asItemStreamReader()) + writer(sampleTasklet.asItemStreamWriter()) + } + } + } + } +} +``` + +## Use a callback + +You can define a callback method for `ItemStream` of `ItemStreamWriter` in `ItemStreamIterableReaderProcessorWriter` and `ItemStreamIterableReaderWriter`. You can selectively define a callback method. + +### Java + +```java +@Component +@StepScope +public class SampleTasklet implements ItemStreamIterableReaderProcessorWriter { + + @Value("#{jobParameters['totalCount']}") + private long totalCount; + + private int count = 0; + + @Override + public void onOpenRead(@NonNull ExecutionContext executionContext) { + System.out.println("onOpenRead"); + } + + @NonNull + @Override + public Iterable readIterable(@NonNull ExecutionContext executionContext) { + System.out.println("totalCount: " + totalCount); + return () -> new Iterator<>() { + @Override + public boolean hasNext() { + return count < totalCount; + } + + @Override + public Integer next() { + return count++; + } + }; + } + + @Override + public void onUpdateRead(@NonNull ExecutionContext executionContext) { + System.out.println("onUpdateRead"); + } + + @Override + public void onCloseRead() { + System.out.println("onCloseRead"); + } + + @Override + public String process(@NonNull Integer item) { + return "'" + item.toString() + "'"; + } + + @Override + public void onOpenWrite(@NonNull ExecutionContext executionContext) { + System.out.println("onOpenWrite"); + } + + @Override + public void write(@NonNull Chunk chunk) { + System.out.println(chunk.getItems()); + } + + @Override + public void onUpdateWrite(@NonNull ExecutionContext executionContext) { + System.out.println("onUpdateWrite"); + executionContext.putString("samplekey", "samplevlaue"); + } + + @Override + public void onCloseWrite() { + System.out.println("onCloseWrite"); + } +} +``` + +```java +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(itemStreamReader(sampleTasklet)) + .processor(itemProcessor(sampleTasklet)) + .writer(itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +### Kotlin + +```kotlin +@Component +@StepScope +open class SampleTasklet( + @Value("#{jobParameters['totalCount']}") private var totalCount: Long, +) : ItemStreamIterableReaderProcessorWriter { + private var count = 0 + + override fun onOpenRead(executionContext: ExecutionContext) { + println("onOpenRead") + } + + override fun readIterable(executionContext: ExecutionContext): Iterable { + println("totalCount: $totalCount") + return Iterable { + object : Iterator { + override fun hasNext(): Boolean { + return count < totalCount + } + + override fun next(): Int { + return count++ + } + } + } + } + + override fun onUpdateRead(executionContext: ExecutionContext) { + println("onUpdateRead") + } + + override fun onCloseRead() { + println("onCloseRead") + } + + override fun process(item: Int): String? { + return "'$item'" + } + + override fun onOpenWrite(executionContext: ExecutionContext) { + println("onOpenWrite") + } + + override fun write(chunk: Chunk) { + println(chunk.items) + } + + override fun onUpdateWrite(executionContext: ExecutionContext) { + println("onUpdateWrite") + executionContext.putString("samplekey", "samplevalue") + } + + override fun onCloseWrite() { + println("onCloseWrite") + } +} +``` + +```kotlin +@Configuration +open class TestJobConfig( + private val batch: BatchDsl, + private val transactionManager: PlatformTransactionManager, +) { + + @Bean + open fun testJob( + sampleTasklet: SampleTasklet, + ): Job = batch { + job("testJob") { + step("testStep") { + chunk(3, transactionManager) { + reader(sampleTasklet.asItemStreamReader()) + processor(sampleTasklet.asItemProcessor()) + writer(sampleTasklet.asItemStreamWriter()) + } + } + } + } +} +``` diff --git a/doc/en/step/item-stream-iterator-reader-processor-writer.md b/doc/en/step/item-stream-iterator-reader-processor-writer.md new file mode 100644 index 0000000..7d02503 --- /dev/null +++ b/doc/en/step/item-stream-iterator-reader-processor-writer.md @@ -0,0 +1,507 @@ +# ItemStreamIteratorReaderProcessorWriter + +- [Create a tasklet with a processor](#create-a-tasklet-with-a-processor) + - [Java](#java) + - [Kotlin](#kotlin) +- [Create a tasklet without a processor](#create-a-tasklet-without-a-processor) + - [Java](#java-1) + - [Kotlin](#kotlin-1) +- [Use a callback](#use-a-callback) + - [Java](#java-2) + - [Kotlin](#kotlin-2) + +A chunk-oriented step in Spring Batch consists of `ItemReader`, `ItemProcessor`, and `ItemWriter`, which are usually defined separately and then assembled to define a step. However, there are some issues with this approach: it is difficult to share data between `ItemReader`, `ItemProcessor`, and `ItemWriter`, and you need to see each respective file to understand the batch flow. Also, if the classes are not reused, they can make the elements of a job less coherent. + +To resolve such issues, `ItemStreamIteratorReaderProcessorWriter` provides an `Iterator` based adapter to helps you define `ItemReader`, `ItemProcessor`, and `ItemWriter` in a single file. + +## Create a tasklet with a processor + +You can use `ItemStreamIteratorReaderProcessorWriter` to define `ItemStreamReader`, `ItemProcessor`, and `ItemStreamWriter` in a single class. + +### Java + +In Java, you can convert a tasklet defined using `AdapterFactory` to `ItemStreamReader`, `ItemProcessor`, and `ItemStreamWriter`. + +```java +@Component +@StepScope +class SampleTasklet implements ItemStreamIteratorReaderProcessorWriter { + + @Value("#{jobParameters['totalCount']}") + private long totalCount; + + private int count = 0; + + @NonNull + @Override + public Iterator readIterator(@NonNull ExecutionContext executionContext) { + System.out.println("totalCount: " + totalCount); + return new Iterator<>() { + @Override + public boolean hasNext() { + return count < totalCount; + } + + @Override + public Integer next() { + return count++; + } + }; + } + + @Override + public String process(@NonNull Integer item) { + return "'" + item.toString() + "'"; + } + + @Override + public void write(@NonNull Chunk chunk) { + System.out.println(chunk.getItems()); + } +} +``` + +```java +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(AdapterFactory.itemStreamReader(sampleTasklet)) + .processor(AdapterFactory.itemProcessor(sampleTasklet)) + .writer(AdapterFactory.itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +You can statically import the method of `AdapterFactory` for better readability. + +```java +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemProcessor; +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader; +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter; + +... + +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(itemStreamReader(sampleTasklet)) + .processor(itemProcessor(sampleTasklet)) + .writer(itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +### Kotlin + +In Kotlin, you can convert a tasklet defined using an extension function to `ItemStreamReader`, `ItemProcessor`, and `ItemStreamWriter`. + +```kotlin +@Component +@StepScope +open class SampleTasklet( + @Value("#{jobParameters['totalCount']}") private var totalCount: Long, +) : ItemStreamIteratorReaderProcessorWriter { + private var count = 0 + + override fun readIterator(executionContext: ExecutionContext): Iterator { + println("totalCount: $totalCount") + return object : Iterator { + override fun hasNext(): Boolean { + return count < totalCount + } + + override fun next(): Int { + return count++ + } + } + } + + override fun process(item: Int): String? { + return "'$item'" + } + + override fun write(chunk: Chunk) { + println(chunk.items) + } +} +``` + +```kotlin +@Configuration +open class TestJobConfig( + private val batch: BatchDsl, + private val transactionManager: PlatformTransactionManager, +) { + @Bean + open fun testJob( + sampleTasklet: SampleTasklet, + ): Job = batch { + job("testJob") { + step("testStep") { + chunk(3, transactionManager) { + reader(sampleTasklet.asItemStreamReader()) + processor(sampleTasklet.asItemProcessor()) + writer(sampleTasklet.asItemStreamWriter()) + } + } + } + } +} +``` + +## Create a tasklet without a processor + +If you need only `ItemStreamReader` and `ItemStreamWriter` without a processor, you can inherit `ItemStreamIteratorReaderWriter` to define `ItemStreamReader` and `ItemStreamWriter` in a single class. + +### Java + +In Java, you can convert a tasklet defined using `AdapterFactory` to `ItemStreamReader` and `ItemStreamWriter`. + +```java +@Component +@StepScope +class SampleTasklet implements ItemStreamIteratorReaderWriter { + + @Value("#{jobParameters['totalCount']}") + private long totalCount; + + private int count = 0; + + @NonNull + @Override + public Iterator readIterator(@NonNull ExecutionContext executionContext) { + System.out.println("totalCount: " + totalCount); + return new Iterator<>() { + @Override + public boolean hasNext() { + return count < totalCount; + } + + @Override + public Integer next() { + return count++; + } + }; + } + + @Override + public void write(@NonNull Chunk chunk) { + System.out.println(chunk.getItems()); + } +} +``` + +```java +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(AdapterFactory.itemStreamReader(sampleTasklet)) + .writer(AdapterFactory.itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +You can statically import the method of `AdapterFactory` for better readability. + +```java +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader; +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter; + +... + +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(itemStreamReader(sampleTasklet)) + .writer(itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +### Kotlin + +In Kotlin, you can easily convert a tasklet defined using an extension function in Spring Batch Plus to `ItemStreamReader` and `ItemStreamWriter`. + +```Kotlin +@Component +@StepScope +open class SampleTasklet( + @Value("#{jobParameters['totalCount']}") private var totalCount: Long, +) : ItemStreamIteratorReaderWriter { + private var count = 0 + + override fun readIterator(executionContext: ExecutionContext): Iterator { + println("totalCount: $totalCount") + return object : Iterator { + override fun hasNext(): Boolean { + return count < totalCount + } + + override fun next(): Int { + return count++ + } + } + } + + override fun write(chunk: Chunk) { + println(chunk.items) + } +} +``` + +```Kotlin +@Configuration +open class TestJobConfig( + private val batch: BatchDsl, + private val transactionManager: PlatformTransactionManager, +) { + + @Bean + open fun testJob( + sampleTasklet: SampleTasklet, + ): Job = batch { + job("testJob") { + step("testStep") { + chunk(3, transactionManager) { + reader(sampleTasklet.asItemStreamReader()) + writer(sampleTasklet.asItemStreamWriter()) + } + } + } + } +} +``` + +## Use a callback + +You can define a callback method for `ItemStream` of `ItemStreamWriter` in `ItemStreamIteratorReaderProcessorWriter` and `ItemStreamIteratorReaderWriter`. You can selectively define a callback method. + +### Java + +```java +@Component +@StepScope +public class SampleTasklet implements ItemStreamIteratorReaderProcessorWriter { + + @Value("#{jobParameters['totalCount']}") + private long totalCount; + + private int count = 0; + + @Override + public void onOpenRead(@NonNull ExecutionContext executionContext) { + System.out.println("onOpenRead"); + } + + @NonNull + @Override + public Iterator readIterator(@NonNull ExecutionContext executionContext) { + System.out.println("totalCount: " + totalCount); + return new Iterator<>() { + @Override + public boolean hasNext() { + return count < totalCount; + } + + @Override + public Integer next() { + return count++; + } + }; + } + + @Override + public void onUpdateRead(@NonNull ExecutionContext executionContext) { + System.out.println("onUpdateRead"); + } + + @Override + public void onCloseRead() { + System.out.println("onCloseRead"); + } + + @Override + public String process(@NonNull Integer item) { + return "'" + item.toString() + "'"; + } + + @Override + public void onOpenWrite(@NonNull ExecutionContext executionContext) { + System.out.println("onOpenWrite"); + } + + @Override + public void write(@NonNull Chunk chunk) { + System.out.println(chunk.getItems()); + } + + @Override + public void onUpdateWrite(@NonNull ExecutionContext executionContext) { + System.out.println("onUpdateWrite"); + executionContext.putString("samplekey", "samplevlaue"); + } + + @Override + public void onCloseWrite() { + System.out.println("onCloseWrite"); + } +} +``` + +```java +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(itemStreamReader(sampleTasklet)) + .processor(itemProcessor(sampleTasklet)) + .writer(itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +### Kotlin + +```kotlin +@Component +@StepScope +open class SampleTasklet( + @Value("#{jobParameters['totalCount']}") private var totalCount: Long, +) : ItemStreamIteratorReaderProcessorWriter { + private var count = 0 + + override fun onOpenRead(executionContext: ExecutionContext) { + println("onOpenRead") + } + + override fun readIterator(executionContext: ExecutionContext): Iterator { + println("totalCount: $totalCount") + return object : Iterator { + override fun hasNext(): Boolean { + return count < totalCount + } + + override fun next(): Int { + return count++ + } + } + } + + override fun onUpdateRead(executionContext: ExecutionContext) { + println("onUpdateRead") + } + + override fun onCloseRead() { + println("onCloseRead") + } + + override fun process(item: Int): String? { + return "'$item'" + } + + override fun onOpenWrite(executionContext: ExecutionContext) { + println("onOpenWrite") + } + + override fun write(chunk: Chunk) { + println(chunk.items) + } + + override fun onUpdateWrite(executionContext: ExecutionContext) { + println("onUpdateWrite") + executionContext.putString("samplekey", "samplevalue") + } + + override fun onCloseWrite() { + println("onCloseWrite") + } +} +``` + +```kotlin +@Configuration +open class TestJobConfig( + private val batch: BatchDsl, + private val transactionManager: PlatformTransactionManager, +) { + + @Bean + open fun testJob( + sampleTasklet: SampleTasklet, + ): Job = batch { + job("testJob") { + step("testStep") { + chunk(3, transactionManager) { + reader(sampleTasklet.asItemStreamReader()) + processor(sampleTasklet.asItemProcessor()) + writer(sampleTasklet.asItemStreamWriter()) + } + } + } + } +} +``` diff --git a/doc/en/step/item-stream-simple-reader-processor-writer.md b/doc/en/step/item-stream-simple-reader-processor-writer.md new file mode 100644 index 0000000..ea3d52f --- /dev/null +++ b/doc/en/step/item-stream-simple-reader-processor-writer.md @@ -0,0 +1,468 @@ +# ItemStreamSimpleReaderProcessorWriter + +- [Create a tasklet with a processor](#create-a-tasklet-with-a-processor) + - [Java](#java) + - [Kotlin](#kotlin) +- [Create a tasklet without a processor](#create-a-tasklet-without-a-processor) + - [Java](#java-1) + - [Kotlin](#kotlin-1) +- [Use a callback](#use-a-callback) + - [Java](#java-2) + - [Kotlin](#kotlin-2) + +A chunk-oriented step in Spring Batch consists of `ItemReader`, `ItemProcessor`, and `ItemWriter`, which are usually defined separately and then assembled to define a step. However, there are some issues with this approach: it is difficult to share data between `ItemReader`, `ItemProcessor`, and `ItemWriter`, and you need to see each respective file to understand the batch flow. Also, if the classes are not reused, they can make the elements of a job less coherent. + +To resolve such issues, `ItemStreamSimpleReaderProcessorWriter` provides an adapter to helps you define `ItemReader`, `ItemProcessor`, and `ItemWriter` in a single file. + +## Create a tasklet with a processor + +You can use `ItemStreamSimpleReaderProcessorWriter` to define `ItemStreamReader`, `ItemProcessor`, and `ItemStreamWriter` in a single class. + +### Java + +In Java, you can convert a tasklet defined using `AdapterFactory` to `ItemStreamReader`, `ItemProcessor`, and `ItemStreamWriter`. + +```java +@Component +@StepScope +class SampleTasklet implements ItemStreamSimpleReaderProcessorWriter { + + @Value("#{jobParameters['totalCount']}") + private long totalCount; + + private int count = 0; + + @Override + public Integer read() { + if (count < totalCount) { + return count++; + } else { + return null; + } + } + + @Override + public String process(@NonNull Integer item) { + return "'" + item.toString() + "'"; + } + + @Override + public void write(@NonNull Chunk chunk) { + System.out.println(chunk.getItems()); + } +} +``` + +```java +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(AdapterFactory.itemStreamReader(sampleTasklet)) + .processor(AdapterFactory.itemProcessor(sampleTasklet)) + .writer(AdapterFactory.itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +You can statically import the method of `AdapterFactory` for better readability. + +```java +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemProcessor; +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader; +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter; + +... + +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(itemStreamReader(sampleTasklet)) + .processor(itemProcessor(sampleTasklet)) + .writer(itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +### Kotlin + +In Kotlin, you can convert a tasklet defined using an extension function to `ItemStreamReader`, `ItemProcessor`, and `ItemStreamWriter`. + +```kotlin +@Component +@StepScope +open class SampleTasklet( + @Value("#{jobParameters['totalCount']}") private var totalCount: Long, +) : ItemStreamSimpleReaderProcessorWriter { + private var count = 0 + + override fun read(): Int? { + return if (count < totalCount) { + count++ + } else { + null + } + } + + override fun process(item: Int): String? { + return "'$item'" + } + + override fun write(chunk: Chunk) { + println(chunk.items) + } +} +``` + +```kotlin +@Configuration +open class TestJobConfig( + private val batch: BatchDsl, + private val transactionManager: PlatformTransactionManager, +) { + @Bean + open fun testJob( + sampleTasklet: SampleTasklet, + ): Job = batch { + job("testJob") { + step("testStep") { + chunk(3, transactionManager) { + reader(sampleTasklet.asItemStreamReader()) + processor(sampleTasklet.asItemProcessor()) + writer(sampleTasklet.asItemStreamWriter()) + } + } + } + } +} +``` + +## Create a tasklet without a processor + +If you need only `ItemStreamReader` and `ItemStreamWriter` without a processor, you can inherit `ItemStreamSimpleReaderWriter` to define `ItemStreamReader` and `ItemStreamWriter` in a single class. + +### Java + +In Java, you can convert a tasklet defined using `AdapterFactory` to `ItemStreamReader` and `ItemStreamWriter`. + +```java +@Component +@StepScope +class SampleTasklet implements ItemStreamSimpleReaderWriter { + + @Value("#{jobParameters['totalCount']}") + private long totalCount; + + private int count = 0; + + @Override + public Integer read() { + if (count < totalCount) { + return count++; + } else { + return null; + } + } + + @Override + public void write(@NonNull Chunk chunk) { + System.out.println(chunk.getItems()); + } +} +``` + +```java +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(AdapterFactory.itemStreamReader(sampleTasklet)) + .writer(AdapterFactory.itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +You can statically import the method of `AdapterFactory` for better readability. + +```java +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader; +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter; + +... + +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(itemStreamReader(sampleTasklet)) + .writer(itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +### Kotlin + +In Kotlin, you can easily convert a tasklet defined using an extension function in Spring Batch Plus to `ItemStreamReader` and `ItemStreamWriter`. + +```Kotlin +@Component +@StepScope +open class SampleTasklet( + @Value("#{jobParameters['totalCount']}") private var totalCount: Long, +) : ItemStreamSimpleReaderWriter { + private var count = 0 + + override fun read(): Int? { + return if (count < totalCount) { + count++ + } else { + null + } + } + + override fun write(chunk: Chunk) { + println(chunk.items) + } +} +``` + +```Kotlin +@Configuration +open class TestJobConfig( + private val batch: BatchDsl, + private val transactionManager: PlatformTransactionManager, +) { + + @Bean + open fun testJob( + sampleTasklet: SampleTasklet, + ): Job = batch { + job("testJob") { + step("testStep") { + chunk(3, transactionManager) { + reader(sampleTasklet.asItemStreamReader()) + writer(sampleTasklet.asItemStreamWriter()) + } + } + } + } +} +``` + +## Use a callback + +You can define a callback method for `ItemStream` of `ItemStreamWriter` in `ItemStreamSimpleReaderProcessorWriter` and `ItemStreamSimpleReaderWriter`. You can selectively define a callback method. + +### Java + +```java +@Component +@StepScope +public class SampleTasklet implements ItemStreamSimpleReaderProcessorWriter { + + @Value("#{jobParameters['totalCount']}") + private long totalCount; + + private int count = 0; + + @Override + public void onOpenRead(@NonNull ExecutionContext executionContext) { + System.out.println("onOpenRead"); + } + + @Override + public Integer read() { + if (count < totalCount) { + return count++; + } else { + return null; + } + } + + @Override + public void onUpdateRead(@NonNull ExecutionContext executionContext) { + System.out.println("onUpdateRead"); + } + + @Override + public void onCloseRead() { + System.out.println("onCloseRead"); + } + + @Override + public String process(@NonNull Integer item) { + return "'" + item.toString() + "'"; + } + + @Override + public void onOpenWrite(@NonNull ExecutionContext executionContext) { + System.out.println("onOpenWrite"); + } + + @Override + public void write(@NonNull Chunk chunk) { + System.out.println(chunk.getItems()); + } + + @Override + public void onUpdateWrite(@NonNull ExecutionContext executionContext) { + System.out.println("onUpdateWrite"); + executionContext.putString("samplekey", "samplevlaue"); + } + + @Override + public void onCloseWrite() { + System.out.println("onCloseWrite"); + } +} +``` + +```java +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(itemStreamReader(sampleTasklet)) + .processor(itemProcessor(sampleTasklet)) + .writer(itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +### Kotlin + +```kotlin +@Component +@StepScope +open class SampleTasklet( + @Value("#{jobParameters['totalCount']}") private var totalCount: Long, +) : ItemStreamSimpleReaderProcessorWriter { + private var count = 0 + + override fun onOpenRead(executionContext: ExecutionContext) { + println("onOpenRead") + } + + override fun read(): Int? { + return if (count < totalCount) { + count++ + } else { + null + } + } + + override fun onUpdateRead(executionContext: ExecutionContext) { + println("onUpdateRead") + } + + override fun onCloseRead() { + println("onCloseRead") + } + + override fun process(item: Int): String? { + return "'$item'" + } + + override fun onOpenWrite(executionContext: ExecutionContext) { + println("onOpenWrite") + } + + override fun write(chunk: Chunk) { + println(chunk.items) + } + + override fun onUpdateWrite(executionContext: ExecutionContext) { + println("onUpdateWrite") + executionContext.putString("samplekey", "samplevalue") + } + + override fun onCloseWrite() { + println("onCloseWrite") + } +} +``` + +```kotlin +@Configuration +open class TestJobConfig( + private val batch: BatchDsl, + private val transactionManager: PlatformTransactionManager, +) { + + @Bean + open fun testJob( + sampleTasklet: SampleTasklet, + ): Job = batch { + job("testJob") { + step("testStep") { + chunk(3, transactionManager) { + reader(sampleTasklet.asItemStreamReader()) + processor(sampleTasklet.asItemProcessor()) + writer(sampleTasklet.asItemStreamWriter()) + } + } + } + } +} +``` diff --git a/doc/ko/README.md b/doc/ko/README.md index 0f814f5..861a678 100644 --- a/doc/ko/README.md +++ b/doc/ko/README.md @@ -4,12 +4,15 @@ Spring Batch Plus는 [Spring Batch](https://github.com/spring-projects/spring-ba ## 사용자 가이드 -Kotlin DSL은 `JobBuilder`, `StepBuilder`, `FlowBuilder`를 이용하지 않고 Kotlin의 [Type-safe builder](https://kotlinlang.org/docs/type-safe-builders.html)를 이용하여 선언적으로 `Job`, `Step`, `Flow`를 선언할 수 있는 기능을 제공합니다. `ClearRunIdIncrementer`는 Spring Batch에서 제공하는 `RunIdIncrementer`를 대신하는 class로 `RunIdIncrementer`가 이전의 JobExecution에 있는 JobParameter를 재사용하는 문제를 해결한 class 입니다. `DeleteMetadataJob`은 오래된 metadata를 삭제하는 기능을 제공해주는 `Job` 입니다. 그리고 `ItemStreamFluxReaderProcessorWriter`는 `ItemStreamReader`, `ItemProcessor`, `ItemStreamWriter`를 단일 class로 구현할 수 있게 합니다. +Kotlin DSL은 `JobBuilder`, `StepBuilder`, `FlowBuilder`를 이용하지 않고 Kotlin의 [Type-safe builder](https://kotlinlang.org/docs/type-safe-builders.html)를 이용하여 선언적으로 `Job`, `Step`, `Flow`를 선언할 수 있는 기능을 제공합니다. `ClearRunIdIncrementer`는 Spring Batch에서 제공하는 `RunIdIncrementer`를 대신하는 class로 `RunIdIncrementer`가 이전의 JobExecution에 있는 JobParameter를 재사용하는 문제를 해결한 class 입니다. `DeleteMetadataJob`은 오래된 metadata를 삭제하는 기능을 제공해주는 `Job` 입니다. 그리고 `ItemStreamFluxReaderProcessorWriter`, `ItemStreamIterableReaderProcessorWriter`, `ItemStreamIteratorReaderProcessorWriter`, `ItemStreamSimpleReaderProcessorWriter` 는 `ItemStreamReader`, `ItemProcessor`, `ItemStreamWriter`를 단일 class로 구현할 수 있게 합니다. - [Kotlin DSL](./configuration/kotlin-dsl/README.md) - [ClearRunIdIncrementer](./job/clear-run-id-incrementer.md) - [DeleteMetadataJob](./job/delete-metadata-job.md) - [ItemStreamFluxReaderProcessorWriter](./step/item-stream-flux-reader-processor-writer.md) +- [ItemStreamIterableReaderProcessorWriter](./step/item-stream-iterable-reader-processor-writer.md) +- [ItemStreamIteratorReaderProcessorWriter](./step/item-stream-iterator-reader-processor-writer.md) +- [ItemStreamSimpleReaderProcessorWriter](./step/item-stream-simple-reader-processor-writer.md) ## 예제 코드 diff --git a/doc/ko/step/item-stream-flux-reader-processor-writer.md b/doc/ko/step/item-stream-flux-reader-processor-writer.md index 692be83..8f72653 100644 --- a/doc/ko/step/item-stream-flux-reader-processor-writer.md +++ b/doc/ko/step/item-stream-flux-reader-processor-writer.md @@ -12,9 +12,9 @@ Spring 진영에서는 Reactive library로 [Reactor](https://projectreactor.io/)를 사용하고 있습니다. Reactor에서는 여러 데이터에 대한 stream을 `Flux`로 제공합니다. `Flux`로 읽은 데이터를 Spring Batch의 `ItemReader`에서 사용하기 위해서는 `Flux`에서 단일 Item씩 뽑아내서 리턴하는 작업이 필요합니다. -Spring Batch의 Chunk-oriented Step은 `ItemReader`, `ItemProcessor`, `ItmeWriter`로 구성됩니다. Spring Batch에서는 일반적으로 `ItemReader`, `ItemProcessor`, `ItmeWriter`를 각각 정의하고 이를 Step을 정의할 때 조립해서 사용합니다. 그런데 이 경우 `ItemReader`, `ItemProcessor`, `ItmeWriter`간에 데이터 공유가 힘들고 배치의 흐름을 알기 위해서는 `ItemReader`, `ItemProcessor`, `ItmeWriter`파일 각각을 살펴봐야 한다는 문제점이 있습니다. 또한 해당 클래스들이 재활용 되지 않는 케이스라면 Job의 응집도를 해치는 요소가 될 수 있습니다. +Spring Batch의 Chunk-oriented Step은 `ItemReader`, `ItemProcessor`, `ItemWriter`로 구성됩니다. Spring Batch에서는 일반적으로 `ItemReader`, `ItemProcessor`, `ItemWriter`를 각각 정의하고 이를 Step을 정의할 때 조립해서 사용합니다. 그런데 이 경우 `ItemReader`, `ItemProcessor`, `ItemWriter`간에 데이터 공유가 힘들고 배치의 흐름을 알기 위해서는 `ItemReader`, `ItemProcessor`, `ItemWriter`파일 각각을 살펴봐야 한다는 문제점이 있습니다. 또한 해당 클래스들이 재활용 되지 않는 케이스라면 Job의 응집도를 해치는 요소가 될 수 있습니다. -이 두가지 이슈를 해결하기 위해 `ItemStreamFluxReaderProcessorWriter`는 `Flux`를 사용할 수 있는 Adaptor와 단일 파일에서 `ItemReader`, `ItemProcessor`, `ItmeWriter` 정의할 수 있는 기능을 제공핣니다. +이 두가지 이슈를 해결하기 위해 `ItemStreamFluxReaderProcessorWriter`는 `Flux`기반으로 단일 파일에서 `ItemReader`, `ItemProcessor`, `ItemWriter` 정의할 수 있는 기능을 제공핣니다. Spring Batch Plus에서는 reactor를 compileOnly로 의존하기 때문에 `ItemStreamFluxReaderProcessorWriter`를 사용하기 위해서는 직접 Reactor 의존성을 추가해야 합니다. @@ -30,7 +30,7 @@ dependencies { ### Java -Java의 경우 `AdapterFactery`를 이용해서 정의한 Tasklet을 `ItemStreamReader`, `ItemProcessor`, `ItemStreamWriter`로 변환하여 사용할 수 있습니다. +Java의 경우 `AdapterFactory`를 이용해서 정의한 Tasklet을 `ItemStreamReader`, `ItemProcessor`, `ItemStreamWriter`로 변환하여 사용할 수 있습니다. ```java @Component diff --git a/doc/ko/step/item-stream-iterable-reader-processor-writer.md b/doc/ko/step/item-stream-iterable-reader-processor-writer.md new file mode 100644 index 0000000..b694ecc --- /dev/null +++ b/doc/ko/step/item-stream-iterable-reader-processor-writer.md @@ -0,0 +1,513 @@ +# ItemStreamIterableReaderProcessorWriter + +- [processor를 포함하여 Tasklet 작성하기](#processor를-포함하여-tasklet-작성하기) + - [Java](#java) + - [Kotlin](#kotlin) +- [Processor 없이 Tasklet 작성하기](#processor-없이-tasklet-작성하기) + - [Java](#java-1) + - [Kotlin](#kotlin-1) +- [Callback 사용하기](#callback-사용하기) + - [Java](#java-2) + - [Kotlin](#kotlin-2) + +Spring Batch의 Chunk-oriented Step은 `ItemReader`, `ItemProcessor`, `ItemWriter`로 구성됩니다. Spring Batch에서는 일반적으로 `ItemReader`, `ItemProcessor`, `ItemWriter`를 각각 정의하고 이를 Step을 정의할 때 조립해서 사용합니다. 그런데 이 경우 `ItemReader`, `ItemProcessor`, `ItemWriter`간에 데이터 공유가 힘들고 배치의 흐름을 알기 위해서는 `ItemReader`, `ItemProcessor`, `ItemWriter`파일 각각을 살펴봐야 한다는 문제점이 있습니다. 또한 해당 클래스들이 재활용 되지 않는 케이스라면 Job의 응집도를 해치는 요소가 될 수 있습니다. + +`ItemStreamIterableReaderProcessorWriter`는 `Iterable` 기반으로 단일 파일에서 `ItemReader`, `ItemProcessor`, `ItemWriter` 정의할 수 있는 기능을 제공핣니다. + +## processor를 포함하여 Tasklet 작성하기 + +`ItemStreamIterableReaderProcessorWriter`를 사용해서 단일 class에서 `ItemStreamReader`, `ItemProcessor`, `ItemStreamWriter`를 정의할 수 있습니다. + +### Java + +Java의 경우 `AdapterFactory`를 이용해서 정의한 Tasklet을 `ItemStreamReader`, `ItemProcessor`, `ItemStreamWriter`로 변환하여 사용할 수 있습니다. + +```java +@Component +@StepScope +class SampleTasklet implements ItemStreamIterableReaderProcessorWriter { + + @Value("#{jobParameters['totalCount']}") + private long totalCount; + + private int count = 0; + + @NonNull + @Override + public Iterable readIterable(@NonNull ExecutionContext executionContext) { + System.out.println("totalCount: " + totalCount); + return () -> new Iterator<>() { + @Override + public boolean hasNext() { + return count < totalCount; + } + + @Override + public Integer next() { + return count++; + } + }; + } + + @Override + public String process(@NonNull Integer item) { + return "'" + item.toString() + "'"; + } + + @Override + public void write(@NonNull Chunk chunk) { + System.out.println(chunk.getItems()); + } +} +``` + +```java +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(AdapterFactory.itemStreamReader(sampleTasklet)) + .processor(AdapterFactory.itemProcessor(sampleTasklet)) + .writer(AdapterFactory.itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +이 경우 `AdapterFactory`의 method를 static import를 해서 사용하는게 미관상 보기 더 좋습니다. + +```java +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemProcessor; +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader; +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter; + +... + +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(itemStreamReader(sampleTasklet)) + .processor(itemProcessor(sampleTasklet)) + .writer(itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +### Kotlin + +Kotlin에서는 extension function을 이용하여 정의한 Tasklet을 `ItemStreamReader`, `ItemProcessor`, `ItemStreamWriter`로 변환하여 사용할 수 있습니다. + +```kotlin +@Component +@StepScope +open class SampleTasklet( + @Value("#{jobParameters['totalCount']}") private var totalCount: Long, +) : ItemStreamIterableReaderProcessorWriter { + private var count = 0 + + override fun readIterable(executionContext: ExecutionContext): Iterable { + println("totalCount: $totalCount") + return Iterable { + object : Iterator { + override fun hasNext(): Boolean { + return count < totalCount + } + + override fun next(): Int { + return count++ + } + } + } + } + + override fun process(item: Int): String? { + return "'$item'" + } + + override fun write(chunk: Chunk) { + println(chunk.items) + } +} +``` + +```kotlin +@Configuration +open class TestJobConfig( + private val batch: BatchDsl, + private val transactionManager: PlatformTransactionManager, +) { + @Bean + open fun testJob( + sampleTasklet: SampleTasklet, + ): Job = batch { + job("testJob") { + step("testStep") { + chunk(3, transactionManager) { + reader(sampleTasklet.asItemStreamReader()) + processor(sampleTasklet.asItemProcessor()) + writer(sampleTasklet.asItemStreamWriter()) + } + } + } + } +} +``` + +## Processor 없이 Tasklet 작성하기 + +Process 과정이 불필요하고 `ItemStreamReader` 와 `ItemStreamWriter` 만 필요하다면 `ItemStreamIterableReaderWriter`를 상속하여 단일 class에서 `ItemStreamReader`, `ItemStreamWriter`를 정의할 수 있습니다. + +### Java + +Java의 경우 `AdapterFactory`를 이용해서 정의한 Tasklet을 `ItemStreamReader`, `ItemStreamWriter`로 변환하여 사용할 수 있습니다. + +```java +@Component +@StepScope +class SampleTasklet implements ItemStreamIterableReaderWriter { + + @Value("#{jobParameters['totalCount']}") + private long totalCount; + + private int count = 0; + + @NonNull + @Override + public Iterable readIterable(@NonNull ExecutionContext executionContext) { + System.out.println("totalCount: " + totalCount); + return () -> new Iterator<>() { + @Override + public boolean hasNext() { + return count < totalCount; + } + + @Override + public Integer next() { + return count++; + } + }; + } + + @Override + public void write(@NonNull Chunk chunk) { + System.out.println(chunk.getItems()); + } +} +``` + +```java +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(AdapterFactory.itemStreamReader(sampleTasklet)) + .writer(AdapterFactory.itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +이 경우 `AdapterFactory`의 method를 static import를 해서 사용하는게 미관상 보기 더 좋습니다. + +```java +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader; +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter; + +... + +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(itemStreamReader(sampleTasklet)) + .writer(itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +### Kotlin + +Kotlin 사용시에는 Spring Batch Plus가 제공하는 extension function 을 사용하여 정의한 Tasklet을 `ItemStreamReader`, `ItemStreamWriter`로 편리하게 변환할 수 있습니다. + +```Kotlin +@Component +@StepScope +open class SampleTasklet( + @Value("#{jobParameters['totalCount']}") private var totalCount: Long, +) : ItemStreamIterableReaderWriter { + private var count = 0 + + override fun readIterable(executionContext: ExecutionContext): Iterable { + println("totalCount: $totalCount") + return Iterable { + object : Iterator { + override fun hasNext(): Boolean { + return count < totalCount + } + + override fun next(): Int { + return count++ + } + } + } + } + + override fun write(chunk: Chunk) { + println(chunk.items) + } +} +``` + +```Kotlin +@Configuration +open class TestJobConfig( + private val batch: BatchDsl, + private val transactionManager: PlatformTransactionManager, +) { + + @Bean + open fun testJob( + sampleTasklet: SampleTasklet, + ): Job = batch { + job("testJob") { + step("testStep") { + chunk(3, transactionManager) { + reader(sampleTasklet.asItemStreamReader()) + writer(sampleTasklet.asItemStreamWriter()) + } + } + } + } +} +``` + +## Callback 사용하기 + +`ItemStreamIterableReaderProcessorWriter`, `ItemStreamIterableReaderWriter` 에는 `ItemStreamReader`, `ItemStreamWriter`의 `ItemStream`에 대한 callback method도 같이 정의할 수 있습니다. Callback method는 선택적으로 정의할 수 있습니다. + +### Java + +```java +@Component +@StepScope +public class SampleTasklet implements ItemStreamIterableReaderProcessorWriter { + + @Value("#{jobParameters['totalCount']}") + private long totalCount; + + private int count = 0; + + @Override + public void onOpenRead(@NonNull ExecutionContext executionContext) { + System.out.println("onOpenRead"); + } + + @NonNull + @Override + public Iterable readIterable(@NonNull ExecutionContext executionContext) { + System.out.println("totalCount: " + totalCount); + return () -> new Iterator<>() { + @Override + public boolean hasNext() { + return count < totalCount; + } + + @Override + public Integer next() { + return count++; + } + }; + } + + @Override + public void onUpdateRead(@NonNull ExecutionContext executionContext) { + System.out.println("onUpdateRead"); + } + + @Override + public void onCloseRead() { + System.out.println("onCloseRead"); + } + + @Override + public String process(@NonNull Integer item) { + return "'" + item.toString() + "'"; + } + + @Override + public void onOpenWrite(@NonNull ExecutionContext executionContext) { + System.out.println("onOpenWrite"); + } + + @Override + public void write(@NonNull Chunk chunk) { + System.out.println(chunk.getItems()); + } + + @Override + public void onUpdateWrite(@NonNull ExecutionContext executionContext) { + System.out.println("onUpdateWrite"); + executionContext.putString("samplekey", "samplevlaue"); + } + + @Override + public void onCloseWrite() { + System.out.println("onCloseWrite"); + } +} +``` + +```java +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(itemStreamReader(sampleTasklet)) + .processor(itemProcessor(sampleTasklet)) + .writer(itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +### Kotlin + +```kotlin +@Component +@StepScope +open class SampleTasklet( + @Value("#{jobParameters['totalCount']}") private var totalCount: Long, +) : ItemStreamIterableReaderProcessorWriter { + private var count = 0 + + override fun onOpenRead(executionContext: ExecutionContext) { + println("onOpenRead") + } + + override fun readIterable(executionContext: ExecutionContext): Iterable { + println("totalCount: $totalCount") + return Iterable { + object : Iterator { + override fun hasNext(): Boolean { + return count < totalCount + } + + override fun next(): Int { + return count++ + } + } + } + } + + override fun onUpdateRead(executionContext: ExecutionContext) { + println("onUpdateRead") + } + + override fun onCloseRead() { + println("onCloseRead") + } + + override fun process(item: Int): String? { + return "'$item'" + } + + override fun onOpenWrite(executionContext: ExecutionContext) { + println("onOpenWrite") + } + + override fun write(chunk: Chunk) { + println(chunk.items) + } + + override fun onUpdateWrite(executionContext: ExecutionContext) { + println("onUpdateWrite") + executionContext.putString("samplekey", "samplevalue") + } + + override fun onCloseWrite() { + println("onCloseWrite") + } +} +``` + +```kotlin +@Configuration +open class TestJobConfig( + private val batch: BatchDsl, + private val transactionManager: PlatformTransactionManager, +) { + + @Bean + open fun testJob( + sampleTasklet: SampleTasklet, + ): Job = batch { + job("testJob") { + step("testStep") { + chunk(3, transactionManager) { + reader(sampleTasklet.asItemStreamReader()) + processor(sampleTasklet.asItemProcessor()) + writer(sampleTasklet.asItemStreamWriter()) + } + } + } + } +} +``` diff --git a/doc/ko/step/item-stream-iterator-reader-processor-writer.md b/doc/ko/step/item-stream-iterator-reader-processor-writer.md new file mode 100644 index 0000000..404a3b9 --- /dev/null +++ b/doc/ko/step/item-stream-iterator-reader-processor-writer.md @@ -0,0 +1,507 @@ +# ItemStreamIteratorReaderProcessorWriter + +- [processor를 포함하여 Tasklet 작성하기](#processor를-포함하여-tasklet-작성하기) + - [Java](#java) + - [Kotlin](#kotlin) +- [Processor 없이 Tasklet 작성하기](#processor-없이-tasklet-작성하기) + - [Java](#java-1) + - [Kotlin](#kotlin-1) +- [Callback 사용하기](#callback-사용하기) + - [Java](#java-2) + - [Kotlin](#kotlin-2) + +Spring Batch의 Chunk-oriented Step은 `ItemReader`, `ItemProcessor`, `ItemWriter`로 구성됩니다. Spring Batch에서는 일반적으로 `ItemReader`, `ItemProcessor`, `ItemWriter`를 각각 정의하고 이를 Step을 정의할 때 조립해서 사용합니다. 그런데 이 경우 `ItemReader`, `ItemProcessor`, `ItemWriter`간에 데이터 공유가 힘들고 배치의 흐름을 알기 위해서는 `ItemReader`, `ItemProcessor`, `ItemWriter`파일 각각을 살펴봐야 한다는 문제점이 있습니다. 또한 해당 클래스들이 재활용 되지 않는 케이스라면 Job의 응집도를 해치는 요소가 될 수 있습니다. + +`ItemStreamIteratorReaderProcessorWriter`는 `Iterator` 기반으로 단일 파일에서 `ItemReader`, `ItemProcessor`, `ItemWriter` 정의할 수 있는 기능을 제공핣니다. + +## processor를 포함하여 Tasklet 작성하기 + +`ItemStreamIteratorReaderProcessorWriter`를 사용해서 단일 class에서 `ItemStreamReader`, `ItemProcessor`, `ItemStreamWriter`를 정의할 수 있습니다. + +### Java + +Java의 경우 `AdapterFactory`를 이용해서 정의한 Tasklet을 `ItemStreamReader`, `ItemProcessor`, `ItemStreamWriter`로 변환하여 사용할 수 있습니다. + +```java +@Component +@StepScope +class SampleTasklet implements ItemStreamIteratorReaderProcessorWriter { + + @Value("#{jobParameters['totalCount']}") + private long totalCount; + + private int count = 0; + + @NonNull + @Override + public Iterator readIterator(@NonNull ExecutionContext executionContext) { + System.out.println("totalCount: " + totalCount); + return new Iterator<>() { + @Override + public boolean hasNext() { + return count < totalCount; + } + + @Override + public Integer next() { + return count++; + } + }; + } + + @Override + public String process(@NonNull Integer item) { + return "'" + item.toString() + "'"; + } + + @Override + public void write(@NonNull Chunk chunk) { + System.out.println(chunk.getItems()); + } +} +``` + +```java +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(AdapterFactory.itemStreamReader(sampleTasklet)) + .processor(AdapterFactory.itemProcessor(sampleTasklet)) + .writer(AdapterFactory.itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +이 경우 `AdapterFactory`의 method를 static import를 해서 사용하는게 미관상 보기 더 좋습니다. + +```java +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemProcessor; +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader; +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter; + +... + +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(itemStreamReader(sampleTasklet)) + .processor(itemProcessor(sampleTasklet)) + .writer(itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +### Kotlin + +Kotlin에서는 extension function을 이용하여 정의한 Tasklet을 `ItemStreamReader`, `ItemProcessor`, `ItemStreamWriter`로 변환하여 사용할 수 있습니다. + +```kotlin +@Component +@StepScope +open class SampleTasklet( + @Value("#{jobParameters['totalCount']}") private var totalCount: Long, +) : ItemStreamIteratorReaderProcessorWriter { + private var count = 0 + + override fun readIterator(executionContext: ExecutionContext): Iterator { + println("totalCount: $totalCount") + return object : Iterator { + override fun hasNext(): Boolean { + return count < totalCount + } + + override fun next(): Int { + return count++ + } + } + } + + override fun process(item: Int): String? { + return "'$item'" + } + + override fun write(chunk: Chunk) { + println(chunk.items) + } +} +``` + +```kotlin +@Configuration +open class TestJobConfig( + private val batch: BatchDsl, + private val transactionManager: PlatformTransactionManager, +) { + @Bean + open fun testJob( + sampleTasklet: SampleTasklet, + ): Job = batch { + job("testJob") { + step("testStep") { + chunk(3, transactionManager) { + reader(sampleTasklet.asItemStreamReader()) + processor(sampleTasklet.asItemProcessor()) + writer(sampleTasklet.asItemStreamWriter()) + } + } + } + } +} +``` + +## Processor 없이 Tasklet 작성하기 + +Process 과정이 불필요하고 `ItemStreamReader` 와 `ItemStreamWriter` 만 필요하다면 `ItemStreamIteratorReaderWriter`를 상속하여 단일 class에서 `ItemStreamReader`, `ItemStreamWriter`를 정의할 수 있습니다. + +### Java + +Java의 경우 `AdapterFactory`를 이용해서 정의한 Tasklet을 `ItemStreamReader`, `ItemStreamWriter`로 변환하여 사용할 수 있습니다. + +```java +@Component +@StepScope +class SampleTasklet implements ItemStreamIteratorReaderWriter { + + @Value("#{jobParameters['totalCount']}") + private long totalCount; + + private int count = 0; + + @NonNull + @Override + public Iterator readIterator(@NonNull ExecutionContext executionContext) { + System.out.println("totalCount: " + totalCount); + return new Iterator<>() { + @Override + public boolean hasNext() { + return count < totalCount; + } + + @Override + public Integer next() { + return count++; + } + }; + } + + @Override + public void write(@NonNull Chunk chunk) { + System.out.println(chunk.getItems()); + } +} +``` + +```java +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(AdapterFactory.itemStreamReader(sampleTasklet)) + .writer(AdapterFactory.itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +이 경우 `AdapterFactory`의 method를 static import를 해서 사용하는게 미관상 보기 더 좋습니다. + +```java +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader; +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter; + +... + +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(itemStreamReader(sampleTasklet)) + .writer(itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +### Kotlin + +Kotlin 사용시에는 Spring Batch Plus가 제공하는 extension function 을 사용하여 정의한 Tasklet을 `ItemStreamReader`, `ItemStreamWriter`로 편리하게 변환할 수 있습니다. + +```Kotlin +@Component +@StepScope +open class SampleTasklet( + @Value("#{jobParameters['totalCount']}") private var totalCount: Long, +) : ItemStreamIteratorReaderWriter { + private var count = 0 + + override fun readIterator(executionContext: ExecutionContext): Iterator { + println("totalCount: $totalCount") + return object : Iterator { + override fun hasNext(): Boolean { + return count < totalCount + } + + override fun next(): Int { + return count++ + } + } + } + + override fun write(chunk: Chunk) { + println(chunk.items) + } +} +``` + +```Kotlin +@Configuration +open class TestJobConfig( + private val batch: BatchDsl, + private val transactionManager: PlatformTransactionManager, +) { + + @Bean + open fun testJob( + sampleTasklet: SampleTasklet, + ): Job = batch { + job("testJob") { + step("testStep") { + chunk(3, transactionManager) { + reader(sampleTasklet.asItemStreamReader()) + writer(sampleTasklet.asItemStreamWriter()) + } + } + } + } +} +``` + +## Callback 사용하기 + +`ItemStreamIteratorReaderProcessorWriter`, `ItemStreamIteratorReaderWriter` 에는 `ItemStreamReader`, `ItemStreamWriter`의 `ItemStream`에 대한 callback method도 같이 정의할 수 있습니다. Callback method는 선택적으로 정의할 수 있습니다. + +### Java + +```java +@Component +@StepScope +public class SampleTasklet implements ItemStreamIteratorReaderProcessorWriter { + + @Value("#{jobParameters['totalCount']}") + private long totalCount; + + private int count = 0; + + @Override + public void onOpenRead(@NonNull ExecutionContext executionContext) { + System.out.println("onOpenRead"); + } + + @NonNull + @Override + public Iterator readIterator(@NonNull ExecutionContext executionContext) { + System.out.println("totalCount: " + totalCount); + return new Iterator<>() { + @Override + public boolean hasNext() { + return count < totalCount; + } + + @Override + public Integer next() { + return count++; + } + }; + } + + @Override + public void onUpdateRead(@NonNull ExecutionContext executionContext) { + System.out.println("onUpdateRead"); + } + + @Override + public void onCloseRead() { + System.out.println("onCloseRead"); + } + + @Override + public String process(@NonNull Integer item) { + return "'" + item.toString() + "'"; + } + + @Override + public void onOpenWrite(@NonNull ExecutionContext executionContext) { + System.out.println("onOpenWrite"); + } + + @Override + public void write(@NonNull Chunk chunk) { + System.out.println(chunk.getItems()); + } + + @Override + public void onUpdateWrite(@NonNull ExecutionContext executionContext) { + System.out.println("onUpdateWrite"); + executionContext.putString("samplekey", "samplevlaue"); + } + + @Override + public void onCloseWrite() { + System.out.println("onCloseWrite"); + } +} +``` + +```java +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(itemStreamReader(sampleTasklet)) + .processor(itemProcessor(sampleTasklet)) + .writer(itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +### Kotlin + +```kotlin +@Component +@StepScope +open class SampleTasklet( + @Value("#{jobParameters['totalCount']}") private var totalCount: Long, +) : ItemStreamIteratorReaderProcessorWriter { + private var count = 0 + + override fun onOpenRead(executionContext: ExecutionContext) { + println("onOpenRead") + } + + override fun readIterator(executionContext: ExecutionContext): Iterator { + println("totalCount: $totalCount") + return object : Iterator { + override fun hasNext(): Boolean { + return count < totalCount + } + + override fun next(): Int { + return count++ + } + } + } + + override fun onUpdateRead(executionContext: ExecutionContext) { + println("onUpdateRead") + } + + override fun onCloseRead() { + println("onCloseRead") + } + + override fun process(item: Int): String? { + return "'$item'" + } + + override fun onOpenWrite(executionContext: ExecutionContext) { + println("onOpenWrite") + } + + override fun write(chunk: Chunk) { + println(chunk.items) + } + + override fun onUpdateWrite(executionContext: ExecutionContext) { + println("onUpdateWrite") + executionContext.putString("samplekey", "samplevalue") + } + + override fun onCloseWrite() { + println("onCloseWrite") + } +} +``` + +```kotlin +@Configuration +open class TestJobConfig( + private val batch: BatchDsl, + private val transactionManager: PlatformTransactionManager, +) { + + @Bean + open fun testJob( + sampleTasklet: SampleTasklet, + ): Job = batch { + job("testJob") { + step("testStep") { + chunk(3, transactionManager) { + reader(sampleTasklet.asItemStreamReader()) + processor(sampleTasklet.asItemProcessor()) + writer(sampleTasklet.asItemStreamWriter()) + } + } + } + } +} +``` diff --git a/doc/ko/step/item-stream-simple-reader-processor-writer.md b/doc/ko/step/item-stream-simple-reader-processor-writer.md new file mode 100644 index 0000000..6620026 --- /dev/null +++ b/doc/ko/step/item-stream-simple-reader-processor-writer.md @@ -0,0 +1,468 @@ +# ItemStreamSimpleReaderProcessorWriter + +- [processor를 포함하여 Tasklet 작성하기](#processor를-포함하여-tasklet-작성하기) + - [Java](#java) + - [Kotlin](#kotlin) +- [Processor 없이 Tasklet 작성하기](#processor-없이-tasklet-작성하기) + - [Java](#java-1) + - [Kotlin](#kotlin-1) +- [Callback 사용하기](#callback-사용하기) + - [Java](#java-2) + - [Kotlin](#kotlin-2) + +Spring Batch의 Chunk-oriented Step은 `ItemReader`, `ItemProcessor`, `ItemWriter`로 구성됩니다. Spring Batch에서는 일반적으로 `ItemReader`, `ItemProcessor`, `ItemWriter`를 각각 정의하고 이를 Step을 정의할 때 조립해서 사용합니다. 그런데 이 경우 `ItemReader`, `ItemProcessor`, `ItemWriter`간에 데이터 공유가 힘들고 배치의 흐름을 알기 위해서는 `ItemReader`, `ItemProcessor`, `ItemWriter`파일 각각을 살펴봐야 한다는 문제점이 있습니다. 또한 해당 클래스들이 재활용 되지 않는 케이스라면 Job의 응집도를 해치는 요소가 될 수 있습니다. + +`ItemStreamSimpleReaderProcessorWriter`는 단일 파일에서 `ItemReader`, `ItemProcessor`, `ItemWriter` 정의할 수 있는 기능을 제공핣니다. + +## processor를 포함하여 Tasklet 작성하기 + +`ItemStreamSimpleReaderProcessorWriter`를 사용해서 단일 class에서 `ItemStreamReader`, `ItemProcessor`, `ItemStreamWriter`를 정의할 수 있습니다. + +### Java + +Java의 경우 `AdapterFactory`를 이용해서 정의한 Tasklet을 `ItemStreamReader`, `ItemProcessor`, `ItemStreamWriter`로 변환하여 사용할 수 있습니다. + +```java +@Component +@StepScope +class SampleTasklet implements ItemStreamSimpleReaderProcessorWriter { + + @Value("#{jobParameters['totalCount']}") + private long totalCount; + + private int count = 0; + + @Override + public Integer read() { + if (count < totalCount) { + return count++; + } else { + return null; + } + } + + @Override + public String process(@NonNull Integer item) { + return "'" + item.toString() + "'"; + } + + @Override + public void write(@NonNull Chunk chunk) { + System.out.println(chunk.getItems()); + } +} +``` + +```java +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(AdapterFactory.itemStreamReader(sampleTasklet)) + .processor(AdapterFactory.itemProcessor(sampleTasklet)) + .writer(AdapterFactory.itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +이 경우 `AdapterFactory`의 method를 static import를 해서 사용하는게 미관상 보기 더 좋습니다. + +```java +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemProcessor; +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader; +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter; + +... + +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(itemStreamReader(sampleTasklet)) + .processor(itemProcessor(sampleTasklet)) + .writer(itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +### Kotlin + +Kotlin에서는 extension function을 이용하여 정의한 Tasklet을 `ItemStreamReader`, `ItemProcessor`, `ItemStreamWriter`로 변환하여 사용할 수 있습니다. + +```kotlin +@Component +@StepScope +open class SampleTasklet( + @Value("#{jobParameters['totalCount']}") private var totalCount: Long, +) : ItemStreamSimpleReaderProcessorWriter { + private var count = 0 + + override fun read(): Int? { + return if (count < totalCount) { + count++ + } else { + null + } + } + + override fun process(item: Int): String? { + return "'$item'" + } + + override fun write(chunk: Chunk) { + println(chunk.items) + } +} +``` + +```kotlin +@Configuration +open class TestJobConfig( + private val batch: BatchDsl, + private val transactionManager: PlatformTransactionManager, +) { + @Bean + open fun testJob( + sampleTasklet: SampleTasklet, + ): Job = batch { + job("testJob") { + step("testStep") { + chunk(3, transactionManager) { + reader(sampleTasklet.asItemStreamReader()) + processor(sampleTasklet.asItemProcessor()) + writer(sampleTasklet.asItemStreamWriter()) + } + } + } + } +} +``` + +## Processor 없이 Tasklet 작성하기 + +Process 과정이 불필요하고 `ItemStreamReader` 와 `ItemStreamWriter` 만 필요하다면 `ItemStreamSimpleReaderWriter`를 상속하여 단일 class에서 `ItemStreamReader`, `ItemStreamWriter`를 정의할 수 있습니다. + +### Java + +Java의 경우 `AdapterFactory`를 이용해서 정의한 Tasklet을 `ItemStreamReader`, `ItemStreamWriter`로 변환하여 사용할 수 있습니다. + +```java +@Component +@StepScope +class SampleTasklet implements ItemStreamSimpleReaderWriter { + + @Value("#{jobParameters['totalCount']}") + private long totalCount; + + private int count = 0; + + @Override + public Integer read() { + if (count < totalCount) { + return count++; + } else { + return null; + } + } + + @Override + public void write(@NonNull Chunk chunk) { + System.out.println(chunk.getItems()); + } +} +``` + +```java +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(AdapterFactory.itemStreamReader(sampleTasklet)) + .writer(AdapterFactory.itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +이 경우 `AdapterFactory`의 method를 static import를 해서 사용하는게 미관상 보기 더 좋습니다. + +```java +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader; +import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter; + +... + +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(itemStreamReader(sampleTasklet)) + .writer(itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +### Kotlin + +Kotlin 사용시에는 Spring Batch Plus가 제공하는 extension function 을 사용하여 정의한 Tasklet을 `ItemStreamReader`, `ItemStreamWriter`로 편리하게 변환할 수 있습니다. + +```Kotlin +@Component +@StepScope +open class SampleTasklet( + @Value("#{jobParameters['totalCount']}") private var totalCount: Long, +) : ItemStreamSimpleReaderWriter { + private var count = 0 + + override fun read(): Int? { + return if (count < totalCount) { + count++ + } else { + null + } + } + + override fun write(chunk: Chunk) { + println(chunk.items) + } +} +``` + +```Kotlin +@Configuration +open class TestJobConfig( + private val batch: BatchDsl, + private val transactionManager: PlatformTransactionManager, +) { + + @Bean + open fun testJob( + sampleTasklet: SampleTasklet, + ): Job = batch { + job("testJob") { + step("testStep") { + chunk(3, transactionManager) { + reader(sampleTasklet.asItemStreamReader()) + writer(sampleTasklet.asItemStreamWriter()) + } + } + } + } +} +``` + +## Callback 사용하기 + +`ItemStreamSimpleReaderProcessorWriter`, `ItemStreamSimpleReaderWriter` 에는 `ItemStreamReader`, `ItemStreamWriter`의 `ItemStream`에 대한 callback method도 같이 정의할 수 있습니다. Callback method는 선택적으로 정의할 수 있습니다. + +### Java + +```java +@Component +@StepScope +public class SampleTasklet implements ItemStreamSimpleReaderProcessorWriter { + + @Value("#{jobParameters['totalCount']}") + private long totalCount; + + private int count = 0; + + @Override + public void onOpenRead(@NonNull ExecutionContext executionContext) { + System.out.println("onOpenRead"); + } + + @Override + public Integer read() { + if (count < totalCount) { + return count++; + } else { + return null; + } + } + + @Override + public void onUpdateRead(@NonNull ExecutionContext executionContext) { + System.out.println("onUpdateRead"); + } + + @Override + public void onCloseRead() { + System.out.println("onCloseRead"); + } + + @Override + public String process(@NonNull Integer item) { + return "'" + item.toString() + "'"; + } + + @Override + public void onOpenWrite(@NonNull ExecutionContext executionContext) { + System.out.println("onOpenWrite"); + } + + @Override + public void write(@NonNull Chunk chunk) { + System.out.println(chunk.getItems()); + } + + @Override + public void onUpdateWrite(@NonNull ExecutionContext executionContext) { + System.out.println("onUpdateWrite"); + executionContext.putString("samplekey", "samplevlaue"); + } + + @Override + public void onCloseWrite() { + System.out.println("onCloseWrite"); + } +} +``` + +```java +@Configuration +public class TestJobConfig { + + @Bean + public Job testJob( + SampleTasklet sampleTasklet, + JobRepository jobRepository, + PlatformTransactionManager transactionManager + ) { + return new JobBuilder("testJob", jobRepository) + .start( + new StepBuilder("testStep", jobRepository) + .chunk(3, transactionManager) + .reader(itemStreamReader(sampleTasklet)) + .processor(itemProcessor(sampleTasklet)) + .writer(itemStreamWriter(sampleTasklet)) + .build() + ) + .build(); + } +} +``` + +### Kotlin + +```kotlin +@Component +@StepScope +open class SampleTasklet( + @Value("#{jobParameters['totalCount']}") private var totalCount: Long, +) : ItemStreamSimpleReaderProcessorWriter { + private var count = 0 + + override fun onOpenRead(executionContext: ExecutionContext) { + println("onOpenRead") + } + + override fun read(): Int? { + return if (count < totalCount) { + count++ + } else { + null + } + } + + override fun onUpdateRead(executionContext: ExecutionContext) { + println("onUpdateRead") + } + + override fun onCloseRead() { + println("onCloseRead") + } + + override fun process(item: Int): String? { + return "'$item'" + } + + override fun onOpenWrite(executionContext: ExecutionContext) { + println("onOpenWrite") + } + + override fun write(chunk: Chunk) { + println(chunk.items) + } + + override fun onUpdateWrite(executionContext: ExecutionContext) { + println("onUpdateWrite") + executionContext.putString("samplekey", "samplevalue") + } + + override fun onCloseWrite() { + println("onCloseWrite") + } +} +``` + +```kotlin +@Configuration +open class TestJobConfig( + private val batch: BatchDsl, + private val transactionManager: PlatformTransactionManager, +) { + + @Bean + open fun testJob( + sampleTasklet: SampleTasklet, + ): Job = batch { + job("testJob") { + step("testStep") { + chunk(3, transactionManager) { + reader(sampleTasklet.asItemStreamReader()) + processor(sampleTasklet.asItemProcessor()) + writer(sampleTasklet.asItemStreamWriter()) + } + } + } + } +} +```