Skip to content

Commit

Permalink
improvements
Browse files Browse the repository at this point in the history
Signed-off-by: Jan N. Klug <[email protected]>
  • Loading branch information
J-N-K committed May 16, 2023
1 parent e6fdd54 commit 68fe0cd
Showing 1 changed file with 50 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,56 +261,9 @@ private Iterable<Item> getAllItems(PersistenceItemConfiguration config) {
return items;
}

/**
* Handles the "restoreOnStartup" strategy for the item.
* If the item state is still undefined when entering this method, all persistence configurations are checked,
* if they have the "restoreOnStartup" strategy configured for the item. If so, the item state will be set
* to its last persisted value.
*
* @param item the item to restore the state for
*/
private void restoreItemStateIfNeeded(Item item) {
// get the last persisted state from the persistence service if no state is yet set
if (UnDefType.NULL.equals(item.getState()) && item instanceof GenericItem) {
List<PersistenceServiceContainer> matchingContainers = persistenceServiceContainers.values().stream() //
.filter(container -> container.getPersistenceService() instanceof QueryablePersistenceService) //
.filter(container -> container.getMatchingConfigurations(RESTORE)
.anyMatch(itemConfig -> appliesToItem(itemConfig, item)))
.toList();

for (PersistenceServiceContainer container : matchingContainers) {
QueryablePersistenceService queryService = (QueryablePersistenceService) container
.getPersistenceService();
FilterCriteria filter = new FilterCriteria().setItemName(item.getName()).setEndDate(ZonedDateTime.now())
.setPageSize(1);
Iterator<HistoricItem> result = safeCaller.create(queryService, QueryablePersistenceService.class)
.onTimeout(() -> logger.warn("Querying persistence service '{}' takes more than {}ms.",
queryService.getId(), SafeCaller.DEFAULT_TIMEOUT))
.onException(e -> logger.error("Exception occurred while querying persistence service '{}': {}",
queryService.getId(), e.getMessage(), e))
.build().query(filter).iterator();
if (result.hasNext()) {
HistoricItem historicItem = result.next();
GenericItem genericItem = (GenericItem) item;
genericItem.removeStateChangeListener(this);
genericItem.setState(historicItem.getState());
genericItem.addStateChangeListener(this);
if (logger.isDebugEnabled()) {
logger.debug("Restored item state from '{}' for item '{}' -> '{}'",
DateTimeFormatter.ISO_ZONED_DATE_TIME.format(historicItem.getTimestamp()),
item.getName(), historicItem.getState());
}
return;
}
}
}
}

private void startEventHandling(PersistenceServiceContainer serviceContainer) {
serviceContainer.getMatchingConfigurations(RESTORE)
.forEach(itemConfig -> getAllItems(itemConfig).forEach(this::restoreItemStateIfNeeded));
serviceContainer.restoreStatesAndScheduleForecastJobs();
serviceContainer.schedulePersistJobs();
serviceContainer.scheduleForecastJobs();
}

// ItemStateChangeListener methods
Expand All @@ -322,7 +275,6 @@ public void allItemsChanged(Collection<String> oldItemNames) {

@Override
public void added(Item item) {
restoreItemStateIfNeeded(item);
persistenceServiceContainers.values().forEach(container -> container.addItem(item));
if (item instanceof GenericItem genericItem) {
genericItem.addStateChangeListener(this);
Expand Down Expand Up @@ -369,7 +321,7 @@ public void timeSeriesUpdated(Item item, TimeSeries timeSeries) {
timeSeries.getStates()
.forEach(e -> ((ModifiablePersistenceService) container.getPersistenceService())
.store(item, e.timestamp().atZone(ZoneId.systemDefault()), e.state()));
container.scheduleTimeSeries(item.getName(), timeSeries);
container.scheduleNewTimeSeries(item.getName(), timeSeries);
}));
}

Expand Down Expand Up @@ -511,11 +463,11 @@ public void schedulePersistJobs() {
});
}

public void scheduleForecastJobs() {
public void restoreStatesAndScheduleForecastJobs() {
itemRegistry.getItems().forEach(this::addItem);
}

public void scheduleTimeSeries(String itemName, TimeSeries timeSeries) {
public void scheduleNewTimeSeries(String itemName, TimeSeries timeSeries) {
List<ScheduledCompletableFuture<?>> jobs = forecastJobs.computeIfAbsent(itemName,
i -> new CopyOnWriteArrayList<>());
timeSeries.getStates().forEach(e -> {
Expand All @@ -527,9 +479,15 @@ public void scheduleTimeSeries(String itemName, TimeSeries timeSeries) {
}

public void addItem(Item item) {
if (persistenceService instanceof QueryablePersistenceService && getMatchingConfigurations(FORECAST)
.anyMatch(configuration -> appliesToItem(configuration, item))) {
schedulePersistedForecasts(item.getName());
if (persistenceService instanceof QueryablePersistenceService) {
if (UnDefType.NULL.equals(item.getState()) && getMatchingConfigurations(RESTORE)
.anyMatch(configuration -> appliesToItem(configuration, item))) {
restoreItemStateIfPossible(item);
}
if (getMatchingConfigurations(FORECAST).anyMatch(configuration -> appliesToItem(configuration, item))) {
scheduleAllPersistedForecastsForItem(item.getName());

}
}
}

Expand All @@ -540,9 +498,38 @@ public void removeItem(String itemName) {
}
}

private void schedulePersistedForecasts(String itemName) {
private void restoreItemStateIfPossible(Item item) {
QueryablePersistenceService queryService = (QueryablePersistenceService) persistenceService;

FilterCriteria filter = new FilterCriteria().setItemName(item.getName()).setEndDate(ZonedDateTime.now())
.setPageSize(1);
Iterator<HistoricItem> result = safeCaller.create(queryService, QueryablePersistenceService.class)
.onTimeout(() -> logger.warn("Querying persistence service '{}' takes more than {}ms.",
queryService.getId(), SafeCaller.DEFAULT_TIMEOUT))
.onException(e -> logger.error("Exception occurred while querying persistence service '{}': {}",
queryService.getId(), e.getMessage(), e))
.build().query(filter).iterator();
if (result.hasNext()) {
HistoricItem historicItem = result.next();
GenericItem genericItem = (GenericItem) item;
if (!UnDefType.NULL.equals(item.getState())) {
// someone else already restored the state or a new state was set
return;
}
genericItem.removeStateChangeListener(PersistenceManager.this);
genericItem.setState(historicItem.getState());
genericItem.addStateChangeListener(PersistenceManager.this);
if (logger.isDebugEnabled()) {
logger.debug("Restored item state from '{}' for item '{}' -> '{}'",
DateTimeFormatter.ISO_ZONED_DATE_TIME.format(historicItem.getTimestamp()), item.getName(),
historicItem.getState());
}
}
}

private void scheduleAllPersistedForecastsForItem(String itemName) {
Item item = itemRegistry.get(itemName);
if (item instanceof GenericItem genericItem) {
if (item instanceof GenericItem) {
QueryablePersistenceService queryService = (QueryablePersistenceService) persistenceService;
FilterCriteria filter = new FilterCriteria().setItemName(itemName).setBeginDate(ZonedDateTime.now())
.setOrdering(ASCENDING);
Expand All @@ -552,23 +539,16 @@ private void schedulePersistedForecasts(String itemName) {
.onException(e -> logger.error("Exception occurred while querying persistence service '{}': {}",
queryService.getId(), e.getMessage(), e))
.build().query(filter).iterator();
if (result.hasNext()) {
HistoricItem historicItem = result.next();
genericItem.setState(historicItem.getState());
if (logger.isDebugEnabled()) {
logger.debug("Set item state from '{}' for item '{}' -> '{}'",
DateTimeFormatter.ISO_ZONED_DATE_TIME.format(historicItem.getTimestamp()),
item.getName(), historicItem.getState());
}
List<ScheduledCompletableFuture<?>> jobs = forecastJobs.computeIfAbsent(itemName,
i -> new CopyOnWriteArrayList<>());
while (result.hasNext()) {
HistoricItem next = result.next();
List<ScheduledCompletableFuture<?>> jobs = forecastJobs.computeIfAbsent(itemName,
i -> new CopyOnWriteArrayList<>());
while (result.hasNext()) {
HistoricItem next = result.next();
if (next.getTimestamp().isAfter(ZonedDateTime.now())) {
jobs.add(scheduler.at(() -> restoreItemState(itemName, next.getState()),
next.getTimestamp().toInstant()));
}
logger.debug("Initial: Total scheduled forecasted values for {} is {}", itemName, jobs.size());
}
logger.debug("Initial: Total scheduled forecasted values for {} is {}", itemName, jobs.size());
}
}

Expand Down

0 comments on commit 68fe0cd

Please sign in to comment.