diff --git a/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/internal/PersistenceManager.java b/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/internal/PersistenceManager.java index 63676bcf8a6..f0b1a165d09 100644 --- a/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/internal/PersistenceManager.java +++ b/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/internal/PersistenceManager.java @@ -261,56 +261,9 @@ private Iterable getAllItems(PersistenceItemConfiguration config) { return items; } - /** - * Handles the "restoreOnStartup" strategy for the item. - * If the item state is still undefined when entering this method, all persistence configurations are checked, - * if they have the "restoreOnStartup" strategy configured for the item. If so, the item state will be set - * to its last persisted value. - * - * @param item the item to restore the state for - */ - private void restoreItemStateIfNeeded(Item item) { - // get the last persisted state from the persistence service if no state is yet set - if (UnDefType.NULL.equals(item.getState()) && item instanceof GenericItem) { - List matchingContainers = persistenceServiceContainers.values().stream() // - .filter(container -> container.getPersistenceService() instanceof QueryablePersistenceService) // - .filter(container -> container.getMatchingConfigurations(RESTORE) - .anyMatch(itemConfig -> appliesToItem(itemConfig, item))) - .toList(); - - for (PersistenceServiceContainer container : matchingContainers) { - QueryablePersistenceService queryService = (QueryablePersistenceService) container - .getPersistenceService(); - FilterCriteria filter = new FilterCriteria().setItemName(item.getName()).setEndDate(ZonedDateTime.now()) - .setPageSize(1); - Iterator result = safeCaller.create(queryService, QueryablePersistenceService.class) - .onTimeout(() -> logger.warn("Querying persistence service '{}' takes more than {}ms.", - queryService.getId(), SafeCaller.DEFAULT_TIMEOUT)) - .onException(e -> logger.error("Exception occurred while querying persistence service '{}': {}", - queryService.getId(), e.getMessage(), e)) - .build().query(filter).iterator(); - if (result.hasNext()) { - HistoricItem historicItem = result.next(); - GenericItem genericItem = (GenericItem) item; - genericItem.removeStateChangeListener(this); - genericItem.setState(historicItem.getState()); - genericItem.addStateChangeListener(this); - if (logger.isDebugEnabled()) { - logger.debug("Restored item state from '{}' for item '{}' -> '{}'", - DateTimeFormatter.ISO_ZONED_DATE_TIME.format(historicItem.getTimestamp()), - item.getName(), historicItem.getState()); - } - return; - } - } - } - } - private void startEventHandling(PersistenceServiceContainer serviceContainer) { - serviceContainer.getMatchingConfigurations(RESTORE) - .forEach(itemConfig -> getAllItems(itemConfig).forEach(this::restoreItemStateIfNeeded)); + serviceContainer.restoreStatesAndScheduleForecastJobs(); serviceContainer.schedulePersistJobs(); - serviceContainer.scheduleForecastJobs(); } // ItemStateChangeListener methods @@ -322,7 +275,6 @@ public void allItemsChanged(Collection oldItemNames) { @Override public void added(Item item) { - restoreItemStateIfNeeded(item); persistenceServiceContainers.values().forEach(container -> container.addItem(item)); if (item instanceof GenericItem genericItem) { genericItem.addStateChangeListener(this); @@ -369,7 +321,7 @@ public void timeSeriesUpdated(Item item, TimeSeries timeSeries) { timeSeries.getStates() .forEach(e -> ((ModifiablePersistenceService) container.getPersistenceService()) .store(item, e.timestamp().atZone(ZoneId.systemDefault()), e.state())); - container.scheduleTimeSeries(item.getName(), timeSeries); + container.scheduleNewTimeSeries(item.getName(), timeSeries); })); } @@ -511,11 +463,11 @@ public void schedulePersistJobs() { }); } - public void scheduleForecastJobs() { + public void restoreStatesAndScheduleForecastJobs() { itemRegistry.getItems().forEach(this::addItem); } - public void scheduleTimeSeries(String itemName, TimeSeries timeSeries) { + public void scheduleNewTimeSeries(String itemName, TimeSeries timeSeries) { List> jobs = forecastJobs.computeIfAbsent(itemName, i -> new CopyOnWriteArrayList<>()); timeSeries.getStates().forEach(e -> { @@ -527,9 +479,15 @@ public void scheduleTimeSeries(String itemName, TimeSeries timeSeries) { } public void addItem(Item item) { - if (persistenceService instanceof QueryablePersistenceService && getMatchingConfigurations(FORECAST) - .anyMatch(configuration -> appliesToItem(configuration, item))) { - schedulePersistedForecasts(item.getName()); + if (persistenceService instanceof QueryablePersistenceService) { + if (UnDefType.NULL.equals(item.getState()) && getMatchingConfigurations(RESTORE) + .anyMatch(configuration -> appliesToItem(configuration, item))) { + restoreItemStateIfPossible(item); + } + if (getMatchingConfigurations(FORECAST).anyMatch(configuration -> appliesToItem(configuration, item))) { + scheduleAllPersistedForecastsForItem(item.getName()); + + } } } @@ -540,9 +498,38 @@ public void removeItem(String itemName) { } } - private void schedulePersistedForecasts(String itemName) { + private void restoreItemStateIfPossible(Item item) { + QueryablePersistenceService queryService = (QueryablePersistenceService) persistenceService; + + FilterCriteria filter = new FilterCriteria().setItemName(item.getName()).setEndDate(ZonedDateTime.now()) + .setPageSize(1); + Iterator result = safeCaller.create(queryService, QueryablePersistenceService.class) + .onTimeout(() -> logger.warn("Querying persistence service '{}' takes more than {}ms.", + queryService.getId(), SafeCaller.DEFAULT_TIMEOUT)) + .onException(e -> logger.error("Exception occurred while querying persistence service '{}': {}", + queryService.getId(), e.getMessage(), e)) + .build().query(filter).iterator(); + if (result.hasNext()) { + HistoricItem historicItem = result.next(); + GenericItem genericItem = (GenericItem) item; + if (!UnDefType.NULL.equals(item.getState())) { + // someone else already restored the state or a new state was set + return; + } + genericItem.removeStateChangeListener(PersistenceManager.this); + genericItem.setState(historicItem.getState()); + genericItem.addStateChangeListener(PersistenceManager.this); + if (logger.isDebugEnabled()) { + logger.debug("Restored item state from '{}' for item '{}' -> '{}'", + DateTimeFormatter.ISO_ZONED_DATE_TIME.format(historicItem.getTimestamp()), item.getName(), + historicItem.getState()); + } + } + } + + private void scheduleAllPersistedForecastsForItem(String itemName) { Item item = itemRegistry.get(itemName); - if (item instanceof GenericItem genericItem) { + if (item instanceof GenericItem) { QueryablePersistenceService queryService = (QueryablePersistenceService) persistenceService; FilterCriteria filter = new FilterCriteria().setItemName(itemName).setBeginDate(ZonedDateTime.now()) .setOrdering(ASCENDING); @@ -552,23 +539,16 @@ private void schedulePersistedForecasts(String itemName) { .onException(e -> logger.error("Exception occurred while querying persistence service '{}': {}", queryService.getId(), e.getMessage(), e)) .build().query(filter).iterator(); - if (result.hasNext()) { - HistoricItem historicItem = result.next(); - genericItem.setState(historicItem.getState()); - if (logger.isDebugEnabled()) { - logger.debug("Set item state from '{}' for item '{}' -> '{}'", - DateTimeFormatter.ISO_ZONED_DATE_TIME.format(historicItem.getTimestamp()), - item.getName(), historicItem.getState()); - } - List> jobs = forecastJobs.computeIfAbsent(itemName, - i -> new CopyOnWriteArrayList<>()); - while (result.hasNext()) { - HistoricItem next = result.next(); + List> jobs = forecastJobs.computeIfAbsent(itemName, + i -> new CopyOnWriteArrayList<>()); + while (result.hasNext()) { + HistoricItem next = result.next(); + if (next.getTimestamp().isAfter(ZonedDateTime.now())) { jobs.add(scheduler.at(() -> restoreItemState(itemName, next.getState()), next.getTimestamp().toInstant())); } - logger.debug("Initial: Total scheduled forecasted values for {} is {}", itemName, jobs.size()); } + logger.debug("Initial: Total scheduled forecasted values for {} is {}", itemName, jobs.size()); } }