diff --git a/src/main/java/io/github/aplotnikov/batch/processing/reactor/ReactorFileProcessor.java b/src/main/java/io/github/aplotnikov/batch/processing/reactor/ReactorFileProcessor.java index 6f58355..e273e3a 100644 --- a/src/main/java/io/github/aplotnikov/batch/processing/reactor/ReactorFileProcessor.java +++ b/src/main/java/io/github/aplotnikov/batch/processing/reactor/ReactorFileProcessor.java @@ -6,6 +6,7 @@ import lombok.RequiredArgsConstructor; import lombok.experimental.FieldDefaults; import net.jcip.annotations.ThreadSafe; +import reactor.core.scheduler.Schedulers; import static lombok.AccessLevel.PRIVATE; @@ -25,9 +26,14 @@ class ReactorFileProcessor implements Runnable { @Override public void run() { fileSource.readAll() + .log() .flatMap(reader::read) + .log() .map(processor::process) + .log() .groupBy(AbstractEvent::getFileName) + .log() + .subscribeOn(Schedulers.parallel()) .subscribe(writer::write); } } diff --git a/src/test/groovy/io/github/aplotnikov/batch/processing/reactor/ReactorFileProcessorSpec.groovy b/src/test/groovy/io/github/aplotnikov/batch/processing/reactor/ReactorFileProcessorSpec.groovy index c31b3cc..d4d5789 100644 --- a/src/test/groovy/io/github/aplotnikov/batch/processing/reactor/ReactorFileProcessorSpec.groovy +++ b/src/test/groovy/io/github/aplotnikov/batch/processing/reactor/ReactorFileProcessorSpec.groovy @@ -1,8 +1,5 @@ package io.github.aplotnikov.batch.processing.reactor -import static java.util.concurrent.TimeUnit.SECONDS -import static java.util.concurrent.locks.LockSupport.parkNanos - import io.github.aplotnikov.batch.processing.reactor.events.FileReceived import io.github.aplotnikov.batch.processing.reactor.readers.XmlFileReaderDecorator import io.github.aplotnikov.batch.processing.reactor.source.EventSource @@ -12,8 +9,11 @@ import org.junit.rules.TemporaryFolder import reactor.core.publisher.Flux import spock.lang.Specification import spock.lang.Subject +import spock.util.concurrent.PollingConditions import java.nio.file.Path +import java.time.Duration +import java.time.Instant class ReactorFileProcessorSpec extends Specification { @@ -37,42 +37,46 @@ class ReactorFileProcessorSpec extends Specification { processor = new ReactorFileProcessor( new EventSource(db, queue), new XmlFileReaderDecorator(rootSourceFolder()), - new EventProcessor(new ClientProcessor(0)), + new EventProcessor(new ClientProcessor(1)), new EventWriter(folder.root.toPath()) ) } void 'should process files'() { given: + PollingConditions conditions = new PollingConditions(timeout: 12) + and: db.readAll() >> Flux.just(new FileReceived(FILE_FROM_DB)) and: queue.readAll() >> Flux.just(new FileReceived(FILE_FROM_QUEUE)) when: processor.run() then: - parkNanos(SECONDS.toNanos(1)) - and: - List generatedFiles = [] - folder.root.eachFileMatch(~/.*response_.*.xml/) { generatedFiles << it } + Duration.between(old(Instant.now()), Instant.now()).toSeconds() < 1 and: - generatedFiles.size() == 2 - and: - File dbFileResponse = generatedFiles.find { it.absolutePath.contains(FILE_FROM_DB) } - with(dbFileResponse.text) { - contains '1' - contains '2' - contains '3' - contains '4' - contains '5' - } - and: - File queueFileResponse = generatedFiles.find { it.absolutePath.contains(FILE_FROM_QUEUE) } - with(queueFileResponse.text) { - contains '11' - contains '12' - contains '13' - contains '14' - contains '15' + conditions.eventually { + List generatedFiles = [] + folder.root.eachFileMatch(~/.*response_.*.xml/) { generatedFiles << it } + + generatedFiles.size() == 2 + + File dbFileResponse = generatedFiles.find { it.absolutePath.contains(FILE_FROM_DB) } + with(dbFileResponse.text) { + contains '1' + contains '2' + contains '3' + contains '4' + contains '5' + } + + File queueFileResponse = generatedFiles.find { it.absolutePath.contains(FILE_FROM_QUEUE) } + with(queueFileResponse.text) { + contains '11' + contains '12' + contains '13' + contains '14' + contains '15' + } } }