Skip to content

Commit

Permalink
[uss_qualifier] NetRIDServiceProvider function for querying user noti…
Browse files Browse the repository at this point in the history
…fications
  • Loading branch information
Shastick committed Jan 31, 2025
1 parent 6680d14 commit 8f0f768
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
4 changes: 4 additions & 0 deletions monitoring/monitorlib/fetch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ class QueryType(str, Enum):
"interuss.automated_testing.rid.v1.injection.deleteTest"
)

InterussRIDAutomatedTestingV1UserNotifications = (
"interuss.automated_testing.rid.v1.injection.UserNotifications"
)

# InterUSS mock_uss
InterUSSMockUSSGetLogs = "interuss.mock_uss.logging.interaction_logs"
InterUSSMockUSSGetLocality = "interuss.mock_uss.locality.locality_get"
Expand Down
26 changes: 25 additions & 1 deletion monitoring/uss_qualifier/resources/netrid/service_providers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import datetime
from typing import List, Optional
from typing import List, Optional, Tuple
from urllib.parse import urlparse

from implicitdict import ImplicitDict
from uas_standards.interuss.automated_testing.rid.v1.injection import QueryUserNotificationsResponse

from monitoring.monitorlib import fetch, infrastructure
from monitoring.monitorlib.rid import RIDVersion
Expand Down Expand Up @@ -73,6 +74,29 @@ def delete_test(self, test_id: str, version: str) -> fetch.Query:
query_type=fetch.QueryType.InterussRIDAutomatedTestingV1DeleteTest,
)

def get_user_notifications(
self,
after: datetime.datetime,
before: datetime.datetime,
) -> Tuple[QueryUserNotificationsResponse, fetch.Query]:
q = fetch.query_and_describe(
self.injection_client,
"GET",
url=f"/user_notifications",
scope=SCOPE_RID_QUALIFIER_INJECT,
participant_id=self.participant_id,
query_type=fetch.QueryType.InterussRIDAutomatedTestingV1UserNotifications,
params={
"after": after,
"before": before,
},
)

if q.error_message:
return QueryUserNotificationsResponse(user_notifications=[]), q

return ImplicitDict.parse(q.response.json, QueryUserNotificationsResponse), q


class NetRIDServiceProviders(Resource[NetRIDServiceProvidersSpecification]):
service_providers: List[NetRIDServiceProvider]
Expand Down

0 comments on commit 8f0f768

Please sign in to comment.