Skip to content

Commit

Permalink
Implement ItemStreamFluxReaderAdapter
Browse files Browse the repository at this point in the history
Signed-off-by: Taeik Lim <[email protected]>
  • Loading branch information
acktsap committed Feb 19, 2024
1 parent a363c1e commit c9cce27
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Spring Batch Plus
*
* Copyright 2022-present NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.spring.batch.plus.item.adapter;

import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.lang.NonNull;

import reactor.core.publisher.Flux;

/**
* An adaptor which adapt {@link ItemStreamFluxReaderDelegate} to {@link ItemStreamReader}.
*
* @since 1.1.0
*/
public class ItemStreamFluxReaderAdapter<T> implements ItemStreamReader<T> {

/**
* Create an adaptor which adapt {@link ItemStreamFluxReaderDelegate} to {@link ItemStreamReader}.
*
* @param delegate a delegate
* @return an adapted ItemStreamReader
* @param <T> a read item type
*/
public static <T> ItemStreamReader<T> of(@NonNull ItemStreamFluxReaderDelegate<T> delegate) {
return new ItemStreamFluxReaderAdapter<>(delegate);
}

protected static final int DEFAULT_BATCH_SIZE = 1;

protected final ItemStreamFluxReaderDelegate<T> delegate;

protected Flux<? extends T> flux = null;

protected Iterator<? extends T> iterator = null;

protected ItemStreamFluxReaderAdapter(ItemStreamFluxReaderDelegate<T> delegate) {
this.delegate = Objects.requireNonNull(delegate, "Delegate reader must not be null");
}

@SuppressWarnings("NullableProblems")
@Override
public void open(ExecutionContext executionContext) {
this.delegate.onOpenRead(executionContext);
this.flux = this.delegate.readFlux(executionContext);
}

@Override
public T read() {
Iterator<? extends T> iterator = getIterator();
if (iterator.hasNext()) {
return iterator.next();
} else {
return null;
}
}

@SuppressWarnings("NullableProblems")
@Override
public void update(ExecutionContext executionContext) {
this.delegate.onUpdateRead(executionContext);
}

@Override
public void close() {
this.delegate.onCloseRead();
}

protected Iterator<? extends T> getIterator() {
if (this.iterator == null) {
this.iterator = Optional.ofNullable(this.flux)
.map(f -> f.toIterable(DEFAULT_BATCH_SIZE))
.map(Iterable::iterator)
.orElseThrow(() -> new IllegalStateException("No flux is set. Call 'open' first."));
}
return this.iterator;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Spring Batch Plus
*
* Copyright 2022-present NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.spring.batch.plus.item.adapter;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.List;

import org.junit.jupiter.api.Test;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamReader;

import reactor.core.publisher.Flux;

@SuppressWarnings("unchecked")
class ItemStreamFluxReaderAdapterTest {


@Test
void testOpen() {
// given
ItemStreamFluxReaderDelegate<Integer> delegate = mock(ItemStreamFluxReaderDelegate.class);
ItemStreamReader<Integer> itemStreamReader = ItemStreamFluxReaderAdapter.of(delegate);

// when
itemStreamReader.open(new ExecutionContext());

// then
verify(delegate, times(1)).onOpenRead(any());
verify(delegate, times(1)).readFlux(any());
}

@Test
void testRead() throws Exception {
// given
List<Integer> expected = List.of(1, 2, 3);
ItemStreamFluxReaderDelegate<Integer> delegate = mock(ItemStreamFluxReaderDelegate.class);
when(delegate.readFlux(any())).thenAnswer($ -> Flux.fromIterable(expected));
ItemStreamReader<Integer> itemStreamReader = ItemStreamFluxReaderAdapter.of(delegate);

// when
itemStreamReader.open(new ExecutionContext());
List<Integer> items = new ArrayList<>();
Integer item;
while ((item = itemStreamReader.read()) != null) {
items.add(item);
}

// then
assertThat(items).isEqualTo(expected);
}

@Test
void testReadWithOpenShouldThrowsException() {
// given
ItemStreamFluxReaderDelegate<Integer> delegate = mock(ItemStreamFluxReaderDelegate.class);
ItemStreamReader<Integer> itemStreamReader = ItemStreamFluxReaderAdapter.of(delegate);

// when, then
assertThatThrownBy(itemStreamReader::read).isInstanceOf(IllegalStateException.class);
}

@Test
void testUpdate() {
// given
ItemStreamFluxReaderDelegate<Integer> delegate = mock(ItemStreamFluxReaderDelegate.class);
ItemStreamReader<Integer> itemStreamReader = ItemStreamFluxReaderAdapter.of(delegate);

// when
itemStreamReader.update(new ExecutionContext());

// then
verify(delegate, times(1)).onUpdateRead(any());
}

@Test
void testClose() {
// given
ItemStreamFluxReaderDelegate<Integer> delegate = mock(ItemStreamFluxReaderDelegate.class);
ItemStreamReader<Integer> itemStreamReader = ItemStreamFluxReaderAdapter.of(delegate);

// when
itemStreamReader.close();

// then
verify(delegate, times(1)).onCloseRead();
}

@SuppressWarnings({"ResultOfMethodCallIgnored", "ConstantConditions"})
@Test
void testPassingNull() {
// when, then
assertThatThrownBy(() -> ItemStreamFluxReaderAdapter.of(null));
}
}

0 comments on commit c9cce27

Please sign in to comment.