diff --git a/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/internal/PersistenceManager.java b/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/internal/PersistenceManager.java index 15a2597695f..b614b74bea8 100644 --- a/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/internal/PersistenceManager.java +++ b/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/internal/PersistenceManager.java @@ -328,6 +328,7 @@ public void timeSeriesUpdated(Item item, TimeSeries timeSeries) { FilterCriteria removeFilter = new FilterCriteria().setItemName(item.getName()) .setBeginDate(begin).setEndDate(end); service.remove(removeFilter); + container.removeScheduledTimeSeries(item.getName(), timeSeries); } // update states timeSeries.getStates().forEach( @@ -489,6 +490,18 @@ public void scheduleNewTimeSeries(String itemName, TimeSeries timeSeries) { logger.debug("TimeSeries: Total scheduled forecasted values for {} is {}", itemName, jobs.size()); } + public void removeScheduledTimeSeries(String itemName, TimeSeries timeSeries) { + List> jobs = forecastJobs.computeIfAbsent(itemName, + i -> new CopyOnWriteArrayList<>()); + ZonedDateTime begin = timeSeries.getBegin().atZone(ZoneId.systemDefault()).minusSeconds(1); + ZonedDateTime end = timeSeries.getEnd().atZone(ZoneId.systemDefault()).plusSeconds(1); + jobs.forEach(job -> { + if (job.getScheduledTime().isAfter(begin) && job.getScheduledTime().isBefore(end)) { + job.cancel(true); + } + }); + } + public void addItem(Item item) { if (persistenceService instanceof QueryablePersistenceService) { if (UnDefType.NULL.equals(item.getState()) diff --git a/bundles/org.openhab.core.persistence/src/test/java/org/openhab/core/persistence/internal/PersistenceManagerTest.java b/bundles/org.openhab.core.persistence/src/test/java/org/openhab/core/persistence/internal/PersistenceManagerTest.java index cbe371eb8d7..5d6a1c23bc5 100644 --- a/bundles/org.openhab.core.persistence/src/test/java/org/openhab/core/persistence/internal/PersistenceManagerTest.java +++ b/bundles/org.openhab.core.persistence/src/test/java/org/openhab/core/persistence/internal/PersistenceManagerTest.java @@ -15,8 +15,11 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -27,6 +30,7 @@ import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; +import java.util.ArrayList; import java.util.List; import org.eclipse.jdt.annotation.NonNullByDefault; @@ -317,57 +321,70 @@ public void noRestoreOnStartupWhenItemNotNull() { @Test public void storeTimeSeriesAndForecastsScheduled() { + List> futures = new ArrayList<>(); + + when(schedulerMock.at(any(SchedulerRunnable.class), any(Instant.class))).thenAnswer(i -> { + ScheduledCompletableFuture future = mock(ScheduledCompletableFuture.class); + when(future.getScheduledTime()).thenReturn(((Instant) i.getArgument(1)).atZone(ZoneId.systemDefault())); + futures.add(future); + return future; + }); + addConfiguration(TEST_MODIFIABLE_PERSISTENCE_SERVICE_ID, new PersistenceAllConfig(), PersistenceStrategy.Globals.FORECAST, null); Instant time1 = Instant.now().minusSeconds(1000); Instant time2 = Instant.now().plusSeconds(1000); Instant time3 = Instant.now().plusSeconds(2000); + Instant time4 = Instant.now().plusSeconds(3000); + + // add elements TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.ADD); timeSeries.add(time1, new StringType("one")); timeSeries.add(time2, new StringType("two")); - timeSeries.add(time2, new StringType("three")); + timeSeries.add(time3, new StringType("three")); + timeSeries.add(time4, new StringType("four")); manager.timeSeriesUpdated(TEST_ITEM, timeSeries); - verify(modifiablePersistenceServiceMock, times(3)).store(any(Item.class), any(ZonedDateTime.class), + InOrder inOrder = inOrder(modifiablePersistenceServiceMock, schedulerMock); + + inOrder.verify(modifiablePersistenceServiceMock, times(4)).store(any(Item.class), any(ZonedDateTime.class), any(State.class)); // first element not scheduled, because it is in the past - verify(schedulerMock).at(any(SchedulerRunnable.class), ArgumentMatchers.eq(time2)); - verify(schedulerMock).at(any(SchedulerRunnable.class), ArgumentMatchers.eq(time3)); - - verifyNoMoreInteractions(safeCallerBuilderMock, modifiablePersistenceServiceMock); - } + inOrder.verify(schedulerMock).at(any(SchedulerRunnable.class), ArgumentMatchers.eq(time2)); + inOrder.verify(schedulerMock).at(any(SchedulerRunnable.class), ArgumentMatchers.eq(time3)); + inOrder.verify(schedulerMock).at(any(SchedulerRunnable.class), ArgumentMatchers.eq(time4)); - @Test - public void storeTimeSeriesAndReplaceForecasts() { - addConfiguration(TEST_MODIFIABLE_PERSISTENCE_SERVICE_ID, new PersistenceAllConfig(), - PersistenceStrategy.Globals.FORECAST, null); - - Instant time1 = Instant.now().minusSeconds(1000); - Instant time2 = Instant.now().plusSeconds(1000); - Instant time3 = Instant.now().plusSeconds(2000); + inOrder.verifyNoMoreInteractions(); - TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.REPLACE); - timeSeries.add(time1, new StringType("one")); - timeSeries.add(time2, new StringType("two")); - timeSeries.add(time2, new StringType("three")); + // replace elements + TimeSeries timeSeries2 = new TimeSeries(TimeSeries.Policy.REPLACE); + timeSeries2.add(time3, new StringType("three2")); + timeSeries2.add(time4, new StringType("four2")); - manager.timeSeriesUpdated(TEST_ITEM, timeSeries); + manager.timeSeriesUpdated(TEST_ITEM, timeSeries2); + // verify removal of old elements from service ArgumentCaptor filterCaptor = ArgumentCaptor.forClass(FilterCriteria.class); - - InOrder inOrder = inOrder(modifiablePersistenceServiceMock); - inOrder.verify(modifiablePersistenceServiceMock).remove(filterCaptor.capture()); - inOrder.verify(modifiablePersistenceServiceMock, times(3)).store(any(Item.class), any(ZonedDateTime.class), - any(State.class)); - FilterCriteria filterCriteria = filterCaptor.getValue(); assertThat(filterCriteria.getItemName(), is(TEST_ITEM_NAME)); - assertThat(filterCriteria.getBeginDate(), is(time1.atZone(ZoneId.systemDefault()))); - assertThat(filterCriteria.getBeginDate(), is(time1.atZone(ZoneId.systemDefault()))); + assertThat(filterCriteria.getBeginDate(), is(time3.atZone(ZoneId.systemDefault()))); + assertThat(filterCriteria.getEndDate(), is(time4.atZone(ZoneId.systemDefault()))); + + // verify second and third restore-future are cancelled + verify(futures.get(0), never()).cancel(anyBoolean()); + verify(futures.get(1)).cancel(true); + verify(futures.get(2)).cancel(true); + + // verify new values are stored + inOrder.verify(modifiablePersistenceServiceMock, times(2)).store(any(Item.class), any(ZonedDateTime.class), + any(State.class)); + // verify new restore futures are scheduled + inOrder.verify(schedulerMock).at(any(SchedulerRunnable.class), ArgumentMatchers.eq(time3)); + inOrder.verify(schedulerMock).at(any(SchedulerRunnable.class), ArgumentMatchers.eq(time4)); inOrder.verifyNoMoreInteractions(); } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/types/TimeSeries.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/types/TimeSeries.java index b1a053cdbb0..7dfdc346a4d 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/types/TimeSeries.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/types/TimeSeries.java @@ -14,7 +14,10 @@ import java.time.Instant; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; +import java.util.Set; +import java.util.TreeSet; import java.util.stream.Stream; import org.eclipse.jdt.annotation.NonNullByDefault; @@ -27,41 +30,70 @@ */ @NonNullByDefault public class TimeSeries { - private final List states = new ArrayList<>(); + private final TreeSet states = new TreeSet<>(Comparator.comparing(e -> e.timestamp)); private final Policy policy; - private Instant begin = Instant.MAX; - private Instant end = Instant.MIN; public TimeSeries(Policy policy) { this.policy = policy; } + /** + * Get the persistence policy of this series. + *

+ * {@link Policy#ADD} add the content to the persistence, {@link Policy#REPLACE} first removes all persisted elements in the timespan given by {@link #getBegin()} and {@link #getEnd()}. + * + * @return + */ public Policy getPolicy() { return policy; } + /** + * Get the timestamp of the first element in this series. + * + * @return the {@link Instant} of the first element + */ public Instant getBegin() { - return begin; + return states.isEmpty() ? Instant.MAX : states.first().timestamp(); } + /** + * Get the timestamp of the last element in this series. + * + * @return the {@link Instant} of the last element + */ public Instant getEnd() { - return end; + return states.isEmpty() ? Instant.MIN : states.last().timestamp(); } + /** + * Get the number of elements in this series. + * + * @return the number of elements + */ public int size() { return states.size(); } + /** + * Add a new element to this series. + *

+ * Elements can be added in an arbitrary order and are sorted chronologically. + * + * @param timestamp an {@link Instant} for the given state + * @param state the {@link State} at the given timestamp + */ public void add(Instant timestamp, State state) { states.add(new Entry(timestamp, state)); - if (begin == null || timestamp.isBefore(begin)) { - begin = timestamp; - } - if (end == null || timestamp.isAfter(end)) { - end = timestamp; - } } + /** + * Get the content of this series. + *

+ * The entries are returned in chronological order, earlier entries before later entries. + * + * @return a {@link } with the content of this series. + */ public Stream getStates() { return List.copyOf(states).stream(); }