diff --git a/src/test/java/sample/endless/repositories/RecordRepositoryTest.java b/src/test/java/sample/endless/repositories/RecordRepositoryTest.java index 5c9751a..d4b2d34 100644 --- a/src/test/java/sample/endless/repositories/RecordRepositoryTest.java +++ b/src/test/java/sample/endless/repositories/RecordRepositoryTest.java @@ -6,14 +6,6 @@ import java.util.List; import java.util.Set; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -36,10 +28,33 @@ public class RecordRepositoryTest { RecordRepository recordRepository; @Test - public void readAll_the_object_is_already_closed() throws ExecutionException, InterruptedException { + public void readAll_the_object_is_already_closed() { // setup System.out.println("Prepare all data."); + prepareAllData(); + // when + System.out.println("Do the test."); + final AtomicLong counter = new AtomicLong(); + + // -- SHOULD NOT FAIL + try (final Stream stream = recordRepository.readAllByIdNotNull()) { + stream.forEach(record -> { + if (record.getId() % 250 == 0) { + System.out.println("Handling Record No: " + record.getId()); + } + final long sum = record.getWrappedValues().stream() + .mapToLong(wrapped -> Long.valueOf(new String(wrapped.getValue().getValue()))) + .sum(); + counter.addAndGet(sum); + }); + } + + // then + Assert.assertEquals(15L * NUMBER_OF_RECORDS, counter); + } + + private void prepareAllData() { final List records = IntStream.rangeClosed(1, NUMBER_OF_RECORDS).mapToObj(ignored -> { final Record record = new Record(); final Set wrappedValues = IntStream.rangeClosed(1, 5).mapToObj(it -> { @@ -54,68 +69,5 @@ public void readAll_the_object_is_already_closed() throws ExecutionException, In return record; }).collect(Collectors.toList()); recordRepository.save(records); - - // when - System.out.println("Do the test."); - - final AtomicBoolean breaker = new AtomicBoolean(false); - final BlockingDeque blockingDeque = new LinkedBlockingDeque<>(100); - final ExecutorService creator = Executors.newSingleThreadExecutor(); - final Future creatorFuture = creator.submit(() -> { - try (final Stream stream = recordRepository.readAllByIdNotNull()) { - stream.forEach(record -> { - try { - if (record.getId() % 250 == 0) { - System.out.println("Handling Record No: " + record.getId()); - } - final long sum = record.getWrappedValues().stream() - .mapToLong(wrapped -> Long.valueOf(new String(wrapped.getValue().getValue()))) - .sum(); - blockingDeque.putLast(sum); - } catch (final InterruptedException e) { - throw new RuntimeException("Creator: Test thread interrupted."); - } catch (final Exception e) { - throw new RuntimeException("Creator: Unexpected error: " + e.getMessage(), e); - } - }); - // all records have been processed, set the breaking value which will finish the other executor - breaker.set(true); - } - }); - - final AtomicLong counter = new AtomicLong(); - final ExecutorService calculator = Executors.newSingleThreadExecutor(); - final Future calculatorFuture = calculator.submit(() -> { - try { - while (true) { - final Long sum = blockingDeque.poll(10L, TimeUnit.MILLISECONDS); - if (sum == null) { - if (breaker.get() == true) { - break; // there will be no more sums to add to the counter - } - continue; // wait for next result to add - } - - // simulate long running calculation - Thread.sleep(25L); - counter.addAndGet(sum); - } - } catch (final InterruptedException e) { - throw new RuntimeException("Calculator: Test thread interrupted."); - } catch (final Exception e) { - throw new RuntimeException("Calculator: Unexpected error: " + e.getMessage(), e); - } - }); - - // wait for the threads to finish - creatorFuture.get(); - calculatorFuture.get(); - - // then - Assert.assertEquals(15L * NUMBER_OF_RECORDS, counter.get()); - - // cleanup - creator.shutdown(); - calculator.shutdown(); } }