generated from MinBZK/python-project-template
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
8015d9a
commit 5779573
Showing
23 changed files
with
600 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
from collections.abc import Callable | ||
from functools import wraps | ||
from typing import Any | ||
|
||
from fastapi import HTTPException, Request | ||
|
||
from amt.core.exceptions import AMTPermissionDenied | ||
|
||
|
||
# note: propably needs to change to understand all the api context to be usefull | ||
def add_permissions(permissions: dict[str, list[str]]) -> Callable[[Callable[..., Any]], Callable[..., Any]]: | ||
def decorator(func: Callable[..., Any]) -> Callable[..., Any]: | ||
@wraps(func) | ||
async def wrapper(*args: Any, **kwargs: Any) -> Any: # noqa: ANN401 | ||
request = kwargs.get("request") | ||
if not isinstance(request, Request): # todo: change exception to custom exception | ||
raise HTTPException(status_code=400, detail="Request object is missing") | ||
|
||
for permission, verbs in permissions.items(): | ||
request_permissions: dict[str, list[str]] = ( | ||
request.state.permissions if hasattr(request.state, "permissions") else {} | ||
) | ||
if permission not in request_permissions: | ||
raise AMTPermissionDenied() | ||
for verb in verbs: | ||
if verb not in request.state.permissions[permission]: | ||
raise AMTPermissionDenied() | ||
|
||
return await func(*args, **kwargs) | ||
|
||
return wrapper | ||
|
||
return decorator |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
113 changes: 113 additions & 0 deletions
113
amt/migrations/versions/e16bb3d53cd6_authorization_system.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
"""authorization system | ||
Revision ID: e16bb3d53cd6 | ||
Revises: 5de977ad946f | ||
Create Date: 2024-12-23 08:32:15.194858 | ||
""" | ||
|
||
from collections.abc import Sequence | ||
|
||
import sqlalchemy as sa | ||
from alembic import op | ||
from amt.core.authorization import AuthorizationResource, AuthorizationVerb, AuthorizationType | ||
from sqlalchemy.orm.session import Session | ||
from amt.models import User, Organization | ||
|
||
# revision identifiers, used by Alembic. | ||
revision: str = "e16bb3d53cd6" | ||
down_revision: str | None = "5de977ad946f" | ||
branch_labels: str | Sequence[str] | None = None | ||
depends_on: str | Sequence[str] | None = None | ||
|
||
|
||
|
||
def upgrade() -> None: | ||
# ### commands auto generated by Alembic - please adjust! ### | ||
role_table = op.create_table( | ||
"role", | ||
sa.Column("id", sa.Integer(), nullable=False), | ||
sa.Column("name", sa.String(), nullable=False), | ||
sa.PrimaryKeyConstraint("id", name=op.f("pk_role")), | ||
) | ||
rule_table = op.create_table( | ||
"rule", | ||
sa.Column("id", sa.Integer(), nullable=False), | ||
sa.Column("resource", sa.String(), nullable=False), | ||
sa.Column("verbs", sa.JSON(), nullable=False), | ||
sa.Column("role_id", sa.Integer(), nullable=False), | ||
sa.ForeignKeyConstraint(["role_id"], ["role.id"], name=op.f("fk_rule_role_id_role")), | ||
sa.PrimaryKeyConstraint("id", name=op.f("pk_rule")), | ||
) | ||
|
||
authorization_table = op.create_table( | ||
"authorization", | ||
sa.Column("id", sa.Integer(), nullable=False), | ||
sa.Column("user_id", sa.UUID(), nullable=False), | ||
sa.Column("role_id", sa.Integer(), nullable=False), | ||
sa.Column("type", sa.String(), nullable=False), | ||
sa.Column("type_id", sa.Integer(), nullable=False), | ||
sa.ForeignKeyConstraint(["role_id"], ["role.id"], name=op.f("fk_authorization_role_id_role")), | ||
sa.ForeignKeyConstraint(["user_id"], ["user.id"], name=op.f("fk_authorization_user_id_user")), | ||
sa.PrimaryKeyConstraint("id", name=op.f("pk_authorization")), | ||
) | ||
|
||
op.bulk_insert( | ||
role_table, | ||
[ | ||
{'id': 1, 'name': 'Organization Maintainer'}, | ||
{'id': 2, 'name': 'Organization Member'}, | ||
{'id': 3, 'name': 'Organization Viewer'}, | ||
{'id': 4, 'name': 'Algorithm Maintainer'}, | ||
{'id': 5, 'name': 'Algorithm Member'}, | ||
{'id': 6, 'name': 'Algorithm Viewer'}, | ||
] | ||
) | ||
|
||
op.bulk_insert( | ||
rule_table, | ||
[ | ||
{'id': 1, 'resource': AuthorizationResource.ORGANIZATION_INFO, 'verbs': [AuthorizationVerb.CREATE, AuthorizationVerb.READ, AuthorizationVerb.UPDATE], 'role_id': 1}, | ||
{'id': 2, 'resource': AuthorizationResource.ORGANIZATION_ALGORITHM, 'verbs': [AuthorizationVerb.LIST, AuthorizationVerb.CREATE, AuthorizationVerb.UPDATE, AuthorizationVerb.DELETE], 'role_id': 1}, | ||
{'id': 3, 'resource': AuthorizationResource.ORGANIZATION_MEMBER, 'verbs': [AuthorizationVerb.LIST, AuthorizationVerb.CREATE, AuthorizationVerb.UPDATE, AuthorizationVerb.DELETE], 'role_id': 1}, | ||
{'id': 4, 'resource': AuthorizationResource.ORGANIZATION_INFO, 'verbs': [AuthorizationVerb.READ], 'role_id': 2}, | ||
{'id': 5, 'resource': AuthorizationResource.ORGANIZATION_ALGORITHM, 'verbs': [AuthorizationVerb.LIST, AuthorizationVerb.CREATE], 'role_id': 2}, | ||
{'id': 6, 'resource': AuthorizationResource.ORGANIZATION_MEMBER, 'verbs': [AuthorizationVerb.LIST], 'role_id': 2}, | ||
{'id': 7, 'resource': AuthorizationResource.ORGANIZATION_INFO, 'verbs': [AuthorizationVerb.READ], 'role_id': 3}, | ||
{'id': 8, 'resource': AuthorizationResource.ORGANIZATION_ALGORITHM, 'verbs': [AuthorizationVerb.LIST], 'role_id': 3}, | ||
{'id': 9, 'resource': AuthorizationResource.ORGANIZATION_MEMBER, 'verbs': [AuthorizationVerb.LIST], 'role_id': 3}, | ||
{'id': 10, 'resource': AuthorizationResource.ALGORITHM, 'verbs': [AuthorizationVerb.CREATE, AuthorizationVerb.READ, AuthorizationVerb.DELETE], 'role_id': 4}, | ||
{'id': 11, 'resource': AuthorizationResource.ALGORITHM_SYSTEMCARD, 'verbs': [AuthorizationVerb.READ, AuthorizationVerb.CREATE, AuthorizationVerb.UPDATE], 'role_id': 4}, | ||
{'id': 12, 'resource': AuthorizationResource.ALGORITHM_MEMBER, 'verbs': [AuthorizationVerb.CREATE, AuthorizationVerb.READ, AuthorizationVerb.UPDATE, AuthorizationVerb.DELETE], 'role_id': 4}, | ||
{'id': 13, 'resource': AuthorizationResource.ALGORITHM, 'verbs': [AuthorizationVerb.READ, AuthorizationVerb.CREATE], 'role_id': 5}, | ||
{'id': 14, 'resource': AuthorizationResource.ALGORITHM_SYSTEMCARD, 'verbs': [AuthorizationVerb.READ, AuthorizationVerb.CREATE, AuthorizationVerb.UPDATE], 'role_id': 5}, | ||
{'id': 15, 'resource': AuthorizationResource.ALGORITHM_MEMBER, 'verbs': [AuthorizationVerb.READ], 'role_id': 5}, | ||
{'id': 16, 'resource': AuthorizationResource.ALGORITHM, 'verbs': [AuthorizationVerb.READ], 'role_id': 6}, | ||
{'id': 17, 'resource': AuthorizationResource.ALGORITHM_SYSTEMCARD, 'verbs': [AuthorizationVerb.READ], 'role_id': 6}, | ||
{'id': 18, 'resource': AuthorizationResource.ALGORITHM_MEMBER, 'verbs': [AuthorizationVerb.READ], 'role_id': 6}, | ||
] | ||
) | ||
|
||
session = Session(bind=op.get_bind()) | ||
|
||
first_user = session.query(User).first() # first user is always present due to other migration | ||
organizations = session.query(Organization).all() | ||
|
||
authorizations = [] | ||
# lets add user 1 to all organizations bij default | ||
for organization in organizations: | ||
authorizations.append( | ||
{'id': 1, 'user_id': first_user.id, 'role_id': 1, 'type': AuthorizationType.ORGANIZATION, 'type_id': organization.id}, | ||
) | ||
|
||
op.bulk_insert( | ||
authorization_table, | ||
authorizations | ||
) | ||
|
||
|
||
|
||
def downgrade() -> None: | ||
op.drop_table("rule") | ||
op.drop_table("authorization") | ||
op.drop_table("role") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,9 @@ | ||
from .algorithm import Algorithm | ||
from .authorization import Authorization | ||
from .organization import Organization | ||
from .role import Role | ||
from .rule import Rule | ||
from .task import Task | ||
from .user import User | ||
|
||
__all__ = ["Algorithm", "Organization", "Task", "User"] | ||
__all__ = ["Algorithm", "Authorization", "Organization", "Role", "Rule", "Task", "User"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
from sqlalchemy import ForeignKey | ||
from sqlalchemy.orm import Mapped, mapped_column, relationship | ||
|
||
from amt.models.base import Base | ||
|
||
|
||
class Authorization(Base): | ||
__tablename__ = "authorization" | ||
|
||
id: Mapped[int] = mapped_column(primary_key=True) | ||
user_id: Mapped[int] = mapped_column(ForeignKey("user.id")) | ||
user: Mapped["User"] = relationship(back_populates="authorizations") # pyright: ignore [reportUndefinedVariable, reportUnknownVariableType] #noqa | ||
role_id: Mapped[int] = mapped_column(ForeignKey("role.id")) | ||
type: Mapped[str] # type [Organization or Algorithm] | ||
type_id: Mapped[int] # ID of the organization or algorithm |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
from sqlalchemy import String | ||
from sqlalchemy.orm import Mapped, mapped_column, relationship | ||
|
||
from amt.models import Authorization | ||
from amt.models.base import Base | ||
from amt.models.rule import Rule | ||
|
||
|
||
class Role(Base): | ||
__tablename__ = "role" | ||
|
||
id: Mapped[int] = mapped_column(primary_key=True) | ||
name: Mapped[str] = mapped_column(String, nullable=False) | ||
rules: Mapped[list["Rule"]] = relationship() | ||
authorizations: Mapped[list["Authorization"]] = relationship() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
from sqlalchemy import ForeignKey, String | ||
from sqlalchemy.orm import Mapped, mapped_column | ||
from sqlalchemy.types import JSON | ||
|
||
from amt.models.base import Base | ||
|
||
|
||
class Rule(Base): | ||
__tablename__ = "rule" | ||
|
||
id: Mapped[int] = mapped_column(primary_key=True) | ||
resource: Mapped[str] = mapped_column(String, nullable=False) | ||
verbs: Mapped[list[str]] = mapped_column(JSON, default=list) | ||
role_id: Mapped[int] = mapped_column(ForeignKey("role.id")) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
import logging | ||
from uuid import UUID | ||
|
||
from sqlalchemy import select | ||
from sqlalchemy.ext.asyncio import AsyncSession | ||
|
||
from amt.core.authorization import AuthorizationVerb | ||
from amt.models import Authorization, Role, Rule | ||
from amt.repositories.deps import get_session_non_generator | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
PermissionTuple = tuple[str, list[AuthorizationVerb], str, int] | ||
PermissionsList = list[PermissionTuple] | ||
|
||
|
||
class AuthorizationRepository: | ||
""" | ||
The AuthorizationRepository provides access to the repository layer. | ||
""" | ||
|
||
def __init__(self, session: AsyncSession | None = None) -> None: | ||
self.session = session | ||
|
||
async def init_session(self) -> None: | ||
if self.session is None: | ||
self.session = await get_session_non_generator() | ||
|
||
async def find_by_user(self, user: UUID) -> PermissionsList | None: | ||
""" | ||
Returns all authorization for a user. | ||
:return: all authorization for the user | ||
""" | ||
await self.init_session() | ||
|
||
statement = ( | ||
select( | ||
Rule.resource, | ||
Rule.verbs, | ||
Authorization.type, | ||
Authorization.type_id, | ||
) | ||
.join(Role, Rule.role_id == Role.id) | ||
.join(Authorization, Rule.role_id == Authorization.role_id) | ||
.filter(Authorization.user_id == user) | ||
) | ||
|
||
result = await self.session.execute(statement) # type: ignore | ||
authorizations = result.all() | ||
return authorizations # type: ignore |
Oops, something went wrong.