diff --git a/.env.dist b/.env.dist
index 251ecee..39ad1a2 100644
--- a/.env.dist
+++ b/.env.dist
@@ -48,3 +48,13 @@ MYSQL_PASSWORD=password
POSTGRES_DB=mork-db
POSTGRES_USER=fun
POSTGRES_PASSWORD=pass
+
+# Emails
+EMAIL_HOST=mailcatcher
+EMAIL_HOST_USER=
+EMAIL_HOST_PASSWORD=
+EMAIL_PORT=1025
+EMAIL_USE_TLS=False
+EMAIL_FROM=from@fun-mooc.fr
+EMAIL_RATE_LIMIT=100/m
+EMAIL_MAX_RETRIES=3
diff --git a/Makefile b/Makefile
index 91dcf19..88f2d0a 100644
--- a/Makefile
+++ b/Makefile
@@ -2,8 +2,9 @@
SHELL := /bin/bash
# -- Docker
-COMPOSE = bin/compose
-COMPOSE_RUN = $(COMPOSE) run --rm --no-deps
+COMPOSE = bin/compose
+COMPOSE_EXEC = $(COMPOSE) exec
+COMPOSE_RUN = $(COMPOSE) run --rm --no-deps
COMPOSE_RUN_API = $(COMPOSE_RUN) api
# -- MySQL
@@ -76,11 +77,11 @@ logs-celery: ## display celery logs (follow mode)
.PHONY: logs-celery
purge-celery: ## purge celery tasks
- @$(COMPOSE_EXEC) celery celery -A mork.celery_app purge
+ @$(COMPOSE_EXEC) celery celery -A mork.celery.celery_app purge
.PHONY: purge-celery
flower: ## run flower
- @$(COMPOSE_EXEC) celery celery -A mork.celery_app flower
+ @$(COMPOSE_EXEC) celery celery -A mork.celery.celery_app flower
.PHONY: flower
run: ## run the whole stack
diff --git a/docker-compose.yml b/docker-compose.yml
index d2be6f9..8b44764 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -37,7 +37,7 @@ services:
celery:
image: mork:development
- command: ["celery", "-A", "mork.worker.celery_app", "worker", "-l", "DEBUG", "-n", "mork@%h"]
+ command: ["celery", "-A", "mork.celery.celery_app", "worker", "-l", "DEBUG", "-n", "mork@%h"]
env_file:
- .env
ports:
@@ -48,12 +48,18 @@ services:
depends_on:
- api
- redis
+ - mailcatcher
- mysql
- postgresql
dockerize:
image: jwilder/dockerize
+ mailcatcher:
+ image: sj26/mailcatcher:latest
+ ports:
+ - "1081:1080"
+
mysql:
image: mysql:5.7
ports:
diff --git a/src/mork/api/models.py b/src/mork/api/models.py
index 19ee8ee..4109d91 100644
--- a/src/mork/api/models.py
+++ b/src/mork/api/models.py
@@ -4,6 +4,8 @@
from pydantic import BaseModel
+from mork.celery.tasks import delete_inactive_users, warn_inactive_users
+
@unique
class TaskStatus(str, Enum):
@@ -37,3 +39,9 @@ class TaskResponse(BaseModel):
id: str
status: TaskStatus
+
+
+TASK_TYPE_TO_FUNC = {
+ TaskType.EMAILING: warn_inactive_users,
+ TaskType.DELETION: delete_inactive_users,
+}
diff --git a/src/mork/api/routes/tasks.py b/src/mork/api/routes/tasks.py
index 951f500..de51fb0 100644
--- a/src/mork/api/routes/tasks.py
+++ b/src/mork/api/routes/tasks.py
@@ -6,8 +6,13 @@
from fastapi import APIRouter, Depends, Response, status
from mork.api.auth import authenticate_api_key
-from mork.api.models import TaskCreate, TaskResponse, TaskStatus, TaskType
-from mork.worker.tasks import TASK_TYPE_TO_FUNC
+from mork.api.models import (
+ TASK_TYPE_TO_FUNC,
+ TaskCreate,
+ TaskResponse,
+ TaskStatus,
+ TaskType,
+)
logger = logging.getLogger(__name__)
diff --git a/src/mork/worker/__init__.py b/src/mork/celery/__init__.py
similarity index 100%
rename from src/mork/worker/__init__.py
rename to src/mork/celery/__init__.py
diff --git a/src/mork/worker/celery_app.py b/src/mork/celery/celery_app.py
similarity index 83%
rename from src/mork/worker/celery_app.py
rename to src/mork/celery/celery_app.py
index 1f02b78..b222c04 100644
--- a/src/mork/worker/celery_app.py
+++ b/src/mork/celery/celery_app.py
@@ -2,13 +2,10 @@
from celery import Celery
-app = Celery("mork")
+app = Celery("mork", include=["mork.celery.tasks"])
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object("mork.conf:settings", namespace="CELERY")
-
-# Load task modules.
-app.autodiscover_tasks()
diff --git a/src/mork/celery/tasks.py b/src/mork/celery/tasks.py
new file mode 100644
index 0000000..34e85b0
--- /dev/null
+++ b/src/mork/celery/tasks.py
@@ -0,0 +1,110 @@
+"""Mork Celery tasks."""
+
+import smtplib
+from datetime import datetime
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+from logging import getLogger
+from smtplib import SMTPException
+
+from sqlalchemy import select
+
+from mork.celery.celery_app import app
+from mork.conf import settings
+from mork.database import MorkDB
+from mork.exceptions import EmailAlreadySent, EmailSendError
+from mork.models import EmailStatus
+
+logger = getLogger(__name__)
+
+
+@app.task
+def warn_inactive_users():
+ """Celery task to warn inactive users by email."""
+ pass
+
+
+@app.task
+def delete_inactive_users():
+ """Celery task to delete inactive users accounts."""
+ pass
+
+
+@app.task(
+ bind=True,
+ rate_limit=settings.EMAIL_RATE_LIMIT,
+ retry_kwargs={"max_retries": settings.EMAIL_MAX_RETRIES},
+)
+def send_email_task(self, email_address: str, username: str):
+ """Celery task that sends an email to the specified user."""
+ # Check that user has not already received a warning email
+ if check_email_already_sent(email_address):
+ raise EmailAlreadySent("An email has already been sent to this user")
+
+ try:
+ send_email(email_address, username)
+ except EmailSendError as exc:
+ logger.exception(exc)
+ raise self.retry(exc=exc) from exc
+
+ # Write flag that email was correctly sent to this user
+ mark_email_status(email_address)
+
+
+def check_email_already_sent(email_adress: str):
+ """Check if an email has already been sent to the user."""
+ db = MorkDB()
+ query = select(EmailStatus.email).where(EmailStatus.email == email_adress)
+ result = db.session.execute(query).scalars().first()
+ db.session.close()
+ return result
+
+
+def send_email(email_address: str, username: str):
+ """Initialize connection to SMTP and send a warning email."""
+ html = f"""\
+
+
+ Hello {username},
+ Your account will be closed soon! If you want to keep it, please log in!
+
+
+ """
+
+ # Create a multipart message and set headers
+ message = MIMEMultipart()
+ message["From"] = settings.EMAIL_FROM
+ message["To"] = email_address
+ message["Subject"] = "Your account will be closed soon"
+
+ # Attach the HTML part
+ message.attach(MIMEText(html, "html"))
+
+ # Send the email
+ with smtplib.SMTP(
+ host=settings.EMAIL_HOST, port=settings.EMAIL_PORT
+ ) as smtp_server:
+ if settings.EMAIL_USE_TLS:
+ smtp_server.starttls()
+ if settings.EMAIL_HOST_USER and settings.EMAIL_HOST_PASSWORD:
+ smtp_server.login(
+ user=settings.EMAIL_HOST_USER,
+ password=settings.EMAIL_HOST_PASSWORD,
+ )
+ try:
+ smtp_server.sendmail(
+ from_addr=settings.EMAIL_FROM,
+ to_addrs=email_address,
+ msg=message.as_string(),
+ )
+ except SMTPException as exc:
+ logger.error(f"Sending email failed: {exc} ")
+ raise EmailSendError("Failed sending an email") from exc
+
+
+def mark_email_status(email_address: str):
+ """Mark the email status in database."""
+ db = MorkDB()
+ db.session.add(EmailStatus(email=email_address, sent_date=datetime.now()))
+ db.session.commit()
+ db.session.close()
diff --git a/src/mork/conf.py b/src/mork/conf.py
index a737823..fc83417 100644
--- a/src/mork/conf.py
+++ b/src/mork/conf.py
@@ -52,6 +52,16 @@ class Settings(BaseSettings):
EDX_DB_PORT: int = 3306
EDX_DB_DEBUG: bool = False
+ # Emails
+ EMAIL_HOST: str = "mailcatcher"
+ EMAIL_HOST_USER: str = ""
+ EMAIL_HOST_PASSWORD: str = ""
+ EMAIL_PORT: int = 1025
+ EMAIL_USE_TLS: bool = False
+ EMAIL_FROM: str = "from@fun-mooc.fr"
+ EMAIL_RATE_LIMIT: str = "100/m"
+ EMAIL_MAX_RETRIES: int = 3
+
# Celery
broker_url: str = Field("redis://redis:6379/0", alias="MORK_CELERY_BROKER_URL")
result_backend: str = Field(
diff --git a/src/mork/exceptions.py b/src/mork/exceptions.py
new file mode 100644
index 0000000..d83b3ce
--- /dev/null
+++ b/src/mork/exceptions.py
@@ -0,0 +1,9 @@
+"""Exceptions for Mork."""
+
+
+class EmailAlreadySent(Exception):
+ """Raised when an email has already been sent to this user."""
+
+
+class EmailSendError(Exception):
+ """Raised when an error occurs when sending an email."""
diff --git a/src/mork/worker/tasks.py b/src/mork/worker/tasks.py
deleted file mode 100644
index 8d83600..0000000
--- a/src/mork/worker/tasks.py
+++ /dev/null
@@ -1,26 +0,0 @@
-"""Mork Celery tasks."""
-
-from logging import getLogger
-
-from mork.api.models import TaskType
-from mork.worker.celery_app import app
-
-logger = getLogger(__name__)
-
-
-@app.task
-def warn_inactive_users():
- """Celery task to warn inactive users by email."""
- pass
-
-
-@app.task
-def delete_inactive_users():
- """Celery task to delete inactive users accounts."""
- pass
-
-
-TASK_TYPE_TO_FUNC = {
- TaskType.EMAILING: warn_inactive_users,
- TaskType.DELETION: delete_inactive_users,
-}
diff --git a/tests/celery/test_tasks.py b/tests/celery/test_tasks.py
new file mode 100644
index 0000000..456ddc5
--- /dev/null
+++ b/tests/celery/test_tasks.py
@@ -0,0 +1,72 @@
+"""Tests for Mork Celery tasks."""
+
+import smtplib
+from unittest.mock import MagicMock
+
+import pytest
+
+from mork.celery.tasks import check_email_already_sent, mark_email_status, send_email
+from mork.exceptions import EmailSendError
+from mork.factories import EmailStatusFactory
+
+
+def test_check_email_already_sent(monkeypatch, db_session):
+ """Test the `check_email_already_sent` function."""
+ email_address = "test_email@example.com"
+
+ class MockMorkDB:
+ session = db_session
+
+ EmailStatusFactory._meta.sqlalchemy_session = db_session
+ monkeypatch.setattr("mork.celery.tasks.MorkDB", MockMorkDB)
+ EmailStatusFactory.create_batch(3)
+
+ assert not check_email_already_sent(email_address)
+
+ EmailStatusFactory.create(email=email_address)
+ assert check_email_already_sent(email_address)
+
+
+def test_send_email(monkeypatch):
+ """Test the `send_email` function."""
+
+ mock_SMTP = MagicMock()
+ monkeypatch.setattr("mork.celery.tasks.smtplib.SMTP", mock_SMTP)
+
+ test_address = "john.doe@example.com"
+ test_username = "JohnDoe"
+ send_email(email_address=test_address, username=test_username)
+
+ assert mock_SMTP.return_value.__enter__.return_value.sendmail.call_count == 1
+
+
+def test_send_email_with_smtp_exception(monkeypatch):
+ """Test the `send_email` function with an SMTP exception."""
+
+ mock_SMTP = MagicMock()
+ mock_SMTP.return_value.__enter__.return_value.sendmail.side_effect = (
+ smtplib.SMTPException
+ )
+
+ monkeypatch.setattr("mork.celery.tasks.smtplib.SMTP", mock_SMTP)
+
+ test_address = "john.doe@example.com"
+ test_username = "JohnDoe"
+
+ with pytest.raises(EmailSendError, match="Failed sending an email"):
+ send_email(email_address=test_address, username=test_username)
+
+
+def test_mark_email_status(monkeypatch, db_session):
+ """Test the `mark_email_status` function."""
+
+ class MockMorkDB:
+ session = db_session
+
+ EmailStatusFactory._meta.sqlalchemy_session = db_session
+ monkeypatch.setattr("mork.celery.tasks.MorkDB", MockMorkDB)
+
+ # Write new email status entry
+ new_email = "test_email@example.com"
+ mark_email_status(new_email)
+ assert check_email_already_sent(new_email)
diff --git a/tests/fixtures/db.py b/tests/fixtures/db.py
index 1392f01..38b133d 100644
--- a/tests/fixtures/db.py
+++ b/tests/fixtures/db.py
@@ -1,9 +1,6 @@
"""Edx database test fixtures."""
import pytest
-from alembic import command
-from alembic.config import Config
-from factory.alchemy import SQLAlchemyModelFactory
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
@@ -37,20 +34,6 @@ def db_engine():
engine.dispose()
-@pytest.fixture(scope="session")
-def db_table(db_engine):
- """Test database tables fixtures"""
- # Pretend to have all migrations applied
- alembic_cfg = Config(settings.ALEMBIC_CFG_PATH)
- alembic_cfg.set_main_option("sqlalchemy.url", settings.TEST_DB_URL)
- command.stamp(alembic_cfg, "head")
-
- # Create database and tables
- Base.metadata.create_all(engine)
- yield
- Base.metadata.drop_all(engine)
-
-
@pytest.fixture(scope="function")
def db_session(db_engine):
"""Test session fixture."""
@@ -62,10 +45,6 @@ def db_session(db_engine):
transaction = connection.begin()
session = Session(bind=connection)
- # Ensure that all factories use the same session
- for factory in SQLAlchemyModelFactory.__subclasses__():
- factory._meta.sqlalchemy_session = session
-
yield session
# Teardown