Skip to content

Commit

Permalink
Refactor code to use one source instead of repository and queue
Browse files Browse the repository at this point in the history
  • Loading branch information
aplotnikov committed Oct 28, 2018
1 parent 54a8b57 commit 8451d55
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.github.aplotnikov.batch.processing.reactor;

import io.github.aplotnikov.batch.processing.reactor.source.XmlFileSource;
import io.github.aplotnikov.batch.processing.reactor.source.XmlFilesSource;
import lombok.RequiredArgsConstructor;
import lombok.experimental.FieldDefaults;

Expand All @@ -10,7 +10,7 @@
@RequiredArgsConstructor
class ReactorFileProcessor implements Runnable {

XmlFileSource fileSource;
XmlFilesSource fileSource;

XmlFileReader reader;

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@

@FieldDefaults(level = PRIVATE, makeFinal = true)
@RequiredArgsConstructor
class Queue {
class Source {

int processedFileNumber;

int pause;

Flux<String> poll() {
Flux<String> readAll() {
AtomicInteger processedFiles = new AtomicInteger(0);
return Flux.generate(
sink -> {
parkNanos(SECONDS.toNanos(pause));
if (processedFiles.getAndIncrement() < processedFileNumber) {
sink.next("client.xml");
} else {
sink.complete();
}
parkNanos(SECONDS.toNanos(pause));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

@FieldDefaults(level = PRIVATE, makeFinal = true)
@RequiredArgsConstructor
public class XmlFileSource {
public class XmlFilesSource {

Repository repository;
Source repository;

Queue queue;
Source queue;

public Flux<String> readAll() {
return Flux.merge(repository.readAll(), queue.poll());
return Flux.merge(repository.readAll(), queue.readAll());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import reactor.test.StepVerifier
import spock.lang.Specification
import spock.lang.Subject

class QueueSpec extends Specification {
class SourceSpec extends Specification {

@Subject
Queue queue = new Queue(2, 0)
Source source = new Source(2, 0)

void 'should generate 2 files into Flux and Flux should be completed'() {
expect:
StepVerifier.create(queue.poll())
StepVerifier.create(source.readAll())
.expectNext('client.xml')
.expectNext('client.xml')
.expectComplete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import reactor.test.StepVerifier
import spock.lang.Specification
import spock.lang.Subject

class XmlFileSourceSpec extends Specification {
class XmlFilesSourceSpec extends Specification {

Repository repository = Mock()
Source repository = Mock()

Queue queue = Mock()
Source queue = Mock()

@Subject
XmlFileSource source = new XmlFileSource(repository, queue)
XmlFilesSource source = new XmlFilesSource(repository, queue)

void 'should merge results from DB and from queue'() {
given:
Expand All @@ -28,7 +28,7 @@ class XmlFileSourceSpec extends Specification {
and:
1 * repository.readAll() >> Flux.just(fileFromDB)
and:
1 * queue.poll() >> Flux.just(fileFromQueue)
1 * queue.readAll() >> Flux.just(fileFromQueue)
and:
0 * _
}
Expand Down

0 comments on commit 8451d55

Please sign in to comment.