Skip to content

Commit

Permalink
[Fix #2113] Data index group processing (#2114)
Browse files Browse the repository at this point in the history
* [Fix #2113] Data index group processing

* [Fix #2113] Support group containing different process instance ids

* [Fix #2113] Flush call is probably not needed

* [Fix #2113] Optimization for user tasks

* Update data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java

Co-authored-by: Gonzalo Muñoz <[email protected]>

---------

Co-authored-by: Gonzalo Muñoz <[email protected]>
  • Loading branch information
fjtirado and gmunozfe authored Oct 16, 2024
1 parent 06e7a99 commit b733219
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,8 @@ public class IndexingService {
public void indexProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
ProcessInstanceStorage storage = manager.getProcessInstanceStorage();
if (event instanceof MultipleProcessInstanceDataEvent) {
for (ProcessInstanceDataEvent<?> item : ((MultipleProcessInstanceDataEvent) event).getData())
indexProccessInstanceEvent(storage, item);
} else {
indexProccessInstanceEvent(storage, event);
}
}

private void indexProccessInstanceEvent(ProcessInstanceStorage storage, ProcessInstanceDataEvent<?> event) {
if (event instanceof ProcessInstanceErrorDataEvent) {
storage.indexGroup(((MultipleProcessInstanceDataEvent) event));
} else if (event instanceof ProcessInstanceErrorDataEvent) {
storage.indexError((ProcessInstanceErrorDataEvent) event);
} else if (event instanceof ProcessInstanceNodeDataEvent) {
storage.indexNode((ProcessInstanceNodeDataEvent) event);
Expand All @@ -112,16 +105,8 @@ public void indexProcessDefinition(ProcessDefinitionDataEvent definitionDataEven
public <T> void indexUserTaskInstanceEvent(UserTaskInstanceDataEvent<T> event) {
UserTaskInstanceStorage storage = manager.getUserTaskInstanceStorage();
if (event instanceof MultipleUserTaskInstanceDataEvent) {
for (UserTaskInstanceDataEvent<?> item : ((MultipleUserTaskInstanceDataEvent) event).getData()) {
indexUserTaskInstanceEvent(storage, item);
}
} else {
indexUserTaskInstanceEvent(storage, event);
}
}

private void indexUserTaskInstanceEvent(UserTaskInstanceStorage storage, UserTaskInstanceDataEvent<?> event) {
if (event instanceof UserTaskInstanceAssignmentDataEvent) {
storage.indexGroup((MultipleUserTaskInstanceDataEvent) event);
} else if (event instanceof UserTaskInstanceAssignmentDataEvent) {
storage.indexAssignment((UserTaskInstanceAssignmentDataEvent) event);
} else if (event instanceof UserTaskInstanceAttachmentDataEvent) {
storage.indexAttachment((UserTaskInstanceAttachmentDataEvent) event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.kie.kogito.index.storage;

import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
import org.kie.kogito.event.process.ProcessInstanceSLADataEvent;
Expand All @@ -28,6 +29,8 @@

public interface ProcessInstanceStorage extends StorageFetcher<String, ProcessInstance> {

void indexGroup(MultipleProcessInstanceDataEvent event);

void indexError(ProcessInstanceErrorDataEvent event);

void indexNode(ProcessInstanceNodeDataEvent event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.kie.kogito.index.storage;

import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent;
Expand All @@ -40,4 +41,6 @@ public interface UserTaskInstanceStorage extends StorageFetcher<String, UserTask
void indexComment(UserTaskInstanceCommentDataEvent event);

void indexVariable(UserTaskInstanceVariableDataEvent event);

void indexGroup(MultipleUserTaskInstanceDataEvent event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.kie.kogito.index.storage;

import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
Expand Down Expand Up @@ -69,6 +70,23 @@ public void indexVariable(ProcessInstanceVariableDataEvent event) {
index(event, variableMerger);
}

@Override
public void indexGroup(MultipleProcessInstanceDataEvent events) {
for (ProcessInstanceDataEvent<?> event : events.getData()) {
if (event instanceof ProcessInstanceErrorDataEvent) {
index(event, errorMerger);
} else if (event instanceof ProcessInstanceNodeDataEvent) {
index(event, nodeMerger);
} else if (event instanceof ProcessInstanceSLADataEvent) {
index(event, slaMerger);
} else if (event instanceof ProcessInstanceStateDataEvent) {
index(event, stateMerger);
} else if (event instanceof ProcessInstanceVariableDataEvent) {
index(event, variableMerger);
}
}
}

private <T extends ProcessInstanceDataEvent<?>> void index(T event, ProcessInstanceEventMerger merger) {
ProcessInstance processInstance = storage.get(event.getKogitoProcessInstanceId());
if (processInstance == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.ArrayList;

import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent;
Expand Down Expand Up @@ -53,7 +54,6 @@ public ModelUserTaskInstanceStorage(Storage<String, UserTaskInstance> storage) {
@Override
public void indexAssignment(UserTaskInstanceAssignmentDataEvent event) {
index(event, assignmentMerger);

}

@Override
Expand All @@ -76,13 +76,30 @@ public void indexState(UserTaskInstanceStateDataEvent event) {
@Override
public void indexVariable(UserTaskInstanceVariableDataEvent event) {
index(event, variableMerger);

}

@Override
public void indexComment(UserTaskInstanceCommentDataEvent event) {
index(event, commentMerger);
}

@Override
public void indexGroup(MultipleUserTaskInstanceDataEvent events) {
for (UserTaskInstanceDataEvent<?> event : events.getData()) {
if (event instanceof UserTaskInstanceAssignmentDataEvent) {
index((UserTaskInstanceAssignmentDataEvent) event, assignmentMerger);
} else if (event instanceof UserTaskInstanceAttachmentDataEvent) {
index((UserTaskInstanceAttachmentDataEvent) event, attachmentMerger);
} else if (event instanceof UserTaskInstanceDeadlineDataEvent) {
index((UserTaskInstanceDeadlineDataEvent) event, deadlineMerger);
} else if (event instanceof UserTaskInstanceStateDataEvent) {
index((UserTaskInstanceStateDataEvent) event, stateMerger);
} else if (event instanceof UserTaskInstanceCommentDataEvent) {
index((UserTaskInstanceCommentDataEvent) event, commentMerger);
} else if (event instanceof UserTaskInstanceVariableDataEvent) {
index((UserTaskInstanceVariableDataEvent) event, variableMerger);
}
}
}

private <T extends UserTaskInstanceDataEvent<?>> void index(T event, UserTaskInstanceEventMerger merger) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@

import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorEventBody;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
Expand Down Expand Up @@ -64,52 +67,73 @@ public ProcessInstanceEntityStorage(ProcessInstanceEntityRepository repository,
super(repository, ProcessInstanceEntity.class, mapper::mapToModel);
}

@Override
@Transactional
public void indexGroup(MultipleProcessInstanceDataEvent events) {
Map<String, ProcessInstanceEntity> piMap = new HashMap<>();
for (ProcessInstanceDataEvent<?> event : events.getData()) {
indexEvent(piMap.computeIfAbsent(event.getKogitoProcessInstanceId(), id -> findOrInit(event)), event);
}
}

@Override
@Transactional
public void indexError(ProcessInstanceErrorDataEvent event) {
indexError(event.getData());
indexError(findOrInit(event), event.getData());
}

@Override
@Transactional
public void indexNode(ProcessInstanceNodeDataEvent event) {
indexNode(event.getData());
indexNode(findOrInit(event), event.getData());
}

@Override
@Transactional
public void indexSLA(ProcessInstanceSLADataEvent event) {
indexSLA(event.getData());

indexSla(findOrInit(event), event.getData());
}

@Override
@Transactional
public void indexState(ProcessInstanceStateDataEvent event) {
indexState(event.getData(), event.getKogitoAddons() == null ? Set.of() : Set.of(event.getKogitoAddons().split(",")), event.getSource() == null ? null : event.getSource().toString());
indexState(findOrInit(event), event);
}

@Override
@Transactional
public void indexVariable(ProcessInstanceVariableDataEvent event) {
indexVariable(event.getData());
indexVariable(findOrInit(event), event.getData());
}

private ProcessInstanceEntity findOrInit(String processId, String processInstanceId, Date date) {
return repository.findByIdOptional(processInstanceId).orElseGet(() -> {
private ProcessInstanceEntity findOrInit(ProcessInstanceDataEvent<?> event) {
return repository.findByIdOptional(event.getKogitoProcessInstanceId()).orElseGet(() -> {
ProcessInstanceEntity pi = new ProcessInstanceEntity();
pi.setProcessId(processId);
pi.setId(processInstanceId);
pi.setLastUpdate(toZonedDateTime(date));
pi.setProcessId(event.getKogitoProcessId());
pi.setId(event.getKogitoProcessInstanceId());
pi.setLastUpdate(toZonedDateTime(event.getTime()));
pi.setNodes(new ArrayList<>());
pi.setMilestones(new ArrayList<>());
repository.persist(pi);
return pi;
});
}

private void indexError(ProcessInstanceErrorEventBody error) {
ProcessInstanceEntity pi = findOrInit(error.getProcessId(), error.getProcessInstanceId(), error.getEventDate());
private void indexEvent(ProcessInstanceEntity pi, ProcessInstanceDataEvent<?> event) {
if (event instanceof ProcessInstanceErrorDataEvent) {
indexError(pi, ((ProcessInstanceErrorDataEvent) event).getData());
} else if (event instanceof ProcessInstanceNodeDataEvent) {
indexNode(pi, ((ProcessInstanceNodeDataEvent) event).getData());
} else if (event instanceof ProcessInstanceSLADataEvent) {
indexSla(pi, ((ProcessInstanceSLADataEvent) event).getData());
} else if (event instanceof ProcessInstanceStateDataEvent) {
indexState(pi, (ProcessInstanceStateDataEvent) event);
} else if (event instanceof ProcessInstanceVariableDataEvent) {
indexVariable(pi, ((ProcessInstanceVariableDataEvent) event).getData());
}
}

private void indexError(ProcessInstanceEntity pi, ProcessInstanceErrorEventBody error) {
ProcessInstanceErrorEntity errorEntity = pi.getError();
if (errorEntity == null) {
errorEntity = new ProcessInstanceErrorEntity();
Expand All @@ -118,16 +142,13 @@ private void indexError(ProcessInstanceErrorEventBody error) {
errorEntity.setMessage(error.getErrorMessage());
errorEntity.setNodeDefinitionId(error.getNodeDefinitionId());
pi.setState(CommonUtils.ERROR_STATE);
repository.flush();
}

private void indexNode(ProcessInstanceNodeEventBody data) {
ProcessInstanceEntity pi = findOrInit(data.getProcessId(), data.getProcessInstanceId(), data.getEventDate());
private void indexNode(ProcessInstanceEntity pi, ProcessInstanceNodeEventBody data) {
pi.getNodes().stream().filter(n -> n.getId().equals(data.getNodeInstanceId())).findAny().ifPresentOrElse(n -> updateNode(n, data), () -> createNode(pi, data));
if ("MilestoneNode".equals(data.getNodeType())) {
pi.getMilestones().stream().filter(n -> n.getId().equals(data.getNodeInstanceId())).findAny().ifPresentOrElse(n -> updateMilestone(n, data), () -> createMilestone(pi, data));
}
repository.flush();
}

private MilestoneEntity createMilestone(ProcessInstanceEntity pi, ProcessInstanceNodeEventBody data) {
Expand Down Expand Up @@ -159,7 +180,6 @@ private NodeInstanceEntity updateNode(NodeInstanceEntity nodeInstance, ProcessIn
nodeInstance.setType(body.getNodeType());
ZonedDateTime eventDate = toZonedDateTime(body.getEventDate());
switch (body.getEventType()) {

case EVENT_TYPE_ENTER:
nodeInstance.setEnter(eventDate);
break;
Expand All @@ -174,13 +194,12 @@ private NodeInstanceEntity updateNode(NodeInstanceEntity nodeInstance, ProcessIn
return nodeInstance;
}

private void indexSLA(ProcessInstanceSLAEventBody data) {
findOrInit(data.getProcessId(), data.getProcessInstanceId(), data.getEventDate());
repository.flush();
private void indexState(ProcessInstanceEntity pi, ProcessInstanceStateDataEvent event) {
indexState(pi, event.getData(), (event.getKogitoAddons() == null || event.getKogitoAddons().isEmpty()) ? Set.of() : Set.of(event.getKogitoAddons().split(",")),
event.getSource() == null ? null : event.getSource().toString());
}

private void indexState(ProcessInstanceStateEventBody data, Set<String> addons, String endpoint) {
ProcessInstanceEntity pi = findOrInit(data.getProcessId(), data.getProcessInstanceId(), data.getEventDate());
private void indexState(ProcessInstanceEntity pi, ProcessInstanceStateEventBody data, Set<String> addons, String endpoint) {
pi.setVersion(data.getProcessVersion());
pi.setProcessName(data.getProcessName());
pi.setRootProcessInstanceId(data.getRootProcessInstanceId());
Expand All @@ -199,13 +218,13 @@ private void indexState(ProcessInstanceStateEventBody data, Set<String> addons,
pi.setLastUpdate(toZonedDateTime(data.getEventDate()));
pi.setAddons(addons);
pi.setEndpoint(endpoint);
repository.flush();
}

private void indexVariable(ProcessInstanceVariableEventBody data) {
ProcessInstanceEntity pi = findOrInit(data.getProcessId(), data.getProcessInstanceId(), data.getEventDate());
private void indexVariable(ProcessInstanceEntity pi, ProcessInstanceVariableEventBody data) {
pi.setVariables(JsonUtils.mergeVariable(data.getVariableName(), data.getVariableValue(), pi.getVariables()));
repository.flush();
}

private void indexSla(ProcessInstanceEntity orInit, ProcessInstanceSLAEventBody data) {
// SLA does nothing for now
}
}
Loading

0 comments on commit b733219

Please sign in to comment.