Skip to content

Commit

Permalink
Merge pull request #34 from open-craft/cliff/task-arguments
Browse files Browse the repository at this point in the history
Add options to mgt command.
  • Loading branch information
jcdyer authored Oct 12, 2018
2 parents ec1130f + 6d9c20a commit fa3f02e
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 31 deletions.
2 changes: 1 addition & 1 deletion completion_aggregator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

from __future__ import absolute_import, unicode_literals

__version__ = '1.5.3'
__version__ = '1.5.4'

default_app_config = 'completion_aggregator.apps.CompletionAggregatorAppConfig' # pylint: disable=invalid-name
93 changes: 66 additions & 27 deletions completion_aggregator/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,67 @@

import collections
import logging
import time

import six

from django.db import transaction

from . import models, utils
from .tasks import aggregation_tasks

log = logging.getLogger(__name__)

EnrollmentTuple = collections.namedtuple('EnrollmentTuple', ['username', 'course_key'])

MAX_KEYS_PER_TASK = 128
MAX_KEYS_PER_TASK = 16


def perform_aggregation():
def perform_aggregation(batch_size=10000, delay=0.0, limit=None, routing_key=None):
"""
Enqueues tasks to reaggregate modified completions.
When blocks are completed, they mark themselves as stale. This function
collects all stale blocks for each enrollment, and enqueues a single
recalculation of all aggregators containing those stale blocks.
batch_size (int|None) [default: 10000]:
Maximum number of stale completions to fetch in a single query to the
database.
delay (float) [default: 0.0]:
The amount of time to wait between sending batches of 1000 tasks to
celery.
limit (int|None) [default: None]:
Maximum number of stale completions to process in a single run of this
function. None means process all outstanding StaleCompletions.
routing_key (str|None) [default None]:
A routing key to pass to celery for the update_aggregators tasks. None
means use the default routing key.
"""
with transaction.atomic():
stale_queryset = models.StaleCompletion.objects.filter(resolved=False)
stale_blocks = collections.defaultdict(set)
forced_updates = set()
for stale in stale_queryset:
stale_queryset = models.StaleCompletion.objects.filter(resolved=False)
task_options = {}
if limit is None:
limit = float('inf')

try:
min_id = stale_queryset.order_by('id')[0].id
max_id = stale_queryset.order_by('-id')[0].id
except IndexError:
log.warning("No StaleCompletions to process. Exiting.")
return
if routing_key:
task_options['routing_key'] = routing_key

stale_blocks = collections.defaultdict(set)
forced_updates = set()
enqueued = 0
for idx in six.moves.range(min_id, max_id + 1, batch_size):
if enqueued > limit:
break
evaluated = stale_queryset.filter(id__gte=idx, id__lt=idx + batch_size)
enqueued += len(evaluated)
for stale in evaluated:
enrollment = EnrollmentTuple(
username=stale.username,
course_key=stale.course_key,
Expand All @@ -46,28 +79,34 @@ def perform_aggregation():
stale_blocks[enrollment] = utils.BagOfHolding()
blocks = stale_blocks[enrollment]
if isinstance(blocks, utils.BagOfHolding) or len(blocks) <= MAX_KEYS_PER_TASK:
# We can stop adding once we have exceeded the maximum number of keys per task.
# We can stop adding once we have exceeded the maximum number
# of keys per task. This keeps the memory usage of this
# function down, and limits the size of the task signature sent
# to celery.
stale_blocks[enrollment].add(stale.block_key)
if stale.force:
forced_updates.add(enrollment)

log.info("Performing aggregation update for %s user enrollments", len(stale_blocks))
for enrollment in stale_blocks:
if isinstance(stale_blocks[enrollment], utils.BagOfHolding):
blocks = []
elif len(stale_blocks[enrollment]) > MAX_KEYS_PER_TASK:
# Limit the number of block_keys we will add to a task,
# because celery has a task size limit. Instead, just
# reprocess the whole course.
blocks = []
else:
blocks = [six.text_type(block_key) for block_key in stale_blocks[enrollment]]
aggregation_tasks.update_aggregators.delay(
username=enrollment.username,
course_key=six.text_type(enrollment.course_key),
block_keys=blocks,
force=enrollment in forced_updates,
)
log.info("Performing aggregation update for %s user enrollments", len(stale_blocks))
for idx, enrollment in enumerate(stale_blocks):
if isinstance(stale_blocks[enrollment], utils.BagOfHolding):
blocks = []
elif len(stale_blocks[enrollment]) > MAX_KEYS_PER_TASK:
blocks = []
else:
blocks = [six.text_type(block_key) for block_key in stale_blocks[enrollment]]
aggregation_tasks.update_aggregators.apply_async(
kwargs={
'username': enrollment.username,
'course_key': six.text_type(enrollment.course_key),
'block_keys': blocks,
'force': enrollment in forced_updates,
},
**task_options
)
if idx % 1000 == 999:
if delay:
time.sleep(delay)


def perform_cleanup():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,45 @@ class Command(BaseCommand):
run_aggregator_service management command.
"""

def add_arguments(self, parser):
"""
Add command-line arguments
"""
parser.add_argument(
'--batch-size',
help='Maximum number of CourseModuleCompletions to migrate, per celery task. (default: 10000)',
default=10000,
type=int,
)
parser.add_argument(
'--delay-between-batches',
help='Amount of time to wait between processing batches in seconds. (default: 0.0)',
default=0.0,
type=float,
)
parser.add_argument(
'--limit',
help='',
default=500000,
type=int,
)
parser.add_argument(
'--routing-key',
dest='routing_key',
help='Celery routing key to use.',
)

def handle(self, *args, **options):
"""
Run the aggregator service.
"""
self.set_logging(options['verbosity'])
perform_aggregation()
perform_aggregation(
batch_size=options['batch_size'],
delay=options['delay_between_batches'],
limit=options['limit'],
routing_key=options.get('routing_key'),
)

def set_logging(self, verbosity):
"""
Expand Down
3 changes: 1 addition & 2 deletions tests/test_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ def test_plethora_of_stale_completions(users):
with patch('completion_aggregator.tasks.aggregation_tasks.update_aggregators.apply_async') as mock_task:
perform_aggregation()
mock_task.assert_called_once_with(
(),
{
kwargs={
'username': users[0].username,
'course_key': six.text_type(course_key),
'block_keys': [],
Expand Down

0 comments on commit fa3f02e

Please sign in to comment.