Skip to content

Commit

Permalink
Fix all date time serialization (except logging :-/)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vincent Wilms committed Jan 29, 2025
1 parent dc42f06 commit df4fc39
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 89 deletions.
59 changes: 26 additions & 33 deletions src/Nexus.Sources.Remote/DataSourceTypes.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using System.Globalization;
using System.Text.Json;
using System.Text.Json.Serialization;
using Microsoft.Extensions.Logging;
using Nexus.DataModel;
using Nexus.Extensibility;
using System.Text.Json;

namespace Nexus.Sources;

Expand Down Expand Up @@ -41,40 +42,32 @@ internal class RemoteException(string message, Exception? innerException = defau
{
}

internal class JsonElementConverter : Newtonsoft.Json.JsonConverter
internal class RoundtripDateTimeConverter : JsonConverter<DateTime>
{
internal static JsonSerializerOptions _serializerOptions = new()
public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};

public override bool CanConvert(Type objectType)
{
var canConvert = objectType == typeof(JsonElement);
return canConvert;
if (!DateTime.TryParseExact
(
reader.GetString(),
"o",
CultureInfo.InvariantCulture,
DateTimeStyles.AdjustToUniversal,
out var dateTime
)
)
{
throw new JsonException();
}

return dateTime;
}

public override object? ReadJson(
Newtonsoft.Json.JsonReader reader,
Type objectType,
object? existingValue,
Newtonsoft.Json.JsonSerializer serializer
public override void Write(
Utf8JsonWriter writer,
DateTime value,
JsonSerializerOptions options
)
{
if (reader.TokenType == Newtonsoft.Json.JsonToken.Null)
return default;

if (reader.TokenType == Newtonsoft.Json.JsonToken.String)
return JsonSerializer.SerializeToElement(JToken.Load(reader).ToString());

var serialized_tmp = JToken.Load(reader).ToString();
var deserialized = JsonSerializer.Deserialize<JsonElement>(serialized_tmp);
return deserialized;
}

public override void WriteJson(Newtonsoft.Json.JsonWriter writer, object? value, Newtonsoft.Json.JsonSerializer serializer)
{
var jsonString = JsonSerializer.Serialize(value, _serializerOptions);
writer.WriteRawValue(jsonString);
writer.WriteStringValue(value.ToString("o", CultureInfo.InvariantCulture));
}
}
}
1 change: 0 additions & 1 deletion src/Nexus.Sources.Remote/Remote.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using Nexus.DataModel;
using Nexus.Extensibility;
using System.Buffers;
using System.Net;
using System.Reflection;
using System.Text.Json;
using System.Text.RegularExpressions;
Expand Down
25 changes: 12 additions & 13 deletions src/Nexus.Sources.Remote/RemoteCommunicator.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
using System.Net;
using System.Net.Sockets;
using System.Net.Sockets;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Converters;
using Newtonsoft.Json.Serialization;
using StreamJsonRpc;

namespace Nexus.Sources;
Expand Down Expand Up @@ -61,18 +60,18 @@ public async Task<IJsonRpcServer> ConnectAsync(CancellationToken cancellationTok
await _dataStream.WriteAsync(Encoding.UTF8.GetBytes("data"), cancellationToken);
await _dataStream.FlushAsync(cancellationToken);

var formatter = new JsonMessageFormatter()
var options = new JsonSerializerOptions()
{
JsonSerializer = {
ContractResolver = new DefaultContractResolver
{
NamingStrategy = new CamelCaseNamingStrategy()
}
}
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};

formatter.JsonSerializer.Converters.Add(new JsonElementConverter());
formatter.JsonSerializer.Converters.Add(new StringEnumConverter());
options.Converters.Add(new JsonStringEnumConverter());
options.Converters.Add(new RoundtripDateTimeConverter());

var formatter = new SystemTextJsonFormatter()
{
JsonSerializerOptions = options
};

var messageHandler = new LengthHeaderMessageHandler(_commStream, _commStream, formatter);
var jsonRpc = new JsonRpc(messageHandler);
Expand Down
53 changes: 45 additions & 8 deletions src/remoting/dotnet/Remoting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ static JsonElement Read(Span<byte> jsonRequest)
response.Add("jsonrpc", "2.0");

var id = request.TryGetProperty("id", out var element2)
? element2.ToString()
? element2.GetInt32()
: throw new Exception("Unable to read the request message id.");

response.Add("id", id);
Expand Down Expand Up @@ -289,8 +289,8 @@ static JsonElement Read(Span<byte> jsonRequest)

result = new JsonObject()
{
["begin"] = begin,
["end"] = end
["begin"] = begin.ToString("o", CultureInfo.InvariantCulture),
["end"] = end.ToString("o", CultureInfo.InvariantCulture)
};
}

Expand All @@ -302,10 +302,10 @@ static JsonElement Read(Span<byte> jsonRequest)
var catalogId = @params[0].GetString()!;

var beginString = @params[1].GetString()!;
var begin = DateTime.ParseExact(beginString, "yyyy-MM-ddTHH:mm:ssZ", CultureInfo.InvariantCulture);
var begin = DateTime.ParseExact(beginString, "o", CultureInfo.InvariantCulture);

var endString = @params[2].GetString()!;
var end = DateTime.ParseExact(endString, "yyyy-MM-ddTHH:mm:ssZ", CultureInfo.InvariantCulture);
var end = DateTime.ParseExact(endString, "o", CultureInfo.InvariantCulture);

var availability = await _dataSource.GetAvailabilityAsync(catalogId, begin, end, cancellationToken);

Expand All @@ -321,10 +321,10 @@ static JsonElement Read(Span<byte> jsonRequest)
throw new Exception("The data source context must be set before invoking other methods.");

var beginString = @params[0].GetString()!;
var begin = DateTime.ParseExact(beginString, "yyyy-MM-ddTHH:mm:ssZ", CultureInfo.InvariantCulture).ToUniversalTime();
var begin = DateTime.ParseExact(beginString, "o", CultureInfo.InvariantCulture).ToUniversalTime();

var endString = @params[1].GetString()!;
var end = DateTime.ParseExact(endString, "yyyy-MM-ddTHH:mm:ssZ", CultureInfo.InvariantCulture).ToUniversalTime();
var end = DateTime.ParseExact(endString, "o", CultureInfo.InvariantCulture).ToUniversalTime();

var originalResourceName = @params[2].GetString()!;

Expand Down Expand Up @@ -377,7 +377,13 @@ private async Task HandleReadDataAsync(
{
["jsonrpc"] = "2.0",
["method"] = "readData",
["params"] = new JsonArray(resourcePath, begin, end)
["params"] = new JsonArray
(
resourcePath,
begin.ToString("o", CultureInfo.InvariantCulture),
end.ToString("o", CultureInfo.InvariantCulture
)
)
};

_logger.LogDebug("Read resource path {ResourcePath} from Nexus", resourcePath);
Expand Down Expand Up @@ -419,6 +425,7 @@ static Utilities()
};

Options.Converters.Add(new JsonStringEnumConverter());
Options.Converters.Add(new RoundtripDateTimeConverter());
}

public static JsonSerializerOptions Options { get; }
Expand Down Expand Up @@ -476,4 +483,34 @@ protected override void Dispose(bool disposing)
public override MemoryHandle Pin(int elementIndex = 0) => throw new NotSupportedException("CastMemoryManager does not support pinning.");

public override void Unpin() => throw new NotSupportedException("CastMemoryManager does not support unpinning.");
}

internal class RoundtripDateTimeConverter : JsonConverter<DateTime>
{
public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (!DateTime.TryParseExact
(
reader.GetString(),
"o",
CultureInfo.InvariantCulture,
DateTimeStyles.AdjustToUniversal,
out var dateTime
)
)
{
throw new JsonException();
}

return dateTime;
}

public override void Write(
Utf8JsonWriter writer,
DateTime value,
JsonSerializerOptions options
)
{
writer.WriteStringValue(value.ToString("o", CultureInfo.InvariantCulture));
}
}
22 changes: 16 additions & 6 deletions src/remoting/python/nexus_remoting/_remoting.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
property_name_decoder=to_snake_case
)

_json_encoder_options.encoders[datetime] = lambda value: value.strftime("%Y-%m-%dT%H:%M:%S.%f") + "0+00:00"

class _Logger(ILogger):

_background_tasks = set[asyncio.Task]()

def __init__(self, tcp_comm_socket: asyncio.StreamWriter):
self._comm_writer = tcp_comm_socket

Expand All @@ -31,7 +35,9 @@ def log(self, log_level: LogLevel, message: str):
"params": [log_level.name, message]
}

asyncio.create_task(_send_to_server(notification, self._comm_writer))
task = asyncio.create_task(_send_to_server(notification, self._comm_writer))
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)

class RemoteCommunicator:
"""A remote communicator."""
Expand Down Expand Up @@ -222,8 +228,8 @@ async def _process_invocation(self, request: dict[str, Any]) \
raise Exception("The data source context must be set before invoking other methods.")

catalog_id = params[0]
begin = datetime.strptime(params[1], "%Y-%m-%dT%H:%M:%SZ")
end = datetime.strptime(params[2], "%Y-%m-%dT%H:%M:%SZ")
begin = _json_encoder_options.decoders[datetime](datetime, params[1])
end = _json_encoder_options.decoders[datetime](datetime, params[2])
availability = await self._data_source.get_availability(catalog_id, begin, end)

result = {
Expand All @@ -235,8 +241,8 @@ async def _process_invocation(self, request: dict[str, Any]) \
if self._data_source is None:
raise Exception("The data source context must be set before invoking other methods.")

begin = datetime.strptime(params[0], "%Y-%m-%dT%H:%M:%SZ")
end = datetime.strptime(params[1], "%Y-%m-%dT%H:%M:%SZ")
begin = _json_encoder_options.decoders[datetime](datetime, params[0])
end = _json_encoder_options.decoders[datetime](datetime, params[1])
original_resource_name = params[2]
catalog_item = JsonEncoder.decode(CatalogItem, params[3], _json_encoder_options)
(data, status) = ExtensibilityUtilities.create_buffers(catalog_item.representation, begin, end)
Expand Down Expand Up @@ -275,7 +281,11 @@ async def _handle_read_data(self, resource_path: str, begin: datetime, end: date
read_data_request = {
"jsonrpc": "2.0",
"method": "readData",
"params": [resource_path, begin, end]
"params": [
resource_path,
begin,
end
]
}

await _send_to_server(read_data_request, self._comm_writer)
Expand Down
79 changes: 53 additions & 26 deletions tests/Nexus.Sources.Remote.Tests/RemoteTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using Moq;
using Nexus.DataModel;
using Nexus.Extensibility;
using System.Runtime.InteropServices;
using System.Text.Json;
using System.Text.Json.Nodes;
using Xunit;
Expand Down Expand Up @@ -111,9 +110,6 @@ public async Task CanProvideAvailability(string language)
[InlineData(PYTHON)]
public async Task CanReadFullDay(string language)
{
// TODO fix this
var complexData = true;

await _fixture.Initialize;

Check warning on line 113 in tests/Nexus.Sources.Remote.Tests/RemoteTests.cs

View workflow job for this annotation

GitHub Actions / Build

Avoid awaiting or returning a Task representing work that was not started within your context as that can lead to deadlocks.

var dataSource = new Remote() as IDataSource;
Expand All @@ -139,31 +135,23 @@ public async Task CanReadFullDay(string language)
var expectedData = new long[length];
var expectedStatus = new byte[length];

if (complexData)
void GenerateData(DateTimeOffset dateTime)
{
void GenerateData(DateTimeOffset dateTime)
{
var data = Enumerable.Range(0, 600)
.Select(value => dateTime.Add(TimeSpan.FromSeconds(value)).ToUnixTimeSeconds())
.ToArray();

var offset = (int)(dateTime - begin).TotalSeconds;
data.CopyTo(expectedData.AsSpan()[offset..]);
expectedStatus.AsSpan().Slice(offset, 600).Fill(1);
}

GenerateData(new DateTimeOffset(2019, 12, 31, 12, 00, 0, 0, TimeSpan.Zero));
GenerateData(new DateTimeOffset(2019, 12, 31, 12, 20, 0, 0, TimeSpan.Zero));
GenerateData(new DateTimeOffset(2020, 01, 01, 00, 00, 0, 0, TimeSpan.Zero));
GenerateData(new DateTimeOffset(2020, 01, 02, 09, 40, 0, 0, TimeSpan.Zero));
GenerateData(new DateTimeOffset(2020, 01, 02, 09, 50, 0, 0, TimeSpan.Zero));
}
else
{
MemoryMarshal.AsBytes(expectedData.AsSpan()).Fill((byte)'d');
expectedStatus.AsSpan().Fill((byte)'s');
var data = Enumerable.Range(0, 600)
.Select(value => dateTime.Add(TimeSpan.FromSeconds(value)).ToUnixTimeSeconds())
.ToArray();

var offset = (int)(dateTime - begin).TotalSeconds;
data.CopyTo(expectedData.AsSpan()[offset..]);
expectedStatus.AsSpan().Slice(offset, 600).Fill(1);
}

GenerateData(new DateTimeOffset(2019, 12, 31, 12, 00, 0, 0, TimeSpan.Zero));
GenerateData(new DateTimeOffset(2019, 12, 31, 12, 20, 0, 0, TimeSpan.Zero));
GenerateData(new DateTimeOffset(2020, 01, 01, 00, 00, 0, 0, TimeSpan.Zero));
GenerateData(new DateTimeOffset(2020, 01, 02, 09, 40, 0, 0, TimeSpan.Zero));
GenerateData(new DateTimeOffset(2020, 01, 02, 09, 50, 0, 0, TimeSpan.Zero));

var request = new ReadRequest(resource.Id, catalogItem, data, status);
await dataSource.ReadAsync(begin, end, [request], default!, new Progress<double>(), CancellationToken.None);
var longData = new CastMemoryManager<byte, long>(data).Memory;
Expand All @@ -172,6 +160,45 @@ void GenerateData(DateTimeOffset dateTime)
Assert.True(expectedStatus.SequenceEqual(status.ToArray()));
}

[Theory]
[InlineData(DOTNET)]
[InlineData(PYTHON)]
public async Task CanRoundtripDateTime(string language)
{
// Arrange
await _fixture.Initialize;

Check warning on line 169 in tests/Nexus.Sources.Remote.Tests/RemoteTests.cs

View workflow job for this annotation

GitHub Actions / Build

Avoid awaiting or returning a Task representing work that was not started within your context as that can lead to deadlocks.

var dataSource = new Remote() as IDataSource;
var context = CreateContext(language);

await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None);

var begin = new DateTime(2020, 01, 01, 0, 0, 0, 20, DateTimeKind.Utc);
var end = new DateTime(2020, 01, 01, 0, 0, 0, 40, DateTimeKind.Utc);
var catalog = await dataSource.EnrichCatalogAsync(new ResourceCatalog("/A/B/C"), CancellationToken.None);
var resource = catalog.Resources![0];
var representation = resource.Representations![0];

var catalogItem = new CatalogItem(
catalog with { Resources = default! },
resource with { Representations = default! },
representation,
default);

var (data, status) = ExtensibilityUtilities.CreateBuffers(representation, begin, end);
var request = new ReadRequest(resource.Id, catalogItem, data, status);

// Act
await dataSource.ReadAsync(
begin,
end,
[request],
default!,
new Progress<double>(),
CancellationToken.None
);
}

[Theory]
[InlineData(DOTNET)]
[InlineData(PYTHON)]
Expand Down
Loading

0 comments on commit df4fc39

Please sign in to comment.