Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validateur NeTEx : polling des résultats #4326

Merged
merged 11 commits into from
Dec 11, 2024
46 changes: 46 additions & 0 deletions apps/transport/lib/jobs/netex_poller_job.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
defmodule Transport.Jobs.NeTExPollerJob do
@moduledoc """
Companion module to the validator for NeTEx files, used to handle long
standing validations.
"""

# Max attempts doesn't really matter here as it's useful for workers failing.
# Here we mostly poll and excepted network errors, the worker won't fail.
@max_attempts 3

use Oban.Worker,
tags: ["validation"],
max_attempts: @max_attempts,
queue: :resource_validation,
unique: [fields: [:args, :worker]]

alias Transport.Validators.NeTEx

# Override the backoff to play nice and avoiding falling in very slow retry
# after an important streak of snoozing (which increments the `attempt`
# counter).
#
# See https://hexdocs.pm/oban/Oban.Worker.html#module-snoozing-jobs.
@impl Worker
def backoff(%Oban.Job{} = job) do
corrected_attempt = @max_attempts - (job.max_attempts - job.attempt)

Worker.backoff(%{job | attempt: corrected_attempt})
end

@impl Worker
def perform(%Oban.Job{
args: %{
"validation_id" => validation_id,
"resource_history_id" => resource_history_id
},
attempt: attempt
}) do
NeTEx.poll_validation_results(validation_id, attempt)
|> NeTEx.handle_validation_results(resource_history_id, fn ^validation_id -> snooze_poller(attempt) end)
end

def snooze_poller(attempt) do
{:snooze, NeTEx.poll_interval(attempt)}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

En lisant https://hexdocs.pm/oban/Oban.Worker.html#module-snoozing-jobs j'ai l'impression que max_attempts va être incrémenté et ne sera pas respecté.

C'est ce qu'on souhaite ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Damned, bien vu.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pour l'instant ma conclusion c'est que je peux pas proprement tester ce comportement...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

J'ai testé manuellement le comportement. Les helpers d'Oban n'émulent pas le snoozing.

end
end
91 changes: 91 additions & 0 deletions apps/transport/lib/jobs/on_demand_netex_poller_job.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
defmodule Transport.Jobs.OnDemandNeTExPollerJob do
@moduledoc """
Job in charge of polling validation results from enRoute Chouette Valid.

Upon success it stores the result in the database.
"""

# Max attempts doesn't really matter here as it's useful for workers failing.
# Here we mostly poll and excepted network errors, the worker won't fail.
@max_attempts 3

use Oban.Worker,
tags: ["validation"],
max_attempts: @max_attempts,
queue: :on_demand_validation,
unique: [fields: [:args, :worker]]

alias Transport.Jobs.OnDemandValidationHelpers, as: Helpers
alias Transport.Validators.NeTEx

# Override the backoff to play nice and avoiding falling in very slow retry
# after an important streak of snoozing (which increments the `attempt`
# counter).
#
# See https://hexdocs.pm/oban/Oban.Worker.html#module-snoozing-jobs.
@impl Worker
def backoff(%Oban.Job{} = job) do
corrected_attempt = @max_attempts - (job.max_attempts - job.attempt)

Worker.backoff(%{job | attempt: corrected_attempt})
end

@impl Oban.Worker
def perform(%Oban.Job{args: %{"id" => multivalidation_id} = args, attempt: attempt}) do
check_result(args, attempt)
|> Helpers.handle_validation_result(multivalidation_id)
end

def later(validation_id, multivalidation_id, url) do
%{validation_id: validation_id, id: multivalidation_id, permanent_url: url}
|> new(schedule_in: {20, :seconds})
|> Oban.insert()

Helpers.delegated_state()
end

def check_result(%{"permanent_url" => url, "validation_id" => validation_id}, attempt) do
case NeTEx.poll_validation(validation_id, attempt) do
{:error, error_result} -> handle_error(error_result)
{:ok, ok_result} -> handle_success(ok_result, url)
{:pending, _validation_id} -> handle_pending(attempt)
end
end

def handle_error(error_result) do
error_result
|> build_error_validation_result()
|> Helpers.terminal_state()
end

def handle_success(ok_result, url) do
ok_result
|> build_successful_validation_result(url)
|> Helpers.terminal_state()
end

def handle_pending(attempt) do
attempt
|> NeTEx.poll_interval()
|> Helpers.snoozed_state()
end

defp build_successful_validation_result(%{"validations" => validation, "metadata" => metadata}, url) do
%{
result: validation,
metadata: metadata,
data_vis: nil,
validator: NeTEx.validator_name(),
validated_data_name: url,
max_error: NeTEx.get_max_severity_error(validation),
oban_args: Helpers.completed()
}
end

defp build_error_validation_result(%{message: msg}) do
%{
oban_args: Helpers.error(msg),
validator: NeTEx.validator_name()
}
end
end
41 changes: 41 additions & 0 deletions apps/transport/lib/jobs/on_demand_validation_helpers.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
defmodule Transport.Jobs.OnDemandValidationHelpers do
@moduledoc """
Shared code for jobs implementing the On Demand validation.
"""
import Ecto.Changeset
import Ecto.Query
alias DB.{MultiValidation, Repo}

def terminal_state(result), do: {:terminal, result}
def delegated_state, do: :delegated
def snoozed_state(duration_in_seconds), do: {:snooze, duration_in_seconds}

def completed, do: %{"state" => "completed"}

def error(error_message), do: %{"state" => "error", "error_reason" => error_message}

def handle_validation_result(result, multivalidation_id) do
case result do
{:terminal, changes} -> update_multivalidation(multivalidation_id, changes)
:delegated -> :ok
{:snooze, _duration_in_seconds} -> result
end
end

defp update_multivalidation(multivalidation_id, changes) do
validation = %{oban_args: oban_args} = MultiValidation |> preload(:metadata) |> Repo.get!(multivalidation_id)

# update oban_args with validator output
oban_args = Map.merge(oban_args, Map.get(changes, :oban_args, %{}))
changes = changes |> Map.put(:oban_args, oban_args)

{metadata, changes} = Map.pop(changes, :metadata)

validation
|> change(changes)
|> put_assoc(:metadata, %{metadata: metadata})
|> Repo.update!()

:ok
end
end
89 changes: 32 additions & 57 deletions apps/transport/lib/jobs/on_demand_validation_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,40 @@ defmodule Transport.Jobs.OnDemandValidationJob do
"""
use Oban.Worker, tags: ["validation"], max_attempts: 5, queue: :on_demand_validation
require Logger
import Ecto.Changeset
import Ecto.Query
alias DB.{MultiValidation, Repo}
alias Shared.Validation.JSONSchemaValidator.Wrapper, as: JSONSchemaValidator
alias Shared.Validation.TableSchemaValidator.Wrapper, as: TableSchemaValidator
alias Transport.DataVisualization
alias Transport.Jobs.OnDemandNeTExPollerJob
alias Transport.Jobs.OnDemandValidationHelpers, as: Helpers
alias Transport.Validators.GTFSRT
alias Transport.Validators.GTFSTransport
alias Transport.Validators.NeTEx

@download_timeout_ms 10_000

@impl Oban.Worker
def perform(%Oban.Job{args: %{"id" => multivalidation_id, "state" => "waiting"} = payload}) do
changes =
result =
try do
perform_validation(payload)
rescue
e -> %{oban_args: %{"state" => "error", "error_reason" => inspect(e)}}
e -> %{oban_args: Helpers.error(inspect(e))} |> Helpers.terminal_state()
end

validation = %{oban_args: oban_args} = MultiValidation |> preload(:metadata) |> Repo.get!(multivalidation_id)

# update oban_args with validator output
oban_args = Map.merge(oban_args, changes.oban_args)
changes = changes |> Map.put(:oban_args, oban_args)

{metadata, changes} = Map.pop(changes, :metadata)

validation
|> change(changes)
|> put_assoc(:metadata, %{
metadata: metadata
})
|> Repo.update!()

Helpers.handle_validation_result(result, multivalidation_id)
after
if Map.has_key?(payload, "filename") do
Transport.S3.delete_object!(:on_demand_validation, payload["filename"])
end

:ok
end

defp perform_validation(%{"type" => "gtfs", "permanent_url" => url}) do
validator = GTFSTransport.validator_name()

case GTFSTransport.validate(url) do
{:error, msg} ->
%{oban_args: %{"state" => "error", "error_reason" => msg}, validator: validator}
%{oban_args: Helpers.error(msg), validator: validator}
|> Helpers.terminal_state()

{:ok, %{"validations" => validation, "metadata" => metadata}} ->
%{
Expand All @@ -65,32 +51,22 @@ defmodule Transport.Jobs.OnDemandValidationJob do
command: GTFSTransport.command(url),
validated_data_name: url,
max_error: GTFSTransport.get_max_severity_error(validation),
oban_args: %{
"state" => "completed"
}
oban_args: Helpers.completed()
}
|> Helpers.terminal_state()
end
end

defp perform_validation(%{"type" => "netex", "permanent_url" => url}) do
validator = NeTEx.validator_name()
defp perform_validation(%{"type" => "netex", "id" => multivalidation_id, "permanent_url" => url}) do
case NeTEx.validate(url) do
{:error, error_result} ->
OnDemandNeTExPollerJob.handle_error(error_result)

case NeTEx.validate(url, []) do
{:error, %{message: msg}} ->
%{oban_args: %{"state" => "error", "error_reason" => msg}, validator: validator}
{:ok, ok_result} ->
OnDemandNeTExPollerJob.handle_success(ok_result, url)

{:ok, %{"validations" => validation, "metadata" => metadata}} ->
%{
result: validation,
metadata: metadata,
data_vis: nil,
validator: validator,
validated_data_name: url,
max_error: NeTEx.get_max_severity_error(validation),
oban_args: %{
"state" => "completed"
}
}
{:pending, validation_id} ->
OnDemandNeTExPollerJob.later(validation_id, multivalidation_id, url)
end
end

Expand All @@ -103,12 +79,14 @@ defmodule Transport.Jobs.OnDemandValidationJob do

case TableSchemaValidator.validate(schema_name, url) do
nil ->
%{oban_args: %{"state" => "error", "error_reason" => "could not perform validation"}, validator: validator}
%{oban_args: Helpers.error("could not perform validation"), validator: validator}
|> Helpers.terminal_state()

# https://github.com/etalab/transport-site/issues/2390
# validator name should come from validator module, when it is properly extracted
validation ->
%{oban_args: %{"state" => "completed"}, result: validation, validator: validator}
%{oban_args: Helpers.completed(), result: validation, validator: validator}
|> Helpers.terminal_state()
end
end

Expand All @@ -127,15 +105,14 @@ defmodule Transport.Jobs.OnDemandValidationJob do
) do
nil ->
%{
oban_args: %{
"state" => "error",
"error_reason" => "could not perform validation"
},
oban_args: Helpers.error("could not perform validation"),
validator: validator
}
|> Helpers.terminal_state()

validation ->
%{oban_args: %{"state" => "completed"}, result: validation, validator: validator}
%{oban_args: Helpers.completed(), result: validation, validator: validator}
|> Helpers.terminal_state()
end
end

Expand All @@ -155,11 +132,12 @@ defmodule Transport.Jobs.OnDemandValidationJob do

result
|> Map.merge(%{validated_data_name: gtfs_rt_url, secondary_validated_data_name: gtfs_url})
|> Helpers.terminal_state()
end

defp normalize_download(result) do
case result do
{:error, reason} -> {:error, %{"state" => "error", "error_reason" => reason}}
{:error, reason} -> {:error, Helpers.error(reason)}
{:ok, path, _} -> {:ok, path}
end
end
Expand Down Expand Up @@ -191,26 +169,23 @@ defmodule Transport.Jobs.OnDemandValidationJob do
# https://github.com/etalab/transport-site/issues/2390
# to do: transport-tools version when available
%{
oban_args: %{"state" => "completed"},
oban_args: Helpers.completed(),
result: validation,
validator: GTFSRT.validator_name(),
command: inspect(validator_args)
}

:error ->
%{
oban_args: %{
"state" => "error",
"error_reason" => "Could not run validator. Please provide a GTFS and a GTFS-RT."
}
oban_args: Helpers.error("Could not run validator. Please provide a GTFS and a GTFS-RT.")
}
end

{:error, reason} ->
if not ignore_shapes and String.contains?(reason, "java.lang.OutOfMemoryError") do
run_save_gtfs_rt_validation(gtfs_path, gtfs_rt_path, ignore_shapes: true)
else
%{oban_args: %{"state" => "error", "error_reason" => inspect(reason)}}
%{oban_args: Helpers.error(inspect(reason))}
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ defmodule TransportWeb.ResourceController do
Transport.Validators.GTFSRT,
Transport.Validators.GBFSValidator,
Transport.Validators.TableSchema,
Transport.Validators.EXJSONSchema
Transport.Validators.EXJSONSchema,
Transport.Validators.NeTEx
])

def details(conn, %{"id" => id} = params) do
Expand Down
Loading
Loading