-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcomment.py
114 lines (98 loc) · 3.17 KB
/
comment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from sqlalchemy.orm.session import Session
from sqlalchemy import or_, and_, desc
from fastapi import Depends
from fastapi.responses import JSONResponse
from datetime import datetime
from typing import List
from main import app, get_db, manager
import schemas
import exceptions
import models
@app.post(
"/api/v1/add_comment",
response_model=schemas.Comment,
tags=["Comments manipulation"],
)
async def add_comment(
comment: schemas.AddComment,
user: models.User = Depends(manager),
db_session: Session = Depends(get_db),
):
comment_model = models.Comment()
comment_model.content = comment.content
comment_model.author_id = user.id
comment_model.article_id = comment.article_id
comment_model.reply_id = comment.reply_id
db_session.add(comment_model)
db_session.commit()
db_session.flush()
comment_for_return = db_session.query(models.CommentWithAuthor).filter(models.CommentWithAuthor.id == comment_model.id).one()
return comment_for_return
@app.post(
"/api/v1/get_article_comments/{article_id}",
response_model=List[schemas.Comment],
tags=["Comments manipulation"],
)
async def get_article_comments(article_id: int, db_session: Session = Depends(get_db)):
article = (
db_session.query(models.Article)
.filter(models.ArticleWithComments.id == article_id)
.one_or_none()
)
if not article:
raise exceptions.ArticleDoesNotExists
comments = (
db_session.query(models.CommentWithAuthor)
.with_parent(article, models.ArticleWithComments.comments)
.filter(
and_(
models.CommentWithAuthor.reply_id == None,
models.CommentWithAuthor.status == models.ModerationStatus.published,
)
)
.order_by(desc(models.Comment.creation_date))
.all()
)
return comments
@app.post(
"/api/v1/get_comments_for_moderation",
response_model=List[schemas.Comment],
tags=["Comments manipulation"],
deprecated=True,
)
async def get_comments_for_moderation(
user: models.User = Depends(manager), db_session: Session = Depends(get_db)
):
if user.type.value < models.UserType.moderator.value:
raise exceptions.PermissionDenied
comments = (
db_session.query(models.Comment)
.filter(models.Comment.status == models.ModerationStatus.waiting)
.order_by(desc(models.Comments.creation_date))
.all()
)
return comments
@app.post(
"/api/v1/change_comment_status/{comment_id}/{status}",
response_model=schemas.RequestResult,
tags=["Comments manipulation"],
)
async def change_comment_status(
comment_id: int,
status: int,
user: models.User = Depends(manager),
db_session: Session = Depends(get_db),
):
if user.type.value < models.UserType.moderator.value:
raise exceptions.PermissionDenied
comment = (
db_session.query(models.Comment)
.filter(models.Comment.id == comment_id)
.one_or_none()
)
if comment is None:
raise exceptions.CommentDoesNotExists
comment.status = models.ModerationStatus(status)
db_session.commit()
db_session.flush()
return {"result": "success"}