-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathuserTypeChange.py
142 lines (124 loc) · 3.83 KB
/
userTypeChange.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from sqlalchemy.orm.session import Session
from sqlalchemy import or_
from fastapi import Depends
from fastapi.responses import JSONResponse
from main import app, get_db, manager
import schemas
import exceptions
import models
@app.post(
"/api/v1/ban_user",
response_model=schemas.RequestResult,
tags=["User data manipulation"],
)
async def ban_user(
deleting_user: schemas.ChangingTypeUser,
user: models.User = Depends(manager),
db_session: Session = Depends(get_db),
):
db_deleting_user: models.User = (
db_session.query(models.User)
.filter(
or_(
models.User.email == deleting_user.identifier,
models.User.nickname == deleting_user.identifier,
)
)
.one_or_none()
)
if db_deleting_user is None:
raise exceptions.UserDoesNotExists
if db_deleting_user.type.value >= user.type.value:
return JSONResponse(
status_code=403,
content={
"result": "error",
"error_description": "Unauthorized for this action",
},
)
db_deleting_user.type = models.UserType.banned
db_session.commit()
db_session.flush()
return JSONResponse(status_code=200, content={"result": "success"})
@app.post(
"/api/v1/add_moderator",
response_model=schemas.RequestResult,
tags=["User data manipulation"],
)
async def add_moderator(
adding_user: schemas.ChangingTypeUser,
user: models.User = Depends(manager),
db_session: Session = Depends(get_db),
):
if user.type != models.UserType.administrator:
raise exceptions.PermissionDenied
db_adding_user: models.User = (
db_session.query(models.User).filter(
or_(
models.User.email == adding_user.identifier,
models.User.nickname == adding_user.identifier,
)
)
).one_or_none()
if db_adding_user is None:
raise exceptions.UserDoesNotExists
db_adding_user.type = models.UserType.moderator
db_session.commit()
db_session.flush()
return JSONResponse(status_code=200, content={"result": "success"})
@app.post(
"/api/v1/unban_user",
response_model=schemas.RequestResult,
tags=["User data manipulation"],
)
async def unban_user(
unbaning_user: schemas.ChangingTypeUser,
user: models.User = Depends(manager),
db_session: Session = Depends(get_db),
):
if user.type.value < models.UserType.moderator.value:
raise exceptions.PermissionDenied
db_user = (
db_session.query(models.User)
.filter(
or_(
models.User.email == unbaning_user.identifier,
models.User.nickname == unbaning_user.identifier,
)
)
.one_or_none()
)
if db_user is None:
raise exceptions.UserDoesNotExists
db_user.type = models.UserType.user
db_session.commit()
db_session.flush()
return JSONResponse(status_code=200, content={"result": "success"})
@app.post(
"/api/v1/remove_moderator",
response_model=schemas.RequestResult,
tags=["User data manipulation"],
)
async def remove_moderator(
removing_user: schemas.ChangingTypeUser,
user: models.User = Depends(manager),
db_session: Session = Depends(get_db),
):
if user.type.value < models.UserType.administrator.value:
raise exceptions.PermissionDenied
user_from_db = (
db_session.query(models.User)
.filter(
or_(
models.User.email == removing_user.identifier,
models.User.nickname == removing_user.identifier,
)
)
.one_or_none()
)
if user_from_db is None:
raise exceptions.UnexpectedError
user_from_db.type = models.UserType.user
db_session.commit()
db_session.flush()
return {"result": "success"}