From 14b3072ec9741136aab758705fe201d4cdf3a092 Mon Sep 17 00:00:00 2001 From: hamzawaleed01 Date: Thu, 20 Jun 2024 18:24:49 +0500 Subject: [PATCH 1/2] feat: move permissions from class level to action level via decorators --- .../api/v1/views/enterprise_catalog_crud.py | 69 ++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/enterprise_catalog/apps/api/v1/views/enterprise_catalog_crud.py b/enterprise_catalog/apps/api/v1/views/enterprise_catalog_crud.py index 38b31dfc5..b733bf3f8 100644 --- a/enterprise_catalog/apps/api/v1/views/enterprise_catalog_crud.py +++ b/enterprise_catalog/apps/api/v1/views/enterprise_catalog_crud.py @@ -17,12 +17,43 @@ enterprises_with_admin_access, has_access_to_all_enterprises, ) +from edx_rbac.decorators import permission_required as permission_required_rbac + +from functools import wraps +from rest_framework.exceptions import PermissionDenied +from django.utils.decorators import method_decorator + +# temporarily added this decorator here +def has_permission_or_group(permission, group_name, fn=None): + """ + Ensure that user has permission to access the endpoint OR is part of a group that has access. + """ + def decorator(view_func): + @wraps(view_func) + def _wrapped_view(request, *args, **kwargs): + user = request.user + view = request.parser_context['view'] + action = view.action + # Check for list action specific permissions + pk = fn(request, **kwargs) if fn else kwargs.get('uuid') + if pk: + has_permission = user.has_perm(permission, pk) + else: + has_permission = user.has_perm(permission) + + if has_permission or user.groups.filter(name=group_name).exists(): + return view_func(request, *args, **kwargs) + else: + raise PermissionDenied( + "Access denied: Only admins and provisioning admins are allowed to access this endpoint.") + return _wrapped_view + return decorator class EnterpriseCatalogCRUDViewSet(BaseViewSet, viewsets.ModelViewSet): """ Viewset for CRUD operations on Enterprise Catalogs """ renderer_classes = [JSONRenderer, XMLRenderer] - permission_required = 'catalog.has_admin_access' + permission_required = [] lookup_field = 'uuid' pagination_class = PageNumberWithSizePagination @@ -30,6 +61,12 @@ class EnterpriseCatalogCRUDViewSet(BaseViewSet, viewsets.ModelViewSet): def request_action(self): return getattr(self, 'action', None) + def get_permission_required(self): + """ + Return specific permission name based on the view being requested + """ + return self.permission_required + @cached_property def admin_accessible_enterprises(self): """ @@ -91,3 +128,33 @@ def get_queryset(self): return all_catalogs return all_catalogs.filter(enterprise_uuid__in=self.admin_accessible_enterprises) return all_catalogs + + # @method_decorator(has_permission_or_group(permission='catalog.has_admin_access', group_name='test')) + @permission_required_rbac('catalog.has_admin_access') + def list(self, request, *args, **kwargs): + return super().list(request, *args, **kwargs) + + # @method_decorator(has_permission_or_group(permission='catalog.has_admin_access', group_name='test')) + @permission_required_rbac('catalog.has_admin_access') + def create(self, request, *args, **kwargs): + return super().create(request, *args, **kwargs) + + # @method_decorator(has_permission_or_group(permission='catalog.has_admin_access', group_name='test',fn=lambda request, uuid: uuid)) + @permission_required_rbac('catalog.has_admin_access') + def retrieve(self, request, *args, **kwargs): + return super().retrieve(request, *args, **kwargs) + + # @method_decorator(has_permission_or_group(permission='catalog.has_admin_access', group_name='test)) + @permission_required_rbac('catalog.has_admin_access') + def update(self, request, *args, **kwargs): + return super().update(request, *args, **kwargs) + + # @method_decorator(has_permission_or_group(permission='catalog.has_admin_access', group_name='test')) + @permission_required_rbac('catalog.has_admin_access') + def partial_update(self, request, *args, **kwargs): + return super().partial_update(request, *args, **kwargs) + + # @method_decorator(has_permission_or_group(permission='catalog.has_admin_access', group_name='test')) + @permission_required_rbac('catalog.has_admin_access') + def destroy(self, request, *args, **kwargs): + return super().destroy(request, *args, **kwargs) From 10f5bcbf671f8d1f6674c14e7d381f922c0a665c Mon Sep 17 00:00:00 2001 From: hamzawaleed01 Date: Fri, 21 Jun 2024 17:05:02 +0500 Subject: [PATCH 2/2] feat: grants provisioning admins access to few api views --- .../apps/api/v1/tests/test_views.py | 88 +++++++++++++++++++ .../api/v1/views/enterprise_catalog_crud.py | 82 +++-------------- enterprise_catalog/apps/catalog/constants.py | 1 + 3 files changed, 101 insertions(+), 70 deletions(-) diff --git a/enterprise_catalog/apps/api/v1/tests/test_views.py b/enterprise_catalog/apps/api/v1/tests/test_views.py index ab7b9e5b7..6f598bce9 100644 --- a/enterprise_catalog/apps/api/v1/tests/test_views.py +++ b/enterprise_catalog/apps/api/v1/tests/test_views.py @@ -9,6 +9,7 @@ import ddt import pytz from django.conf import settings +from django.contrib.auth.models import Group from django.db import IntegrityError from django.utils.http import urlencode from django.utils.text import slugify @@ -31,6 +32,7 @@ EXEC_ED_2U_ENTITLEMENT_MODE, LEARNER_PATHWAY, PROGRAM, + PROVISIONING_ADMINS_GROUP, ) from enterprise_catalog.apps.catalog.models import ( CatalogQuery, @@ -177,6 +179,7 @@ def setUp(self): 'publish_audit_enrollment_urls': True, 'content_filter': {'content_type': 'course'}, } + self.allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) def _assert_correct_new_catalog_data(self, catalog_uuid): """ @@ -250,6 +253,22 @@ def test_detail(self, is_implicit_check): self.assertEqual(data['title'], self.enterprise_catalog.title) self.assertEqual(uuid.UUID(data['enterprise_customer']), self.enterprise_catalog.enterprise_uuid) + def test_detail_provisioning_admin(self): + """ + Verify the viewset returns the details if requesting user is a PA + """ + self.set_up_staff_user() + self.remove_role_assignments() + self.set_up_invalid_jwt_role() + self.user.groups.add(self.allowed_group) + url = reverse('api:v1:enterprise-catalog-detail', kwargs={'uuid': self.enterprise_catalog.uuid}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + data = response.data + self.assertEqual(uuid.UUID(data['uuid']), self.enterprise_catalog.uuid) + self.assertEqual(data['title'], self.enterprise_catalog.title) + self.assertEqual(uuid.UUID(data['enterprise_customer']), self.enterprise_catalog.enterprise_uuid) + def test_detail_unauthorized_non_catalog_admin(self): """ Verify the viewset rejects users that are not catalog admins for the detail route @@ -297,6 +316,29 @@ def test_patch(self, is_implicit_check): self.enterprise_catalog.publish_audit_enrollment_urls, ) + def test_patch_provisioning_admins(self): + """ + Verify the viewset handles patching an enterprise catalog + """ + self.set_up_staff_user() + self.remove_role_assignments() + self.set_up_invalid_jwt_role() + self.user.groups.add(self.allowed_group) + url = reverse('api:v1:enterprise-catalog-detail', kwargs={'uuid': self.enterprise_catalog.uuid}) + patch_data = {'title': 'Patch title'} + response = self.client.patch(url, patch_data) + self.assertEqual(response.status_code, status.HTTP_200_OK) + # Verify that only the data we specifically patched changed + self.assertEqual(response.data['title'], patch_data['title']) + patched_catalog = EnterpriseCatalog.objects.get(uuid=self.enterprise_catalog.uuid) + self.assertEqual(patched_catalog.catalog_query, self.enterprise_catalog.catalog_query) + self.assertEqual(patched_catalog.enterprise_uuid, self.enterprise_catalog.enterprise_uuid) + self.assertEqual(patched_catalog.enabled_course_modes, self.enterprise_catalog.enabled_course_modes) + self.assertEqual( + patched_catalog.publish_audit_enrollment_urls, + self.enterprise_catalog.publish_audit_enrollment_urls, + ) + def test_patch_unauthorized_non_catalog_admin(self): """ Verify the viewset rejects patch for users that are not catalog admins @@ -336,6 +378,19 @@ def test_put(self, is_implicit_check): self.assertEqual(response.status_code, status.HTTP_200_OK) self._assert_correct_new_catalog_data(self.enterprise_catalog.uuid) # The UUID should not have changed + def test_put_provisioning_admins(self): + """ + Verify the viewset allows access to PAs + """ + self.set_up_staff_user() + self.remove_role_assignments() + self.set_up_invalid_jwt_role() + self.user.groups.add(self.allowed_group) + url = reverse('api:v1:enterprise-catalog-detail', kwargs={'uuid': self.enterprise_catalog.uuid}) + response = self.client.put(url, self.new_catalog_data) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self._assert_correct_new_catalog_data(self.enterprise_catalog.uuid) # The UUID should not have changed + def test_put_unauthorized_non_catalog_admin(self): """ Verify the viewset rejects put for users that are not catalog admins @@ -413,6 +468,19 @@ def test_post(self, is_implicit_check): self.assertEqual(response.status_code, status.HTTP_201_CREATED) self._assert_correct_new_catalog_data(self.new_catalog_uuid) + def test_post_provisioning_admins(self): + """ + Verify the viewset handles creating an enterprise catalog + """ + self.set_up_staff_user() + self.remove_role_assignments() + self.set_up_invalid_jwt_role() + self.user.groups.add(self.allowed_group) + url = reverse('api:v1:enterprise-catalog-list') + response = self.client.post(url, self.new_catalog_data) + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + self._assert_correct_new_catalog_data(self.new_catalog_uuid) + def test_post_integrity_error(self): """ Verify the viewset raises error when creating a duplicate enterprise catalog @@ -464,6 +532,7 @@ def setUp(self): super().setUp() self.set_up_staff_user() self.enterprise_catalog = EnterpriseCatalogFactory(enterprise_uuid=self.enterprise_uuid) + self.allowed_group = Group.objects.create(name=PROVISIONING_ADMINS_GROUP) def test_list_for_superusers(self): """ @@ -479,6 +548,25 @@ def test_list_for_superusers(self): self.assertEqual(uuid.UUID(results[0]['uuid']), self.enterprise_catalog.uuid) self.assertEqual(uuid.UUID(results[1]['uuid']), second_enterprise_catalog.uuid) + def test_list_for_provisioning_admins(self): + """ + Verify the viewset returns a list of all enterprise catalogs for provisioning admins + """ + self.set_up_staff_user() + self.remove_role_assignments() + self.set_up_invalid_jwt_role() + self.user.groups.add(self.allowed_group) + url = reverse('api:v1:enterprise-catalog-list') + second_enterprise_catalog = EnterpriseCatalogFactory() + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data['count'], 2) + results = response.data['results'] + self.assertEqual( + uuid.UUID(results[0]['uuid']), self.enterprise_catalog.uuid) + self.assertEqual( + uuid.UUID(results[1]['uuid']), second_enterprise_catalog.uuid) + def test_empty_list_for_non_catalog_admin(self): """ Verify the viewset returns an empty list for users that are staff but not catalog admins. diff --git a/enterprise_catalog/apps/api/v1/views/enterprise_catalog_crud.py b/enterprise_catalog/apps/api/v1/views/enterprise_catalog_crud.py index b733bf3f8..862af881b 100644 --- a/enterprise_catalog/apps/api/v1/views/enterprise_catalog_crud.py +++ b/enterprise_catalog/apps/api/v1/views/enterprise_catalog_crud.py @@ -12,48 +12,18 @@ EnterpriseCatalogSerializer, ) from enterprise_catalog.apps.api.v1.views.base import BaseViewSet +from enterprise_catalog.apps.catalog.constants import PROVISIONING_ADMINS_GROUP from enterprise_catalog.apps.catalog.models import EnterpriseCatalog from enterprise_catalog.apps.catalog.rules import ( enterprises_with_admin_access, has_access_to_all_enterprises, ) -from edx_rbac.decorators import permission_required as permission_required_rbac - -from functools import wraps -from rest_framework.exceptions import PermissionDenied -from django.utils.decorators import method_decorator - -# temporarily added this decorator here -def has_permission_or_group(permission, group_name, fn=None): - """ - Ensure that user has permission to access the endpoint OR is part of a group that has access. - """ - def decorator(view_func): - @wraps(view_func) - def _wrapped_view(request, *args, **kwargs): - user = request.user - view = request.parser_context['view'] - action = view.action - # Check for list action specific permissions - pk = fn(request, **kwargs) if fn else kwargs.get('uuid') - if pk: - has_permission = user.has_perm(permission, pk) - else: - has_permission = user.has_perm(permission) - - if has_permission or user.groups.filter(name=group_name).exists(): - return view_func(request, *args, **kwargs) - else: - raise PermissionDenied( - "Access denied: Only admins and provisioning admins are allowed to access this endpoint.") - return _wrapped_view - return decorator class EnterpriseCatalogCRUDViewSet(BaseViewSet, viewsets.ModelViewSet): """ Viewset for CRUD operations on Enterprise Catalogs """ renderer_classes = [JSONRenderer, XMLRenderer] - permission_required = [] + permission_required = 'catalog.has_admin_access' lookup_field = 'uuid' pagination_class = PageNumberWithSizePagination @@ -61,12 +31,6 @@ class EnterpriseCatalogCRUDViewSet(BaseViewSet, viewsets.ModelViewSet): def request_action(self): return getattr(self, 'action', None) - def get_permission_required(self): - """ - Return specific permission name based on the view being requested - """ - return self.permission_required - @cached_property def admin_accessible_enterprises(self): """ @@ -101,6 +65,11 @@ def check_permissions(self, request): If `get_permission_object` is implemented, it will be called and should return the object for which the `rules` predicate checks against. """ + # Grant provisioning-admins access to few actions + if self.request_action in ('create', 'partial_update', 'update', 'retrieve', 'list') and \ + request.user.groups.filter(name=PROVISIONING_ADMINS_GROUP).exists(): + return + if self.request_action == 'list': # Super-users and staff won't get Forbidden responses, # but depending on their assigned roles, staff may @@ -115,46 +84,19 @@ def check_permissions(self, request): def get_queryset(self): """ Returns the queryset corresponding to all catalogs the requesting user has access to. + Also allows provisioning admins to access all catalogs. """ all_catalogs = EnterpriseCatalog.objects.all().order_by('created') enterprise_customer = self.request.GET.get('enterprise_customer', False) + is_provisioning_admin = self.request.user.groups.filter( + name=PROVISIONING_ADMINS_GROUP).exists() if enterprise_customer: all_catalogs = all_catalogs.filter(enterprise_uuid=enterprise_customer) if self.request_action == 'list': - if not self.admin_accessible_enterprises: + if not self.admin_accessible_enterprises and not is_provisioning_admin: return EnterpriseCatalog.objects.none() - if has_access_to_all_enterprises(self.admin_accessible_enterprises): + if has_access_to_all_enterprises(self.admin_accessible_enterprises) or is_provisioning_admin: return all_catalogs return all_catalogs.filter(enterprise_uuid__in=self.admin_accessible_enterprises) return all_catalogs - - # @method_decorator(has_permission_or_group(permission='catalog.has_admin_access', group_name='test')) - @permission_required_rbac('catalog.has_admin_access') - def list(self, request, *args, **kwargs): - return super().list(request, *args, **kwargs) - - # @method_decorator(has_permission_or_group(permission='catalog.has_admin_access', group_name='test')) - @permission_required_rbac('catalog.has_admin_access') - def create(self, request, *args, **kwargs): - return super().create(request, *args, **kwargs) - - # @method_decorator(has_permission_or_group(permission='catalog.has_admin_access', group_name='test',fn=lambda request, uuid: uuid)) - @permission_required_rbac('catalog.has_admin_access') - def retrieve(self, request, *args, **kwargs): - return super().retrieve(request, *args, **kwargs) - - # @method_decorator(has_permission_or_group(permission='catalog.has_admin_access', group_name='test)) - @permission_required_rbac('catalog.has_admin_access') - def update(self, request, *args, **kwargs): - return super().update(request, *args, **kwargs) - - # @method_decorator(has_permission_or_group(permission='catalog.has_admin_access', group_name='test')) - @permission_required_rbac('catalog.has_admin_access') - def partial_update(self, request, *args, **kwargs): - return super().partial_update(request, *args, **kwargs) - - # @method_decorator(has_permission_or_group(permission='catalog.has_admin_access', group_name='test')) - @permission_required_rbac('catalog.has_admin_access') - def destroy(self, request, *args, **kwargs): - return super().destroy(request, *args, **kwargs) diff --git a/enterprise_catalog/apps/catalog/constants.py b/enterprise_catalog/apps/catalog/constants.py index 3cd114a10..e923728a5 100644 --- a/enterprise_catalog/apps/catalog/constants.py +++ b/enterprise_catalog/apps/catalog/constants.py @@ -103,6 +103,7 @@ } FORCE_INCLUSION_METADATA_TAG_KEY = 'enterprise_force_included' +PROVISIONING_ADMINS_GROUP = "provisioning-admins-group" def json_serialized_course_modes():