Skip to content

Commit

Permalink
Add first possible implementation of parsing of XML file
Browse files Browse the repository at this point in the history
  • Loading branch information
aplotnikov committed Oct 28, 2018
1 parent 0866e98 commit 9d7008a
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 4 deletions.
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ jar {
}

dependencies {
compile 'io.projectreactor:reactor-core:3.2.1.RELEASE'
compile(
'io.projectreactor:reactor-core:3.2.1.RELEASE',
'io.vavr:vavr:0.9.2'
)

testCompile(
'org.codehaus.groovy:groovy-all:2.5.3',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.github.aplotnikov.batch.processing.reactor;

import java.util.Objects;

public class Client {

private final long id;

public Client(long id) {
this.id = id;
}

public long getId() {
return id;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

Client client = (Client) other;
return id == client.id;
}

@Override
public int hashCode() {
return Objects.hash(id);
}

@Override
public String toString() {
return "Client{id=" + id + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ class ReactorFileProcessor implements Runnable {

private final Queue queue;

ReactorFileProcessor(Repository repository, Queue queue) {
private final XmlFileReader reader;

ReactorFileProcessor(Repository repository, Queue queue, XmlFileReader reader) {
this.repository = repository;
this.queue = queue;
this.reader = reader;
}

@Override
public void run() {
Flux.merge(repository.readAll(), queue.poll());
// process XML file
Flux.merge(repository.readAll(), queue.poll())
.map(reader::read);
// generate responses with pause
// collect responses
// generate result file
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.github.aplotnikov.batch.processing.reactor;

import io.vavr.control.Try;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;

import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamReader;
import java.io.BufferedInputStream;
import java.util.concurrent.Callable;
import java.util.function.Consumer;

import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;

class XmlFileReader {

Flux<Client> read(String path) {
return Flux.generate(
reader(path),
this::findClients,
close()
);
}

private Callable<XMLStreamReader> reader(String path) {
return () -> {
BufferedInputStream inputStream = new BufferedInputStream(getClass().getResourceAsStream(path));
return XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
};
}

private XMLStreamReader findClients(XMLStreamReader reader, SynchronousSink<Client> sink) {
Try.run(() -> {
// Need to think about refactoring it to Stream
while (reader.hasNext()) {
if (reader.next() == START_ELEMENT && reader.getLocalName().equals("client")) {
sink.next(parseClient(reader));
}
}
})
.onFailure(sink::error)
.onSuccess(aVoid -> sink.complete());
return reader;
}

private Client parseClient(XMLStreamReader reader) {
return new Client(
Long.parseLong(reader.getAttributeValue("", "id"))
);
}

private Consumer<XMLStreamReader> close() {
return reader -> Try.run(reader::close).get();
}
}

0 comments on commit 9d7008a

Please sign in to comment.