diff --git a/CHANGELOG.md b/CHANGELOG.md index 22b6899e3f..97d7a92c77 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ and this project adheres to - Extend provisioner to support collections [#2830](https://github.com/OpenFn/lightning/issues/2830) +- Add collection limiter in the provisioner + [PR#2910](https://github.com/OpenFn/lightning/pull/2910) - Adds project name to failure alert email [#2884](https://github.com/OpenFn/lightning/pull/2884) - Allow project users to manage collections diff --git a/lib/lightning/collections.ex b/lib/lightning/collections.ex index 7e7aefd7e9..329a6cec10 100644 --- a/lib/lightning/collections.ex +++ b/lib/lightning/collections.ex @@ -142,7 +142,12 @@ defmodule Lightning.Collections do collection -> Repo.transact(fn -> - :ok = CollectionHook.handle_delete(collection) + :ok = + CollectionHook.handle_delete( + collection.project_id, + collection.byte_size_sum + ) + Repo.delete(collection) end) end diff --git a/lib/lightning/extensions/collection_hook.ex b/lib/lightning/extensions/collection_hook.ex index ce71da4aea..634e21cd49 100644 --- a/lib/lightning/extensions/collection_hook.ex +++ b/lib/lightning/extensions/collection_hook.ex @@ -8,7 +8,7 @@ defmodule Lightning.Extensions.CollectionHook do def handle_create(_attrs), do: :ok @impl true - def handle_delete(_col), do: :ok + def handle_delete(_project_id, _size), do: :ok @impl true def handle_put_items(_col, _size), do: :ok diff --git a/lib/lightning/extensions/collection_hooking.ex b/lib/lightning/extensions/collection_hooking.ex index 86fcbdee7f..30a0ec3943 100644 --- a/lib/lightning/extensions/collection_hooking.ex +++ b/lib/lightning/extensions/collection_hooking.ex @@ -8,7 +8,10 @@ defmodule Lightning.Extensions.CollectionHooking do @callback handle_create(attrs :: map()) :: :ok | limit_error() - @callback handle_delete(Collection.t()) :: :ok + @callback handle_delete( + project_id :: Ecto.UUID.t(), + delta_size :: neg_integer() + ) :: :ok @callback handle_put_items(Collection.t(), delta_size :: integer()) :: :ok | limit_error() diff --git a/lib/lightning/extensions/usage_limiting.ex b/lib/lightning/extensions/usage_limiting.ex index 0300804ba8..128783d293 100644 --- a/lib/lightning/extensions/usage_limiting.ex +++ b/lib/lightning/extensions/usage_limiting.ex @@ -18,6 +18,7 @@ defmodule Lightning.Extensions.UsageLimiting do | :ai_usage | :alert_failure | :github_sync + | :new_collection | :new_run | :new_user | :require_mfa, diff --git a/lib/lightning/projects/provisioner.ex b/lib/lightning/projects/provisioner.ex index d4f1d57754..6b404fe828 100644 --- a/lib/lightning/projects/provisioner.ex +++ b/lib/lightning/projects/provisioner.ex @@ -14,10 +14,14 @@ defmodule Lightning.Projects.Provisioner do alias Ecto.Multi alias Lightning.Accounts.User alias Lightning.Collections.Collection + alias Lightning.Extensions.UsageLimiting.Action + alias Lightning.Extensions.UsageLimiting.Context alias Lightning.Projects.Project alias Lightning.Projects.ProjectCredential alias Lightning.Projects.ProjectUser alias Lightning.Repo + alias Lightning.Services.CollectionHook + alias Lightning.Services.UsageLimiter alias Lightning.VersionControl.ProjectRepoConnection alias Lightning.VersionControl.VersionControlUsageLimiter alias Lightning.Workflows @@ -53,6 +57,7 @@ defmodule Lightning.Projects.Provisioner do build_import_changeset(project, user_or_repo_connection, data), {:ok, %{workflows: workflows} = project} <- Repo.insert_or_update(project_changeset), + :ok <- handle_collection_deletion(project_changeset), updated_project <- preload_dependencies(project), {:ok, _changes} <- audit_workflows(project_changeset, user_or_repo_connection), @@ -195,6 +200,56 @@ defmodule Lightning.Projects.Provisioner do add_error(changeset, :id, message) end end) + |> then(fn changeset -> + case limit_collection_creation(changeset) do + :ok -> + changeset + + {:error, _reason, %{text: message}} -> + add_error(changeset, :id, message) + end + end) + end + + defp limit_collection_creation(changeset) do + new_collections_count = + changeset + |> get_assoc(:collections) + |> Enum.count(fn collection_changeset -> + # We only want to count collections that are being inserted + collection_changeset.data.__meta__.state == :built + end) + + if new_collections_count > 0 do + UsageLimiter.limit_action( + %Action{type: :new_collection, amount: new_collections_count}, + %Context{project_id: changeset.data.id} + ) + else + :ok + end + end + + defp handle_collection_deletion(project_changeset) do + deleted_size = + project_changeset + |> get_assoc(:collections) + |> Enum.reduce(0, fn collection_changeset, sum -> + if get_change(collection_changeset, :delete) do + sum + get_field(collection_changeset, :byte_size_sum) + else + sum + end + end) + + if deleted_size > 0 do + CollectionHook.handle_delete( + project_changeset.data.id, + deleted_size + ) + else + :ok + end end defp maybe_add_project_user(changeset, user_or_repo_connection) do diff --git a/lib/lightning/services/collection_hook.ex b/lib/lightning/services/collection_hook.ex index a658b5e841..7e9daa32cc 100644 --- a/lib/lightning/services/collection_hook.ex +++ b/lib/lightning/services/collection_hook.ex @@ -12,8 +12,8 @@ defmodule Lightning.Services.CollectionHook do end @impl true - def handle_delete(collection) do - adapter().handle_delete(collection) + def handle_delete(project_id, collection_byte_size) do + adapter().handle_delete(project_id, collection_byte_size) end @impl true diff --git a/test/lightning/projects/provisioner_test.exs b/test/lightning/projects/provisioner_test.exs index 704f6bec35..0dc4f819f1 100644 --- a/test/lightning/projects/provisioner_test.exs +++ b/test/lightning/projects/provisioner_test.exs @@ -773,7 +773,7 @@ defmodule Lightning.Projects.ProvisionerTest do } end - test "adds error incase limit action is returns error", %{ + test "adds error incase workflow limit action returns error", %{ project: %{id: project_id} = project, user: user } do @@ -787,7 +787,7 @@ defmodule Lightning.Projects.ProvisionerTest do ) |> Mox.expect( :limit_action, - fn _action, %{project_id: ^project_id} -> + fn %{type: :activate_workflow}, %{project_id: ^project_id} -> {:error, :too_many_workflows, %{text: error_msg}} end ) @@ -1010,6 +1010,129 @@ defmodule Lightning.Projects.ProvisionerTest do assert Repo.reload(collection_to_delete) |> is_nil() assert remaining_collection.id == collection.id end + + test "usage limiter is called when creating collection", %{ + project: %{id: project_id} = project, + user: user + } do + collections_count = 3 + + Lightning.Extensions.MockUsageLimiter + |> Mox.expect( + :limit_action, + fn %{type: :github_sync}, %{project_id: ^project_id} -> :ok end + ) + |> Mox.expect( + :limit_action, + fn %{type: :activate_workflow}, %{project_id: ^project_id} -> :ok end + ) + |> Mox.expect( + :limit_action, + fn %{type: :new_collection, amount: ^collections_count}, + %{project_id: ^project_id} -> + :ok + end + ) + + %{body: body} = valid_document(project.id) + + body_with_collections = + Map.put( + body, + "collections", + Enum.map(1..collections_count, fn n -> + %{id: Ecto.UUID.generate(), name: "test-collection-#{n}"} + end) + ) + + {:ok, _project} = + Provisioner.import_document(project, user, body_with_collections) + end + + test "adds error incase create collection limiter returns error", %{ + project: %{id: project_id} = project, + user: user + } do + error_msg = "Oopsie Doopsie" + + Lightning.Extensions.MockUsageLimiter + |> Mox.expect( + :limit_action, + fn %{type: :github_sync}, %{project_id: ^project_id} -> :ok end + ) + |> Mox.expect( + :limit_action, + fn %{type: :activate_workflow}, %{project_id: ^project_id} -> :ok end + ) + |> Mox.expect( + :limit_action, + fn %{type: :new_collection}, %{project_id: ^project_id} -> + {:error, :exceeds_limit, %{text: error_msg}} + end + ) + + %{body: body} = valid_document(project.id) + + body_with_collections = + Map.put( + body, + "collections", + [%{id: Ecto.UUID.generate(), name: "test-collection"}] + ) + + assert {:error, changeset} = + Provisioner.import_document(project, user, body_with_collections) + + assert flatten_errors(changeset) == %{id: [error_msg]} + end + + test "collection hook is called when deleting a collection", %{ + project: %{id: project_id} = project, + user: user + } do + Mox.stub( + Lightning.Extensions.MockUsageLimiter, + :limit_action, + fn _action, %{project_id: ^project_id} -> + :ok + end + ) + + collection = insert(:collection, byte_size_sum: 1000, project: project) + + collection_to_delete_1 = + insert(:collection, byte_size_sum: 2500, project: project) + + collection_to_delete_2 = + insert(:collection, byte_size_sum: 3333, project: project) + + body = %{ + "id" => project_id, + "name" => "test-project", + "collections" => [ + %{"id" => collection.id}, + %{"id" => collection_to_delete_1.id, "delete" => true}, + %{"id" => collection_to_delete_2.id, "delete" => true} + ] + } + + expected_byte_size_sum = + collection_to_delete_1.byte_size_sum + + collection_to_delete_2.byte_size_sum + + Mox.expect( + Lightning.Extensions.MockCollectionHook, + :handle_delete, + fn ^project_id, ^expected_byte_size_sum -> + :ok + end + ) + + assert {:ok, %{id: ^project_id, collections: [remaining_collection]}} = + Provisioner.import_document(project, user, body) + + assert remaining_collection.id == collection.id + end end defp valid_document(project_id \\ nil, number_of_workflows \\ 1) do