From 1b2dd7f0248ca50667e555e7011a7a727f7d4a21 Mon Sep 17 00:00:00 2001 From: Alex Yeung Date: Fri, 1 Nov 2024 18:11:40 +0000 Subject: [PATCH] CTP-3867 Digest SORA SQS messages --- classes/extension/aws_queue_processor.php | 239 +++++++++++++++++++++ classes/extension/ec.php | 6 +- classes/extension/extension.php | 22 +- classes/extension/sora.php | 28 ++- classes/extension/sora_queue_processor.php | 52 +++++ classes/manager.php | 4 +- classes/task/process_aws_sora_updates.php | 54 +++++ db/install.xml | 23 +- db/tasks.php | 9 + db/upgrade.php | 33 +++ lang/en/local_sitsgradepush.php | 3 + version.php | 2 +- 12 files changed, 455 insertions(+), 20 deletions(-) create mode 100644 classes/extension/aws_queue_processor.php create mode 100644 classes/extension/sora_queue_processor.php create mode 100644 classes/task/process_aws_sora_updates.php diff --git a/classes/extension/aws_queue_processor.php b/classes/extension/aws_queue_processor.php new file mode 100644 index 0000000..cfc02db --- /dev/null +++ b/classes/extension/aws_queue_processor.php @@ -0,0 +1,239 @@ +. + +namespace local_sitsgradepush\extension; + +use local_sitsgradepush\aws\sqs; +use local_sitsgradepush\logger; + +/** + * Parent class for queue processors. + * + * @package local_sitsgradepush + * @copyright 2024 onwards University College London {@link https://www.ucl.ac.uk/} + * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later + * @author Alex Yeung + */ +abstract class aws_queue_processor { + + /** @var int Maximum number of messages to fetch per call. 10 is the highest number, limited by AWS */ + const MAX_MESSAGES = 10; + + /** @var int Visibility timeout in seconds */ + const VISIBILITY_TIMEOUT = 60; + + /** @var int Wait time in seconds */ + const WAIT_TIME_SECONDS = 5; + + /** @var string Message status - processed */ + const STATUS_PROCESSED = 'processed'; + + /** @var string Message status - failed */ + const STATUS_FAILED = 'failed'; + + /** @var int Maximum number of batches */ + const MAX_BATCHES = 30; + + /** @var int Maximum number of messages to fetch */ + const MAX_MESSAGES_TO_PROCESS = 300; + + /** @var int Maximum execution time in seconds */ + const MAX_EXECUTION_TIME = 1800; // 30 minutes + + /** + * Get the queue URL. + * + * @return string + */ + abstract protected function get_queue_url(): string; + + /** + * Process the message. + * + * @param array $messagebody AWS SQS message body + * @return void + */ + abstract protected function process_message(array $messagebody): void; + + /** + * Fetch messages from the queue. + * + * @param int $maxmessages Maximum number of messages to fetch + * @param int $visibilitytimeout Visibility timeout in seconds + * @param int $waittimeseconds Wait time in seconds + * @return array + */ + protected function fetch_messages( + int $maxmessages = self::MAX_MESSAGES, + int $visibilitytimeout = self::VISIBILITY_TIMEOUT, + int $waittimeseconds = self::WAIT_TIME_SECONDS + ): array { + $sqs = new sqs(); + $result = $sqs->get_client()->receiveMessage([ + 'QueueUrl' => $this->get_queue_url(), + 'MaxNumberOfMessages' => $maxmessages, + 'VisibilityTimeout' => $visibilitytimeout, + 'WaitTimeSeconds' => $waittimeseconds, + ]); + + return $result->get('Messages') ?? []; + } + + /** + * Check if message is already processed. + * + * @param string $messageid AWS SQS Message ID + * @return bool True if message is processed already, false otherwise + * @throws \dml_exception + */ + protected function is_processed_message(string $messageid): bool { + global $DB; + + try { + // Allow processing if message has not been processed successfully. + return $DB->record_exists( + 'local_sitsgradepush_aws_log', + ['messageid' => $messageid, 'status' => self::STATUS_PROCESSED] + ); + } catch (\Exception $e) { + logger::log($e->getMessage(), null, 'Failed to check message status'); + return false; + } + } + + /** + * Execute the queue processor with batch processing support + * + * @return void + * @throws \Exception + */ + public function execute(): void { + try { + $processedcount = 0; + $batchnumber = 0; + $starttime = time(); + + do { + // Check safety limits. + if ($batchnumber >= self::MAX_BATCHES) { + mtrace("Maximum batch limit (" . self::MAX_BATCHES . ") reached"); + break; + } + + if ($processedcount >= self::MAX_MESSAGES_TO_PROCESS) { + mtrace("Maximum message limit (" . self::MAX_MESSAGES_TO_PROCESS . ") reached"); + break; + } + + $elapsedtime = time() - $starttime; + if ($elapsedtime >= self::MAX_EXECUTION_TIME) { + mtrace("Maximum execution time (" . self::MAX_EXECUTION_TIME . " seconds) reached"); + break; + } + + // Fetch messages from the queue. + $messages = $this->fetch_messages(); + if (empty($messages)) { + if ($batchnumber === 0) { + mtrace('No messages found.'); + } + break; + } + + $batchnumber++; + mtrace(sprintf('Processing batch %d with %d messages...', $batchnumber, count($messages))); + + foreach ($messages as $message) { + try { + if ($this->is_processed_message($message['MessageId'])) { + mtrace("Skipping processed message: {$message['MessageId']}"); + continue; + } + $data = json_decode($message['Body'], true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new \Exception('Invalid JSON data: ' . json_last_error_msg()); + } + $this->process_message($data); + $this->save_message_record($message); + $this->delete_message($message['ReceiptHandle']); + $processedcount++; + } catch (\Exception $e) { + logger::log($e->getMessage(), null, static::class . ' Processing Error'); + $this->save_message_record($message, self::STATUS_FAILED, $e->getMessage()); + } + } + + } while (!empty($messages)); + + mtrace(sprintf('Completed processing %d messages in %d batches (%.2f seconds)', + $processedcount, + $batchnumber, + time() - $starttime + )); + } catch (\Exception $e) { + logger::log($e->getMessage(), null, static::class . ' Queue Error'); + throw $e; + } + } + + /** + * Delete the message from the queue. + * + * @param string $receipthandle + * @return void + */ + protected function delete_message(string $receipthandle): void { + $sqs = new sqs(); + $sqs->get_client()->deleteMessage([ + 'QueueUrl' => $this->get_queue_url(), + 'ReceiptHandle' => $receipthandle, + ]); + } + + /** + * Save message processing details to database + * + * @param array $message SQS message data + * @param string $status Processing status + * @param string|null $error Error message if any + * @return bool|int Returns record ID on success, false on failure + * @throws \dml_exception + */ + protected function save_message_record( + array $message, + string $status = self::STATUS_PROCESSED, + ?string $error = null + ): bool|int { + global $DB, $USER; + + try { + $record = new \stdClass(); + $record->messageid = $message['MessageId']; + $record->receipthandle = $message['ReceiptHandle']; + $record->queueurl = $this->get_queue_url(); + $record->status = $status; + $record->payload = $message['Body']; + $record->error_message = $error; + $record->timecreated = time(); + $record->usermodified = $USER->id; + + return $DB->insert_record('local_sitsgradepush_aws_log', $record); + } catch (\Exception $e) { + logger::log($e->getMessage(), null, 'Failed to save message record'); + return false; + } + } +} diff --git a/classes/extension/ec.php b/classes/extension/ec.php index bdc7316..edffff7 100644 --- a/classes/extension/ec.php +++ b/classes/extension/ec.php @@ -72,13 +72,13 @@ public function process_extension(): void { * Set the EC properties from the AWS EC update message. * Note: The AWS EC update message is not yet developed, will implement this when the message is available. * - * @param string $message + * @param string $messagebody * @return void * @throws \dml_exception|\moodle_exception */ - public function set_properties_from_aws_message(string $message): void { + public function set_properties_from_aws_message(string $messagebody): void { // Decode the JSON message. - $messagedata = $this->parse_event_json($message); + $messagedata = $this->parse_event_json($messagebody); // Set the user ID of the student. $this->set_userid($messagedata->student_code); diff --git a/classes/extension/extension.php b/classes/extension/extension.php index f09f96a..56179ff 100644 --- a/classes/extension/extension.php +++ b/classes/extension/extension.php @@ -40,10 +40,10 @@ abstract class extension implements iextension { /** * Set properties from JSON message like SORA / EC update message from AWS. * - * @param string $message + * @param string $messagebody * @return void */ - abstract public function set_properties_from_aws_message(string $message): void; + abstract public function set_properties_from_aws_message(string $messagebody): void; /** * Set properties from get students API. @@ -71,6 +71,19 @@ public function get_mab_identifier(): string { return $this->mabidentifier; } + /** + * Check if the module type is supported. + * + * @param string|null $module + * @return bool + */ + public static function is_module_supported(?string $module): bool { + if (empty($module)) { + return false; + } + return in_array($module, self::SUPPORTED_MODULE_TYPES); + } + /** * Get all the assessment mappings by MAB identifier. * @@ -157,8 +170,11 @@ protected function get_mappings_by_userid(int $userid): array { */ protected function parse_event_json(string $message): \stdClass { $messageobject = json_decode($message); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new \Exception(get_string('error:invalid_json_data', 'local_sitsgradepush', json_last_error_msg())); + } if (empty($messageobject)) { - throw new \Exception('Invalid message data'); + throw new \Exception(get_string('error:empty_json_data', 'local_sitsgradepush')); } return $messageobject; } diff --git a/classes/extension/sora.php b/classes/extension/sora.php index b8b2e93..c44d89a 100644 --- a/classes/extension/sora.php +++ b/classes/extension/sora.php @@ -125,32 +125,38 @@ public function get_extension_group_name(): string { /** * Set properties from AWS SORA update message. * - * @param string $message + * @param string $messagebody * @return void * @throws \dml_exception * @throws \moodle_exception */ - public function set_properties_from_aws_message(string $message): void { + public function set_properties_from_aws_message(string $messagebody): void { - // Decode the JSON message. - $messagedata = $this->parse_event_json($message); + // Decode the JSON message body. + $messagedata = $this->parse_event_json($messagebody); // Check the message is valid. if (empty($messagedata->entity->person_sora->sora[0])) { - throw new \moodle_exception('error:invalid_message', 'local_sitsgradepush', '', null, $message); + throw new \moodle_exception('error:invalid_message', 'local_sitsgradepush', '', null, $messagebody); } $soradata = $messagedata->entity->person_sora->sora[0]; - // Set the user ID of the student. - $this->set_userid($soradata->person->student_code); - // Set properties. - $this->extraduration = (int) $soradata->extra_duration; - $this->restduration = (int) $soradata->rest_duration; + $this->extraduration = (int) $soradata->extra_duration ?? 0; + $this->restduration = (int) $soradata->rest_duration ?? 0; + + // A SORA update message must have at least one of the durations. + if ($this->extraduration == 0 && $this->restduration == 0) { + throw new \moodle_exception('error:invalid_duration', 'local_sitsgradepush'); + } // Calculate and set the time extension in seconds. - $this->timeextension = $this->calculate_time_extension($this->get_extra_duration(), $this->get_rest_duration()); + $this->timeextension = $this->calculate_time_extension($this->extraduration, $this->restduration); + + // Set the user ID of the student. + $this->set_userid($soradata->person->student_code); + $this->dataisset = true; } diff --git a/classes/extension/sora_queue_processor.php b/classes/extension/sora_queue_processor.php new file mode 100644 index 0000000..3b84c6b --- /dev/null +++ b/classes/extension/sora_queue_processor.php @@ -0,0 +1,52 @@ +. + +namespace local_sitsgradepush\extension; + +/** + * SORA queue processor. + * + * @package local_sitsgradepush + * @copyright 2024 onwards University College London {@link https://www.ucl.ac.uk/} + * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later + * @author Alex Yeung + */ +class sora_queue_processor extends aws_queue_processor { + /** + * Get the queue URL. + * + * @return string + * @throws \dml_exception + */ + protected function get_queue_url(): string { + return get_config('local_sitsgradepush', 'aws_sora_sqs_queue_url'); + } + + /** + * Process the aws SORA message. + * + * @param array $messagebody SORA message data body + * + * @throws \coding_exception + * @throws \moodle_exception + * @throws \dml_exception + */ + protected function process_message(array $messagebody): void { + $sora = new sora(); + $sora->set_properties_from_aws_message($messagebody['Message']); + $sora->process_extension(); + } +} diff --git a/classes/manager.php b/classes/manager.php index de5a7dd..5bc1662 100644 --- a/classes/manager.php +++ b/classes/manager.php @@ -25,6 +25,7 @@ use local_sitsgradepush\api\irequest; use local_sitsgradepush\assessment\assessment; use local_sitsgradepush\assessment\assessmentfactory; +use local_sitsgradepush\extension\extension; use local_sitsgradepush\output\pushrecord; use local_sitsgradepush\submission\submissionfactory; @@ -487,7 +488,8 @@ public function save_assessment_mapping(\stdClass $data): int|bool { } $record->componentgradeid = $data->componentgradeid; $record->reassessment = $data->reassessment; - $record->enableextension = get_config('local_sitsgradepush', 'extension_enabled') ? 1 : 0; + $record->enableextension = (get_config('local_sitsgradepush', 'extension_enabled') && + (isset($record->moduletype) && extension::is_module_supported($record->moduletype))) ? 1 : 0; $record->timecreated = time(); $record->timemodified = time(); diff --git a/classes/task/process_aws_sora_updates.php b/classes/task/process_aws_sora_updates.php new file mode 100644 index 0000000..5865d7b --- /dev/null +++ b/classes/task/process_aws_sora_updates.php @@ -0,0 +1,54 @@ +. + +namespace local_sitsgradepush\task; + +use local_sitsgradepush\extension\sora_queue_processor; + +/** + * Scheduled task to process AWS SORA updates. + * + * @package local_sitsgradepush + * @copyright 2024 onwards University College London {@link https://www.ucl.ac.uk/} + * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later + * @author Alex Yeung + */ +class process_aws_sora_updates extends \core\task\scheduled_task { + /** + * Return name of the task. + * + * @return string + * @throws \coding_exception + */ + public function get_name() { + return get_string('task:process_aws_sora_updates', 'local_sitsgradepush'); + } + + /** + * Execute the task. + * @throws \Exception + */ + public function execute(): void { + // Skip if extension is not enabled. + if (!get_config('local_sitsgradepush', 'extension_enabled')) { + mtrace('Extension processing is not enabled. Exiting...'); + return; + } + + $processor = new sora_queue_processor(); + $processor->execute(); + } +} diff --git a/db/install.xml b/db/install.xml index 24579bd..8108462 100644 --- a/db/install.xml +++ b/db/install.xml @@ -1,5 +1,5 @@ - @@ -111,5 +111,26 @@ + + + + + + + + + + + + + + + + + + + + +
diff --git a/db/tasks.php b/db/tasks.php index 83aaa7b..6f85f1f 100644 --- a/db/tasks.php +++ b/db/tasks.php @@ -44,4 +44,13 @@ 'month' => '*', 'dayofweek' => '*', ], + [ + 'classname' => 'local_sitsgradepush\task\process_aws_sora_updates', + 'blocking' => 0, + 'minute' => '0', + 'hour' => '0', + 'day' => '*', + 'month' => '*', + 'dayofweek' => '*', + ], ]; diff --git a/db/upgrade.php b/db/upgrade.php index 52b456f..77e85ea 100644 --- a/db/upgrade.php +++ b/db/upgrade.php @@ -549,5 +549,38 @@ function xmldb_local_sitsgradepush_upgrade($oldversion) { upgrade_plugin_savepoint(true, 2024101100, 'local', 'sitsgradepush'); } + if ($oldversion < 2024110100) { + + // Define table local_sitsgradepush_aws_log to be created. + $table = new xmldb_table('local_sitsgradepush_aws_log'); + + // Adding fields to table local_sitsgradepush_aws_log. + $table->add_field('id', XMLDB_TYPE_INTEGER, '10', null, XMLDB_NOTNULL, XMLDB_SEQUENCE, null); + $table->add_field('messageid', XMLDB_TYPE_CHAR, '255', null, XMLDB_NOTNULL, null, null); + $table->add_field('receipthandle', XMLDB_TYPE_TEXT, null, null, XMLDB_NOTNULL, null, null); + $table->add_field('queueurl', XMLDB_TYPE_CHAR, '255', null, XMLDB_NOTNULL, null, null); + $table->add_field('status', XMLDB_TYPE_CHAR, '20', null, XMLDB_NOTNULL, null, null); + $table->add_field('payload', XMLDB_TYPE_TEXT, null, null, XMLDB_NOTNULL, null, null); + $table->add_field('error_message', XMLDB_TYPE_TEXT, null, null, null, null, null); + $table->add_field('timecreated', XMLDB_TYPE_INTEGER, '10', null, XMLDB_NOTNULL, null, null); + $table->add_field('usermodified', XMLDB_TYPE_INTEGER, '10', null, XMLDB_NOTNULL, null, null); + + // Adding keys to table local_sitsgradepush_aws_log. + $table->add_key('primary', XMLDB_KEY_PRIMARY, ['id']); + + // Adding indexes to table local_sitsgradepush_aws_log. + $table->add_index('messageid', XMLDB_INDEX_NOTUNIQUE, ['messageid']); + $table->add_index('status', XMLDB_INDEX_NOTUNIQUE, ['status']); + $table->add_index('timecreated', XMLDB_INDEX_NOTUNIQUE, ['timecreated']); + + // Conditionally launch create table for local_sitsgradepush_aws_log. + if (!$dbman->table_exists($table)) { + $dbman->create_table($table); + } + + // Sitsgradepush savepoint reached. + upgrade_plugin_savepoint(true, 2024110100, 'local', 'sitsgradepush'); + } + return true; } diff --git a/lang/en/local_sitsgradepush.php b/lang/en/local_sitsgradepush.php index 23e5a2e..8f64567 100644 --- a/lang/en/local_sitsgradepush.php +++ b/lang/en/local_sitsgradepush.php @@ -95,6 +95,7 @@ $string['error:duplicatedtask'] = 'There is already a transfer task in queue / processing for this assessment mapping.'; $string['error:duplicatemapping'] = 'Cannot map multiple assessment components with same module delivery to an activity. Mapcode: {$a}'; $string['error:ecextensionnotsupported'] = 'EC extension is not supported for this assessment.'; +$string['error:empty_json_data'] = 'Empty JSON data'; $string['error:emptyresponse'] = 'Empty response received when calling {$a}.'; $string['error:extensiondataisnotset'] = 'Extension data is not set.'; $string['error:failtomapassessment'] = 'Failed to map assessment component to source.'; @@ -104,6 +105,7 @@ $string['error:gradesneedregrading'] = 'Marks transfer is not available while grades are being recalculated.'; $string['error:gradetype_not_supported'] = 'The grade type of this assessment is not supported for marks transfer.'; $string['error:inserttask'] = 'Failed to insert task.'; +$string['error:invalid_json_data'] = 'Invalid JSON data: {$a}'; $string['error:invalid_message'] = 'Invalid message received.'; $string['error:invalid_source_type'] = 'Invalid source type. {$a}'; $string['error:lesson_practice'] = 'Practice lessons have no grades'; @@ -238,6 +240,7 @@ $string['subplugintype_sitsapiclient_plural'] = 'API clients used for data integration.'; $string['task:adhoctask'] = 'Adhoc Task'; $string['task:assesstype:name'] = 'Insert Assessment Type for Pre-mapped Assessments'; +$string['task:process_aws_sora_updates'] = 'Process AWS SORA updates'; $string['task:processextensions'] = 'Process SORA and EC extensions'; $string['task:pushtask:name'] = 'Schedule Transfer Task'; $string['task:requested:success'] = 'Transfer task requested successfully'; diff --git a/version.php b/version.php index 20e1660..8068fdd 100644 --- a/version.php +++ b/version.php @@ -27,7 +27,7 @@ $plugin->component = 'local_sitsgradepush'; $plugin->release = '0.1.0'; -$plugin->version = 2024103000; +$plugin->version = 2024110100; $plugin->requires = 2024042200; $plugin->maturity = MATURITY_ALPHA; $plugin->dependencies = [