diff --git a/.env.dist b/.env.dist index 5c63057..42628bf 100644 --- a/.env.dist +++ b/.env.dist @@ -48,3 +48,13 @@ MYSQL_PASSWORD=password POSTGRES_DB=mork-db POSTGRES_USER=fun POSTGRES_PASSWORD=pass + +# Emails +EMAIL_HOST=mailcatcher +EMAIL_HOST_USER= +EMAIL_HOST_PASSWORD= +EMAIL_PORT=1025 +EMAIL_USE_TLS=False +EMAIL_FROM=from@fun-mooc.fr +EMAIL_RATE_LIMIT=100/m +EMAIL_MAX_RETRIES=3 diff --git a/Makefile b/Makefile index 3cce4f9..cf13756 100644 --- a/Makefile +++ b/Makefile @@ -2,8 +2,9 @@ SHELL := /bin/bash # -- Docker -COMPOSE = bin/compose -COMPOSE_RUN = $(COMPOSE) run --rm --no-deps +COMPOSE = bin/compose +COMPOSE_EXEC = $(COMPOSE) exec +COMPOSE_RUN = $(COMPOSE) run --rm --no-deps COMPOSE_RUN_API = $(COMPOSE_RUN) api # -- MySQL @@ -77,11 +78,11 @@ logs-celery: ## display celery logs (follow mode) .PHONY: logs-celery purge-celery: ## purge celery tasks - @$(COMPOSE_EXEC) celery celery -A mork.celery_app purge + @$(COMPOSE_EXEC) celery celery -A mork.celery.celery_app purge .PHONY: purge-celery flower: ## run flower - @$(COMPOSE_EXEC) celery celery -A mork.celery_app flower + @$(COMPOSE_EXEC) celery celery -A mork.celery.celery_app flower .PHONY: flower run: ## run the whole stack diff --git a/docker-compose.yml b/docker-compose.yml index 08d91b0..8b44764 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -36,28 +36,30 @@ services: - .:/app celery: - build: - context: . - target: "${MORK_IMAGE_BUILD_TARGET:-development}" - args: - DOCKER_USER: ${DOCKER_USER:-1000} - command: ["celery", "-A", "mork.celery_app", "worker", "-l", "DEBUG", "-n", "mork@%h"] + image: mork:development + command: ["celery", "-A", "mork.celery.celery_app", "worker", "-l", "DEBUG", "-n", "mork@%h"] env_file: - .env ports: - - "${MORK_CELERY_SERVER_PORT:-5555}" + - "5555:5555" volumes: - ./src:/app - ./bin/seed_edx_database.py:/opt/src/seed_edx_database.py depends_on: - api - redis + - mailcatcher - mysql - postgresql dockerize: image: jwilder/dockerize + mailcatcher: + image: sj26/mailcatcher:latest + ports: + - "1081:1080" + mysql: image: mysql:5.7 ports: diff --git a/src/mork/api/models.py b/src/mork/api/models.py index 19ee8ee..4109d91 100644 --- a/src/mork/api/models.py +++ b/src/mork/api/models.py @@ -4,6 +4,8 @@ from pydantic import BaseModel +from mork.celery.tasks import delete_inactive_users, warn_inactive_users + @unique class TaskStatus(str, Enum): @@ -37,3 +39,9 @@ class TaskResponse(BaseModel): id: str status: TaskStatus + + +TASK_TYPE_TO_FUNC = { + TaskType.EMAILING: warn_inactive_users, + TaskType.DELETION: delete_inactive_users, +} diff --git a/src/mork/api/routes/tasks.py b/src/mork/api/routes/tasks.py index 951f500..de51fb0 100644 --- a/src/mork/api/routes/tasks.py +++ b/src/mork/api/routes/tasks.py @@ -6,8 +6,13 @@ from fastapi import APIRouter, Depends, Response, status from mork.api.auth import authenticate_api_key -from mork.api.models import TaskCreate, TaskResponse, TaskStatus, TaskType -from mork.worker.tasks import TASK_TYPE_TO_FUNC +from mork.api.models import ( + TASK_TYPE_TO_FUNC, + TaskCreate, + TaskResponse, + TaskStatus, + TaskType, +) logger = logging.getLogger(__name__) diff --git a/src/mork/worker/__init__.py b/src/mork/celery/__init__.py similarity index 100% rename from src/mork/worker/__init__.py rename to src/mork/celery/__init__.py diff --git a/src/mork/worker/celery_app.py b/src/mork/celery/celery_app.py similarity index 83% rename from src/mork/worker/celery_app.py rename to src/mork/celery/celery_app.py index 1f02b78..b222c04 100644 --- a/src/mork/worker/celery_app.py +++ b/src/mork/celery/celery_app.py @@ -2,13 +2,10 @@ from celery import Celery -app = Celery("mork") +app = Celery("mork", include=["mork.celery.tasks"]) # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object("mork.conf:settings", namespace="CELERY") - -# Load task modules. -app.autodiscover_tasks() diff --git a/src/mork/celery/tasks.py b/src/mork/celery/tasks.py new file mode 100644 index 0000000..34e85b0 --- /dev/null +++ b/src/mork/celery/tasks.py @@ -0,0 +1,110 @@ +"""Mork Celery tasks.""" + +import smtplib +from datetime import datetime +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from logging import getLogger +from smtplib import SMTPException + +from sqlalchemy import select + +from mork.celery.celery_app import app +from mork.conf import settings +from mork.database import MorkDB +from mork.exceptions import EmailAlreadySent, EmailSendError +from mork.models import EmailStatus + +logger = getLogger(__name__) + + +@app.task +def warn_inactive_users(): + """Celery task to warn inactive users by email.""" + pass + + +@app.task +def delete_inactive_users(): + """Celery task to delete inactive users accounts.""" + pass + + +@app.task( + bind=True, + rate_limit=settings.EMAIL_RATE_LIMIT, + retry_kwargs={"max_retries": settings.EMAIL_MAX_RETRIES}, +) +def send_email_task(self, email_address: str, username: str): + """Celery task that sends an email to the specified user.""" + # Check that user has not already received a warning email + if check_email_already_sent(email_address): + raise EmailAlreadySent("An email has already been sent to this user") + + try: + send_email(email_address, username) + except EmailSendError as exc: + logger.exception(exc) + raise self.retry(exc=exc) from exc + + # Write flag that email was correctly sent to this user + mark_email_status(email_address) + + +def check_email_already_sent(email_adress: str): + """Check if an email has already been sent to the user.""" + db = MorkDB() + query = select(EmailStatus.email).where(EmailStatus.email == email_adress) + result = db.session.execute(query).scalars().first() + db.session.close() + return result + + +def send_email(email_address: str, username: str): + """Initialize connection to SMTP and send a warning email.""" + html = f"""\ + + +

Hello {username},

+ Your account will be closed soon! If you want to keep it, please log in! + + + """ + + # Create a multipart message and set headers + message = MIMEMultipart() + message["From"] = settings.EMAIL_FROM + message["To"] = email_address + message["Subject"] = "Your account will be closed soon" + + # Attach the HTML part + message.attach(MIMEText(html, "html")) + + # Send the email + with smtplib.SMTP( + host=settings.EMAIL_HOST, port=settings.EMAIL_PORT + ) as smtp_server: + if settings.EMAIL_USE_TLS: + smtp_server.starttls() + if settings.EMAIL_HOST_USER and settings.EMAIL_HOST_PASSWORD: + smtp_server.login( + user=settings.EMAIL_HOST_USER, + password=settings.EMAIL_HOST_PASSWORD, + ) + try: + smtp_server.sendmail( + from_addr=settings.EMAIL_FROM, + to_addrs=email_address, + msg=message.as_string(), + ) + except SMTPException as exc: + logger.error(f"Sending email failed: {exc} ") + raise EmailSendError("Failed sending an email") from exc + + +def mark_email_status(email_address: str): + """Mark the email status in database.""" + db = MorkDB() + db.session.add(EmailStatus(email=email_address, sent_date=datetime.now())) + db.session.commit() + db.session.close() diff --git a/src/mork/conf.py b/src/mork/conf.py index 8c350aa..c6bb1b6 100644 --- a/src/mork/conf.py +++ b/src/mork/conf.py @@ -52,6 +52,16 @@ class Settings(BaseSettings): EDX_DB_PORT: int = 3306 EDX_DB_DEBUG: bool = False + # Emails + EMAIL_HOST: str = "mailcatcher" + EMAIL_HOST_USER: str = "" + EMAIL_HOST_PASSWORD: str = "" + EMAIL_PORT: int = 1025 + EMAIL_USE_TLS: bool = False + EMAIL_FROM: str = "from@fun-mooc.fr" + EMAIL_RATE_LIMIT: str = "100/m" + EMAIL_MAX_RETRIES: int = 3 + # Celery broker_url: str = Field("redis://redis:6379/0", alias="MORK_CELERY_BROKER_URL") result_backend: str = Field( diff --git a/src/mork/exceptions.py b/src/mork/exceptions.py new file mode 100644 index 0000000..d83b3ce --- /dev/null +++ b/src/mork/exceptions.py @@ -0,0 +1,9 @@ +"""Exceptions for Mork.""" + + +class EmailAlreadySent(Exception): + """Raised when an email has already been sent to this user.""" + + +class EmailSendError(Exception): + """Raised when an error occurs when sending an email.""" diff --git a/src/mork/worker/tasks.py b/src/mork/worker/tasks.py deleted file mode 100644 index 8d83600..0000000 --- a/src/mork/worker/tasks.py +++ /dev/null @@ -1,26 +0,0 @@ -"""Mork Celery tasks.""" - -from logging import getLogger - -from mork.api.models import TaskType -from mork.worker.celery_app import app - -logger = getLogger(__name__) - - -@app.task -def warn_inactive_users(): - """Celery task to warn inactive users by email.""" - pass - - -@app.task -def delete_inactive_users(): - """Celery task to delete inactive users accounts.""" - pass - - -TASK_TYPE_TO_FUNC = { - TaskType.EMAILING: warn_inactive_users, - TaskType.DELETION: delete_inactive_users, -} diff --git a/tests/worker/test_tasks.py b/tests/worker/test_tasks.py new file mode 100644 index 0000000..ce75dab --- /dev/null +++ b/tests/worker/test_tasks.py @@ -0,0 +1,70 @@ +"""Tests for Mork Celery tasks.""" + +import smtplib +from unittest.mock import MagicMock + +import pytest + +from mork.celery.tasks import check_email_already_sent, mark_email_status, send_email +from mork.exceptions import EmailSendError +from mork.factories import EmailStatusFactory + + +def test_check_email_already_sent(monkeypatch, db_session): + """Test the `check_email_already_sent` function.""" + email_address = "test_email@example.com" + + class MockMorkDB: + session = db_session + + monkeypatch.setattr("mork.celery.tasks.MorkDB", MockMorkDB) + EmailStatusFactory.create_batch(3) + + assert not check_email_already_sent(email_address) + + EmailStatusFactory.create(email=email_address) + assert check_email_already_sent(email_address) + + +def test_send_email(monkeypatch): + """Test the `send_email` function.""" + + mock_SMTP = MagicMock() + monkeypatch.setattr("mork.celery.tasks.smtplib.SMTP", mock_SMTP) + + test_address = "john.doe@example.com" + test_username = "JohnDoe" + send_email(email_address=test_address, username=test_username) + + assert mock_SMTP.return_value.__enter__.return_value.sendmail.call_count == 1 + + +def test_send_email_with_smtp_exception(monkeypatch): + """Test the `send_email` function with an SMTP exception.""" + + mock_SMTP = MagicMock() + mock_SMTP.return_value.__enter__.return_value.sendmail.side_effect = ( + smtplib.SMTPException + ) + + monkeypatch.setattr("mork.celery.tasks.smtplib.SMTP", mock_SMTP) + + test_address = "john.doe@example.com" + test_username = "JohnDoe" + + with pytest.raises(EmailSendError, match="Failed sending an email"): + send_email(email_address=test_address, username=test_username) + + +def test_mark_email_status(monkeypatch, db_session): + """Test the `mark_email_status` function.""" + + class MockMorkDB: + session = db_session + + monkeypatch.setattr("mork.celery.tasks.MorkDB", MockMorkDB) + + # Write new email status entry + new_email = "test_email@example.com" + mark_email_status(new_email) + assert check_email_already_sent(new_email)