From 42c552eeadf95d48783d3e8b80e0c8979612313f Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Tue, 7 Jan 2025 17:21:36 -0500 Subject: [PATCH] Simplify getting flow from an event --- core/handlers/contact_urns_changed.go | 6 ++---- core/handlers/msg_created.go | 7 +------ core/handlers/optin_requested.go | 7 +------ core/handlers/ticket_opened.go | 7 ++----- core/handlers/warning.go | 11 ++++------- core/handlers/webhook_called.go | 5 ++--- core/hooks/commit_urn_changes.go | 2 +- core/models/contacts.go | 2 +- core/models/sessions.go | 8 +++++--- 9 files changed, 19 insertions(+), 36 deletions(-) diff --git a/core/handlers/contact_urns_changed.go b/core/handlers/contact_urns_changed.go index adefe3322..bd50e23b5 100644 --- a/core/handlers/contact_urns_changed.go +++ b/core/handlers/contact_urns_changed.go @@ -5,7 +5,6 @@ import ( "log/slog" "github.com/jmoiron/sqlx" - "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/flows" "github.com/nyaruka/goflow/flows/events" "github.com/nyaruka/mailroom/core/hooks" @@ -23,10 +22,9 @@ func handleContactURNsChanged(ctx context.Context, rt *runtime.Runtime, tx *sqlx slog.Debug("contact urns changed", "contact", scene.ContactUUID(), "session", scene.SessionID(), "urns", event.URNs) - var flow *assets.FlowReference + var flow *models.Flow if scene.Session() != nil { - run, _ := scene.Session().FindStep(e.StepUUID()) - flow = run.FlowReference() + flow, _ = scene.Session().LocateEvent(e) } // create our URN changed event diff --git a/core/handlers/msg_created.go b/core/handlers/msg_created.go index e72199f45..fd3917d89 100644 --- a/core/handlers/msg_created.go +++ b/core/handlers/msg_created.go @@ -88,12 +88,7 @@ func handleMsgCreated(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, oa } // and the flow - var flow *models.Flow - run, _ := scene.Session().FindStep(e.StepUUID()) - flowAsset, _ := oa.FlowByUUID(run.FlowReference().UUID) - if flowAsset != nil { - flow = flowAsset.(*models.Flow) - } + flow, _ := scene.Session().LocateEvent(e) msg, err := models.NewOutgoingFlowMsg(rt, oa.Org(), channel, scene.Session(), flow, event.Msg, event.CreatedOn()) if err != nil { diff --git a/core/handlers/optin_requested.go b/core/handlers/optin_requested.go index 6522a5d4e..a372c60da 100644 --- a/core/handlers/optin_requested.go +++ b/core/handlers/optin_requested.go @@ -49,12 +49,7 @@ func handleOptInRequested(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, } // and the flow - var flow *models.Flow - run, _ := scene.Session().FindStep(e.StepUUID()) - flowAsset, _ := oa.FlowByUUID(run.FlowReference().UUID) - if flowAsset != nil { - flow = flowAsset.(*models.Flow) - } + flow, _ := scene.Session().LocateEvent(e) msg := models.NewOutgoingOptInMsg(rt, scene.Session(), flow, optIn, channel, urn, event.CreatedOn()) diff --git a/core/handlers/ticket_opened.go b/core/handlers/ticket_opened.go index dd646e616..996d21e95 100644 --- a/core/handlers/ticket_opened.go +++ b/core/handlers/ticket_opened.go @@ -43,11 +43,8 @@ func handleTicketOpened(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, o var openedInID models.FlowID if scene.Session() != nil { - run, _ := scene.Session().FindStep(e.StepUUID()) - flowAsset, _ := oa.FlowByUUID(run.FlowReference().UUID) - if flowAsset != nil { - openedInID = flowAsset.(*models.Flow).ID() - } + flow, _ := scene.Session().LocateEvent(e) + openedInID = flow.ID() } ticket := models.NewTicket( diff --git a/core/handlers/warning.go b/core/handlers/warning.go index 5f19c2b5e..4dc1cbd96 100644 --- a/core/handlers/warning.go +++ b/core/handlers/warning.go @@ -26,13 +26,10 @@ func init() { func handleWarning(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, oa *models.OrgAssets, scene *models.Scene, e flows.Event) error { event := e.(*events.WarningEvent) - run, _ := scene.Session().FindStep(e.StepUUID()) - flow, _ := oa.FlowByUUID(run.FlowReference().UUID) - if flow != nil { - logMsg := warningsLogs[event.Text] - if logMsg != "" { - slog.Error(logMsg, "session", scene.SessionID(), "flow", flow.UUID(), "text", event.Text) - } + flow, _ := scene.Session().LocateEvent(e) + logMsg := warningsLogs[event.Text] + if logMsg != "" { + slog.Error(logMsg, "session", scene.SessionID(), "flow", flow.UUID(), "text", event.Text) } return nil diff --git a/core/handlers/webhook_called.go b/core/handlers/webhook_called.go index 9200221b9..124e6fde4 100644 --- a/core/handlers/webhook_called.go +++ b/core/handlers/webhook_called.go @@ -34,8 +34,7 @@ func handleWebhookCalled(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, scene.AppendToEventPreCommitHook(hooks.UnsubscribeResthookHook, unsub) } - run, step := scene.Session().FindStep(e.StepUUID()) - flow := run.Flow().Asset().(*models.Flow) + flow, nodeUUID := scene.Session().LocateEvent(e) // create an HTTP log if flow != nil { @@ -52,7 +51,7 @@ func handleWebhookCalled(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, } // pass node and response time to the hook that monitors webhook health - scene.AppendToEventPreCommitHook(hooks.MonitorWebhooks, &hooks.WebhookCall{NodeUUID: step.NodeUUID(), Event: event}) + scene.AppendToEventPreCommitHook(hooks.MonitorWebhooks, &hooks.WebhookCall{NodeUUID: nodeUUID, Event: event}) return nil } diff --git a/core/hooks/commit_urn_changes.go b/core/hooks/commit_urn_changes.go index 8ea25e72b..23a519222 100644 --- a/core/hooks/commit_urn_changes.go +++ b/core/hooks/commit_urn_changes.go @@ -28,7 +28,7 @@ func (h *commitURNChangesHook) Apply(ctx context.Context, rt *runtime.Runtime, t changes = append(changes, urnChange) if urnChange.Flow != nil { - flowUUID = urnChange.Flow.UUID + flowUUID = urnChange.Flow.UUID() } } diff --git a/core/models/contacts.go b/core/models/contacts.go index 3159d9b30..534a37fbe 100644 --- a/core/models/contacts.go +++ b/core/models/contacts.go @@ -1347,7 +1347,7 @@ type ContactURNsChanged struct { ContactID ContactID OrgID OrgID URNs []urns.URN - Flow *assets.FlowReference // for logging + Flow *Flow // for logging } func (i *URNID) Scan(value any) error { return null.ScanInt(value, i) } diff --git a/core/models/sessions.go b/core/models/sessions.go index f82130f6a..c605cfbda 100644 --- a/core/models/sessions.go +++ b/core/models/sessions.go @@ -150,9 +150,11 @@ func (s *Session) Sprint() flows.Sprint { return s.sprint } -// FindStep finds the run and step with the given UUID -func (s *Session) FindStep(uuid flows.StepUUID) (flows.Run, flows.Step) { - return s.findStep(uuid) +// LocateEvent finds the flow and node UUID for an event belonging to this session +func (s *Session) LocateEvent(e flows.Event) (*Flow, flows.NodeUUID) { + run, step := s.findStep(e.StepUUID()) + flow := run.Flow().Asset().(*Flow) + return flow, step.NodeUUID() } // Timeout returns the amount of time after our last message sends that we should timeout