Skip to content

Commit

Permalink
✨(celery) implement subtask for sending a single warning email
Browse files Browse the repository at this point in the history
Introducing a subtask designed to send a warning email to an inactive user. This
subtask will be invoked many times by the main warning task, which is yet to be
implemented.
  • Loading branch information
wilbrdt committed Aug 12, 2024
1 parent 6cf6408 commit 54bd4ab
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 43 deletions.
10 changes: 10 additions & 0 deletions .env.dist
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,13 @@ MYSQL_PASSWORD=password
POSTGRES_DB=mork-db
POSTGRES_USER=fun
POSTGRES_PASSWORD=pass

# Emails
EMAIL_HOST=mailcatcher
EMAIL_HOST_USER=
EMAIL_HOST_PASSWORD=
EMAIL_PORT=1025
EMAIL_USE_TLS=False
[email protected]
EMAIL_RATE_LIMIT=100/m
EMAIL_MAX_RETRIES=3
9 changes: 5 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
SHELL := /bin/bash

# -- Docker
COMPOSE = bin/compose
COMPOSE_RUN = $(COMPOSE) run --rm --no-deps
COMPOSE = bin/compose
COMPOSE_EXEC = $(COMPOSE) exec
COMPOSE_RUN = $(COMPOSE) run --rm --no-deps
COMPOSE_RUN_API = $(COMPOSE_RUN) api

# -- MySQL
Expand Down Expand Up @@ -77,11 +78,11 @@ logs-celery: ## display celery logs (follow mode)
.PHONY: logs-celery

purge-celery: ## purge celery tasks
@$(COMPOSE_EXEC) celery celery -A mork.celery_app purge
@$(COMPOSE_EXEC) celery celery -A mork.celery.celery_app purge
.PHONY: purge-celery

flower: ## run flower
@$(COMPOSE_EXEC) celery celery -A mork.celery_app flower
@$(COMPOSE_EXEC) celery celery -A mork.celery.celery_app flower
.PHONY: flower

run: ## run the whole stack
Expand Down
16 changes: 9 additions & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,30 @@ services:
- .:/app

celery:
build:
context: .
target: "${MORK_IMAGE_BUILD_TARGET:-development}"
args:
DOCKER_USER: ${DOCKER_USER:-1000}
command: ["celery", "-A", "mork.celery_app", "worker", "-l", "DEBUG", "-n", "mork@%h"]
image: mork:development
command: ["celery", "-A", "mork.celery.celery_app", "worker", "-l", "DEBUG", "-n", "mork@%h"]
env_file:
- .env
ports:
- "${MORK_CELERY_SERVER_PORT:-5555}"
- "5555:5555"
volumes:
- ./src:/app
- ./bin/seed_edx_database.py:/opt/src/seed_edx_database.py
depends_on:
- api
- redis
- mailcatcher
- mysql
- postgresql

dockerize:
image: jwilder/dockerize

mailcatcher:
image: sj26/mailcatcher:latest
ports:
- "1081:1080"

mysql:
image: mysql:5.7
ports:
Expand Down
8 changes: 8 additions & 0 deletions src/mork/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from pydantic import BaseModel

from mork.celery.tasks import delete_inactive_users, warn_inactive_users


@unique
class TaskStatus(str, Enum):
Expand Down Expand Up @@ -37,3 +39,9 @@ class TaskResponse(BaseModel):

id: str
status: TaskStatus


TASK_TYPE_TO_FUNC = {
TaskType.EMAILING: warn_inactive_users,
TaskType.DELETION: delete_inactive_users,
}
9 changes: 7 additions & 2 deletions src/mork/api/routes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@
from fastapi import APIRouter, Depends, Response, status

from mork.api.auth import authenticate_api_key
from mork.api.models import TaskCreate, TaskResponse, TaskStatus, TaskType
from mork.worker.tasks import TASK_TYPE_TO_FUNC
from mork.api.models import (
TASK_TYPE_TO_FUNC,
TaskCreate,
TaskResponse,
TaskStatus,
TaskType,
)

logger = logging.getLogger(__name__)

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@

from celery import Celery

app = Celery("mork")
app = Celery("mork", include=["mork.celery.tasks"])

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object("mork.conf:settings", namespace="CELERY")

# Load task modules.
app.autodiscover_tasks()
110 changes: 110 additions & 0 deletions src/mork/celery/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""Mork Celery tasks."""

import smtplib
from datetime import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from logging import getLogger
from smtplib import SMTPException

from sqlalchemy import select

from mork.celery.celery_app import app
from mork.conf import settings
from mork.database import MorkDB
from mork.exceptions import EmailAlreadySent, EmailSendError
from mork.models import EmailStatus

logger = getLogger(__name__)


@app.task
def warn_inactive_users():
"""Celery task to warn inactive users by email."""
pass


@app.task
def delete_inactive_users():
"""Celery task to delete inactive users accounts."""
pass


@app.task(
bind=True,
rate_limit=settings.EMAIL_RATE_LIMIT,
retry_kwargs={"max_retries": settings.EMAIL_MAX_RETRIES},
)
def send_email_task(self, email_address: str, username: str):
"""Celery task that sends an email to the specified user."""
# Check that user has not already received a warning email
if check_email_already_sent(email_address):
raise EmailAlreadySent("An email has already been sent to this user")

try:
send_email(email_address, username)
except EmailSendError as exc:
logger.exception(exc)
raise self.retry(exc=exc) from exc

# Write flag that email was correctly sent to this user
mark_email_status(email_address)


def check_email_already_sent(email_adress: str):
"""Check if an email has already been sent to the user."""
db = MorkDB()
query = select(EmailStatus.email).where(EmailStatus.email == email_adress)
result = db.session.execute(query).scalars().first()
db.session.close()
return result


def send_email(email_address: str, username: str):
"""Initialize connection to SMTP and send a warning email."""
html = f"""\
<html>
<body>
<h1>Hello {username},</h1>
Your account will be closed soon! If you want to keep it, please log in!
</body>
</html>
"""

# Create a multipart message and set headers
message = MIMEMultipart()
message["From"] = settings.EMAIL_FROM
message["To"] = email_address
message["Subject"] = "Your account will be closed soon"

# Attach the HTML part
message.attach(MIMEText(html, "html"))

# Send the email
with smtplib.SMTP(
host=settings.EMAIL_HOST, port=settings.EMAIL_PORT
) as smtp_server:
if settings.EMAIL_USE_TLS:
smtp_server.starttls()
if settings.EMAIL_HOST_USER and settings.EMAIL_HOST_PASSWORD:
smtp_server.login(
user=settings.EMAIL_HOST_USER,
password=settings.EMAIL_HOST_PASSWORD,
)
try:
smtp_server.sendmail(
from_addr=settings.EMAIL_FROM,
to_addrs=email_address,
msg=message.as_string(),
)
except SMTPException as exc:
logger.error(f"Sending email failed: {exc} ")
raise EmailSendError("Failed sending an email") from exc


def mark_email_status(email_address: str):
"""Mark the email status in database."""
db = MorkDB()
db.session.add(EmailStatus(email=email_address, sent_date=datetime.now()))
db.session.commit()
db.session.close()
10 changes: 10 additions & 0 deletions src/mork/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ class Settings(BaseSettings):
EDX_DB_PORT: int = 3306
EDX_DB_DEBUG: bool = False

# Emails
EMAIL_HOST: str = "mailcatcher"
EMAIL_HOST_USER: str = ""
EMAIL_HOST_PASSWORD: str = ""
EMAIL_PORT: int = 1025
EMAIL_USE_TLS: bool = False
EMAIL_FROM: str = "[email protected]"
EMAIL_RATE_LIMIT: str = "100/m"
EMAIL_MAX_RETRIES: int = 3

# Celery
broker_url: str = Field("redis://redis:6379/0", alias="MORK_CELERY_BROKER_URL")
result_backend: str = Field(
Expand Down
9 changes: 9 additions & 0 deletions src/mork/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Exceptions for Mork."""


class EmailAlreadySent(Exception):
"""Raised when an email has already been sent to this user."""


class EmailSendError(Exception):
"""Raised when an error occurs when sending an email."""
26 changes: 0 additions & 26 deletions src/mork/worker/tasks.py

This file was deleted.

70 changes: 70 additions & 0 deletions tests/celery/test_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Tests for Mork Celery tasks."""

import smtplib
from unittest.mock import MagicMock

import pytest

from mork.celery.tasks import check_email_already_sent, mark_email_status, send_email
from mork.exceptions import EmailSendError
from mork.factories import EmailStatusFactory


def test_check_email_already_sent(monkeypatch, db_session):
"""Test the `check_email_already_sent` function."""
email_address = "[email protected]"

class MockMorkDB:
session = db_session

monkeypatch.setattr("mork.celery.tasks.MorkDB", MockMorkDB)
EmailStatusFactory.create_batch(3)

assert not check_email_already_sent(email_address)

EmailStatusFactory.create(email=email_address)
assert check_email_already_sent(email_address)


def test_send_email(monkeypatch):
"""Test the `send_email` function."""

mock_SMTP = MagicMock()
monkeypatch.setattr("mork.celery.tasks.smtplib.SMTP", mock_SMTP)

test_address = "[email protected]"
test_username = "JohnDoe"
send_email(email_address=test_address, username=test_username)

assert mock_SMTP.return_value.__enter__.return_value.sendmail.call_count == 1


def test_send_email_with_smtp_exception(monkeypatch):
"""Test the `send_email` function with an SMTP exception."""

mock_SMTP = MagicMock()
mock_SMTP.return_value.__enter__.return_value.sendmail.side_effect = (
smtplib.SMTPException
)

monkeypatch.setattr("mork.celery.tasks.smtplib.SMTP", mock_SMTP)

test_address = "[email protected]"
test_username = "JohnDoe"

with pytest.raises(EmailSendError, match="Failed sending an email"):
send_email(email_address=test_address, username=test_username)


def test_mark_email_status(monkeypatch, db_session):
"""Test the `mark_email_status` function."""

class MockMorkDB:
session = db_session

monkeypatch.setattr("mork.celery.tasks.MorkDB", MockMorkDB)

# Write new email status entry
new_email = "[email protected]"
mark_email_status(new_email)
assert check_email_already_sent(new_email)

0 comments on commit 54bd4ab

Please sign in to comment.