diff --git a/go.mod b/go.mod index 05e8b38c8..90372aaa2 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de github.com/mhale/smtpd v0.8.3 github.com/obot-platform/kinm v0.0.0-20250116162656-270198b40c6d - github.com/obot-platform/nah v0.0.0-20250207012945-3b7c581712f6 + github.com/obot-platform/nah v0.0.0-20250210200356-4c7b8ce27778 github.com/obot-platform/namegenerator v0.0.0-20241217121223-fc58bdb7dca2 github.com/obot-platform/obot/apiclient v0.0.0-00010101000000-000000000000 github.com/obot-platform/obot/logger v0.0.0-20241217130503-4004a5c69f32 diff --git a/go.sum b/go.sum index aab455d19..6e37aa1f3 100644 --- a/go.sum +++ b/go.sum @@ -458,8 +458,8 @@ github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY= github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc= github.com/obot-platform/kinm v0.0.0-20250116162656-270198b40c6d h1:GzMvRkssr4jAa2YvQiv9eXhjuNpaZVab3GajE7+cQ3s= github.com/obot-platform/kinm v0.0.0-20250116162656-270198b40c6d/go.mod h1:RzrH0geIlbiTHDGZ8bpCk5k1hwdU9uu3l4zJn9n0pZU= -github.com/obot-platform/nah v0.0.0-20250207012945-3b7c581712f6 h1:rHIf46CC4pvG4yad9fMaFBTlewXqu3RWOj+epso56xw= -github.com/obot-platform/nah v0.0.0-20250207012945-3b7c581712f6/go.mod h1:KG1jLO9FeYvCPGI0iDqe5oqDqOFLd3/dt/iwuMianmI= +github.com/obot-platform/nah v0.0.0-20250210200356-4c7b8ce27778 h1:7YA4E2AwqdhKII979r85sdCIDdVg2AJXpE5Qu2kfWdU= +github.com/obot-platform/nah v0.0.0-20250210200356-4c7b8ce27778/go.mod h1:KG1jLO9FeYvCPGI0iDqe5oqDqOFLd3/dt/iwuMianmI= github.com/obot-platform/namegenerator v0.0.0-20241217121223-fc58bdb7dca2 h1:jiyBM/TYxU6UNVS9ff8Y8n55DOKDYohKkIZjfHpjfTY= github.com/obot-platform/namegenerator v0.0.0-20241217121223-fc58bdb7dca2/go.mod h1:isbKX6EfvvG/ojjFB2ZLyz27+2xoG3yRmpTSE+ytWEs= github.com/olekukonko/tablewriter v0.0.6-0.20230925090304-df64c4bbad77 h1:3bMMZ1f+GPXFQ1uNaYbO/uECWvSfqEA+ZEXn1rFAT88= diff --git a/pkg/api/handlers/invoke.go b/pkg/api/handlers/invoke.go index c64e49acb..d89f91f0c 100644 --- a/pkg/api/handlers/invoke.go +++ b/pkg/api/handlers/invoke.go @@ -82,6 +82,7 @@ func (i *InvokeHandler) Invoke(req api.Context) error { if agent.Name != "" { resp, err = i.invoker.Agent(req.Context(), req.Storage, &agent, string(input), invoke.Options{ + GenerateName: system.ChatRunPrefix, ThreadName: threadID, Synchronous: synchronous, CreateThread: true, diff --git a/pkg/invoke/invoker.go b/pkg/invoke/invoker.go index 0f8bd42e4..65838c5a3 100644 --- a/pkg/invoke/invoker.go +++ b/pkg/invoke/invoker.go @@ -165,6 +165,7 @@ type Options struct { CreateThread bool ThreadCredentialScope *bool UserUID string + GenerateName string } func (i *Invoker) getChatState(ctx context.Context, c kclient.Client, run *v1.Run) (result, lastThreadName string, _ error) { @@ -362,6 +363,7 @@ func (i *Invoker) Agent(ctx context.Context, c kclient.WithWatch, agent *v1.Agen WorkflowExecutionName: opt.WorkflowExecutionName, PreviousRunName: opt.PreviousRunName, ForceNoResume: opt.ForceNoResume, + GenerateName: opt.GenerateName, }) } @@ -374,6 +376,7 @@ func unAbortThread(ctx context.Context, c kclient.Client, thread *v1.Thread) err } type runOptions struct { + GenerateName string AgentName string Synchronous bool WorkflowName string @@ -407,9 +410,14 @@ func (i *Invoker) createRun(ctx context.Context, c kclient.WithWatch, thread *v1 return nil, err } + generateName := opts.GenerateName + if generateName == "" { + generateName = system.RunPrefix + } + run := v1.Run{ ObjectMeta: metav1.ObjectMeta{ - GenerateName: system.RunPrefix, + GenerateName: generateName, Namespace: thread.Namespace, Finalizers: []string{v1.RunFinalizer}, }, diff --git a/pkg/services/config.go b/pkg/services/config.go index 52baeea4b..883ab6eda 100644 --- a/pkg/services/config.go +++ b/pkg/services/config.go @@ -20,6 +20,7 @@ import ( "github.com/obot-platform/nah/pkg/apply" "github.com/obot-platform/nah/pkg/leader" "github.com/obot-platform/nah/pkg/router" + "github.com/obot-platform/nah/pkg/runtime" "github.com/obot-platform/obot/pkg/api/authn" "github.com/obot-platform/obot/pkg/api/authz" "github.com/obot-platform/obot/pkg/api/server" @@ -287,6 +288,9 @@ func New(ctx context.Context, config Config) (*Services, error) { GVKThreadiness: map[schema.GroupVersionKind]int{ v1.SchemeGroupVersion.WithKind("KnowledgeFile"): config.KnowledgeFileWorkers, }, + GVKQueueSplitters: map[schema.GroupVersionKind]runtime.WorkerQueueSplitter{ + v1.SchemeGroupVersion.WithKind("Run"): (*runQueueSplitter)(nil), + }, }) if err != nil { return nil, err diff --git a/pkg/services/queuesplitters.go b/pkg/services/queuesplitters.go new file mode 100644 index 000000000..a0c85ce7e --- /dev/null +++ b/pkg/services/queuesplitters.go @@ -0,0 +1,21 @@ +package services + +import ( + "github.com/obot-platform/nah/pkg/runtime" + "github.com/obot-platform/obot/pkg/system" +) + +type runQueueSplitter struct{} + +func (*runQueueSplitter) Split(key string) int { + _, name := runtime.KeyParse(key) + if system.IsChatRunID(name) { + return 1 + } + + return 0 +} + +func (*runQueueSplitter) Queues() int { + return 2 +} diff --git a/pkg/system/ids.go b/pkg/system/ids.go index 7f77eacac..981677cd6 100644 --- a/pkg/system/ids.go +++ b/pkg/system/ids.go @@ -6,6 +6,7 @@ const ( ThreadPrefix = "t1" AgentPrefix = "a1" RunPrefix = "r1" + ChatRunPrefix = "r1chat" WorkflowPrefix = "w1" WorkflowExecutionPrefix = "we1" WorkflowStepPrefix = "ws1" @@ -51,3 +52,7 @@ func IsWorkflowID(id string) bool { func IsEmailReceiverID(id string) bool { return strings.HasPrefix(id, EmailReceiverPrefix) } + +func IsChatRunID(id string) bool { + return strings.HasPrefix(id, ChatRunPrefix) +}