Skip to content

Commit

Permalink
Fix mistake in unit test for ClientProcessor, add concurrent unit tes…
Browse files Browse the repository at this point in the history
…t for ClientProcessor
  • Loading branch information
aplotnikov committed Oct 28, 2018
1 parent 2c31b35 commit 52d9cb8
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private boolean isCurrentResponseSuccessful() {

do {
isSuccessful = isLastResponseSuccessful.get();
} while (isLastResponseSuccessful.compareAndExchange(isSuccessful, !isSuccessful));
} while (!isLastResponseSuccessful.compareAndSet(isSuccessful, !isSuccessful));

return isSuccessful;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package io.github.aplotnikov.batch.processing.reactor

import static io.github.aplotnikov.batch.processing.reactor.Response.Status.FAILED
import static io.github.aplotnikov.batch.processing.reactor.Response.Status.SUCCESS
import static java.util.concurrent.TimeUnit.SECONDS

import spock.lang.Specification
import spock.lang.Subject

import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger

class ClientProcessorSpec extends Specification {

@Subject
Expand All @@ -29,7 +36,7 @@ class ClientProcessorSpec extends Specification {
then:
with(response) {
clientId == secondClient.id
status == FAILED
status == SUCCESS
}
when:
response = processor.process(thirdClient)
Expand All @@ -39,4 +46,41 @@ class ClientProcessorSpec extends Specification {
status == FAILED
}
}

void 'should work correctly under concurrent usage'() {
given:
int threadNumber = 10
and:
ExecutorService threadPool = Executors.newFixedThreadPool(threadNumber)
and:
CountDownLatch afterInitBlocker = new CountDownLatch(threadNumber)
CountDownLatch allDone = new CountDownLatch(threadNumber)
and:
AtomicInteger clientNumber = new AtomicInteger(1)
and:
List<Response> actualResponses = [].asSynchronized()
and:
Runnable action = {
int id = clientNumber.getAndIncrement()

afterInitBlocker.countDown()
afterInitBlocker.await()

actualResponses << processor.process(new Client(id))

allDone.countDown()
}
when:
try {
(1..threadNumber).each {
threadPool.submit(action)
}
allDone.await(10, SECONDS)
} finally {
threadPool.shutdown()
}
then:
actualResponses.size() == threadNumber
actualResponses.findAll { it.status == SUCCESS }.size() == threadNumber / 2
}
}

0 comments on commit 52d9cb8

Please sign in to comment.