From 8490ee2ae846a0c082e66bece121c82bad25df0c Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 9 Dec 2022 01:50:34 +0700 Subject: [PATCH 1/2] Attempt to fix JSON and JSONB payload DbType (failed) --- ...PostgreSqlJournalJsonBSerializationSpec.cs | 57 ++++++++ .../PostgreSqlJournalJsonSerializationSpec.cs | 57 ++++++++ .../PostgreSqlJournalSerializationSpec.cs | 32 +++-- ...eSqlSnapshotStoreJsonBSerializationSpec.cs | 55 +++++++ ...reSqlSnapshotStoreJsonSerializationSpec.cs | 52 +++++++ ...ostgreSqlSnapshotStoreSerializationSpec.cs | 32 +++-- .../Akka.Persistence.PostgreSql.csproj | 1 + .../Journal/PostgreSqlQueryExecutor.cs | 134 +++++++++--------- .../Properties/FriendsOf.cs | 10 ++ .../Snapshot/PostgreSqlQueryExecutor.cs | 124 ++++++++-------- 10 files changed, 400 insertions(+), 154 deletions(-) create mode 100644 src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalJsonBSerializationSpec.cs create mode 100644 src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalJsonSerializationSpec.cs create mode 100644 src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreJsonBSerializationSpec.cs create mode 100644 src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreJsonSerializationSpec.cs create mode 100644 src/Akka.Persistence.PostgreSql/Properties/FriendsOf.cs diff --git a/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalJsonBSerializationSpec.cs b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalJsonBSerializationSpec.cs new file mode 100644 index 0000000..caad88d --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalJsonBSerializationSpec.cs @@ -0,0 +1,57 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using Akka.Configuration; +using Akka.Persistence.TCK.Serialization; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.PostgreSql.Tests.Serialization +{ + [Collection("PostgreSqlSpec")] + public class PostgreSqlJournalJsonBSerializationSpec : JournalSerializationSpec + { + public PostgreSqlJournalJsonBSerializationSpec(ITestOutputHelper output, PostgresFixture fixture) + : base(CreateSpecConfig(fixture), "PostgreSqlJournalSerializationSpec", output) + { + } + + private static Config CreateSpecConfig(PostgresFixture fixture) + { + //need to make sure db is created before the tests start + DbUtils.Initialize(fixture); + + return ConfigurationFactory.ParseString($@" + akka.persistence {{ + publish-plugin-commands = on + journal {{ + plugin = ""akka.persistence.journal.postgresql"" + postgresql {{ + stored-as = jsonb + connection-string = ""{DbUtils.ConnectionString}"" + auto-initialize = on + }} + }} + snapshot-store {{ + plugin = ""akka.persistence.snapshot-store.postgresql"" + postgresql {{ + stored-as = jsonb + connection-string = ""{DbUtils.ConnectionString}"" + auto-initialize = on + }} + }} + }} + akka.test.single-expect-default = 10s") + .WithFallback(PostgreSqlPersistence.DefaultConfiguration()); + } + + [Fact(Skip = "Sql plugin does not support EventAdapter.Manifest")] + public override void Journal_should_serialize_Persistent_with_EventAdapter_manifest() + { + } + } +} diff --git a/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalJsonSerializationSpec.cs b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalJsonSerializationSpec.cs new file mode 100644 index 0000000..7137769 --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalJsonSerializationSpec.cs @@ -0,0 +1,57 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using Akka.Configuration; +using Akka.Persistence.TCK.Serialization; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.PostgreSql.Tests.Serialization +{ + [Collection("PostgreSqlSpec")] + public class PostgreSqlJournalJsonSerializationSpec : JournalSerializationSpec + { + public PostgreSqlJournalJsonSerializationSpec(ITestOutputHelper output, PostgresFixture fixture) + : base(CreateSpecConfig(fixture), "PostgreSqlJournalSerializationSpec", output) + { + } + + private static Config CreateSpecConfig(PostgresFixture fixture) + { + //need to make sure db is created before the tests start + DbUtils.Initialize(fixture); + + return ConfigurationFactory.ParseString($@" + akka.persistence {{ + publish-plugin-commands = on + journal {{ + plugin = ""akka.persistence.journal.postgresql"" + postgresql {{ + stored-as = json + connection-string = ""{DbUtils.ConnectionString}"" + auto-initialize = on + }} + }} + snapshot-store {{ + plugin = ""akka.persistence.snapshot-store.postgresql"" + postgresql {{ + stored-as = json + connection-string = ""{DbUtils.ConnectionString}"" + auto-initialize = on + }} + }} + }} + akka.test.single-expect-default = 10s") + .WithFallback(PostgreSqlPersistence.DefaultConfiguration()); + } + + [Fact(Skip = "Sql plugin does not support EventAdapter.Manifest")] + public override void Journal_should_serialize_Persistent_with_EventAdapter_manifest() + { + } + } +} diff --git a/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalSerializationSpec.cs b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalSerializationSpec.cs index 478e4e8..c300c50 100644 --- a/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalSerializationSpec.cs +++ b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalSerializationSpec.cs @@ -25,22 +25,28 @@ private static Config CreateSpecConfig(PostgresFixture fixture) //need to make sure db is created before the tests start DbUtils.Initialize(fixture); - return ConfigurationFactory.ParseString(@" - akka.persistence { + return ConfigurationFactory.ParseString($@" + akka.persistence {{ publish-plugin-commands = on - journal { + journal {{ plugin = ""akka.persistence.journal.postgresql"" - postgresql { - class = ""Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal, Akka.Persistence.PostgreSql"" - plugin-dispatcher = ""akka.actor.default-dispatcher"" - table-name = event_journal - schema-name = public + postgresql {{ + stored-as = bytea + connection-string = ""{DbUtils.ConnectionString}"" auto-initialize = on - connection-string = """ + DbUtils.ConnectionString + @""" - } - } - } - akka.test.single-expect-default = 10s"); + }} + }} + snapshot-store {{ + plugin = ""akka.persistence.snapshot-store.postgresql"" + postgresql {{ + stored-as = bytea + connection-string = ""{DbUtils.ConnectionString}"" + auto-initialize = on + }} + }} + }} + akka.test.single-expect-default = 10s") + .WithFallback(PostgreSqlPersistence.DefaultConfiguration()); } [Fact(Skip = "Sql plugin does not support EventAdapter.Manifest")] diff --git a/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreJsonBSerializationSpec.cs b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreJsonBSerializationSpec.cs new file mode 100644 index 0000000..99d75ef --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreJsonBSerializationSpec.cs @@ -0,0 +1,55 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using Akka.Actor; +using Akka.Configuration; +using Akka.Persistence.TCK.Serialization; +using FluentAssertions; +using Newtonsoft.Json; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.PostgreSql.Tests.Serialization +{ + [Collection("PostgreSqlSpec")] + public class PostgreSqlSnapshotStoreJsonBSerializationSpec : SnapshotStoreSerializationSpec + { + public PostgreSqlSnapshotStoreJsonBSerializationSpec(ITestOutputHelper output, PostgresFixture fixture) + : base(CreateSpecConfig(fixture), "PostgreSqlSnapshotStoreSerializationSpec", output) + { + } + + private static Config CreateSpecConfig(PostgresFixture fixture) + { + //need to make sure db is created before the tests start + DbUtils.Initialize(fixture); + + return ConfigurationFactory.ParseString($@" + akka.persistence {{ + publish-plugin-commands = on + journal {{ + plugin = ""akka.persistence.journal.postgresql"" + postgresql {{ + stored-as = jsonb + connection-string = ""{DbUtils.ConnectionString}"" + auto-initialize = on + }} + }} + snapshot-store {{ + plugin = ""akka.persistence.snapshot-store.postgresql"" + postgresql {{ + stored-as = jsonb + connection-string = ""{DbUtils.ConnectionString}"" + auto-initialize = on + }} + }} + }} + akka.test.single-expect-default = 10s") + .WithFallback(PostgreSqlPersistence.DefaultConfiguration()); + } + } +} diff --git a/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreJsonSerializationSpec.cs b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreJsonSerializationSpec.cs new file mode 100644 index 0000000..9354c3d --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreJsonSerializationSpec.cs @@ -0,0 +1,52 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using Akka.Configuration; +using Akka.Persistence.TCK.Serialization; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.PostgreSql.Tests.Serialization +{ + [Collection("PostgreSqlSpec")] + public class PostgreSqlSnapshotStoreJsonSerializationSpec : SnapshotStoreSerializationSpec + { + public PostgreSqlSnapshotStoreJsonSerializationSpec(ITestOutputHelper output, PostgresFixture fixture) + : base(CreateSpecConfig(fixture), "PostgreSqlSnapshotStoreSerializationSpec", output) + { + } + + private static Config CreateSpecConfig(PostgresFixture fixture) + { + //need to make sure db is created before the tests start + DbUtils.Initialize(fixture); + + return ConfigurationFactory.ParseString($@" + akka.persistence {{ + publish-plugin-commands = on + journal {{ + plugin = ""akka.persistence.journal.postgresql"" + postgresql {{ + stored-as = json + connection-string = ""{DbUtils.ConnectionString}"" + auto-initialize = on + }} + }} + snapshot-store {{ + plugin = ""akka.persistence.snapshot-store.postgresql"" + postgresql {{ + stored-as = json + connection-string = ""{DbUtils.ConnectionString}"" + auto-initialize = on + }} + }} + }} + akka.test.single-expect-default = 10s") + .WithFallback(PostgreSqlPersistence.DefaultConfiguration()); + } + } +} diff --git a/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreSerializationSpec.cs b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreSerializationSpec.cs index b7d03b5..40e5944 100644 --- a/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreSerializationSpec.cs +++ b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreSerializationSpec.cs @@ -25,22 +25,28 @@ private static Config CreateSpecConfig(PostgresFixture fixture) //need to make sure db is created before the tests start DbUtils.Initialize(fixture); - return ConfigurationFactory.ParseString(@" - akka.persistence { + return ConfigurationFactory.ParseString($@" + akka.persistence {{ publish-plugin-commands = on - journal { + journal {{ plugin = ""akka.persistence.journal.postgresql"" - postgresql { - class = ""Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal, Akka.Persistence.PostgreSql"" - plugin-dispatcher = ""akka.actor.default-dispatcher"" - table-name = event_journal - schema-name = public + postgresql {{ + stored-as = bytea + connection-string = ""{DbUtils.ConnectionString}"" auto-initialize = on - connection-string = """ + DbUtils.ConnectionString + @""" - } - } - } - akka.test.single-expect-default = 10s"); + }} + }} + snapshot-store {{ + plugin = ""akka.persistence.snapshot-store.postgresql"" + postgresql {{ + stored-as = bytea + connection-string = ""{DbUtils.ConnectionString}"" + auto-initialize = on + }} + }} + }} + akka.test.single-expect-default = 10s") + .WithFallback(PostgreSqlPersistence.DefaultConfiguration()); } } } diff --git a/src/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj b/src/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj index ad093d7..09dd55f 100644 --- a/src/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj +++ b/src/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj @@ -6,6 +6,7 @@ Akka Persistence journal and snapshot store backed by PostgreSql database. $(NetStandardLibVersion) true + 8 diff --git a/src/Akka.Persistence.PostgreSql/Journal/PostgreSqlQueryExecutor.cs b/src/Akka.Persistence.PostgreSql/Journal/PostgreSqlQueryExecutor.cs index 23a78a6..8709fc4 100644 --- a/src/Akka.Persistence.PostgreSql/Journal/PostgreSqlQueryExecutor.cs +++ b/src/Akka.Persistence.PostgreSql/Journal/PostgreSqlQueryExecutor.cs @@ -26,8 +26,9 @@ namespace Akka.Persistence.PostgreSql.Journal { public class PostgreSqlQueryExecutor : AbstractQueryExecutor { - private readonly Func _serialize; - private readonly Func _deserialize; + private readonly NpgsqlDbType _payloadDbType; + private readonly NewtonSoftJsonSerializer _jsonSerializer; + private readonly PostgreSqlQueryConfiguration _config; public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.Serialization.Serialization serialization, ITimestampProvider timestampProvider) : base(configuration, serialization, timestampProvider) @@ -73,47 +74,15 @@ DELETE FROM {Configuration.FullJournalTableName} DeleteBatchSqlMetadata = $@"DELETE FROM {Configuration.FullMetaTableName} WHERE {Configuration.PersistenceIdColumnName} = @PersistenceId AND {Configuration.SequenceNrColumnName} <= @ToSequenceNr;"; - switch (configuration.StoredAs) + _jsonSerializer = new NewtonSoftJsonSerializer(Serialization.System); + _config = configuration; + _payloadDbType = _config.StoredAs switch { - case StoredAsType.ByteA: - _serialize = e => - { - var payloadType = e.Payload.GetType(); - var serializer = Serialization.FindSerializerForType(payloadType, Configuration.DefaultSerializer); - - // TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811 - var binary = Akka.Serialization.Serialization.WithTransport(Serialization.System, () => serializer.ToBinary(e.Payload)); - - return new SerializationResult(NpgsqlDbType.Bytea, binary, serializer); - }; - _deserialize = (type, payload, manifest, serializerId) => - { - if (serializerId.HasValue) - { - // TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811 - return Serialization.Deserialize((byte[])payload, serializerId.Value, manifest); - } - else - { - // Support old writes that did not set the serializer id - var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer); - - // TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811 - return Akka.Serialization.Serialization.WithTransport(Serialization.System, () => deserializer.FromBinary((byte[])payload, type)); - } - }; - break; - case StoredAsType.JsonB: - _serialize = e => new SerializationResult(NpgsqlDbType.Jsonb, JsonConvert.SerializeObject(e.Payload, configuration.JsonSerializerSettings), null); - _deserialize = (type, serialized, manifest, serializerId) => JsonConvert.DeserializeObject((string)serialized, type, configuration.JsonSerializerSettings); - break; - case StoredAsType.Json: - _serialize = e => new SerializationResult(NpgsqlDbType.Json, JsonConvert.SerializeObject(e.Payload, configuration.JsonSerializerSettings), null); - _deserialize = (type, serialized, manifest, serializerId) => JsonConvert.DeserializeObject((string)serialized, type, configuration.JsonSerializerSettings); - break; - default: - throw new NotSupportedException($"{configuration.StoredAs} is not supported Db type for a payload"); - } + StoredAsType.ByteA => NpgsqlDbType.Bytea, + StoredAsType.JsonB => NpgsqlDbType.Jsonb, + StoredAsType.Json => NpgsqlDbType.Json, + _ => throw new NotSupportedException($"{_config.StoredAs} is not supported Db type for a payload") + }; } protected override DbCommand CreateCommand(DbConnection connection) => ((NpgsqlConnection)connection).CreateCommand(); @@ -123,36 +92,47 @@ DELETE FROM {Configuration.FullJournalTableName} protected override string DeleteBatchSql { get; } protected virtual string DeleteBatchSqlMetadata { get; } + private Serializer GetSerializerFor(object payload) + { + return _payloadDbType switch + { + NpgsqlDbType.Bytea => Serialization.FindSerializerFor(payload, _config.DefaultSerializer), + NpgsqlDbType.Jsonb => _jsonSerializer, + NpgsqlDbType.Json => _jsonSerializer, + _ => throw new NotSupportedException($"{_payloadDbType} is not supported Db type for a payload") + }; + } + protected override void WriteEvent(DbCommand command, IPersistentRepresentation e, IImmutableSet tags) { - var serializationResult = _serialize(e); - var serializer = serializationResult.Serializer; - var hasSerializer = serializer != null; - - string manifest = ""; - if (hasSerializer && serializer is SerializerWithStringManifest) - manifest = ((SerializerWithStringManifest)serializer).Manifest(e.Payload); - else if (hasSerializer && serializer.IncludeManifest) - manifest = QualifiedName(e); - else - manifest = string.IsNullOrEmpty(e.Manifest) ? QualifiedName(e) : e.Manifest; - + var payload = e.Payload; + var serializer = GetSerializerFor(payload); + // TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811 + object serialized = _payloadDbType switch + { + NpgsqlDbType.Bytea => Akka.Serialization.Serialization.WithTransport(Serialization.System, () => serializer.ToBinary(payload)), + _ => Akka.Serialization.Serialization.WithTransport(Serialization.System, () => + { + var bytes = _jsonSerializer.ToBinary(payload); + return Encoding.UTF8.GetString(bytes); + }) + }; + + var manifest = serializer switch + { + SerializerWithStringManifest stringManifest => stringManifest.Manifest(payload), + _ when serializer.IncludeManifest => payload.GetType().TypeQualifiedName(), + _ => string.IsNullOrEmpty(e.Manifest) ? payload.GetType().TypeQualifiedName() : e.Manifest + }; + AddParameter(command, "@PersistenceId", DbType.String, e.PersistenceId); AddParameter(command, "@SequenceNr", DbType.Int64, e.SequenceNr); AddParameter(command, "@Timestamp", DbType.Int64, e.Timestamp); AddParameter(command, "@IsDeleted", DbType.Boolean, false); AddParameter(command, "@Manifest", DbType.String, manifest); + AddParameter(command, "@SerializerId", DbType.Int32, serializer.Identifier); - if (hasSerializer) - { - AddParameter(command, "@SerializerId", DbType.Int32, serializer.Identifier); - } - else - { - AddParameter(command, "@SerializerId", DbType.Int32, DBNull.Value); - } - - command.Parameters.Add(new NpgsqlParameter("@Payload", serializationResult.DbType) { Value = serializationResult.Payload }); + command.Parameters.Add(new NpgsqlParameter("@Payload", _payloadDbType) { Value = serialized }); if (tags.Count != 0) { @@ -180,18 +160,38 @@ protected override IPersistentRepresentation ReadEvent(DbDataReader reader) var manifest = reader.GetString(ManifestIndex); var raw = reader[PayloadIndex]; - int? serializerId = null; - Type type = null; + int? serializerId; + Type type; if (reader.IsDBNull(SerializerIdIndex)) { type = Type.GetType(manifest, true); + serializerId = null; } else { + type = Type.GetType(manifest, false); serializerId = reader.GetInt32(SerializerIdIndex); } - var deserialized = _deserialize(type, raw, manifest, serializerId); + object deserialized; + if (serializerId is { }) + { + if (_payloadDbType == NpgsqlDbType.Bytea) + { + deserialized = Serialization.Deserialize((byte[])raw, serializerId.Value, manifest); + } + else + { + var bytes = Encoding.UTF8.GetBytes((string)raw); + deserialized = _jsonSerializer.FromBinary(bytes, type); + } + } + else + { + // Support old writes that did not set the serializer id + var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer); + deserialized = deserializer.FromBinary((byte[])raw, type); + } return new Persistent(deserialized, sequenceNr, persistenceId, manifest, isDeleted, ActorRefs.NoSender, null, timestamp); } diff --git a/src/Akka.Persistence.PostgreSql/Properties/FriendsOf.cs b/src/Akka.Persistence.PostgreSql/Properties/FriendsOf.cs new file mode 100644 index 0000000..2231308 --- /dev/null +++ b/src/Akka.Persistence.PostgreSql/Properties/FriendsOf.cs @@ -0,0 +1,10 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("Akka.Persistence.PostgreSql.Tests")] diff --git a/src/Akka.Persistence.PostgreSql/Snapshot/PostgreSqlQueryExecutor.cs b/src/Akka.Persistence.PostgreSql/Snapshot/PostgreSqlQueryExecutor.cs index 4c66e00..63a6809 100644 --- a/src/Akka.Persistence.PostgreSql/Snapshot/PostgreSqlQueryExecutor.cs +++ b/src/Akka.Persistence.PostgreSql/Snapshot/PostgreSqlQueryExecutor.cs @@ -8,19 +8,21 @@ using Akka.Persistence.Sql.Common.Snapshot; using Akka.Serialization; using Akka.Util; -using Newtonsoft.Json; using Npgsql; using NpgsqlTypes; using System; using System.Data; using System.Data.Common; +using System.Text; namespace Akka.Persistence.PostgreSql.Snapshot { public class PostgreSqlQueryExecutor : AbstractQueryExecutor { - private readonly Func _serialize; - private readonly Func _deserialize; + private readonly NpgsqlDbType _payloadDbType; + private readonly NewtonSoftJsonSerializer _jsonSerializer; + private readonly PostgreSqlQueryConfiguration _config; + public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.Serialization.Serialization serialization) : base(configuration, serialization) { CreateSnapshotTableSql = $@" @@ -62,42 +64,15 @@ WITH upsert AS ( SELECT @PersistenceId, @SequenceNr, @Timestamp, @Manifest, @Payload, @SerializerId WHERE NOT EXISTS (SELECT * FROM upsert)"; - switch (configuration.StoredAs) + _jsonSerializer = new NewtonSoftJsonSerializer(Serialization.System); + _config = configuration; + _payloadDbType = _config.StoredAs switch { - case StoredAsType.ByteA: - _serialize = ss => - { - var serializer = Serialization.FindSerializerFor(ss); - // TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811 - var binary = Akka.Serialization.Serialization - .WithTransport(Serialization.System, () => serializer.ToBinary(ss)); - return new SerializationResult(NpgsqlDbType.Bytea, binary, serializer); - }; - _deserialize = (type, serialized, manifest, serializerId) => - { - if (serializerId.HasValue) - { - return Serialization.Deserialize((byte[])serialized, serializerId.Value, manifest); - } - else - { - // Support old writes that did not set the serializer id - var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer); - return deserializer.FromBinary((byte[])serialized, type); - } - }; - break; - case StoredAsType.JsonB: - _serialize = ss => new SerializationResult(NpgsqlDbType.Jsonb, JsonConvert.SerializeObject(ss, configuration.JsonSerializerSettings), null); - _deserialize = (type, serialized, manifest, serializerId) => JsonConvert.DeserializeObject((string)serialized, type, configuration.JsonSerializerSettings); - break; - case StoredAsType.Json: - _serialize = ss => new SerializationResult(NpgsqlDbType.Json, JsonConvert.SerializeObject(ss, configuration.JsonSerializerSettings), null); - _deserialize = (type, serialized, manifest, serializerId) => JsonConvert.DeserializeObject((string)serialized, type, configuration.JsonSerializerSettings); - break; - default: - throw new NotSupportedException($"{configuration.StoredAs} is not supported Db type for a payload"); - } + StoredAsType.ByteA => NpgsqlDbType.Bytea, + StoredAsType.JsonB => NpgsqlDbType.Jsonb, + StoredAsType.Json => NpgsqlDbType.Json, + _ => throw new NotSupportedException($"{_config.StoredAs} is not supported Db type for a payload") + }; } protected override string InsertSnapshotSql { get; } @@ -110,29 +85,43 @@ protected override DbCommand CreateCommand(DbConnection connection) protected override void SetTimestampParameter(DateTime timestamp, DbCommand command) => AddParameter(command, "@Timestamp", DbType.Int64, timestamp.Ticks); protected override void SetPayloadParameter(object snapshot, DbCommand command) { - var serializationResult = _serialize(snapshot); - command.Parameters.Add(new NpgsqlParameter("@Payload", serializationResult.DbType) { Value = serializationResult.Payload }); + var serializer = GetSerializerFor(snapshot); + // TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811 + object payload = _payloadDbType switch + { + NpgsqlDbType.Bytea => Akka.Serialization.Serialization.WithTransport(Serialization.System, () => serializer.ToBinary(snapshot)), + _ => Akka.Serialization.Serialization.WithTransport(Serialization.System, () => + { + var bytes = _jsonSerializer.ToBinary(snapshot); + return Encoding.UTF8.GetString(bytes); + }) + }; + command.Parameters.Add(new NpgsqlParameter("@Payload", _payloadDbType) { Value = payload }); } protected override void SetManifestParameters(object snapshot, DbCommand command) { - var snapshotType = snapshot.GetType(); - var serializer = Serialization.FindSerializerForType(snapshotType, Configuration.DefaultSerializer); - - var manifest = ""; - if (serializer is SerializerWithStringManifest stringManifest) + var serializer = GetSerializerFor(snapshot); + var manifest = serializer switch { - manifest = stringManifest.Manifest(snapshot); - } - else if (serializer.IncludeManifest) - { - manifest = snapshotType.TypeQualifiedName(); - } - + SerializerWithStringManifest stringManifest => stringManifest.Manifest(snapshot), + _ => snapshot.GetType().TypeQualifiedName(), + }; AddParameter(command, "@Manifest", DbType.String, manifest); AddParameter(command, "@SerializerId", DbType.Int32, serializer.Identifier); } + private Serializer GetSerializerFor(object snapshot) + { + return _payloadDbType switch + { + NpgsqlDbType.Bytea => Serialization.FindSerializerFor(snapshot, _config.DefaultSerializer), + NpgsqlDbType.Jsonb => _jsonSerializer, + NpgsqlDbType.Json => _jsonSerializer, + _ => throw new NotSupportedException($"{_payloadDbType} is not supported Db type for a payload") + }; + } + protected override SelectedSnapshot ReadSnapshot(DbDataReader reader) { var persistenceId = reader.GetString(0); @@ -141,11 +130,12 @@ protected override SelectedSnapshot ReadSnapshot(DbDataReader reader) var manifest = reader.GetString(3); var payloadObject = reader[4]; - int? serializerId = null; - Type type = null; + int? serializerId; + Type type; if (reader.IsDBNull(5)) { type = Type.GetType(manifest, true); + serializerId = null; } else { @@ -153,7 +143,25 @@ protected override SelectedSnapshot ReadSnapshot(DbDataReader reader) serializerId = reader.GetInt32(5); } - var snapshot = _deserialize(type, payloadObject, manifest, serializerId); + object snapshot; + if (serializerId is { }) + { + if (_payloadDbType == NpgsqlDbType.Bytea) + { + snapshot = Serialization.Deserialize((byte[])payloadObject, serializerId.Value, manifest); + } + else + { + var bytes = Encoding.UTF8.GetBytes((string)payloadObject); + snapshot = _jsonSerializer.FromBinary(bytes, type); + } + } + else + { + // Support old writes that did not set the serializer id + var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer); + snapshot = deserializer.FromBinary((byte[])payloadObject, type); + } var metadata = new SnapshotMetadata(persistenceId, sequenceNr, timestamp); return new SelectedSnapshot(metadata, snapshot); @@ -166,7 +174,6 @@ protected override SelectedSnapshot ReadSnapshot(DbDataReader reader) public class PostgreSqlQueryConfiguration : QueryConfiguration { public readonly StoredAsType StoredAs; - public readonly JsonSerializerSettings JsonSerializerSettings; public PostgreSqlQueryConfiguration( string schemaName, @@ -180,16 +187,11 @@ public PostgreSqlQueryConfiguration( TimeSpan timeout, StoredAsType storedAs, string defaultSerializer, - JsonSerializerSettings jsonSerializerSettings = null, bool useSequentialAccess = true) : base(schemaName, snapshotTableName, persistenceIdColumnName, sequenceNrColumnName, payloadColumnName, manifestColumnName, timestampColumnName, serializerIdColumnName, timeout, defaultSerializer, useSequentialAccess) { StoredAs = storedAs; - JsonSerializerSettings = jsonSerializerSettings ?? new JsonSerializerSettings - { - ContractResolver = new AkkaContractResolver() - }; } } } \ No newline at end of file From ac776b7ab4371a389f15a410f18e4de7476f62f8 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 9 Dec 2022 02:20:56 +0700 Subject: [PATCH 2/2] Fix unit tests by overriding the expected result --- ...PostgreSqlJournalJsonBSerializationSpec.cs | 47 +++++++ .../PostgreSqlJournalJsonSerializationSpec.cs | 49 +++++++ ...eSqlSnapshotStoreJsonBSerializationSpec.cs | 35 +++++ ...reSqlSnapshotStoreJsonSerializationSpec.cs | 35 +++++ .../Serialization/Test.cs | 82 ++++++++++++ .../Serialization/TestJournal.cs | 122 ++++++++++++++++++ 6 files changed, 370 insertions(+) create mode 100644 src/Akka.Persistence.PostgreSql.Tests/Serialization/Test.cs create mode 100644 src/Akka.Persistence.PostgreSql.Tests/Serialization/TestJournal.cs diff --git a/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalJsonBSerializationSpec.cs b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalJsonBSerializationSpec.cs index caad88d..e58d7fc 100644 --- a/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalJsonBSerializationSpec.cs +++ b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalJsonBSerializationSpec.cs @@ -5,8 +5,11 @@ // //----------------------------------------------------------------------- +using System.Collections.Generic; +using Akka.Actor; using Akka.Configuration; using Akka.Persistence.TCK.Serialization; +using Akka.Util.Internal; using Xunit; using Xunit.Abstractions; @@ -53,5 +56,49 @@ private static Config CreateSpecConfig(PostgresFixture fixture) public override void Journal_should_serialize_Persistent_with_EventAdapter_manifest() { } + + [Fact] + public override void Journal_should_serialize_Persistent_with_string_manifest() + { + var probe = CreateTestProbe(); + var persistentEvent = new Persistent(new TestJournal.MyPayload2("b", 5), 1L, Pid, null, false, null, WriterGuid); + + var messages = new List + { + new AtomicWrite(persistentEvent) + }; + + Journal.Tell(new WriteMessages(messages, probe.Ref, ActorInstanceId)); + probe.ExpectMsg(); + probe.ExpectMsg(m => m.ActorInstanceId == ActorInstanceId && m.Persistent.PersistenceId == Pid); + + Journal.Tell(new ReplayMessages(0, long.MaxValue, long.MaxValue, Pid, probe.Ref)); + probe.ExpectMsg(s => s.Persistent.PersistenceId == persistentEvent.PersistenceId + && s.Persistent.SequenceNr == persistentEvent.SequenceNr + && s.Persistent.Payload.AsInstanceOf().Data.Equals("b")); + probe.ExpectMsg(); + } + + [Fact] + public override void Journal_should_serialize_Persistent() + { + var probe = CreateTestProbe(); + var persistentEvent = new Persistent(new TestJournal.MyPayload("a"), 1L, Pid, null, false, null, WriterGuid); + + var messages = new List + { + new AtomicWrite(persistentEvent) + }; + + Journal.Tell(new WriteMessages(messages, probe.Ref, ActorInstanceId)); + probe.ExpectMsg(); + probe.ExpectMsg(m => m.ActorInstanceId == ActorInstanceId && m.Persistent.PersistenceId == Pid); + + Journal.Tell(new ReplayMessages(0, long.MaxValue, long.MaxValue, Pid, probe.Ref)); + probe.ExpectMsg(s => s.Persistent.PersistenceId == Pid + && s.Persistent.SequenceNr == persistentEvent.SequenceNr + && s.Persistent.Payload.AsInstanceOf().Data.Equals("a")); + probe.ExpectMsg(); + } } } diff --git a/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalJsonSerializationSpec.cs b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalJsonSerializationSpec.cs index 7137769..ee01ae9 100644 --- a/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalJsonSerializationSpec.cs +++ b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlJournalJsonSerializationSpec.cs @@ -5,8 +5,11 @@ // //----------------------------------------------------------------------- +using System.Collections.Generic; +using Akka.Actor; using Akka.Configuration; using Akka.Persistence.TCK.Serialization; +using Akka.Util.Internal; using Xunit; using Xunit.Abstractions; @@ -53,5 +56,51 @@ private static Config CreateSpecConfig(PostgresFixture fixture) public override void Journal_should_serialize_Persistent_with_EventAdapter_manifest() { } + + + [Fact] + public override void Journal_should_serialize_Persistent_with_string_manifest() + { + var probe = CreateTestProbe(); + var persistentEvent = new Persistent(new TestJournal.MyPayload2("b", 5), 1L, Pid, null, false, null, WriterGuid); + + var messages = new List + { + new AtomicWrite(persistentEvent) + }; + + Journal.Tell(new WriteMessages(messages, probe.Ref, ActorInstanceId)); + probe.ExpectMsg(); + probe.ExpectMsg(m => m.ActorInstanceId == ActorInstanceId && m.Persistent.PersistenceId == Pid); + + Journal.Tell(new ReplayMessages(0, long.MaxValue, long.MaxValue, Pid, probe.Ref)); + probe.ExpectMsg(s => s.Persistent.PersistenceId == persistentEvent.PersistenceId + && s.Persistent.SequenceNr == persistentEvent.SequenceNr + && s.Persistent.Payload.AsInstanceOf().Data.Equals("b")); + probe.ExpectMsg(); + } + + [Fact] + public override void Journal_should_serialize_Persistent() + { + var probe = CreateTestProbe(); + var persistentEvent = new Persistent(new TestJournal.MyPayload("a"), 1L, Pid, null, false, null, WriterGuid); + + var messages = new List + { + new AtomicWrite(persistentEvent) + }; + + Journal.Tell(new WriteMessages(messages, probe.Ref, ActorInstanceId)); + probe.ExpectMsg(); + probe.ExpectMsg(m => m.ActorInstanceId == ActorInstanceId && m.Persistent.PersistenceId == Pid); + + Journal.Tell(new ReplayMessages(0, long.MaxValue, long.MaxValue, Pid, probe.Ref)); + probe.ExpectMsg(s => s.Persistent.PersistenceId == Pid + && s.Persistent.SequenceNr == persistentEvent.SequenceNr + && s.Persistent.Payload.AsInstanceOf().Data.Equals("a")); + probe.ExpectMsg(); + } + } } diff --git a/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreJsonBSerializationSpec.cs b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreJsonBSerializationSpec.cs index 99d75ef..4e50c49 100644 --- a/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreJsonBSerializationSpec.cs +++ b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreJsonBSerializationSpec.cs @@ -8,6 +8,7 @@ using Akka.Actor; using Akka.Configuration; using Akka.Persistence.TCK.Serialization; +using Akka.Util.Internal; using FluentAssertions; using Newtonsoft.Json; using Xunit; @@ -51,5 +52,39 @@ private static Config CreateSpecConfig(PostgresFixture fixture) akka.test.single-expect-default = 10s") .WithFallback(PostgreSqlPersistence.DefaultConfiguration()); } + + [Fact] + public override void SnapshotStore_should_serialize_Payload() + { + var probe = CreateTestProbe(); + + var snapshot = new Test.MySnapshot("a"); + + var metadata = new SnapshotMetadata(Pid, 1); + SnapshotStore.Tell(new SaveSnapshot(metadata, snapshot), probe.Ref); + probe.ExpectMsg(); + + SnapshotStore.Tell(new LoadSnapshot(Pid, SnapshotSelectionCriteria.Latest, long.MaxValue), probe.Ref); + probe.ExpectMsg(s => + s.Snapshot.Snapshot is Test.MySnapshot + && s.Snapshot.Snapshot.AsInstanceOf().Data.Equals("a")); + } + + [Fact] + public override void SnapshotStore_should_serialize_Payload_with_string_manifest() + { + var probe = CreateTestProbe(); + + var snapshot = new Test.MySnapshot2("a"); + + var metadata = new SnapshotMetadata(Pid, 1); + SnapshotStore.Tell(new SaveSnapshot(metadata, snapshot), probe.Ref); + probe.ExpectMsg(); + + SnapshotStore.Tell(new LoadSnapshot(Pid, SnapshotSelectionCriteria.Latest, long.MaxValue), probe.Ref); + probe.ExpectMsg(s => + s.Snapshot.Snapshot is Test.MySnapshot2 + && s.Snapshot.Snapshot.AsInstanceOf().Data.Equals("a")); + } } } diff --git a/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreJsonSerializationSpec.cs b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreJsonSerializationSpec.cs index 9354c3d..3cd914e 100644 --- a/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreJsonSerializationSpec.cs +++ b/src/Akka.Persistence.PostgreSql.Tests/Serialization/PostgreSqlSnapshotStoreJsonSerializationSpec.cs @@ -7,6 +7,7 @@ using Akka.Configuration; using Akka.Persistence.TCK.Serialization; +using Akka.Util.Internal; using Xunit; using Xunit.Abstractions; @@ -48,5 +49,39 @@ private static Config CreateSpecConfig(PostgresFixture fixture) akka.test.single-expect-default = 10s") .WithFallback(PostgreSqlPersistence.DefaultConfiguration()); } + + [Fact] + public override void SnapshotStore_should_serialize_Payload() + { + var probe = CreateTestProbe(); + + var snapshot = new Test.MySnapshot("a"); + + var metadata = new SnapshotMetadata(Pid, 1); + SnapshotStore.Tell(new SaveSnapshot(metadata, snapshot), probe.Ref); + probe.ExpectMsg(); + + SnapshotStore.Tell(new LoadSnapshot(Pid, SnapshotSelectionCriteria.Latest, long.MaxValue), probe.Ref); + probe.ExpectMsg(s => + s.Snapshot.Snapshot is Test.MySnapshot + && s.Snapshot.Snapshot.AsInstanceOf().Data.Equals("a")); + } + + [Fact] + public override void SnapshotStore_should_serialize_Payload_with_string_manifest() + { + var probe = CreateTestProbe(); + + var snapshot = new Test.MySnapshot2("a"); + + var metadata = new SnapshotMetadata(Pid, 1); + SnapshotStore.Tell(new SaveSnapshot(metadata, snapshot), probe.Ref); + probe.ExpectMsg(); + + SnapshotStore.Tell(new LoadSnapshot(Pid, SnapshotSelectionCriteria.Latest, long.MaxValue), probe.Ref); + probe.ExpectMsg(s => + s.Snapshot.Snapshot is Test.MySnapshot2 + && s.Snapshot.Snapshot.AsInstanceOf().Data.Equals("a")); + } } } diff --git a/src/Akka.Persistence.PostgreSql.Tests/Serialization/Test.cs b/src/Akka.Persistence.PostgreSql.Tests/Serialization/Test.cs new file mode 100644 index 0000000..40897ad --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Tests/Serialization/Test.cs @@ -0,0 +1,82 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Text; +using Akka.Actor; +using Akka.Serialization; + +namespace Akka.Persistence.PostgreSql.Tests.Serialization +{ + internal static class Test + { + public class MySnapshot + { + public MySnapshot(string data) + { + Data = data; + } + + public string Data { get; } + } + + public class MySnapshot2 + { + public MySnapshot2(string data) + { + Data = data; + } + + public string Data { get; } + } + + public class MySnapshotSerializer : Serializer + { + public MySnapshotSerializer(ExtendedActorSystem system) : base(system) { } + public override int Identifier => 77124; + public override bool IncludeManifest => true; + + public override byte[] ToBinary(object obj) + { + if (obj is MySnapshot snapshot) return Encoding.UTF8.GetBytes($".{snapshot.Data}"); + throw new ArgumentException($"Can't serialize object of type [{obj.GetType()}] in [{nameof(MySnapshotSerializer2)}]"); + } + + public override object FromBinary(byte[] bytes, Type type) + { + if (type == typeof(MySnapshot)) return new MySnapshot($"{Encoding.UTF8.GetString(bytes)}."); + throw new ArgumentException($"Unimplemented deserialization of message with manifest [{type}] in serializer {nameof(MySnapshotSerializer)}"); + } + } + + public class MySnapshotSerializer2 : SerializerWithStringManifest + { + private const string ContactsManifest = "A"; + + public MySnapshotSerializer2(ExtendedActorSystem system) : base(system) { } + public override int Identifier => 77126; + + public override byte[] ToBinary(object obj) + { + if (obj is MySnapshot2 snapshot) return Encoding.UTF8.GetBytes($".{snapshot.Data}"); + throw new ArgumentException($"Can't serialize object of type [{obj.GetType()}] in [{nameof(MySnapshotSerializer2)}]"); + } + + public override string Manifest(object obj) + { + if (obj is MySnapshot2) return ContactsManifest; + throw new ArgumentException($"Can't serialize object of type [{obj.GetType()}] in [{nameof(MySnapshotSerializer2)}]"); + } + + public override object FromBinary(byte[] bytes, string manifest) + { + if (manifest == ContactsManifest) return new MySnapshot2(Encoding.UTF8.GetString(bytes) + "."); + throw new ArgumentException($"Unimplemented deserialization of message with manifest [{manifest}] in serializer {nameof(MySnapshotSerializer2)}"); + } + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.PostgreSql.Tests/Serialization/TestJournal.cs b/src/Akka.Persistence.PostgreSql.Tests/Serialization/TestJournal.cs new file mode 100644 index 0000000..9c54416 --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Tests/Serialization/TestJournal.cs @@ -0,0 +1,122 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Text; +using Akka.Actor; +using Akka.Persistence.Journal; +using Akka.Serialization; +using Akka.Util; + +namespace Akka.Persistence.PostgreSql.Tests.Serialization +{ + internal static class TestJournal + { + public class MyPayload + { + public MyPayload(string data) => Data = data; + + public string Data { get; } + } + + public class MyPayload2 + { + public MyPayload2(string data, int n) + { + Data = data; + N = n; + } + + public string Data { get; } + public int N { get; } + } + + public class MyPayload3 + { + public MyPayload3(string data) => Data = data; + + public string Data { get; } + } + + public class MyPayloadSerializer : Serializer + { + public MyPayloadSerializer(ExtendedActorSystem system) : base(system) { } + + public override int Identifier => 77123; + public override bool IncludeManifest => true; + + public override byte[] ToBinary(object obj) + { + if (obj is MyPayload myPayload) return Encoding.UTF8.GetBytes("." + myPayload.Data); + if (obj is MyPayload3 myPayload3) return Encoding.UTF8.GetBytes("." + myPayload3.Data); + throw new ArgumentException($"Can't serialize object of type [{obj.GetType()}] in [{nameof(MyPayloadSerializer)}]"); + } + + public override object FromBinary(byte[] bytes, Type type) + { + if (type == typeof(MyPayload)) return new MyPayload($"{Encoding.UTF8.GetString(bytes)}."); + if (type == typeof(MyPayload3)) return new MyPayload3($"{Encoding.UTF8.GetString(bytes)}."); + throw new ArgumentException($"Unimplemented deserialization of message with manifest [{type}] in serializer {nameof(MyPayloadSerializer)}"); + } + } + + public class MyPayload2Serializer : SerializerWithStringManifest + { + private readonly string _manifestV1 = typeof(MyPayload).TypeQualifiedName(); + private readonly string _manifestV2 = "MyPayload-V2"; + + public MyPayload2Serializer(ExtendedActorSystem system) : base(system) + { + } + + public override int Identifier => 77125; + + public override byte[] ToBinary(object obj) + { + if (obj is MyPayload2) + return Encoding.UTF8.GetBytes(string.Format(".{0}:{1}", ((MyPayload2)obj).Data, ((MyPayload2)obj).N)); + return null; + } + + public override string Manifest(object o) + { + return _manifestV2; + } + + public override object FromBinary(byte[] bytes, string manifest) + { + if (manifest.Equals(_manifestV2)) + { + var parts = Encoding.UTF8.GetString(bytes).Split(':'); + return new MyPayload2(parts[0] + ".", int.Parse(parts[1])); + } + if (manifest.Equals(_manifestV1)) + return new MyPayload2(Encoding.UTF8.GetString(bytes) + ".", 0); + throw new ArgumentException("unexpected manifest " + manifest); + } + } + + public class MyWriteAdapter : IWriteEventAdapter + { + public string Manifest(object evt) + { + switch (evt) + { + case MyPayload3 p when p.Data.Equals("item1"): + return "First-Manifest"; + default: + return string.Empty; + } + } + + public object ToJournal(object evt) + { + return evt; + } + } + } +} \ No newline at end of file