Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/improved event logging #2214

Merged
merged 7 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def config_from_env(variable_name: str, *, default: str | bool | int | None = No
config_from_env("SPIFFWORKFLOW_BACKEND_LOG_TO_FILE", default=False)
config_from_env("SPIFFWORKFLOW_BACKEND_EVENT_STREAM_HOST", default=None)
config_from_env("SPIFFWORKFLOW_BACKEND_EVENT_STREAM_PORT", default=None)
config_from_env("SPIFFWORKFLOW_BACKEND_EVENT_STREAM_SOURCE", default="spiffworkflow.org")

### permissions
config_from_env("SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_ABSOLUTE_PATH")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,12 @@ def process_instance_create(

process_instance = _process_instance_create(process_model_identifier)

LoggingService.log_event(
ProcessInstanceEventType.process_instance_created.value,
process_model_identifier=process_model_identifier,
process_instance_id=process_instance.id,
)
log_extras = {
"milestone": "Started",
"process_model_identifier": process_model_identifier,
"process_instance_id": process_instance.id,
}
LoggingService.log_event(ProcessInstanceEventType.process_instance_created.value, log_extras)

return Response(
json.dumps(ProcessInstanceModelSchema().dump(process_instance)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,11 @@ def format(self, record: Any) -> str:
return json.dumps(
{
"version": "1.0",
"action": "add_event",
"event": {
"specversion": "1.0",
"type": record.name,
"id": str(uuid4()),
"source": "spiffworkflow.org",
"timestamp": datetime.utcnow().timestamp(),
"data": record._spiff_data,
},
"type": record.name,
"id": str(uuid4()),
"source": self.app.config["SPIFFWORKFLOW_BACKEND_EVENT_STREAM_SOURCE"],
"timestamp": datetime.utcnow().timestamp(),
"data": record._spiff_data,
}
)

Expand All @@ -65,40 +61,49 @@ def get_default_process_info(self) -> tuple[int | None, str | None]:
return None, None

def filter(self, record: Any) -> bool:
if record.name.startswith("spiff") and getattr(record, "event_type", "") not in ["task_completed", "task_cancelled"]:
if record.name.startswith("spiff"):
user_id, user_name = self.get_user_info()

data = {
"message": record.msg,
"userid": user_id,
"username": user_name,
"process_instance_id": getattr(record, "process_instance_id", None),
"process_model_identifier": getattr(record, "process_model_identifier", None),
}

process_instance_id, process_model_identifier = self.get_default_process_info()

if not hasattr(record, "process_instance_id"):
if data["process_instance_id"] is None:
data["process_instance_id"] = process_instance_id
if not hasattr(record, "process_model_identifier"):
if data["process_model_identifier"] is None:
data["process_model_identifier"] = process_model_identifier

task_properties_from_spiff = [
"worflow_spec",
"task_spec",
"task_id",
"task_type",
"state",
"last_state_change",
"elapsed",
"parent",
]
workflow_properties_from_spiff = ["completed", "success"]
properties_from_spiff = task_properties_from_spiff + workflow_properties_from_spiff
for attr in properties_from_spiff:
if record.name in "spiff.task":
properties = [
"workflow_spec",
"task_spec",
"task_id",
"task_type",
"state",
"last_state_change",
"elapsed",
"parent",
]
elif record.name == "spiff.workflow":
properties = ["workflow_spec", "completed", "success"]
elif record.name == "spiff.data":
properties = ["workflow_spec", "task_spec", "task_id", "task_type"]
elif record.name == "spiff.event":
properties = ["bpmn_name", "milestone", "task_id", "task_spec", "metadata", "error_info"]
else:
properties = []

for attr in properties:
if hasattr(record, attr):
data[attr] = str(getattr(record, attr))
else:
data[attr] = None
record._spiff_data = data
data[attr] = getattr(record, attr)
if not (data[attr] is None or isinstance(data[attr], dict)):
data[attr] = str(data[attr])
record._spiff_data = data

return True
else:
return False
Expand Down Expand Up @@ -296,25 +301,8 @@ def get_log_formatter(app: Flask) -> logging.Formatter:


class LoggingService:
_spiff_logger = logging.getLogger("spiff")
_spiff_logger = logging.getLogger("spiff.event")

@classmethod
def log_event(
cls,
event_type: str,
task_guid: str | None = None,
process_model_identifier: str | None = None,
process_instance_id: int | None = None,
) -> None:
extra: dict[str, Any] = {"event_type": event_type}

if task_guid is not None:
extra["task_guid"] = task_guid

if process_model_identifier is not None:
extra["process_model_Identifier"] = process_model_identifier

if process_instance_id is not None:
extra["process_instance_id"] = process_instance_id

cls._spiff_logger.info(event_type, extra=extra)
def log_event(cls, message: str, log_extras: dict | None = None) -> None:
cls._spiff_logger.info(message, extra=log_extras)
Original file line number Diff line number Diff line change
Expand Up @@ -993,43 +993,14 @@ def get_potential_owners_from_task(self, task: SpiffTask) -> PotentialOwnerIdLis
"lane_assignment_id": lane_assignment_id,
}

def extract_metadata(self) -> None:
# we are currently not getting the metadata extraction paths based on the version in git from the process instance.
# it would make sense to do that if the shell-out-to-git performance cost was not too high.
# we also discussed caching this information in new database tables. something like:
# process_model_version
# id
# process_model_identifier
# git_hash
# display_name
# notification_type
# metadata_extraction
# id
# extraction_key
# extraction_path
# metadata_extraction_process_model_version
# process_model_version_id
# metadata_extraction_id
process_model_info = ProcessModelService.get_process_model(self.process_instance_model.process_model_identifier)
metadata_extraction_paths = process_model_info.metadata_extraction_paths
if metadata_extraction_paths is None:
return
if len(metadata_extraction_paths) <= 0:
return

current_data = self.get_current_data()
for metadata_extraction_path in metadata_extraction_paths:
key = metadata_extraction_path["key"]
path = metadata_extraction_path["path"]
path_segments = path.split(".")
data_for_key = current_data
for path_segment in path_segments:
if path_segment in data_for_key:
data_for_key = data_for_key[path_segment]
else:
data_for_key = None # type: ignore
break
def extract_metadata(self) -> dict:
return ProcessModelService.extract_metadata(
self.process_instance_model.process_model_identifier,
self.get_current_data(),
)

def store_metadata(self, metadata: dict) -> None:
for key, data_for_key in metadata.items():
if data_for_key is not None:
pim = ProcessInstanceMetadataModel.query.filter_by(
process_instance_id=self.process_instance_model.id,
Expand Down Expand Up @@ -1190,21 +1161,26 @@ def save(self) -> None:
if self.process_instance_model.start_in_seconds is None:
self.process_instance_model.start_in_seconds = round(time.time())

metadata = self.extract_metadata()
if self.process_instance_model.end_in_seconds is None:
if self.bpmn_process_instance.is_completed():
self.process_instance_model.end_in_seconds = round(time.time())
if self._workflow_completed_handler is not None:
self._workflow_completed_handler(self.process_instance_model)
LoggingService.log_event(
ProcessInstanceEventType.process_instance_completed.value,
)
log_extras = {
"milestone": "Completed",
"process_model_identifier": self.process_instance_model.process_model_identifier,
"process_instance_id": self.process_instance_model.id,
"metadata": metadata,
}
LoggingService.log_event(ProcessInstanceEventType.process_instance_completed.value, log_extras)

db.session.add(self.process_instance_model)

human_tasks = HumanTaskModel.query.filter_by(process_instance_id=self.process_instance_model.id, completed=False).all()
ready_or_waiting_tasks = self.get_all_ready_or_waiting_tasks()

self.extract_metadata()
self.store_metadata(metadata)
self.update_summary()

for ready_or_waiting_task in ready_or_waiting_tasks:
Expand Down Expand Up @@ -1839,8 +1815,18 @@ def complete_task(self, spiff_task: SpiffTask, human_task: HumanTaskModel, user:
task_guid=task_model.guid,
user_id=user.id,
exception=task_exception,
log_event=False,
)

log_extras = {
"task_id": str(spiff_task.id),
"task_spec": spiff_task.task_spec.name,
"bpmn_name": spiff_task.task_spec.bpmn_name,
"process_model_identifier": self.process_instance_model.process_model_identifier,
"process_instance_id": self.process_instance_model.id,
"metadata": self.extract_metadata(),
}
LoggingService.log_event(task_event, log_extras)
# children of a multi-instance task has the attribute "triggered" set to True
# so use that to determine if a spiff_task is apart of a multi-instance task
# and therefore we need to process its parent since the current task will not
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
import traceback
from typing import Any

from flask import g
from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException # type: ignore
Expand Down Expand Up @@ -32,6 +33,7 @@ def add_event_to_process_instance(
timestamp: float | None = None,
add_to_db_session: bool | None = True,
migration_details: ProcessInstanceMigrationDetailDict | None = None,
log_event: bool = True,
) -> tuple[ProcessInstanceEventModel, ProcessInstanceErrorDetailModel | None]:
if user_id is None and hasattr(g, "user") and g.user:
user_id = g.user.id
Expand All @@ -47,6 +49,8 @@ def add_event_to_process_instance(
if add_to_db_session:
db.session.add(process_instance_event)

log_extras: dict[str, Any] = {"task_id": task_guid}

process_instance_error_detail = None
if exception is not None:
# NOTE: I tried to move this to its own method but
Expand Down Expand Up @@ -82,10 +86,19 @@ def add_event_to_process_instance(
task_offset=task_offset,
)

log_extras["error_info"] = {
"trace": stacktrace,
"line_number": task_line_number,
"line_offset": task_offset,
"line_content": task_line_contents,
}

if add_to_db_session:
db.session.add(process_instance_error_detail)

LoggingService.log_event(event_type, task_guid)
if log_event:
# Some events need to be logged elsewhere so that all required info can be included
LoggingService.log_event(event_type, log_extras)

if migration_details is not None:
pi_detail = cls.add_process_instance_migration_detail(process_instance_event, migration_details)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,46 @@ def add_json_data_to_json_file(cls, process_model: ProcessModelInfo, file_name:
full_json_data = {**existing_json, **json_data}
cls.write_json_file(json_path, full_json_data)

@classmethod
def extract_metadata(cls, process_model_identifier: str, current_data: dict) -> dict[str, Any]:
# we are currently not getting the metadata extraction paths based on the version in git from the process instance.
# it would make sense to do that if the shell-out-to-git performance cost was not too high.
# we also discussed caching this information in new database tables. something like:
# process_model_version
# id
# process_model_identifier
# git_hash
# display_name
# notification_type
# metadata_extraction
# id
# extraction_key
# extraction_path
# metadata_extraction_process_model_version
# process_model_version_id
# metadata_extraction_id
process_model_info = cls.get_process_model(process_model_identifier)
metadata_extraction_paths = process_model_info.metadata_extraction_paths
if metadata_extraction_paths is None:
return {}
if len(metadata_extraction_paths) <= 0:
return {}

current_metadata = {}
for metadata_extraction_path in metadata_extraction_paths:
key = metadata_extraction_path["key"]
path = metadata_extraction_path["path"]
path_segments = path.split(".")
data_for_key = current_data
for path_segment in path_segments:
if path_segment in data_for_key:
data_for_key = data_for_key[path_segment]
else:
data_for_key = None # type: ignore
break
current_metadata[key] = data_for_key
return current_metadata

@classmethod
def save_process_model(cls, process_model: ProcessModelInfo) -> None:
process_model_path = os.path.abspath(os.path.join(FileSystemService.root_path(), process_model.id_for_file_path()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ def update_task_model_with_spiff_task(
task_guid=task_model.guid,
timestamp=timestamp,
add_to_db_session=False,
log_event=False, # Log this in the execution service instead
)
self.process_instance_events[task_model.guid] = process_instance_event

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.assertion_service import safe_assertion
from spiffworkflow_backend.services.jinja_service import JinjaService
from spiffworkflow_backend.services.logging_service import LoggingService
from spiffworkflow_backend.services.process_instance_lock_service import ProcessInstanceLockService
from spiffworkflow_backend.services.process_instance_tmp_service import ProcessInstanceTmpService
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.task_service import StartAndEndTimes
from spiffworkflow_backend.services.task_service import TaskService

Expand Down Expand Up @@ -323,21 +325,36 @@ def did_complete_task(self, spiff_task: SpiffTask) -> None:
# NOTE: used with process-all-tasks and process-children-of-last-task
task_model = self.task_service.update_task_model_with_spiff_task(spiff_task)
if self.current_task_start_in_seconds is None:
raise Exception("Could not find cached current_task_start_in_seconds. This should never have happend")
raise Exception("Could not find cached current_task_start_in_seconds. This should never have happened")
task_model.start_in_seconds = self.current_task_start_in_seconds
task_model.end_in_seconds = time.time()

metadata = ProcessModelService.extract_metadata(
self.process_instance.process_model_identifier,
spiff_task.data,
)
log_extras = {
"task_id": str(spiff_task.id),
"task_spec": spiff_task.task_spec.name,
"bpmn_name": spiff_task.task_spec.bpmn_name,
"process_model_identifier": self.process_instance.process_model_identifier,
"process_instance_id": self.process_instance.id,
"metadata": metadata,
}
if (
spiff_task.task_spec.__class__.__name__ in ["StartEvent", "EndEvent", "IntermediateThrowEvent"]
and spiff_task.task_spec.bpmn_name is not None
):
self.process_instance.last_milestone_bpmn_name = spiff_task.task_spec.bpmn_name
log_extras["milestone"] = spiff_task.task_spec.bpmn_name
elif spiff_task.workflow.parent_task_id is None:
# if parent_task_id is None then this should be the top level process
if spiff_task.task_spec.__class__.__name__ == "EndEvent":
self.process_instance.last_milestone_bpmn_name = "Completed"
elif spiff_task.task_spec.__class__.__name__ == "StartEvent":
self.process_instance.last_milestone_bpmn_name = "Started"

LoggingService.log_event(ProcessInstanceEventType.task_completed.value, log_extras)
self.process_instance.task_updated_at_in_seconds = round(time.time())
self._last_completed_spiff_task = spiff_task
if self.secondary_engine_step_delegate:
Expand Down
Loading