Skip to content

Commit

Permalink
Add emulation of client processing
Browse files Browse the repository at this point in the history
  • Loading branch information
aplotnikov committed Oct 28, 2018
1 parent 7937f07 commit 0207099
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

import java.util.Objects;

public class Client {
class Client {

private final long id;

public Client(long id) {
Client(long id) {
this.id = id;
}

public long getId() {
long getId() {
return id;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.github.aplotnikov.batch.processing.reactor;

import java.util.concurrent.atomic.AtomicBoolean;

import static io.github.aplotnikov.batch.processing.reactor.Response.Status.FAILED;
import static io.github.aplotnikov.batch.processing.reactor.Response.Status.SUCCESS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.concurrent.locks.LockSupport.parkNanos;

class ClientProcessor {

private final AtomicBoolean isLastResponseSuccessful = new AtomicBoolean();

private final int pause;

ClientProcessor(int pause) {
this.pause = pause;
}

Response process(Client client) {
parkNanos(SECONDS.toNanos(pause));
return new Response(
client.getId(),
isCurrentResponseSuccessful() ? SUCCESS : FAILED
);
}

private boolean isCurrentResponseSuccessful() {
boolean isSuccessful;

do {
isSuccessful = isLastResponseSuccessful.get();
} while (isLastResponseSuccessful.compareAndExchange(isSuccessful, !isSuccessful));

return isSuccessful;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@ class ReactorFileProcessor implements Runnable {

private final XmlFileReader reader;

ReactorFileProcessor(Repository repository, Queue queue, XmlFileReader reader) {
private final ClientProcessor processor;

ReactorFileProcessor(Repository repository, Queue queue, XmlFileReader reader, ClientProcessor processor) {
this.repository = repository;
this.queue = queue;
this.reader = reader;
this.processor = processor;
}

@Override
public void run() {
Flux.merge(repository.readAll(), queue.poll())
.flatMap(reader::read);
// generate responses with pause
.flatMap(reader::read)
.map(processor::process);
// collect responses
// generate result file
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.github.aplotnikov.batch.processing.reactor;

import java.util.Objects;

class Response {

private final long clientId;

private final Status status;

Response(long clientId, Status status) {
this.clientId = clientId;
this.status = status;
}

long getClientId() {
return clientId;
}

Status getStatus() {
return status;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

Response that = (Response) other;
return clientId == that.clientId &&
status == that.status;
}

@Override
public int hashCode() {
return Objects.hash(clientId, status);
}

@Override
public String toString() {
return "ClientResponse{" +
"clientId=" + clientId +
", status=" + status +
'}';
}

enum Status {
SUCCESS,
FAILED
}
}

0 comments on commit 0207099

Please sign in to comment.