Skip to content

Commit

Permalink
Merge pull request #943 from Project-MONAI/AI-376
Browse files Browse the repository at this point in the history
Ai 376
  • Loading branch information
neildsouth authored Jan 10, 2024
2 parents acd4284 + f2b19c8 commit 18a99ed
Show file tree
Hide file tree
Showing 24 changed files with 435 additions and 10 deletions.
3 changes: 3 additions & 0 deletions src/Common/Configuration/WorkflowManagerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public class WorkflowManagerOptions : PagedOptions
[ConfigurationKeyName("migExternalAppPlugins")]
public string[] MigExternalAppPlugins { get; set; }

[ConfigurationKeyName("dataRetentionDays")]
public int DataRetentionDays { get; set; }

public WorkflowManagerOptions()
{
Messaging = new MessageBrokerConfiguration();
Expand Down
8 changes: 8 additions & 0 deletions src/WorkflowManager/Common/Interfaces/IPayloadService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,13 @@ Task<IList<PayloadDto>> GetAllAsync(int? skip = null,
/// <param name="payload"></param>
/// <returns></returns>
Task<bool> UpdateWorkflowInstanceIdsAsync(string payloadId, IEnumerable<string> workflowInstances);

/// <summary>
/// Gets the expiry date for a payload.
/// </summary>
/// <param name="now"></param>
/// <param name="workflowInstanceId"></param>
/// <returns>date of expiry or null</returns>
Task<DateTime?> GetExpiry(DateTime now, string? workflowInstanceId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<AdditionalFiles Include="..\..\.sonarlint\project-monai_monai-deploy-workflow-manager\CSharp\SonarLint.xml" Link="SonarLint.xml" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Common\Configuration\Monai.Deploy.WorkflowManager.Common.Configuration.csproj" />
<ProjectReference Include="..\Contracts\Monai.Deploy.WorkflowManager.Contracts.csproj" />
<ProjectReference Include="..\Database\Monai.Deploy.WorkflowManager.Database.csproj" />
<ProjectReference Include="..\Storage\Monai.Deploy.WorkflowManager.Storage.csproj" />
Expand Down
34 changes: 33 additions & 1 deletion src/WorkflowManager/Common/Services/PayloadService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
using Monai.Deploy.WorkflowManager.Common.Database.Interfaces;
using Monai.Deploy.WorkflowManager.Common.Logging;
using Monai.Deploy.WorkflowManager.Common.Storage.Services;
using Microsoft.Extensions.Options;
using Monai.Deploy.WorkflowManager.Common.Configuration;

namespace Monai.Deploy.WorkflowManager.Common.Miscellaneous.Services
{
Expand All @@ -34,23 +36,31 @@ public class PayloadService : IPayloadService

private readonly IWorkflowInstanceRepository _workflowInstanceRepository;

private readonly IWorkflowRepository _workflowRepository;

private readonly IDicomService _dicomService;

private readonly IStorageService _storageService;

private readonly WorkflowManagerOptions _options;

private readonly ILogger<PayloadService> _logger;

public PayloadService(
IPayloadRepository payloadRepository,
IDicomService dicomService,
IWorkflowInstanceRepository workflowInstanceRepository,
IWorkflowRepository workflowRepository,
IServiceScopeFactory serviceScopeFactory,
IOptions<WorkflowManagerOptions> options,
ILogger<PayloadService> logger)
{
_payloadRepository = payloadRepository ?? throw new ArgumentNullException(nameof(payloadRepository));
_workflowInstanceRepository = workflowInstanceRepository ?? throw new ArgumentNullException(nameof(workflowInstanceRepository));
_dicomService = dicomService ?? throw new ArgumentNullException(nameof(dicomService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_workflowRepository = workflowRepository ?? throw new ArgumentNullException(nameof(workflowRepository));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));

var scopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
var scope = scopeFactory.CreateScope();
Expand Down Expand Up @@ -85,7 +95,8 @@ public PayloadService(
DataTrigger = eventPayload.DataTrigger,
Timestamp = eventPayload.Timestamp,
PatientDetails = patientDetails,
PayloadDeleted = PayloadDeleted.No
PayloadDeleted = PayloadDeleted.No,
Expires = await GetExpiry(DateTime.UtcNow, eventPayload.WorkflowInstanceId)
};

if (await _payloadRepository.CreateAsync(payload))
Expand All @@ -106,6 +117,27 @@ public PayloadService(
return null;
}

public async Task<DateTime?> GetExpiry(DateTime now, string? workflowInstanceId)
{
var daysToKeep = await GetWorkflowDataExpiry(workflowInstanceId);
daysToKeep ??= _options.DataRetentionDays;

if (daysToKeep == -1) { return null; }

return now.AddDays(daysToKeep.Value);
}

private async Task<int?> GetWorkflowDataExpiry(string? workflowInstanceId)
{
if (string.IsNullOrWhiteSpace(workflowInstanceId)) { return null; }

var workflowInstance = await _workflowInstanceRepository.GetByWorkflowInstanceIdAsync(workflowInstanceId);

if (workflowInstance is null) { return null; }

return (await _workflowRepository.GetByWorkflowIdAsync(workflowInstance.WorkflowId))?.Workflow?.DataRetentionDays ?? null;
}

public async Task<Payload> GetByIdAsync(string payloadId)
{
ArgumentNullException.ThrowIfNullOrWhiteSpace(payloadId, nameof(payloadId));
Expand Down
42 changes: 42 additions & 0 deletions src/WorkflowManager/Contracts/Migrations/M004_Payload_expires.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//
// Copyright 2023 Guy’s and St Thomas’ NHS Foundation Trust
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
using Mongo.Migration.Migrations.Document;
using MongoDB.Bson;

namespace Monai.Deploy.WorkflowManager.Common.Contracts.Migrations
{
public class M004_Payload_Expires : DocumentMigration<Payload>
{
public M004_Payload_Expires() : base("1.0.4") { }

public override void Up(BsonDocument document)
{
document.Add("Expires", BsonNull.Create(null).ToJson(), true); //null = never expires
}

public override void Down(BsonDocument document)
{
try
{
document.Remove("DataTrigger");
}
catch
{ // can ignore we dont want failures stopping startup !
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//
// Copyright 2023 Guy’s and St Thomas’ NHS Foundation Trust
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
using Mongo.Migration.Migrations.Document;
using MongoDB.Bson;


namespace Monai.Deploy.WorkflowManager.Common.Contracts.Migrations
{
public class M004_WorkflowRevision_AddDataRetension : DocumentMigration<WorkflowRevision>
{
public M004_WorkflowRevision_AddDataRetension() : base("1.0.1") { }

public override void Up(BsonDocument document)
{
// document.Add("Workflow.DataRetentionDays", BsonNull.Create(null).ToJson(), true);
var workflow = document["Workflow"].AsBsonDocument;
workflow.Add("DataRetentionDays", BsonNull.Create(null).ToJson(), true);
}

public override void Down(BsonDocument document)
{
try
{
var workflow = document["Workflow"].AsBsonDocument;
workflow.Remove("DataRetentionDays");
}
catch
{ // can ignore we dont want failures stopping startup !
}
}
}
}
8 changes: 6 additions & 2 deletions src/WorkflowManager/Contracts/Models/Payload.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@

namespace Monai.Deploy.WorkflowManager.Common.Contracts.Models
{
[CollectionLocation("Payloads"), RuntimeVersion("1.0.3")]
[CollectionLocation("Payloads"), RuntimeVersion("1.0.4")]
public class Payload : IDocument
{
[JsonConverter(typeof(DocumentVersionConvert)), BsonSerializer(typeof(DocumentVersionConverBson))]
public DocumentVersion Version { get; set; } = new DocumentVersion(1, 0, 3);
public DocumentVersion Version { get; set; } = new DocumentVersion(1, 0, 4);

[JsonProperty(PropertyName = "id")]
public string Id { get; set; } = string.Empty;
Expand Down Expand Up @@ -67,6 +67,10 @@ public class Payload : IDocument
public PatientDetails PatientDetails { get; set; } = new PatientDetails();

public DataOrigin DataTrigger { get; set; } = new DataOrigin { DataService = DataService.DIMSE };

[JsonProperty(PropertyName = "expires")]
public DateTime? Expires { get; set; }

}

public enum PayloadDeleted
Expand Down
3 changes: 3 additions & 0 deletions src/WorkflowManager/Contracts/Models/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,8 @@ public class Workflow
[JsonProperty(PropertyName = "tasks")]
public TaskObject[] Tasks { get; set; } = System.Array.Empty<TaskObject>();

[JsonProperty(PropertyName = "dataRetentionDays")]
public int? DataRetentionDays { get; set; } = 3;// note. -1 = never delete

}
}
4 changes: 2 additions & 2 deletions src/WorkflowManager/Contracts/Models/WorkflowRevision.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@

namespace Monai.Deploy.WorkflowManager.Common.Contracts.Models
{
[CollectionLocation("Workflows"), RuntimeVersion("1.0.0")]
[CollectionLocation("Workflows"), RuntimeVersion("1.0.1")]
public class WorkflowRevision : ISoftDeleteable, IDocument
{
[BsonId]
[JsonProperty(PropertyName = "id")]
public string? Id { get; set; }

[JsonConverter(typeof(DocumentVersionConvert)), BsonSerializer(typeof(DocumentVersionConverBson))]
public DocumentVersion Version { get; set; } = new DocumentVersion(1, 0, 0);
public DocumentVersion Version { get; set; } = new DocumentVersion(1, 0, 1);

[JsonProperty(PropertyName = "workflow_id")]
public string WorkflowId { get; set; } = string.Empty;
Expand Down
17 changes: 17 additions & 0 deletions src/WorkflowManager/Database/Interfaces/IPayloadRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
Expand Down Expand Up @@ -52,11 +53,27 @@ public interface IPayloadRepository
/// <returns>The updated payload.</returns>
Task<bool> UpdateAsync(Payload payload);

/// <summary>
/// Updates a payload in the database.
/// </summary>
/// <param name="payloadId"></param>
/// <param name="workflowInstances"></param>
/// <returns></returns>
Task<bool> UpdateAssociatedWorkflowInstancesAsync(string payloadId, IEnumerable<string> workflowInstances);

/// <summary>
/// Gets all the payloads that might need deleted
/// </summary>
/// <param name="now">the current datetime</param>
/// <returns></returns>
Task<IList<Payload>> GetPayloadsToDelete(DateTime now);

/// <summary>
/// Marks a bunch of payloads as a new deleted state
/// </summary>
/// <param name="Ids">a list of payloadIds to mark in new status</param>
/// <param name="status">the status to mark as</param>
/// <returns></returns>
Task MarkDeletedState(IList<string> Ids, PayloadDeleted status);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Monai.Deploy.Messaging.Events;
using Monai.Deploy.WorkflowManager.Common.Contracts.Models;

namespace Monai.Deploy.WorkflowManager.Common.Database.Interfaces
Expand Down
55 changes: 55 additions & 0 deletions src/WorkflowManager/Database/Repositories/PayloadRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Ardalis.GuardClauses;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -47,6 +48,28 @@ public PayloadRepository(
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
var mongoDatabase = client.GetDatabase(databaseSettings.Value.DatabaseName);
_payloadCollection = mongoDatabase.GetCollection<Payload>("Payloads");
EnsureIndex().GetAwaiter().GetResult();
}

private async Task EnsureIndex()
{
var indexName = "PayloadDeletedIndex";

var model = new CreateIndexModel<Payload>(
Builders<Payload>.IndexKeys.Ascending(s => s.PayloadDeleted),
new CreateIndexOptions { Name = indexName }
);


var asyncCursor = (await _payloadCollection.Indexes.ListAsync());
var bsonDocuments = (await asyncCursor.ToListAsync());
var indexes = bsonDocuments.Select(_ => _.GetElement("name").Value.ToString()).ToList();

// If index not present create it else skip.
if (!indexes.Exists(i => i is not null && i.Equals(indexName)))
{
await _payloadCollection.Indexes.CreateOneAsync(model);
}
}

public Task<long> CountAsync() => CountAsync(_payloadCollection, null);
Expand Down Expand Up @@ -137,5 +160,37 @@ await _payloadCollection.FindOneAndUpdateAsync(
return false;
}
}

public async Task<IList<Payload>> GetPayloadsToDelete(DateTime now)
{
try
{
var filter = (Builders<Payload>.Filter.Eq(p => p.PayloadDeleted, PayloadDeleted.No) |
Builders<Payload>.Filter.Eq(p => p.PayloadDeleted, PayloadDeleted.Failed)) &
Builders<Payload>.Filter.Lt(p => p.Expires, now);

return await (await _payloadCollection.FindAsync(filter)).ToListAsync();

}
catch (Exception ex)
{
_logger.DbGetPayloadsToDeleteError(ex);
return new List<Payload>();
}
}

public async Task MarkDeletedState(IList<string> Ids, PayloadDeleted status)
{
try
{
var filter = Builders<Payload>.Filter.In(p => p.PayloadId, Ids);
var update = Builders<Payload>.Update.Set(p => p.PayloadDeleted, status);
await _payloadCollection.UpdateManyAsync(filter, update);
}
catch (Exception ex)
{
_logger.DbGetPayloadsToDeleteError(ex);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public async Task<IList<WorkflowRevision>> GetWorkflowsForWorkflowRequestAsync(s
x.Workflow.InformaticsGateway.DataOrigins.Any(d => d == callingAeTitle)) &&
x.Deleted == null)
.ToListAsync();

return wfs;
}

Expand Down
2 changes: 1 addition & 1 deletion src/WorkflowManager/Logging/Log.200000.Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public static partial class Log
[LoggerMessage(EventId = 210007, Level = LogLevel.Information, Message = "Exporting to MIG task Id {taskid}, export destination {destination} number of files {fileCount} Mig data plugins {plugins}.")]
public static partial void LogMigExport(this ILogger logger, string taskid, string destination, int fileCount, string plugins);

[LoggerMessage(EventId = 200018, Level = LogLevel.Error, Message = "ExportList or Artifacts are empty! workflowInstanceId {workflowInstanceId} TaskId {taskId}")]
[LoggerMessage(EventId = 210018, Level = LogLevel.Error, Message = "ExportList or Artifacts are empty! workflowInstanceId {workflowInstanceId} TaskId {taskId}")]
public static partial void ExportListOrArtifactsAreEmpty(this ILogger logger, string taskId, string workflowInstanceId);
}
}
3 changes: 3 additions & 0 deletions src/WorkflowManager/Logging/Log.800000.Database.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,8 @@ public static partial class Log

[LoggerMessage(EventId = 800014, Level = LogLevel.Error, Message = "Failed to update payload: '{payloadId}'.")]
public static partial void DbUpdatePayloadError(this ILogger logger, string payloadId, Exception ex);

[LoggerMessage(EventId = 800015, Level = LogLevel.Error, Message = "Failed to get payloads to delete.")]
public static partial void DbGetPayloadsToDeleteError(this ILogger logger, Exception ex);
}
}
Loading

0 comments on commit 18a99ed

Please sign in to comment.