Skip to content

Commit

Permalink
Make async processing of the file and update test
Browse files Browse the repository at this point in the history
  • Loading branch information
aplotnikov committed Nov 5, 2018
1 parent 1b69f6c commit 89f3d81
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import lombok.RequiredArgsConstructor;
import lombok.experimental.FieldDefaults;
import net.jcip.annotations.ThreadSafe;
import reactor.core.scheduler.Schedulers;

import static lombok.AccessLevel.PRIVATE;

Expand All @@ -25,9 +26,14 @@ class ReactorFileProcessor implements Runnable {
@Override
public void run() {
fileSource.readAll()
.log()
.flatMap(reader::read)
.log()
.map(processor::process)
.log()
.groupBy(AbstractEvent::getFileName)
.log()
.subscribeOn(Schedulers.parallel())
.subscribe(writer::write);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package io.github.aplotnikov.batch.processing.reactor

import static java.util.concurrent.TimeUnit.SECONDS
import static java.util.concurrent.locks.LockSupport.parkNanos

import io.github.aplotnikov.batch.processing.reactor.events.FileReceived
import io.github.aplotnikov.batch.processing.reactor.readers.XmlFileReaderDecorator
import io.github.aplotnikov.batch.processing.reactor.source.EventSource
Expand All @@ -12,8 +9,11 @@ import org.junit.rules.TemporaryFolder
import reactor.core.publisher.Flux
import spock.lang.Specification
import spock.lang.Subject
import spock.util.concurrent.PollingConditions

import java.nio.file.Path
import java.time.Duration
import java.time.Instant

class ReactorFileProcessorSpec extends Specification {

Expand All @@ -37,42 +37,46 @@ class ReactorFileProcessorSpec extends Specification {
processor = new ReactorFileProcessor(
new EventSource(db, queue),
new XmlFileReaderDecorator(rootSourceFolder()),
new EventProcessor(new ClientProcessor(0)),
new EventProcessor(new ClientProcessor(1)),
new EventWriter(folder.root.toPath())
)
}

void 'should process files'() {
given:
PollingConditions conditions = new PollingConditions(timeout: 12)
and:
db.readAll() >> Flux.just(new FileReceived(FILE_FROM_DB))
and:
queue.readAll() >> Flux.just(new FileReceived(FILE_FROM_QUEUE))
when:
processor.run()
then:
parkNanos(SECONDS.toNanos(1))
and:
List<File> generatedFiles = []
folder.root.eachFileMatch(~/.*response_.*.xml/) { generatedFiles << it }
Duration.between(old(Instant.now()), Instant.now()).toSeconds() < 1
and:
generatedFiles.size() == 2
and:
File dbFileResponse = generatedFiles.find { it.absolutePath.contains(FILE_FROM_DB) }
with(dbFileResponse.text) {
contains '<clientId>1</clientId>'
contains '<clientId>2</clientId>'
contains '<clientId>3</clientId>'
contains '<clientId>4</clientId>'
contains '<clientId>5</clientId>'
}
and:
File queueFileResponse = generatedFiles.find { it.absolutePath.contains(FILE_FROM_QUEUE) }
with(queueFileResponse.text) {
contains '<clientId>11</clientId>'
contains '<clientId>12</clientId>'
contains '<clientId>13</clientId>'
contains '<clientId>14</clientId>'
contains '<clientId>15</clientId>'
conditions.eventually {
List<File> generatedFiles = []
folder.root.eachFileMatch(~/.*response_.*.xml/) { generatedFiles << it }

generatedFiles.size() == 2

File dbFileResponse = generatedFiles.find { it.absolutePath.contains(FILE_FROM_DB) }
with(dbFileResponse.text) {
contains '<clientId>1</clientId>'
contains '<clientId>2</clientId>'
contains '<clientId>3</clientId>'
contains '<clientId>4</clientId>'
contains '<clientId>5</clientId>'
}

File queueFileResponse = generatedFiles.find { it.absolutePath.contains(FILE_FROM_QUEUE) }
with(queueFileResponse.text) {
contains '<clientId>11</clientId>'
contains '<clientId>12</clientId>'
contains '<clientId>13</clientId>'
contains '<clientId>14</clientId>'
contains '<clientId>15</clientId>'
}
}
}

Expand Down

0 comments on commit 89f3d81

Please sign in to comment.