Skip to content

Commit

Permalink
Limit collections in the provisioner (#2910)
Browse files Browse the repository at this point in the history
* implement collection limiter in the provisioner

* update changelog

* update changelog

* rename CollectionHook.handle_delete/1 to handle_delete/2
  • Loading branch information
midigofrank authored Feb 6, 2025
1 parent 700c621 commit ab071b9
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 7 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ and this project adheres to

- Extend provisioner to support collections
[#2830](https://github.com/OpenFn/lightning/issues/2830)
- Add collection limiter in the provisioner
[PR#2910](https://github.com/OpenFn/lightning/pull/2910)
- Adds project name to failure alert email
[#2884](https://github.com/OpenFn/lightning/pull/2884)
- Allow project users to manage collections
Expand Down
7 changes: 6 additions & 1 deletion lib/lightning/collections.ex
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,12 @@ defmodule Lightning.Collections do

collection ->
Repo.transact(fn ->
:ok = CollectionHook.handle_delete(collection)
:ok =
CollectionHook.handle_delete(
collection.project_id,
collection.byte_size_sum
)

Repo.delete(collection)
end)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/lightning/extensions/collection_hook.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Lightning.Extensions.CollectionHook do
def handle_create(_attrs), do: :ok

@impl true
def handle_delete(_col), do: :ok
def handle_delete(_project_id, _size), do: :ok

@impl true
def handle_put_items(_col, _size), do: :ok
Expand Down
5 changes: 4 additions & 1 deletion lib/lightning/extensions/collection_hooking.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ defmodule Lightning.Extensions.CollectionHooking do

@callback handle_create(attrs :: map()) :: :ok | limit_error()

@callback handle_delete(Collection.t()) :: :ok
@callback handle_delete(
project_id :: Ecto.UUID.t(),
delta_size :: neg_integer()
) :: :ok

@callback handle_put_items(Collection.t(), delta_size :: integer()) ::
:ok | limit_error()
Expand Down
1 change: 1 addition & 0 deletions lib/lightning/extensions/usage_limiting.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ defmodule Lightning.Extensions.UsageLimiting do
| :ai_usage
| :alert_failure
| :github_sync
| :new_collection
| :new_run
| :new_user
| :require_mfa,
Expand Down
55 changes: 55 additions & 0 deletions lib/lightning/projects/provisioner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ defmodule Lightning.Projects.Provisioner do
alias Ecto.Multi
alias Lightning.Accounts.User
alias Lightning.Collections.Collection
alias Lightning.Extensions.UsageLimiting.Action
alias Lightning.Extensions.UsageLimiting.Context
alias Lightning.Projects.Project
alias Lightning.Projects.ProjectCredential
alias Lightning.Projects.ProjectUser
alias Lightning.Repo
alias Lightning.Services.CollectionHook
alias Lightning.Services.UsageLimiter
alias Lightning.VersionControl.ProjectRepoConnection
alias Lightning.VersionControl.VersionControlUsageLimiter
alias Lightning.Workflows
Expand Down Expand Up @@ -53,6 +57,7 @@ defmodule Lightning.Projects.Provisioner do
build_import_changeset(project, user_or_repo_connection, data),
{:ok, %{workflows: workflows} = project} <-
Repo.insert_or_update(project_changeset),
:ok <- handle_collection_deletion(project_changeset),
updated_project <- preload_dependencies(project),
{:ok, _changes} <-
audit_workflows(project_changeset, user_or_repo_connection),
Expand Down Expand Up @@ -195,6 +200,56 @@ defmodule Lightning.Projects.Provisioner do
add_error(changeset, :id, message)
end
end)
|> then(fn changeset ->
case limit_collection_creation(changeset) do
:ok ->
changeset

{:error, _reason, %{text: message}} ->
add_error(changeset, :id, message)
end
end)
end

defp limit_collection_creation(changeset) do
new_collections_count =
changeset
|> get_assoc(:collections)
|> Enum.count(fn collection_changeset ->
# We only want to count collections that are being inserted
collection_changeset.data.__meta__.state == :built
end)

if new_collections_count > 0 do
UsageLimiter.limit_action(
%Action{type: :new_collection, amount: new_collections_count},
%Context{project_id: changeset.data.id}
)
else
:ok
end
end

defp handle_collection_deletion(project_changeset) do
deleted_size =
project_changeset
|> get_assoc(:collections)
|> Enum.reduce(0, fn collection_changeset, sum ->
if get_change(collection_changeset, :delete) do
sum + get_field(collection_changeset, :byte_size_sum)
else
sum
end
end)

if deleted_size > 0 do
CollectionHook.handle_delete(
project_changeset.data.id,
deleted_size
)
else
:ok
end
end

defp maybe_add_project_user(changeset, user_or_repo_connection) do
Expand Down
4 changes: 2 additions & 2 deletions lib/lightning/services/collection_hook.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ defmodule Lightning.Services.CollectionHook do
end

@impl true
def handle_delete(collection) do
adapter().handle_delete(collection)
def handle_delete(project_id, collection_byte_size) do
adapter().handle_delete(project_id, collection_byte_size)
end

@impl true
Expand Down
127 changes: 125 additions & 2 deletions test/lightning/projects/provisioner_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ defmodule Lightning.Projects.ProvisionerTest do
}
end

test "adds error incase limit action is returns error", %{
test "adds error incase workflow limit action returns error", %{
project: %{id: project_id} = project,
user: user
} do
Expand All @@ -787,7 +787,7 @@ defmodule Lightning.Projects.ProvisionerTest do
)
|> Mox.expect(
:limit_action,
fn _action, %{project_id: ^project_id} ->
fn %{type: :activate_workflow}, %{project_id: ^project_id} ->
{:error, :too_many_workflows, %{text: error_msg}}
end
)
Expand Down Expand Up @@ -1010,6 +1010,129 @@ defmodule Lightning.Projects.ProvisionerTest do
assert Repo.reload(collection_to_delete) |> is_nil()
assert remaining_collection.id == collection.id
end

test "usage limiter is called when creating collection", %{
project: %{id: project_id} = project,
user: user
} do
collections_count = 3

Lightning.Extensions.MockUsageLimiter
|> Mox.expect(
:limit_action,
fn %{type: :github_sync}, %{project_id: ^project_id} -> :ok end
)
|> Mox.expect(
:limit_action,
fn %{type: :activate_workflow}, %{project_id: ^project_id} -> :ok end
)
|> Mox.expect(
:limit_action,
fn %{type: :new_collection, amount: ^collections_count},
%{project_id: ^project_id} ->
:ok
end
)

%{body: body} = valid_document(project.id)

body_with_collections =
Map.put(
body,
"collections",
Enum.map(1..collections_count, fn n ->
%{id: Ecto.UUID.generate(), name: "test-collection-#{n}"}
end)
)

{:ok, _project} =
Provisioner.import_document(project, user, body_with_collections)
end

test "adds error incase create collection limiter returns error", %{
project: %{id: project_id} = project,
user: user
} do
error_msg = "Oopsie Doopsie"

Lightning.Extensions.MockUsageLimiter
|> Mox.expect(
:limit_action,
fn %{type: :github_sync}, %{project_id: ^project_id} -> :ok end
)
|> Mox.expect(
:limit_action,
fn %{type: :activate_workflow}, %{project_id: ^project_id} -> :ok end
)
|> Mox.expect(
:limit_action,
fn %{type: :new_collection}, %{project_id: ^project_id} ->
{:error, :exceeds_limit, %{text: error_msg}}
end
)

%{body: body} = valid_document(project.id)

body_with_collections =
Map.put(
body,
"collections",
[%{id: Ecto.UUID.generate(), name: "test-collection"}]
)

assert {:error, changeset} =
Provisioner.import_document(project, user, body_with_collections)

assert flatten_errors(changeset) == %{id: [error_msg]}
end

test "collection hook is called when deleting a collection", %{
project: %{id: project_id} = project,
user: user
} do
Mox.stub(
Lightning.Extensions.MockUsageLimiter,
:limit_action,
fn _action, %{project_id: ^project_id} ->
:ok
end
)

collection = insert(:collection, byte_size_sum: 1000, project: project)

collection_to_delete_1 =
insert(:collection, byte_size_sum: 2500, project: project)

collection_to_delete_2 =
insert(:collection, byte_size_sum: 3333, project: project)

body = %{
"id" => project_id,
"name" => "test-project",
"collections" => [
%{"id" => collection.id},
%{"id" => collection_to_delete_1.id, "delete" => true},
%{"id" => collection_to_delete_2.id, "delete" => true}
]
}

expected_byte_size_sum =
collection_to_delete_1.byte_size_sum +
collection_to_delete_2.byte_size_sum

Mox.expect(
Lightning.Extensions.MockCollectionHook,
:handle_delete,
fn ^project_id, ^expected_byte_size_sum ->
:ok
end
)

assert {:ok, %{id: ^project_id, collections: [remaining_collection]}} =
Provisioner.import_document(project, user, body)

assert remaining_collection.id == collection.id
end
end

defp valid_document(project_id \\ nil, number_of_workflows \\ 1) do
Expand Down

0 comments on commit ab071b9

Please sign in to comment.