diff --git a/src/Nexus.Sources.Remote/DataSourceTypes.cs b/src/Nexus.Sources.Remote/DataSourceTypes.cs index 57eb2d4..af56736 100644 --- a/src/Nexus.Sources.Remote/DataSourceTypes.cs +++ b/src/Nexus.Sources.Remote/DataSourceTypes.cs @@ -1,8 +1,9 @@ -using Microsoft.Extensions.Logging; -using Newtonsoft.Json.Linq; +using System.Globalization; +using System.Text.Json; +using System.Text.Json.Serialization; +using Microsoft.Extensions.Logging; using Nexus.DataModel; using Nexus.Extensibility; -using System.Text.Json; namespace Nexus.Sources; @@ -41,40 +42,32 @@ internal class RemoteException(string message, Exception? innerException = defau { } -internal class JsonElementConverter : Newtonsoft.Json.JsonConverter +internal class RoundtripDateTimeConverter : JsonConverter { - internal static JsonSerializerOptions _serializerOptions = new() + public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) { - PropertyNamingPolicy = JsonNamingPolicy.CamelCase - }; - - public override bool CanConvert(Type objectType) - { - var canConvert = objectType == typeof(JsonElement); - return canConvert; + if (!DateTime.TryParseExact + ( + reader.GetString(), + "o", + CultureInfo.InvariantCulture, + DateTimeStyles.AdjustToUniversal, + out var dateTime + ) + ) + { + throw new JsonException(); + } + + return dateTime; } - public override object? ReadJson( - Newtonsoft.Json.JsonReader reader, - Type objectType, - object? existingValue, - Newtonsoft.Json.JsonSerializer serializer + public override void Write( + Utf8JsonWriter writer, + DateTime value, + JsonSerializerOptions options ) { - if (reader.TokenType == Newtonsoft.Json.JsonToken.Null) - return default; - - if (reader.TokenType == Newtonsoft.Json.JsonToken.String) - return JsonSerializer.SerializeToElement(JToken.Load(reader).ToString()); - - var serialized_tmp = JToken.Load(reader).ToString(); - var deserialized = JsonSerializer.Deserialize(serialized_tmp); - return deserialized; - } - - public override void WriteJson(Newtonsoft.Json.JsonWriter writer, object? value, Newtonsoft.Json.JsonSerializer serializer) - { - var jsonString = JsonSerializer.Serialize(value, _serializerOptions); - writer.WriteRawValue(jsonString); + writer.WriteStringValue(value.ToString("o", CultureInfo.InvariantCulture)); } -} +} \ No newline at end of file diff --git a/src/Nexus.Sources.Remote/Remote.cs b/src/Nexus.Sources.Remote/Remote.cs index 4c56880..2bf0742 100644 --- a/src/Nexus.Sources.Remote/Remote.cs +++ b/src/Nexus.Sources.Remote/Remote.cs @@ -2,7 +2,6 @@ using Nexus.DataModel; using Nexus.Extensibility; using System.Buffers; -using System.Net; using System.Reflection; using System.Text.Json; using System.Text.RegularExpressions; diff --git a/src/Nexus.Sources.Remote/RemoteCommunicator.cs b/src/Nexus.Sources.Remote/RemoteCommunicator.cs index 3848551..773260c 100644 --- a/src/Nexus.Sources.Remote/RemoteCommunicator.cs +++ b/src/Nexus.Sources.Remote/RemoteCommunicator.cs @@ -1,9 +1,8 @@ -using System.Net; -using System.Net.Sockets; +using System.Net.Sockets; using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; using Microsoft.Extensions.Logging; -using Newtonsoft.Json.Converters; -using Newtonsoft.Json.Serialization; using StreamJsonRpc; namespace Nexus.Sources; @@ -61,18 +60,18 @@ public async Task ConnectAsync(CancellationToken cancellationTok await _dataStream.WriteAsync(Encoding.UTF8.GetBytes("data"), cancellationToken); await _dataStream.FlushAsync(cancellationToken); - var formatter = new JsonMessageFormatter() + var options = new JsonSerializerOptions() { - JsonSerializer = { - ContractResolver = new DefaultContractResolver - { - NamingStrategy = new CamelCaseNamingStrategy() - } - } + PropertyNamingPolicy = JsonNamingPolicy.CamelCase }; - formatter.JsonSerializer.Converters.Add(new JsonElementConverter()); - formatter.JsonSerializer.Converters.Add(new StringEnumConverter()); + options.Converters.Add(new JsonStringEnumConverter()); + options.Converters.Add(new RoundtripDateTimeConverter()); + + var formatter = new SystemTextJsonFormatter() + { + JsonSerializerOptions = options + }; var messageHandler = new LengthHeaderMessageHandler(_commStream, _commStream, formatter); var jsonRpc = new JsonRpc(messageHandler); diff --git a/src/remoting/dotnet/Remoting.cs b/src/remoting/dotnet/Remoting.cs index b9e0cea..1b97b77 100644 --- a/src/remoting/dotnet/Remoting.cs +++ b/src/remoting/dotnet/Remoting.cs @@ -172,7 +172,7 @@ static JsonElement Read(Span jsonRequest) response.Add("jsonrpc", "2.0"); var id = request.TryGetProperty("id", out var element2) - ? element2.ToString() + ? element2.GetInt32() : throw new Exception("Unable to read the request message id."); response.Add("id", id); @@ -289,8 +289,8 @@ static JsonElement Read(Span jsonRequest) result = new JsonObject() { - ["begin"] = begin, - ["end"] = end + ["begin"] = begin.ToString("o", CultureInfo.InvariantCulture), + ["end"] = end.ToString("o", CultureInfo.InvariantCulture) }; } @@ -302,10 +302,10 @@ static JsonElement Read(Span jsonRequest) var catalogId = @params[0].GetString()!; var beginString = @params[1].GetString()!; - var begin = DateTime.ParseExact(beginString, "yyyy-MM-ddTHH:mm:ssZ", CultureInfo.InvariantCulture); + var begin = DateTime.ParseExact(beginString, "o", CultureInfo.InvariantCulture); var endString = @params[2].GetString()!; - var end = DateTime.ParseExact(endString, "yyyy-MM-ddTHH:mm:ssZ", CultureInfo.InvariantCulture); + var end = DateTime.ParseExact(endString, "o", CultureInfo.InvariantCulture); var availability = await _dataSource.GetAvailabilityAsync(catalogId, begin, end, cancellationToken); @@ -321,10 +321,10 @@ static JsonElement Read(Span jsonRequest) throw new Exception("The data source context must be set before invoking other methods."); var beginString = @params[0].GetString()!; - var begin = DateTime.ParseExact(beginString, "yyyy-MM-ddTHH:mm:ssZ", CultureInfo.InvariantCulture).ToUniversalTime(); + var begin = DateTime.ParseExact(beginString, "o", CultureInfo.InvariantCulture).ToUniversalTime(); var endString = @params[1].GetString()!; - var end = DateTime.ParseExact(endString, "yyyy-MM-ddTHH:mm:ssZ", CultureInfo.InvariantCulture).ToUniversalTime(); + var end = DateTime.ParseExact(endString, "o", CultureInfo.InvariantCulture).ToUniversalTime(); var originalResourceName = @params[2].GetString()!; @@ -377,7 +377,13 @@ private async Task HandleReadDataAsync( { ["jsonrpc"] = "2.0", ["method"] = "readData", - ["params"] = new JsonArray(resourcePath, begin, end) + ["params"] = new JsonArray + ( + resourcePath, + begin.ToString("o", CultureInfo.InvariantCulture), + end.ToString("o", CultureInfo.InvariantCulture + ) + ) }; _logger.LogDebug("Read resource path {ResourcePath} from Nexus", resourcePath); @@ -419,6 +425,7 @@ static Utilities() }; Options.Converters.Add(new JsonStringEnumConverter()); + Options.Converters.Add(new RoundtripDateTimeConverter()); } public static JsonSerializerOptions Options { get; } @@ -476,4 +483,34 @@ protected override void Dispose(bool disposing) public override MemoryHandle Pin(int elementIndex = 0) => throw new NotSupportedException("CastMemoryManager does not support pinning."); public override void Unpin() => throw new NotSupportedException("CastMemoryManager does not support unpinning."); +} + +internal class RoundtripDateTimeConverter : JsonConverter +{ + public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (!DateTime.TryParseExact + ( + reader.GetString(), + "o", + CultureInfo.InvariantCulture, + DateTimeStyles.AdjustToUniversal, + out var dateTime + ) + ) + { + throw new JsonException(); + } + + return dateTime; + } + + public override void Write( + Utf8JsonWriter writer, + DateTime value, + JsonSerializerOptions options + ) + { + writer.WriteStringValue(value.ToString("o", CultureInfo.InvariantCulture)); + } } \ No newline at end of file diff --git a/src/remoting/python/nexus_remoting/_remoting.py b/src/remoting/python/nexus_remoting/_remoting.py index c6c896b..8abc4bb 100644 --- a/src/remoting/python/nexus_remoting/_remoting.py +++ b/src/remoting/python/nexus_remoting/_remoting.py @@ -18,8 +18,12 @@ property_name_decoder=to_snake_case ) +_json_encoder_options.encoders[datetime] = lambda value: value.strftime("%Y-%m-%dT%H:%M:%S.%f") + "0+00:00" + class _Logger(ILogger): + _background_tasks = set[asyncio.Task]() + def __init__(self, tcp_comm_socket: asyncio.StreamWriter): self._comm_writer = tcp_comm_socket @@ -31,7 +35,9 @@ def log(self, log_level: LogLevel, message: str): "params": [log_level.name, message] } - asyncio.create_task(_send_to_server(notification, self._comm_writer)) + task = asyncio.create_task(_send_to_server(notification, self._comm_writer)) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) class RemoteCommunicator: """A remote communicator.""" @@ -222,8 +228,8 @@ async def _process_invocation(self, request: dict[str, Any]) \ raise Exception("The data source context must be set before invoking other methods.") catalog_id = params[0] - begin = datetime.strptime(params[1], "%Y-%m-%dT%H:%M:%SZ") - end = datetime.strptime(params[2], "%Y-%m-%dT%H:%M:%SZ") + begin = _json_encoder_options.decoders[datetime](datetime, params[1]) + end = _json_encoder_options.decoders[datetime](datetime, params[2]) availability = await self._data_source.get_availability(catalog_id, begin, end) result = { @@ -235,8 +241,8 @@ async def _process_invocation(self, request: dict[str, Any]) \ if self._data_source is None: raise Exception("The data source context must be set before invoking other methods.") - begin = datetime.strptime(params[0], "%Y-%m-%dT%H:%M:%SZ") - end = datetime.strptime(params[1], "%Y-%m-%dT%H:%M:%SZ") + begin = _json_encoder_options.decoders[datetime](datetime, params[0]) + end = _json_encoder_options.decoders[datetime](datetime, params[1]) original_resource_name = params[2] catalog_item = JsonEncoder.decode(CatalogItem, params[3], _json_encoder_options) (data, status) = ExtensibilityUtilities.create_buffers(catalog_item.representation, begin, end) @@ -275,7 +281,11 @@ async def _handle_read_data(self, resource_path: str, begin: datetime, end: date read_data_request = { "jsonrpc": "2.0", "method": "readData", - "params": [resource_path, begin, end] + "params": [ + resource_path, + begin, + end + ] } await _send_to_server(read_data_request, self._comm_writer) diff --git a/tests/Nexus.Sources.Remote.Tests/RemoteTests.cs b/tests/Nexus.Sources.Remote.Tests/RemoteTests.cs index 40cf44e..edd3c05 100644 --- a/tests/Nexus.Sources.Remote.Tests/RemoteTests.cs +++ b/tests/Nexus.Sources.Remote.Tests/RemoteTests.cs @@ -3,7 +3,6 @@ using Moq; using Nexus.DataModel; using Nexus.Extensibility; -using System.Runtime.InteropServices; using System.Text.Json; using System.Text.Json.Nodes; using Xunit; @@ -111,9 +110,6 @@ public async Task CanProvideAvailability(string language) [InlineData(PYTHON)] public async Task CanReadFullDay(string language) { - // TODO fix this - var complexData = true; - await _fixture.Initialize; var dataSource = new Remote() as IDataSource; @@ -139,31 +135,23 @@ public async Task CanReadFullDay(string language) var expectedData = new long[length]; var expectedStatus = new byte[length]; - if (complexData) + void GenerateData(DateTimeOffset dateTime) { - void GenerateData(DateTimeOffset dateTime) - { - var data = Enumerable.Range(0, 600) - .Select(value => dateTime.Add(TimeSpan.FromSeconds(value)).ToUnixTimeSeconds()) - .ToArray(); - - var offset = (int)(dateTime - begin).TotalSeconds; - data.CopyTo(expectedData.AsSpan()[offset..]); - expectedStatus.AsSpan().Slice(offset, 600).Fill(1); - } - - GenerateData(new DateTimeOffset(2019, 12, 31, 12, 00, 0, 0, TimeSpan.Zero)); - GenerateData(new DateTimeOffset(2019, 12, 31, 12, 20, 0, 0, TimeSpan.Zero)); - GenerateData(new DateTimeOffset(2020, 01, 01, 00, 00, 0, 0, TimeSpan.Zero)); - GenerateData(new DateTimeOffset(2020, 01, 02, 09, 40, 0, 0, TimeSpan.Zero)); - GenerateData(new DateTimeOffset(2020, 01, 02, 09, 50, 0, 0, TimeSpan.Zero)); - } - else - { - MemoryMarshal.AsBytes(expectedData.AsSpan()).Fill((byte)'d'); - expectedStatus.AsSpan().Fill((byte)'s'); + var data = Enumerable.Range(0, 600) + .Select(value => dateTime.Add(TimeSpan.FromSeconds(value)).ToUnixTimeSeconds()) + .ToArray(); + + var offset = (int)(dateTime - begin).TotalSeconds; + data.CopyTo(expectedData.AsSpan()[offset..]); + expectedStatus.AsSpan().Slice(offset, 600).Fill(1); } + GenerateData(new DateTimeOffset(2019, 12, 31, 12, 00, 0, 0, TimeSpan.Zero)); + GenerateData(new DateTimeOffset(2019, 12, 31, 12, 20, 0, 0, TimeSpan.Zero)); + GenerateData(new DateTimeOffset(2020, 01, 01, 00, 00, 0, 0, TimeSpan.Zero)); + GenerateData(new DateTimeOffset(2020, 01, 02, 09, 40, 0, 0, TimeSpan.Zero)); + GenerateData(new DateTimeOffset(2020, 01, 02, 09, 50, 0, 0, TimeSpan.Zero)); + var request = new ReadRequest(resource.Id, catalogItem, data, status); await dataSource.ReadAsync(begin, end, [request], default!, new Progress(), CancellationToken.None); var longData = new CastMemoryManager(data).Memory; @@ -172,6 +160,45 @@ void GenerateData(DateTimeOffset dateTime) Assert.True(expectedStatus.SequenceEqual(status.ToArray())); } + [Theory] + [InlineData(DOTNET)] + [InlineData(PYTHON)] + public async Task CanRoundtripDateTime(string language) + { + // Arrange + await _fixture.Initialize; + + var dataSource = new Remote() as IDataSource; + var context = CreateContext(language); + + await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None); + + var begin = new DateTime(2020, 01, 01, 0, 0, 0, 20, DateTimeKind.Utc); + var end = new DateTime(2020, 01, 01, 0, 0, 0, 40, DateTimeKind.Utc); + var catalog = await dataSource.EnrichCatalogAsync(new ResourceCatalog("/A/B/C"), CancellationToken.None); + var resource = catalog.Resources![0]; + var representation = resource.Representations![0]; + + var catalogItem = new CatalogItem( + catalog with { Resources = default! }, + resource with { Representations = default! }, + representation, + default); + + var (data, status) = ExtensibilityUtilities.CreateBuffers(representation, begin, end); + var request = new ReadRequest(resource.Id, catalogItem, data, status); + + // Act + await dataSource.ReadAsync( + begin, + end, + [request], + default!, + new Progress(), + CancellationToken.None + ); + } + [Theory] [InlineData(DOTNET)] [InlineData(PYTHON)] diff --git a/tests/Nexus.Sources.Remote.Tests/python/v1/src/foo/test.py b/tests/Nexus.Sources.Remote.Tests/python/v1/src/foo/test.py index e33a10a..be55f7a 100644 --- a/tests/Nexus.Sources.Remote.Tests/python/v1/src/foo/test.py +++ b/tests/Nexus.Sources.Remote.Tests/python/v1/src/foo/test.py @@ -109,7 +109,7 @@ async def get_availability(self, catalog_id: str, begin: datetime, end: datetime max_file_count = (end - begin).total_seconds() / period_per_file.total_seconds() file_paths = glob.glob(url2pathname(self._root) + "/**/*.dat", recursive=True) file_names = [os.path.basename(file_path) for file_path in file_paths] - date_times = [datetime.strptime(fileName, '%Y-%m-%d_%H-%M-%S.dat') for fileName in file_names] + date_times = [datetime.strptime(fileName, "%Y-%m-%d_%H-%M-%S.dat").replace(tzinfo=timezone.utc) for fileName in file_names] filtered_date_times = [current for current in date_times if current >= begin and current < end] actual_file_count = len(filtered_date_times) @@ -168,7 +168,10 @@ async def _read_local_files( for file_path in file_paths: fileName = os.path.basename(file_path) - file_begin = datetime.strptime(fileName, '%Y-%m-%d_%H-%M-%S.dat') + + file_begin = datetime \ + .strptime(fileName, '%Y-%m-%d_%H-%M-%S.dat') \ + .replace(tzinfo=timezone.utc) # if file date/time is within the limits if file_begin >= current_begin and file_begin < end: