diff --git a/.dockerignore b/.dockerignore
new file mode 100644
index 0000000..fe1152b
--- /dev/null
+++ b/.dockerignore
@@ -0,0 +1,30 @@
+**/.classpath
+**/.dockerignore
+**/.env
+**/.git
+**/.gitignore
+**/.project
+**/.settings
+**/.toolstarget
+**/.vs
+**/.vscode
+**/*.*proj.user
+**/*.dbmdl
+**/*.jfm
+**/azds.yaml
+**/bin
+**/charts
+**/docker-compose*
+**/Dockerfile*
+**/node_modules
+**/npm-debug.log
+**/obj
+**/secrets.dev.yaml
+**/values.dev.yaml
+LICENSE
+README.md
+!**/.gitignore
+!.git/HEAD
+!.git/config
+!.git/packed-refs
+!.git/refs/heads/**
\ No newline at end of file
diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml
new file mode 100644
index 0000000..dcddade
--- /dev/null
+++ b/.github/workflows/dotnet.yml
@@ -0,0 +1,31 @@
+# This workflow will build a .NET project
+# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-net
+
+name: .NET
+
+on:
+ push:
+ branches: [ "main" ]
+ pull_request:
+ branches: [ "main" ]
+
+jobs:
+ build:
+
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v4
+ - name: Setup .NET
+ uses: actions/setup-dotnet@v4
+ with:
+ dotnet-version: 8.0.x
+ - name: Restore dependencies
+ run: dotnet restore
+ working-directory: ./src
+ - name: Build
+ run: dotnet build --no-restore
+ working-directory: ./src
+ - name: Test
+ run: dotnet test --no-build --verbosity normal
+ working-directory: ./src
diff --git a/.gitignore b/.gitignore
index a4fe18b..44ce8a3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,402 @@
-## Ignore Visual Studio temporary files, build results, and
-## files generated by popular Visual Studio add-ons.
+dist/
+.vscode/##
+## Get latest from https://github.com/github/gitignore/blob/main/VisualStudio.gitignore
+
+# User-specific files
+*.rsuser
+*.suo
+*.user
+*.userosscache
+*.sln.docstates
+
+# User-specific files (MonoDevelop/Xamarin Studio)
+*.userprefs
+
+# Mono auto generated files
+mono_crash.*
+
+# Build results
+[Dd]ebug/
+[Dd]ebugPublic/
+[Rr]elease/
+[Rr]eleases/
+x64/
+x86/
+[Ww][Ii][Nn]32/
+[Aa][Rr][Mm]/
+[Aa][Rr][Mm]64/
+bld/
+[Bb]in/
+[Oo]bj/
+[Ll]og/
+[Ll]ogs/
+
+# Visual Studio 2015/2017 cache/options directory
+.vs/
+# Uncomment if you have tasks that create the project's static files in wwwroot
+#wwwroot/
+
+# Visual Studio 2017 auto generated files
+Generated\ Files/
+
+# MSTest test Results
+[Tt]est[Rr]esult*/
+[Bb]uild[Ll]og.*
+
+# NUnit
+*.VisualState.xml
+TestResult.xml
+nunit-*.xml
+
+# Build Results of an ATL Project
+[Dd]ebugPS/
+[Rr]eleasePS/
+dlldata.c
+
+# Benchmark Results
+BenchmarkDotNet.Artifacts/
+
+# .NET Core
+project.lock.json
+project.fragment.lock.json
+artifacts/
+
+# ASP.NET Scaffolding
+ScaffoldingReadMe.txt
+
+# StyleCop
+StyleCopReport.xml
+
+# Files built by Visual Studio
+*_i.c
+*_p.c
+*_h.h
+*.ilk
+*.meta
+*.obj
+*.iobj
+*.pch
+*.pdb
+*.ipdb
+*.pgc
+*.pgd
+*.rsp
+# but not Directory.Build.rsp, as it configures directory-level build defaults
+!Directory.Build.rsp
+*.sbr
+*.tlb
+*.tli
+*.tlh
+*.tmp
+*.tmp_proj
+*_wpftmp.csproj
+*.log
+*.tlog
+*.vspscc
+*.vssscc
+.builds
+*.pidb
+*.svclog
+*.scc
+
+# Chutzpah Test files
+_Chutzpah*
+
+# Visual C++ cache files
+ipch/
+*.aps
+*.ncb
+*.opendb
+*.opensdf
+*.sdf
+*.cachefile
+*.VC.db
+*.VC.VC.opendb
+
+# Visual Studio profiler
+*.psess
+*.vsp
+*.vspx
+*.sap
+
+# Visual Studio Trace Files
+*.e2e
+
+# TFS 2012 Local Workspace
+$tf/
+
+# Guidance Automation Toolkit
+*.gpState
+
+# ReSharper is a .NET coding add-in
+_ReSharper*/
+*.[Rr]e[Ss]harper
+*.DotSettings.user
+
+# TeamCity is a build add-in
+_TeamCity*
+
+# DotCover is a Code Coverage Tool
+*.dotCover
+
+# AxoCover is a Code Coverage Tool
+.axoCover/*
+!.axoCover/settings.json
+
+# Coverlet is a free, cross platform Code Coverage Tool
+coverage*.json
+coverage*.xml
+coverage*.info
+
+# Visual Studio code coverage results
+*.coverage
+*.coveragexml
+
+# NCrunch
+_NCrunch_*
+.*crunch*.local.xml
+nCrunchTemp_*
+
+# MightyMoose
+*.mm.*
+AutoTest.Net/
+
+# Web workbench (sass)
+.sass-cache/
+
+# Installshield output folder
+[Ee]xpress/
+
+# DocProject is a documentation generator add-in
+DocProject/buildhelp/
+DocProject/Help/*.HxT
+DocProject/Help/*.HxC
+DocProject/Help/*.hhc
+DocProject/Help/*.hhk
+DocProject/Help/*.hhp
+DocProject/Help/Html2
+DocProject/Help/html
+
+# Click-Once directory
+publish/
+
+# Publish Web Output
+*.[Pp]ublish.xml
+*.azurePubxml
+# Note: Comment the next line if you want to checkin your web deploy settings,
+# but database connection strings (with potential passwords) will be unencrypted
+*.pubxml
+*.publishproj
+
+# Microsoft Azure Web App publish settings. Comment the next line if you want to
+# checkin your Azure Web App publish settings, but sensitive information contained
+# in these scripts will be unencrypted
+PublishScripts/
+
+# NuGet Packages
+*.nupkg
+# NuGet Symbol Packages
+*.snupkg
+# The packages folder can be ignored because of Package Restore
+**/[Pp]ackages/*
+# except build/, which is used as an MSBuild target.
+!**/[Pp]ackages/build/
+# Uncomment if necessary however generally it will be regenerated when needed
+#!**/[Pp]ackages/repositories.config
+# NuGet v3's project.json files produces more ignorable files
+*.nuget.props
+*.nuget.targets
+
+# Microsoft Azure Build Output
+csx/
+*.build.csdef
+
+# Microsoft Azure Emulator
+ecf/
+rcf/
+
+# Windows Store app package directories and files
+AppPackages/
+BundleArtifacts/
+Package.StoreAssociation.xml
+_pkginfo.txt
+*.appx
+*.appxbundle
+*.appxupload
+
+# Visual Studio cache files
+# files ending in .cache can be ignored
+*.[Cc]ache
+# but keep track of directories ending in .cache
+!?*.[Cc]ache/
+
+# Others
+ClientBin/
+~$*
+*~
+*.dbmdl
+*.dbproj.schemaview
+*.jfm
+*.pfx
+*.publishsettings
+orleans.codegen.cs
+
+# Including strong name files can present a security risk
+# (https://github.com/github/gitignore/pull/2483#issue-259490424)
+#*.snk
+
+# Since there are multiple workflows, uncomment next line to ignore bower_components
+# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622)
+#bower_components/
+
+# RIA/Silverlight projects
+Generated_Code/
+
+# Backup & report files from converting an old project file
+# to a newer Visual Studio version. Backup files are not needed,
+# because we have git ;-)
+_UpgradeReport_Files/
+Backup*/
+UpgradeLog*.XML
+UpgradeLog*.htm
+ServiceFabricBackup/
+*.rptproj.bak
+
+# SQL Server files
+*.mdf
+*.ldf
+*.ndf
+
+# Business Intelligence projects
+*.rdl.data
+*.bim.layout
+*.bim_*.settings
+*.rptproj.rsuser
+*- [Bb]ackup.rdl
+*- [Bb]ackup ([0-9]).rdl
+*- [Bb]ackup ([0-9][0-9]).rdl
+
+# Microsoft Fakes
+FakesAssemblies/
+
+# GhostDoc plugin setting file
+*.GhostDoc.xml
+
+# Node.js Tools for Visual Studio
+.ntvs_analysis.dat
+node_modules/
+
+# Visual Studio 6 build log
+*.plg
+
+# Visual Studio 6 workspace options file
+*.opt
+
+# Visual Studio 6 auto-generated workspace file (contains which files were open etc.)
+*.vbw
+
+# Visual Studio 6 auto-generated project file (contains which files were open etc.)
+*.vbp
+
+# Visual Studio 6 workspace and project file (working project files containing files to include in project)
+*.dsw
+*.dsp
+
+# Visual Studio 6 technical files
+*.ncb
+*.aps
+
+# Visual Studio LightSwitch build output
+**/*.HTMLClient/GeneratedArtifacts
+**/*.DesktopClient/GeneratedArtifacts
+**/*.DesktopClient/ModelManifest.xml
+**/*.Server/GeneratedArtifacts
+**/*.Server/ModelManifest.xml
+_Pvt_Extensions
+
+# Paket dependency manager
+.paket/paket.exe
+paket-files/
+
+# FAKE - F# Make
+.fake/
+
+# CodeRush personal settings
+.cr/personal
+
+# Python Tools for Visual Studio (PTVS)
+__pycache__/
+*.pyc
+
+# Cake - Uncomment if you are using it
+# tools/**
+# !tools/packages.config
+
+# Tabs Studio
+*.tss
+
+# Telerik's JustMock configuration file
+*.jmconfig
+
+# BizTalk build output
+*.btp.cs
+*.btm.cs
+*.odx.cs
+*.xsd.cs
+
+# OpenCover UI analysis results
+OpenCover/
+
+# Azure Stream Analytics local run output
+ASALocalRun/
+
+# MSBuild Binary and Structured Log
+*.binlog
+
+# NVidia Nsight GPU debugger configuration file
+*.nvuser
+
+# MFractors (Xamarin productivity tool) working folder
+.mfractor/
+
+# Local History for Visual Studio
+.localhistory/
+
+# Visual Studio History (VSHistory) files
+.vshistory/
+
+# BeatPulse healthcheck temp database
+healthchecksdb
+
+# Backup folder for Package Reference Convert tool in Visual Studio 2017
+MigrationBackup/
+
+# Ionide (cross platform F# VS Code tools) working folder
+.ionide/
+
+# Fody - auto-generated XML schema
+FodyWeavers.xsd
+
+# VS Code files for those working on multiple tools
+.vscode/*
+!.vscode/settings.json
+!.vscode/tasks.json
+!.vscode/launch.json
+!.vscode/extensions.json
+*.code-workspace
+
+# Local History for Visual Studio Code
+.history/
+
+# Windows Installer files from build outputs
+*.cab
+*.msi
+*.msix
+*.msm
+*.msp
+
+# JetBrains Rider
+*.sln.iml
##
## Get latest from https://github.com/github/gitignore/blob/main/VisualStudio.gitignore
diff --git a/src/.gitignore b/src/.gitignore
new file mode 100644
index 0000000..a4fe18b
--- /dev/null
+++ b/src/.gitignore
@@ -0,0 +1,400 @@
+## Ignore Visual Studio temporary files, build results, and
+## files generated by popular Visual Studio add-ons.
+##
+## Get latest from https://github.com/github/gitignore/blob/main/VisualStudio.gitignore
+
+# User-specific files
+*.rsuser
+*.suo
+*.user
+*.userosscache
+*.sln.docstates
+
+# User-specific files (MonoDevelop/Xamarin Studio)
+*.userprefs
+
+# Mono auto generated files
+mono_crash.*
+
+# Build results
+[Dd]ebug/
+[Dd]ebugPublic/
+[Rr]elease/
+[Rr]eleases/
+x64/
+x86/
+[Ww][Ii][Nn]32/
+[Aa][Rr][Mm]/
+[Aa][Rr][Mm]64/
+bld/
+[Bb]in/
+[Oo]bj/
+[Ll]og/
+[Ll]ogs/
+
+# Visual Studio 2015/2017 cache/options directory
+.vs/
+# Uncomment if you have tasks that create the project's static files in wwwroot
+#wwwroot/
+
+# Visual Studio 2017 auto generated files
+Generated\ Files/
+
+# MSTest test Results
+[Tt]est[Rr]esult*/
+[Bb]uild[Ll]og.*
+
+# NUnit
+*.VisualState.xml
+TestResult.xml
+nunit-*.xml
+
+# Build Results of an ATL Project
+[Dd]ebugPS/
+[Rr]eleasePS/
+dlldata.c
+
+# Benchmark Results
+BenchmarkDotNet.Artifacts/
+
+# .NET Core
+project.lock.json
+project.fragment.lock.json
+artifacts/
+
+# ASP.NET Scaffolding
+ScaffoldingReadMe.txt
+
+# StyleCop
+StyleCopReport.xml
+
+# Files built by Visual Studio
+*_i.c
+*_p.c
+*_h.h
+*.ilk
+*.meta
+*.obj
+*.iobj
+*.pch
+*.pdb
+*.ipdb
+*.pgc
+*.pgd
+*.rsp
+# but not Directory.Build.rsp, as it configures directory-level build defaults
+!Directory.Build.rsp
+*.sbr
+*.tlb
+*.tli
+*.tlh
+*.tmp
+*.tmp_proj
+*_wpftmp.csproj
+*.log
+*.tlog
+*.vspscc
+*.vssscc
+.builds
+*.pidb
+*.svclog
+*.scc
+
+# Chutzpah Test files
+_Chutzpah*
+
+# Visual C++ cache files
+ipch/
+*.aps
+*.ncb
+*.opendb
+*.opensdf
+*.sdf
+*.cachefile
+*.VC.db
+*.VC.VC.opendb
+
+# Visual Studio profiler
+*.psess
+*.vsp
+*.vspx
+*.sap
+
+# Visual Studio Trace Files
+*.e2e
+
+# TFS 2012 Local Workspace
+$tf/
+
+# Guidance Automation Toolkit
+*.gpState
+
+# ReSharper is a .NET coding add-in
+_ReSharper*/
+*.[Rr]e[Ss]harper
+*.DotSettings.user
+
+# TeamCity is a build add-in
+_TeamCity*
+
+# DotCover is a Code Coverage Tool
+*.dotCover
+
+# AxoCover is a Code Coverage Tool
+.axoCover/*
+!.axoCover/settings.json
+
+# Coverlet is a free, cross platform Code Coverage Tool
+coverage*.json
+coverage*.xml
+coverage*.info
+
+# Visual Studio code coverage results
+*.coverage
+*.coveragexml
+
+# NCrunch
+_NCrunch_*
+.*crunch*.local.xml
+nCrunchTemp_*
+
+# MightyMoose
+*.mm.*
+AutoTest.Net/
+
+# Web workbench (sass)
+.sass-cache/
+
+# Installshield output folder
+[Ee]xpress/
+
+# DocProject is a documentation generator add-in
+DocProject/buildhelp/
+DocProject/Help/*.HxT
+DocProject/Help/*.HxC
+DocProject/Help/*.hhc
+DocProject/Help/*.hhk
+DocProject/Help/*.hhp
+DocProject/Help/Html2
+DocProject/Help/html
+
+# Click-Once directory
+publish/
+
+# Publish Web Output
+*.[Pp]ublish.xml
+*.azurePubxml
+# Note: Comment the next line if you want to checkin your web deploy settings,
+# but database connection strings (with potential passwords) will be unencrypted
+*.pubxml
+*.publishproj
+
+# Microsoft Azure Web App publish settings. Comment the next line if you want to
+# checkin your Azure Web App publish settings, but sensitive information contained
+# in these scripts will be unencrypted
+PublishScripts/
+
+# NuGet Packages
+*.nupkg
+# NuGet Symbol Packages
+*.snupkg
+# The packages folder can be ignored because of Package Restore
+**/[Pp]ackages/*
+# except build/, which is used as an MSBuild target.
+!**/[Pp]ackages/build/
+# Uncomment if necessary however generally it will be regenerated when needed
+#!**/[Pp]ackages/repositories.config
+# NuGet v3's project.json files produces more ignorable files
+*.nuget.props
+*.nuget.targets
+
+# Microsoft Azure Build Output
+csx/
+*.build.csdef
+
+# Microsoft Azure Emulator
+ecf/
+rcf/
+
+# Windows Store app package directories and files
+AppPackages/
+BundleArtifacts/
+Package.StoreAssociation.xml
+_pkginfo.txt
+*.appx
+*.appxbundle
+*.appxupload
+
+# Visual Studio cache files
+# files ending in .cache can be ignored
+*.[Cc]ache
+# but keep track of directories ending in .cache
+!?*.[Cc]ache/
+
+# Others
+ClientBin/
+~$*
+*~
+*.dbmdl
+*.dbproj.schemaview
+*.jfm
+*.pfx
+*.publishsettings
+orleans.codegen.cs
+
+# Including strong name files can present a security risk
+# (https://github.com/github/gitignore/pull/2483#issue-259490424)
+#*.snk
+
+# Since there are multiple workflows, uncomment next line to ignore bower_components
+# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622)
+#bower_components/
+
+# RIA/Silverlight projects
+Generated_Code/
+
+# Backup & report files from converting an old project file
+# to a newer Visual Studio version. Backup files are not needed,
+# because we have git ;-)
+_UpgradeReport_Files/
+Backup*/
+UpgradeLog*.XML
+UpgradeLog*.htm
+ServiceFabricBackup/
+*.rptproj.bak
+
+# SQL Server files
+*.mdf
+*.ldf
+*.ndf
+
+# Business Intelligence projects
+*.rdl.data
+*.bim.layout
+*.bim_*.settings
+*.rptproj.rsuser
+*- [Bb]ackup.rdl
+*- [Bb]ackup ([0-9]).rdl
+*- [Bb]ackup ([0-9][0-9]).rdl
+
+# Microsoft Fakes
+FakesAssemblies/
+
+# GhostDoc plugin setting file
+*.GhostDoc.xml
+
+# Node.js Tools for Visual Studio
+.ntvs_analysis.dat
+node_modules/
+
+# Visual Studio 6 build log
+*.plg
+
+# Visual Studio 6 workspace options file
+*.opt
+
+# Visual Studio 6 auto-generated workspace file (contains which files were open etc.)
+*.vbw
+
+# Visual Studio 6 auto-generated project file (contains which files were open etc.)
+*.vbp
+
+# Visual Studio 6 workspace and project file (working project files containing files to include in project)
+*.dsw
+*.dsp
+
+# Visual Studio 6 technical files
+*.ncb
+*.aps
+
+# Visual Studio LightSwitch build output
+**/*.HTMLClient/GeneratedArtifacts
+**/*.DesktopClient/GeneratedArtifacts
+**/*.DesktopClient/ModelManifest.xml
+**/*.Server/GeneratedArtifacts
+**/*.Server/ModelManifest.xml
+_Pvt_Extensions
+
+# Paket dependency manager
+.paket/paket.exe
+paket-files/
+
+# FAKE - F# Make
+.fake/
+
+# CodeRush personal settings
+.cr/personal
+
+# Python Tools for Visual Studio (PTVS)
+__pycache__/
+*.pyc
+
+# Cake - Uncomment if you are using it
+# tools/**
+# !tools/packages.config
+
+# Tabs Studio
+*.tss
+
+# Telerik's JustMock configuration file
+*.jmconfig
+
+# BizTalk build output
+*.btp.cs
+*.btm.cs
+*.odx.cs
+*.xsd.cs
+
+# OpenCover UI analysis results
+OpenCover/
+
+# Azure Stream Analytics local run output
+ASALocalRun/
+
+# MSBuild Binary and Structured Log
+*.binlog
+
+# NVidia Nsight GPU debugger configuration file
+*.nvuser
+
+# MFractors (Xamarin productivity tool) working folder
+.mfractor/
+
+# Local History for Visual Studio
+.localhistory/
+
+# Visual Studio History (VSHistory) files
+.vshistory/
+
+# BeatPulse healthcheck temp database
+healthchecksdb
+
+# Backup folder for Package Reference Convert tool in Visual Studio 2017
+MigrationBackup/
+
+# Ionide (cross platform F# VS Code tools) working folder
+.ionide/
+
+# Fody - auto-generated XML schema
+FodyWeavers.xsd
+
+# VS Code files for those working on multiple tools
+.vscode/*
+!.vscode/settings.json
+!.vscode/tasks.json
+!.vscode/launch.json
+!.vscode/extensions.json
+*.code-workspace
+
+# Local History for Visual Studio Code
+.history/
+
+# Windows Installer files from build outputs
+*.cab
+*.msi
+*.msix
+*.msm
+*.msp
+
+# JetBrains Rider
+*.sln.iml
diff --git a/src/Common.Properties.xml b/src/Common.Properties.xml
new file mode 100644
index 0000000..e382396
--- /dev/null
+++ b/src/Common.Properties.xml
@@ -0,0 +1,22 @@
+
+
+
+ net8.0
+ enable
+ Sa
+ abt
+ dundich
+ Copyright © 2024
+ See https://github.com/dundich/Sa/releases
+ https://github.com/dundich/Sa
+ https://github.com/dundich/Sa
+ Apache-2.0
+ true
+ true
+ enable
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/Sa.Data.Cache/Sa.Data.Cache.csproj b/src/Sa.Data.Cache/Sa.Data.Cache.csproj
new file mode 100644
index 0000000..90fa666
--- /dev/null
+++ b/src/Sa.Data.Cache/Sa.Data.Cache.csproj
@@ -0,0 +1,13 @@
+
+
+
+ net8.0
+ enable
+ enable
+
+
+
+
+
+
+
diff --git a/src/Sa.Data.Cache/Setup.cs b/src/Sa.Data.Cache/Setup.cs
new file mode 100644
index 0000000..f740dd6
--- /dev/null
+++ b/src/Sa.Data.Cache/Setup.cs
@@ -0,0 +1,32 @@
+using Microsoft.Extensions.Caching.Memory;
+using Microsoft.Extensions.DependencyInjection;
+using ZiggyCreatures.Caching.Fusion;
+
+namespace Sa.Data.Cache;
+
+
+public static class Setup
+{
+ public static IServiceCollection AddFusionCacheEx(this IServiceCollection services, string cacheName, Action? configure = null)
+ {
+ services.AddFusionCacheSystemTextJsonSerializer();
+
+ // https://github.com/ZiggyCreatures/FusionCache
+ services
+ .AddFusionCache(cacheName)
+ .WithPostSetup((sp, c) =>
+ {
+ FusionCacheEntryOptions ops = c.DefaultEntryOptions;
+
+ ops.Duration = TimeSpan.FromMinutes(2);
+ ops.FactorySoftTimeout = TimeSpan.FromMilliseconds(100);
+ ops.FailSafeMaxDuration = TimeSpan.FromHours(2);
+ ops.FailSafeThrottleDuration = TimeSpan.FromSeconds(30);
+ ops.Priority = CacheItemPriority.Low;
+ configure?.Invoke(sp, ops);
+ })
+ .WithoutLogger();
+
+ return services;
+ }
+}
\ No newline at end of file
diff --git a/src/Sa.Data.PostgreSql.Migration/ITodo.cs b/src/Sa.Data.PostgreSql.Migration/ITodo.cs
new file mode 100644
index 0000000..2064e3c
--- /dev/null
+++ b/src/Sa.Data.PostgreSql.Migration/ITodo.cs
@@ -0,0 +1,7 @@
+namespace Sa.Data.PostgreSql.Migration
+{
+ public interface ITodo
+ {
+ // todos
+ }
+}
diff --git a/src/Sa.Data.PostgreSql.Migration/Sa.Data.PostgreSql.Migration.csproj b/src/Sa.Data.PostgreSql.Migration/Sa.Data.PostgreSql.Migration.csproj
new file mode 100644
index 0000000..660d21e
--- /dev/null
+++ b/src/Sa.Data.PostgreSql.Migration/Sa.Data.PostgreSql.Migration.csproj
@@ -0,0 +1,13 @@
+
+
+
+ net8.0
+ enable
+ enable
+
+
+
+
+
+
+
diff --git a/src/Sa.Data.PostgreSql/Configuration/IPgDataSourceSettingsBuilder.cs b/src/Sa.Data.PostgreSql/Configuration/IPgDataSourceSettingsBuilder.cs
new file mode 100644
index 0000000..1c8c3ff
--- /dev/null
+++ b/src/Sa.Data.PostgreSql/Configuration/IPgDataSourceSettingsBuilder.cs
@@ -0,0 +1,10 @@
+
+namespace Sa.Data.PostgreSql;
+
+public interface IPgDataSourceSettingsBuilder
+{
+ void WithConnectionString(string connectionString);
+ void WithConnectionString(Func implementationFactory);
+ void WithSettings(PgDataSourceSettings settings);
+ void WithSettings(Func implementationFactory);
+}
\ No newline at end of file
diff --git a/src/Sa.Data.PostgreSql/Configuration/PgDataSourceSettings.cs b/src/Sa.Data.PostgreSql/Configuration/PgDataSourceSettings.cs
new file mode 100644
index 0000000..29c4800
--- /dev/null
+++ b/src/Sa.Data.PostgreSql/Configuration/PgDataSourceSettings.cs
@@ -0,0 +1,6 @@
+namespace Sa.Data.PostgreSql;
+
+public class PgDataSourceSettings(string connectionString)
+{
+ public string ConnectionString { get; } = connectionString;
+}
diff --git a/src/Sa.Data.PostgreSql/Configuration/PgDataSourceSettingsBuilder.cs b/src/Sa.Data.PostgreSql/Configuration/PgDataSourceSettingsBuilder.cs
new file mode 100644
index 0000000..1e40a96
--- /dev/null
+++ b/src/Sa.Data.PostgreSql/Configuration/PgDataSourceSettingsBuilder.cs
@@ -0,0 +1,28 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+
+namespace Sa.Data.PostgreSql.Configuration;
+
+internal class PgDataSourceSettingsBuilder(IServiceCollection services) : IPgDataSourceSettingsBuilder
+{
+
+ public void WithConnectionString(string connectionString)
+ {
+ services.TryAddSingleton(new PgDataSourceSettings(connectionString));
+ }
+
+ public void WithConnectionString(Func implementationFactory)
+ {
+ services.TryAddSingleton(sp => new PgDataSourceSettings(implementationFactory(sp)));
+ }
+
+ public void WithSettings(Func implementationFactory)
+ {
+ services.TryAddSingleton(implementationFactory);
+ }
+
+ public void WithSettings(PgDataSourceSettings settings)
+ {
+ services.TryAddSingleton(settings);
+ }
+}
diff --git a/src/Sa.Data.PostgreSql/GlobalSuppressions.cs b/src/Sa.Data.PostgreSql/GlobalSuppressions.cs
new file mode 100644
index 0000000..9f39147
--- /dev/null
+++ b/src/Sa.Data.PostgreSql/GlobalSuppressions.cs
@@ -0,0 +1,8 @@
+// This file is used by Code Analysis to maintain SuppressMessage
+// attributes that are applied to this project.
+// Project-level suppressions either have no target or are given
+// a specific target and scoped to a namespace, type, member, etc.
+
+using System.Diagnostics.CodeAnalysis;
+
+[assembly: SuppressMessage("Style", "IDE0130:Namespace does not match folder structure", Justification = "", Scope = "namespace", Target = "~N:Sa.Data.PostgreSql")]
diff --git a/src/Sa.Data.PostgreSql/IPgDataSource.cs b/src/Sa.Data.PostgreSql/IPgDataSource.cs
new file mode 100644
index 0000000..0f504f7
--- /dev/null
+++ b/src/Sa.Data.PostgreSql/IPgDataSource.cs
@@ -0,0 +1,83 @@
+using Npgsql;
+
+namespace Sa.Data.PostgreSql;
+
+public interface IPgDataSource
+{
+ public static IPgDataSource Create(string connectionString) => new PgDataSource(new PgDataSourceSettings(connectionString));
+
+
+ // ExecuteNonQuery
+
+ Task ExecuteNonQuery(string sql, NpgsqlParameter[] parameters, CancellationToken cancellationToken = default);
+
+ async Task ExecuteNonQuery(string sql, CancellationToken cancellationToken = default)
+ => await ExecuteNonQuery(sql, [], cancellationToken);
+
+
+ // ExecuteReader
+
+ Task ExecuteReader(string sql, Action read, NpgsqlParameter[] parameters, CancellationToken cancellationToken = default);
+
+ async Task ExecuteReader(string sql, Action read, CancellationToken cancellationToken = default)
+ => await ExecuteReader(sql, read, [], cancellationToken);
+
+
+ // ExecuteReaderList
+
+
+ async Task> ExecuteReaderList(string sql, Func read, CancellationToken cancellationToken = default)
+ {
+ List list = [];
+ await ExecuteReader(sql, (reader, _) => list.Add(read(reader)), cancellationToken);
+ return list;
+ }
+
+ async Task> ExecuteReaderList(string sql, Func read, NpgsqlParameter[] parameters, CancellationToken cancellationToken = default)
+ {
+ List list = [];
+ await ExecuteReader(sql, (reader, _) => list.Add(read(reader)), parameters, cancellationToken);
+ return list;
+ }
+
+
+ // ExecuteReaderFirst
+
+
+ Task ExecuteReaderFirst(string sql, CancellationToken cancellationToken = default)
+ {
+ return ExecuteReaderFirst(sql, [], cancellationToken);
+ }
+
+ async Task ExecuteReaderFirst(string sql, NpgsqlParameter[] parameters, CancellationToken cancellationToken = default)
+ {
+ T value = default!;
+
+ await ExecuteReader(sql, (reader, _) =>
+ {
+ value = Type.GetTypeCode(typeof(T)) switch
+ {
+ TypeCode.Char => (T)(object)reader.GetChar(0),
+ TypeCode.Int64 => (T)(object)reader.GetInt64(0),
+ TypeCode.Int32 => (T)(object)reader.GetInt32(0),
+ TypeCode.String => (T)(object)reader.GetString(0),
+ TypeCode.Boolean => (T)(object)reader.GetBoolean(0),
+ TypeCode.Double => (T)(object)reader.GetDouble(0),
+ TypeCode.DateTime => (T)(object)reader.GetDateTime(0),
+ TypeCode.Decimal => (T)(object)reader.GetDecimal(0),
+ TypeCode.DBNull => value,
+ _ => throw new InvalidOperationException($"Unsupported type: {typeof(T)}"),
+ };
+ }
+ , parameters
+ , cancellationToken);
+
+ return value;
+ }
+
+
+ // BeginBinaryImport
+
+
+ ValueTask BeginBinaryImport(string sql, Func> write, CancellationToken cancellationToken = default);
+}
diff --git a/src/Sa.Data.PostgreSql/IPgDistributedLock.cs b/src/Sa.Data.PostgreSql/IPgDistributedLock.cs
new file mode 100644
index 0000000..5ebf898
--- /dev/null
+++ b/src/Sa.Data.PostgreSql/IPgDistributedLock.cs
@@ -0,0 +1,6 @@
+namespace Sa.Data.PostgreSql;
+
+public interface IPgDistributedLock
+{
+ Task TryExecuteInDistributedLock(long lockId, Func exclusiveLockTask, CancellationToken cancellationToken);
+}
diff --git a/src/Sa.Data.PostgreSql/PgDataSource.cs b/src/Sa.Data.PostgreSql/PgDataSource.cs
new file mode 100644
index 0000000..d175bf5
--- /dev/null
+++ b/src/Sa.Data.PostgreSql/PgDataSource.cs
@@ -0,0 +1,71 @@
+using Npgsql;
+
+namespace Sa.Data.PostgreSql;
+
+///
+/// NpgsqlDataSource lite
+///
+/// connection string
+internal sealed class PgDataSource(PgDataSourceSettings settings) : IPgDataSource, IDisposable, IAsyncDisposable
+{
+ private readonly Lazy _dataSource = new(() => NpgsqlDataSource.Create(settings.ConnectionString));
+
+ public ValueTask OpenDbConnection(CancellationToken cancellationToken) => _dataSource.Value.OpenConnectionAsync(cancellationToken);
+
+ public void Dispose()
+ {
+ if (_dataSource.IsValueCreated)
+ {
+ _dataSource.Value.Dispose();
+ }
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ if (_dataSource.IsValueCreated)
+ {
+ await _dataSource.Value.DisposeAsync();
+ }
+ }
+
+ public async ValueTask BeginBinaryImport(string sql, Func> write, CancellationToken cancellationToken = default)
+ {
+ using NpgsqlConnection db = await OpenDbConnection(cancellationToken);
+ using NpgsqlBinaryImporter writer = await db.BeginBinaryImportAsync(sql, cancellationToken);
+ ulong result = await write(writer, cancellationToken);
+ return result;
+ }
+
+ public async Task ExecuteNonQuery(string sql, NpgsqlParameter[] parameters, CancellationToken cancellationToken = default)
+ {
+ using NpgsqlConnection connection = await OpenDbConnection(cancellationToken);
+ using NpgsqlCommand cmd = new(sql, connection);
+ AddParameters(cmd, parameters);
+ return await cmd.ExecuteNonQueryAsync(cancellationToken);
+ }
+
+ public async Task ExecuteReader(string sql, Action read, NpgsqlParameter[] parameters, CancellationToken cancellationToken = default)
+ {
+ int rowCount = 0;
+
+ using NpgsqlConnection connection = await OpenDbConnection(cancellationToken);
+ using NpgsqlCommand cmd = new(sql, connection);
+ AddParameters(cmd, parameters);
+ using NpgsqlDataReader reader = await cmd.ExecuteReaderAsync(cancellationToken);
+ while (await reader.ReadAsync(cancellationToken) && !cancellationToken.IsCancellationRequested)
+ {
+ read(reader, rowCount);
+ rowCount++;
+ }
+ return rowCount;
+ }
+
+
+ static void AddParameters(NpgsqlCommand cmd, NpgsqlParameter[] parameters)
+ {
+ if (parameters != null && parameters.Length > 0)
+ {
+ cmd.Parameters.AddRange(parameters);
+ }
+ }
+}
diff --git a/src/Sa.Data.PostgreSql/PgDistributedLock.cs b/src/Sa.Data.PostgreSql/PgDistributedLock.cs
new file mode 100644
index 0000000..f22b1b5
--- /dev/null
+++ b/src/Sa.Data.PostgreSql/PgDistributedLock.cs
@@ -0,0 +1,63 @@
+using Microsoft.Extensions.Logging;
+using Npgsql;
+
+namespace Sa.Data.PostgreSql;
+
+///
+///
+///
+///
+internal sealed class PgDistributedLock(PgDataSourceSettings settings, ILogger? logger = null) : IPgDistributedLock
+{
+ private readonly NpgsqlConnectionStringBuilder builder = new(settings.ConnectionString);
+
+ public async Task TryExecuteInDistributedLock(long lockId, Func exclusiveLockTask, CancellationToken cancellationToken)
+ {
+ logger?.LogInformation("Trying to acquire session lock for Lock Id {@LockId}", lockId);
+
+ using var connection = new NpgsqlConnection(builder.ToString());
+ await connection.OpenAsync(cancellationToken);
+
+ bool hasLockedAcquired = await TryAcquireLockAsync(lockId, connection, cancellationToken);
+
+ if (!hasLockedAcquired)
+ {
+ logger?.LogInformation("Lock {@LockId} rejected", lockId);
+ return false;
+ }
+
+ logger?.LogInformation("Lock {@LockId} acquired", lockId);
+ try
+ {
+ if (await TryAcquireLockAsync(lockId, connection, cancellationToken))
+ {
+ await exclusiveLockTask(cancellationToken);
+ }
+ }
+ finally
+ {
+ logger?.LogInformation("Releasing session lock for {@LockId}", lockId);
+ await ReleaseLock(lockId, connection, cancellationToken);
+ }
+ return true;
+ }
+
+ private static async Task TryAcquireLockAsync(long lockId, NpgsqlConnection connection, CancellationToken cancellationToken)
+ {
+ string sessionLockCommand = $"SELECT pg_try_advisory_lock({lockId})";
+ using var commandQuery = new NpgsqlCommand(sessionLockCommand, connection);
+ object? result = await commandQuery.ExecuteScalarAsync(cancellationToken);
+ if (result != null && bool.TryParse(result.ToString(), out var lockAcquired) && lockAcquired)
+ {
+ return true;
+ }
+ return false;
+ }
+
+ private static async Task ReleaseLock(long lockId, NpgsqlConnection connection, CancellationToken cancellationToke)
+ {
+ string transactionLockCommand = $"SELECT pg_advisory_unlock({lockId})";
+ using var commandQuery = new NpgsqlCommand(transactionLockCommand, connection);
+ await commandQuery.ExecuteScalarAsync(cancellationToke);
+ }
+}
\ No newline at end of file
diff --git a/src/Sa.Data.PostgreSql/Readme.md b/src/Sa.Data.PostgreSql/Readme.md
new file mode 100644
index 0000000..5846932
--- /dev/null
+++ b/src/Sa.Data.PostgreSql/Readme.md
@@ -0,0 +1,87 @@
+# IPgDataSource
+
+Предоставляет облегченный (минимальный) вариант абстракции для работы с базой данных PostgreSQL в .NET-приложениях.
+
+## ExecuteNonQuery
+
+Выполняет SQL-запрос, который не возвращает данные (например, INSERT, UPDATE, DELETE), и возвращает количество затронутых строк.
+
+```csharp
+var dataSource = new PgDataSource(new PgDataSourceSettings("YourConnectionString"));
+int affectedRows = await dataSource.ExecuteNonQuery("SELECT 2");
+Console.WriteLine($"Affected Rows: {affectedRows}");
+
+var parameters = new[]
+{
+ new NpgsqlParameter("p1", "Tom"),
+ new NpgsqlParameter("p2", 18)
+};
+
+int affectedRows = await dataSource.ExecuteNonQuery("""
+ CREATE TABLE IF NOT EXISTS users (
+ name text,
+ age int
+ );
+
+ INSERT INTO users (name, age) VALUES (@p1, @p2);
+ """, parameters);
+
+Console.WriteLine($"Affected Rows: {affectedRows}");
+```
+
+## ExecuteReader
+
+Чтение данных
+
+```csharp
+int actual = 0;
+await dataSource.ExecuteReader("SELECT 1", (reader, i) => actual = reader.GetInt32(0));
+Console.WriteLine($"Value from Database: {actual}");
+
+// get first value
+int errCount = await fixture.DataSource.ExecuteReaderFirst("select count(error_id) from outbox__$error");
+
+```
+
+## BeginBinaryImport
+
+Бинарный импорт
+
+```csharp
+public async ValueTask BulkWrite(ReadOnlyMemory> messages CancellationToken cancellationToken){
+ // Начинаем бинарный импорт
+ ulong result = await dataSource.BeginBinaryImport(sqlTemplate, async (writer, t) =>
+ {
+ // Записываем строки в импорт
+ WriteRows(writer, typeCode, messages);
+ return await writer.CompleteAsync(t);
+ }, cancellationToken);
+
+ return result;
+}
+
+private void WriteRows(NpgsqlBinaryImporter writer, ReadOnlyMemory> messages)
+{
+ foreach (OutboxMessage message in messages.Span)
+ {
+ // Генерируем уникальный идентификатор для сообщения
+ string id = idGenerator.GenId(message.PartInfo.CreatedAt);
+
+ // Начинаем новую строку для записи
+ writer.StartRow();
+
+ // Записываем данные в строку
+ writer.Write(id, NpgsqlDbType.Char); // id
+ writer.Write(message.PartInfo.TenantId, NpgsqlDbType.Integer); // tenant
+ writer.Write(message.PartInfo.Part, NpgsqlDbType.Text); // part
+
+ // Сериализуем и записываем полезную нагрузку
+ using RecyclableMemoryStream stream = streamManager.GetStream();
+ serializer.Serialize(stream, message.Payload);
+ stream.Position = 0;
+ writer.Write(stream, NpgsqlDbType.Bytea); // payload
+ writer.Write(stream.Length, NpgsqlDbType.Integer); // payload_size
+ writer.Write(message.PartInfo.CreatedAt.ToUnixTimeSeconds(), NpgsqlDbType.Bigint); // created_at
+ }
+}
+```
\ No newline at end of file
diff --git a/src/Sa.Data.PostgreSql/Sa.Data.PostgreSql.csproj b/src/Sa.Data.PostgreSql/Sa.Data.PostgreSql.csproj
new file mode 100644
index 0000000..f43529b
--- /dev/null
+++ b/src/Sa.Data.PostgreSql/Sa.Data.PostgreSql.csproj
@@ -0,0 +1,13 @@
+
+
+
+ net8.0
+ enable
+ enable
+
+
+
+
+
+
+
diff --git a/src/Sa.Data.PostgreSql/Setup.cs b/src/Sa.Data.PostgreSql/Setup.cs
new file mode 100644
index 0000000..9a122a0
--- /dev/null
+++ b/src/Sa.Data.PostgreSql/Setup.cs
@@ -0,0 +1,17 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using Sa.Data.PostgreSql.Configuration;
+
+namespace Sa.Data.PostgreSql;
+
+public static class Setup
+{
+ public static IServiceCollection AddPgDataSource(this IServiceCollection services, Action? configure = null)
+ {
+ PgDataSourceSettingsBuilder builder = new(services);
+ configure?.Invoke(builder);
+ services.TryAddSingleton();
+ services.TryAddSingleton();
+ return services;
+ }
+}
diff --git a/src/Sa.Media/Sa.Media.csproj b/src/Sa.Media/Sa.Media.csproj
new file mode 100644
index 0000000..fa71b7a
--- /dev/null
+++ b/src/Sa.Media/Sa.Media.csproj
@@ -0,0 +1,9 @@
+
+
+
+ net8.0
+ enable
+ enable
+
+
+
diff --git a/src/Sa.Media/Wav/WavFile.cs b/src/Sa.Media/Wav/WavFile.cs
new file mode 100644
index 0000000..a333d13
--- /dev/null
+++ b/src/Sa.Media/Wav/WavFile.cs
@@ -0,0 +1,404 @@
+using System.ComponentModel.DataAnnotations;
+using System.Diagnostics.CodeAnalysis;
+
+namespace Sa.Media.Wav;
+
+///
+/// Структура, описывающая заголовок WAV файла.
+///
+///
+///
+///
+
+public sealed class WavFile : IDisposable
+{
+ static class Env
+ {
+ public const UInt32 ChunkId = 0x46464952;
+ public const UInt32 WaveFormat = 0x45564157;
+ public const UInt16 WaveFormatPcm = 0x0001;
+ public const UInt32 Subchunk1IdJunk = 0x4B4E554A;
+ }
+
+ private BinaryReader? _reader;
+
+
+ ///
+ /// Содержит символы "RIFF" в ASCII кодировке
+ ///
+ public UInt32 ChunkId { get; private set; }
+
+ ///
+ /// Это оставшийся размер цепочки, начиная с этой позиции.
+ /// Иначе говоря, это размер файла - 8, то есть, исключены поля chunkId и chunkSize.
+ ///
+ public UInt32 ChunkSize { get; private set; }
+
+ ///
+ /// Содержит символы "WAVE"
+ ///
+ public UInt32 Format { get; private set; }
+
+ ///
+ /// Содержит символы "fmt "
+ ///
+ public UInt32 Subchunk1Id { get; private set; }
+
+ ///
+ /// 16 для формата PCM. (or 18)
+ /// Это оставшийся размер подцепочки, начиная с этой позиции.
+ ///
+ public UInt32 Subchunk1Size { get; private set; }
+
+ ///
+ /// Аудио формат
+ /// Для PCM = 1 (то есть, Линейное квантование).
+ /// Значения, отличающиеся от 1, обозначают некоторый формат сжатия.
+ ///
+ ///
+ public UInt16 AudioFormat { get; private set; }
+
+ ///
+ /// Количество каналов. Моно = 1, Стерео = 2 и т.д.
+ ///
+ public UInt16 NumChannels { get; private set; }
+
+ ///
+ /// Частота дискретизации. 8000 Гц, 44100 Гц и т.д.
+ ///
+ public UInt32 SampleRate { get; private set; }
+
+ ///
+ /// sampleRate * numChannels * bitsPerSample/8
+ ///
+ public UInt32 ByteRate { get; private set; }
+
+ ///
+ /// numChannels * bitsPerSample/8
+ /// Количество байт для одного сэмпла, включая все каналы.
+ ///
+ public UInt16 BlockAlign { get; private set; }
+
+ ///
+ /// Так называемая "глубиная" или точность звучания. 8 бит, 16 бит и т.д.
+ ///
+ public UInt16 BitsPerSample { get; private set; }
+
+ // Подцепочка "data" содержит аудио-данные и их размер.
+
+ ///
+ /// Содержит символы "data"
+ ///
+ public UInt32 Subchunk2Id { get; private set; }
+
+ ///
+ /// numSamples * numChannels * bitsPerSample/8
+ /// Количество байт в области данных.
+ ///
+ public int Subchunk2Size { get; private set; }
+
+ ///
+ /// Смещение к области данных
+ ///
+ public long DataOffset { get; private set; }
+
+ ///
+ /// the number of samples per channel
+ ///
+ public int SamplesPerChannel { get; private set; }
+
+ ///
+ /// Из файла
+ ///
+ public string? FileName { get; private set; }
+
+
+ public bool IsWave => IsLoaded() && ChunkId == Env.ChunkId
+ && Format == Env.WaveFormat;
+
+ public bool IsPcmWave => IsWave
+ && (Subchunk1Size == 16 || Subchunk2Size == 18)
+ && AudioFormat == Env.WaveFormatPcm;
+
+ public bool IsLoaded() => DataOffset > 0;
+
+
+ public WavFile ReadHeader(bool suppressErrors = true)
+ {
+ if (IsLoaded()) return this;
+
+ BinaryReader reader = OpenReader();
+
+ // chunk 0
+ ChunkId = reader.ReadUInt32();
+ ChunkSize = reader.ReadUInt32();
+ Format = reader.ReadUInt32();
+
+ // chunk 1
+ Subchunk1Id = reader.ReadUInt32();
+
+ // chunk 1
+ // Содержит символы "fmt "
+ // (0x666d7420 в big-endian представлении)
+ while (Subchunk1Id == Env.Subchunk1IdJunk) //JUNK
+ {
+ //skip JUNK chunks: https://www.daubnet.com/en/file-format-riff
+ UInt32 JunkSubchunk1Size = reader.ReadUInt32(); // bytes for this chunk
+ if (JunkSubchunk1Size % 2 == 1)
+ {
+ ++JunkSubchunk1Size; //When writing RIFFs, JUNK chunks should not have odd number as Size.
+ }
+ reader.ReadBytes((int)JunkSubchunk1Size);
+ Subchunk1Id = reader.ReadUInt32(); //read next subchunk
+ }
+
+
+ Subchunk1Size = reader.ReadUInt32(); // bytes for this chunk (expect 16 or 18)
+
+ // 16 bytes coming...
+ AudioFormat = reader.ReadUInt16();
+ NumChannels = reader.ReadUInt16();
+ SampleRate = reader.ReadUInt32();
+ ByteRate = reader.ReadUInt32();
+ BlockAlign = reader.ReadUInt16();
+ BitsPerSample = reader.ReadUInt16();
+
+
+ if (Subchunk1Size == 18)
+ {
+ // Read any extra values
+ int fmtExtraSize = reader.ReadInt16();
+ reader.ReadBytes(fmtExtraSize);
+ }
+
+ // chunk 2
+
+
+ while (true)
+ {
+ Subchunk2Id = reader.ReadUInt32();
+ Subchunk2Size = reader.ReadInt32();
+
+ if (Subchunk2Id == 0x5453494c)
+ {
+ //just skip LIST subchunk
+ reader.ReadBytes(Subchunk2Size);
+ continue;
+ }
+ if (Subchunk2Id == 0x524c4c46)
+ {
+ //just skip FLLR subchunk https://stackoverflow.com/questions/6284651/avaudiorecorder-doesnt-write-out-proper-wav-file-header
+ reader.ReadBytes(Subchunk2Size);
+ continue;
+ }
+
+ if (Subchunk2Id != 0x61746164)
+ {
+ if (suppressErrors) return this;
+ throw new NotImplementedException($"Bad Subchunk2Id: 0x{Subchunk2Id:x8}");
+ }
+ break;
+ }
+
+ if (Subchunk2Size == 0x7FFFFFFF)
+ {
+ //size does not set!!
+ //hack to support custom file length calculation
+ //this does not check if there are otehr subchunks after "data" in thefile
+ long sizeInBytesLong = (reader.BaseStream.Length - reader.BaseStream.Position);
+ if (sizeInBytesLong > Int32.MaxValue)
+ {
+ if (suppressErrors) return this;
+ throw new ArgumentNullException("Too long wave! " + sizeInBytesLong);
+ }
+
+ Subchunk2Size = (int)sizeInBytesLong;
+ }
+
+ // Calculate the number of samples per channel
+ SamplesPerChannel = Subchunk2Size / (BlockAlign * NumChannels);
+
+ // save start data offset
+ DataOffset = reader.BaseStream.Position;
+
+ return this;
+ }
+
+
+ public WavFile WithFileName(string filename)
+ {
+ if (filename != FileName)
+ {
+ Close();
+ FileName = filename;
+ }
+
+ return this;
+ }
+
+
+ public IEnumerable<(int channelId, byte[] sample)> ReadWave(float? cutFromSeconds = null, float? cutToSeconds = null)
+ {
+ ReadHeader();
+
+ BinaryReader reader = OpenReader();
+
+ // Calculate the byte offset for the start of the data
+ long dataOffset = DataOffset;
+
+ // Calculate the byte offset for the end of the data
+ long dataEndOffset = dataOffset + Subchunk2Size;
+
+ // Calculate the byte offset for the start of the cut
+ long cutFromOffset = dataOffset;
+ if (cutFromSeconds != null)
+ {
+ cutFromOffset += (long)(cutFromSeconds.Value * SampleRate * BlockAlign);
+ }
+
+ // Calculate the byte offset for the end of the cut
+ long cutToOffset = dataEndOffset;
+ if (cutToSeconds != null)
+ {
+ cutToOffset = dataOffset + (long)(cutToSeconds.Value * SampleRate * BlockAlign);
+ }
+
+ if (reader.BaseStream.CanSeek)
+ {
+ reader.BaseStream.Position = cutFromOffset;
+ }
+
+ // Read samples from the current channel
+ for (long i = cutFromOffset; i < cutToOffset; i += BlockAlign)
+ {
+ for (int channelId = 0; channelId < NumChannels; channelId++)
+ {
+ // Read the sample from the stream
+ byte[] sample = reader.ReadBytes(BlockAlign / NumChannels);
+ yield return (channelId, sample);
+ }
+ }
+ }
+
+ ///
+ /// Convert and return audio data in double format
+ ///
+ ///
+ ///
+ ///
+ public IEnumerable<(int channelId, double[] sample)> ReadDoubleWave(float? cutFromSeconds = null, float? cutToSeconds = null)
+ => ReadWave(cutFromSeconds, cutToSeconds)
+ .Select(c => (c.channelId, ConvertToDouble(BitsPerSample, c.sample)));
+
+
+
+ ///
+ /// для распознавалок
+ ///
+ public IEnumerable<(int channelId, byte[] sample)> ReadDoubleWaveAsByte(float? cutFromSeconds = null, float? cutToSeconds = null)
+ => ReadDoubleWave(cutFromSeconds, cutToSeconds)
+ .Select(c => (c.channelId, ConvertToByte(c.sample)));
+
+ public double GetLengthSeconds()
+ => IsLoaded() && SampleRate != 0
+ ? SamplesPerChannel / SampleRate
+ : 0;
+
+ public TimeSpan GetLength() => TimeSpan.FromSeconds(GetLengthSeconds());
+
+ public long WriteChannel([NotNull] string fileName, [Range(0, 10)] int indexChannel)
+ {
+ using FileStream fs = File.Open(fileName ?? throw new ArgumentNullException(nameof(fileName)), FileMode.OpenOrCreate);
+ return WriteChannel(fs, indexChannel);
+ }
+
+ public long WriteChannel(FileStream fs, [Range(0, 10)] int indexChannel)
+ {
+ ReadHeader();
+
+ if (!IsLoaded()) throw new NotSupportedException();
+
+ if (indexChannel >= NumChannels || indexChannel < 0) throw new ArgumentOutOfRangeException(nameof(indexChannel));
+
+ using var writer = new BinaryWriter(fs);
+ writer.Write(ChunkId);
+ writer.Write(ChunkSize);
+ writer.Write(Format);
+ writer.Write(Subchunk1Id);
+ writer.Write(Subchunk1Size);
+ writer.Write(AudioFormat);
+ writer.Write((UInt16)1); //NumChannels
+ writer.Write(SampleRate);
+ writer.Write(ByteRate);
+ writer.Write((UInt16)(BlockAlign / NumChannels));
+ writer.Write(BitsPerSample);
+ writer.Write(Subchunk2Id);
+ writer.Write(Subchunk2Size / NumChannels);
+
+
+ foreach (var (_, sample) in ReadWave().Where(c => c.channelId == indexChannel))
+ {
+ writer.Write(sample);
+ }
+
+ writer.Flush();
+ return fs.Length;
+ }
+
+ public void Close()
+ {
+ _reader?.Dispose();
+ _reader = null;
+ DataOffset = 0;
+ }
+
+ public void Dispose() => Close();
+
+ private BinaryReader OpenReader()
+ {
+ if (_reader == null)
+ {
+ FileStream fs = File.Open(FileName ?? throw new ArgumentException(nameof(FileName)), FileMode.Open);
+ _reader = new BinaryReader(fs);
+ }
+ else
+ {
+ _reader.BaseStream.Position = 0;
+ }
+ return _reader;
+ }
+
+ private static byte[] ConvertToByte(double[] data)
+ {
+ short[] array = Array.ConvertAll(data, (double e) => (short)(e * 32767.0));
+ byte[] array2 = new byte[array.Length * 2];
+ Buffer.BlockCopy(array, 0, array2, 0, array2.Length);
+ return array2;
+ }
+
+ private static double[] ConvertToDouble(ushort bitsPerSample, byte[] data)
+ {
+ int len = data.Length;
+ double[] sample;
+ switch (bitsPerSample)
+ {
+ case 64:
+ sample = new double[len / sizeof(double)];
+ Buffer.BlockCopy(data, 0, sample, 0, len);
+ break;
+ case 32:
+ float[] asFloat = new float[len / sizeof(float)];
+ Buffer.BlockCopy(data, 0, asFloat, 0, len);
+ sample = Array.ConvertAll(asFloat, e => (double)e);
+ break;
+ case 16:
+ Int16[] asInt16 = new Int16[len / sizeof(Int16)];
+ Buffer.BlockCopy(data, 0, asInt16, 0, len);
+ sample = Array.ConvertAll(asInt16, e => e / -(double)Int16.MinValue);
+ break;
+ default: throw new ArgumentException("Bad BitsPerSample: " + bitsPerSample);
+ }
+
+ return sample;
+ }
+}
diff --git a/src/Sa.Outbox.Attributes/IOutboxPayloadMessage.cs b/src/Sa.Outbox.Attributes/IOutboxPayloadMessage.cs
new file mode 100644
index 0000000..7305f3f
--- /dev/null
+++ b/src/Sa.Outbox.Attributes/IOutboxPayloadMessage.cs
@@ -0,0 +1,18 @@
+namespace Sa.Outbox.Support;
+
+///
+/// Represents a message payload in the Outbox system.
+/// This interface defines the properties that any Outbox payload message must implement.
+///
+public interface IOutboxPayloadMessage
+{
+ ///
+ /// Gets the unique identifier for the payload.
+ ///
+ string PayloadId { get; }
+
+ ///
+ /// Gets the identifier for the tenant associated with the payload.
+ ///
+ public int TenantId { get; }
+}
diff --git a/src/Sa.Outbox.Attributes/OutboxMessageAttribute.cs b/src/Sa.Outbox.Attributes/OutboxMessageAttribute.cs
new file mode 100644
index 0000000..dcc6af8
--- /dev/null
+++ b/src/Sa.Outbox.Attributes/OutboxMessageAttribute.cs
@@ -0,0 +1,21 @@
+namespace Sa.Outbox.Support;
+
+
+//
+/// An attribute used to mark classes or structs as Outbox messages.
+/// This attribute can be used to specify the part associated with the Outbox message.
+///
+/// The part identifier for the Outbox message. Default is "root".
+[AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct)]
+public class OutboxMessageAttribute(string part = "root") : Attribute
+{
+ ///
+ /// Gets the part identifier associated with the Outbox message.
+ ///
+ public string Part => part;
+
+ ///
+ /// A default instance of the with the default part value.
+ ///
+ public readonly static OutboxMessageAttribute Default = new();
+}
\ No newline at end of file
diff --git a/src/Sa.Outbox.Attributes/PingMessage.cs b/src/Sa.Outbox.Attributes/PingMessage.cs
new file mode 100644
index 0000000..1f60aa2
--- /dev/null
+++ b/src/Sa.Outbox.Attributes/PingMessage.cs
@@ -0,0 +1,10 @@
+namespace Sa.Outbox.Support;
+
+
+[OutboxMessage]
+public record PingMessage(long Payload) : IOutboxPayloadMessage
+{
+ public string PayloadId => String.Empty;
+
+ public int TenantId => 0;
+}
diff --git a/src/Sa.Outbox.Attributes/Sa.Outbox.Support.csproj b/src/Sa.Outbox.Attributes/Sa.Outbox.Support.csproj
new file mode 100644
index 0000000..fa71b7a
--- /dev/null
+++ b/src/Sa.Outbox.Attributes/Sa.Outbox.Support.csproj
@@ -0,0 +1,9 @@
+
+
+
+ net8.0
+ enable
+ enable
+
+
+
diff --git a/src/Sa.Outbox.PostgreSql/Commands/ErrorDeliveryCommand.cs b/src/Sa.Outbox.PostgreSql/Commands/ErrorDeliveryCommand.cs
new file mode 100644
index 0000000..aeb5067
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Commands/ErrorDeliveryCommand.cs
@@ -0,0 +1,58 @@
+using Npgsql;
+using Sa.Data.PostgreSql;
+using Sa.Extensions;
+
+namespace Sa.Outbox.PostgreSql.Commands;
+
+internal class ErrorDeliveryCommand(
+ IPgDataSource dataSource
+ , SqlOutboxTemplate sqlTemplate
+) : IErrorDeliveryCommand
+{
+
+ private readonly SqlCacheSplitter sqlCache = new(len => sqlTemplate.SqlError(len));
+
+ public async Task> Execute(IOutboxContext[] outboxMessages, CancellationToken cancellationToken)
+ {
+ Dictionary errors = outboxMessages
+ .Where(m => m.Exception != null)
+ .GroupBy(m => m.Exception!)
+ .Select(m => (err: m.Key, createdAt: m.First().DeliveryResult.CreatedAt.StartOfDay()))
+ .ToDictionary(e => e.err, e => new ErrorInfo(e.err.ToString().GetMurmurHash3(), e.err.GetType().Name, e.createdAt));
+
+ int len = errors.Count;
+
+ if (len == 0) return errors;
+
+ KeyValuePair[] arrErrors = [.. errors];
+
+ int startIndex = 0;
+
+ foreach ((string sql, int cnt) in sqlCache.GetSql(len))
+ {
+
+ var sliceErrors = new ArraySegment>(arrErrors, startIndex, cnt);
+
+ startIndex += cnt;
+
+ List parameters = [];
+
+ int i = 0;
+ // (@id_{i},@type_{i},@message_{i},@created_at_{i}
+ foreach ((Exception Key, ErrorInfo Value) in sliceErrors)
+ {
+ (long ErrorId, string TypeName, DateTimeOffset CreatedAt) = Value;
+
+ parameters.Add(new($"@id_{i}", ErrorId));
+ parameters.Add(new($"@type_{i}", TypeName));
+ parameters.Add(new($"@message_{i}", Key.ToString()));
+ parameters.Add(new($"@created_at_{i}", CreatedAt.ToUnixTimeSeconds()));
+ i++;
+ }
+
+ await dataSource.ExecuteNonQuery(sql, [.. parameters], cancellationToken);
+ }
+
+ return errors;
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/Commands/ExtendDeliveryCommand.cs b/src/Sa.Outbox.PostgreSql/Commands/ExtendDeliveryCommand.cs
new file mode 100644
index 0000000..63b3395
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Commands/ExtendDeliveryCommand.cs
@@ -0,0 +1,32 @@
+using Sa.Data.PostgreSql;
+using Sa.Outbox.PostgreSql.TypeHashResolve;
+
+namespace Sa.Outbox.PostgreSql.Commands;
+
+internal class ExtendDeliveryCommand(
+ IPgDataSource dataSource
+ , IMsgTypeHashResolver hashResolver
+ , SqlOutboxTemplate sqlTemplate
+) : IExtendDeliveryCommand
+{
+ public async Task Execute(TimeSpan lockExpiration, OutboxMessageFilter filter, CancellationToken cancellationToken)
+ {
+
+ long typeCode = await hashResolver.GetCode(filter.PayloadType, cancellationToken);
+ long now = filter.NowDate.ToUnixTimeSeconds();
+ long lockExpiresOn = (filter.NowDate + lockExpiration).ToUnixTimeSeconds();
+ long fromDate = filter.FromDate.ToUnixTimeSeconds();
+
+ return await dataSource.ExecuteNonQuery(sqlTemplate.SqlExtendDelivery,
+ [
+ new("tenant", filter.TenantId)
+ , new("part", filter.Part)
+ , new("from_date", fromDate)
+ , new("transact_id", filter.TransactId)
+ , new("payload_type", typeCode)
+ , new("lock_expires_on", lockExpiresOn)
+ , new("now", now)
+ ]
+ , cancellationToken);
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/Commands/FinishDeliveryCommand.cs b/src/Sa.Outbox.PostgreSql/Commands/FinishDeliveryCommand.cs
new file mode 100644
index 0000000..c8b09ee
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Commands/FinishDeliveryCommand.cs
@@ -0,0 +1,86 @@
+using Npgsql;
+using Sa.Data.PostgreSql;
+using Sa.Outbox.PostgreSql.IdGen;
+
+namespace Sa.Outbox.PostgreSql.Commands;
+
+internal class FinishDeliveryCommand(
+ IPgDataSource dataSource
+ , SqlOutboxTemplate sqlTemplate
+ , IIdGenerator idGenerator
+) : IFinishDeliveryCommand
+{
+ const int IndexParamsCount = 7;
+ const int ConstParamsCount = 4;
+
+
+ private readonly SqlCacheSplitter sqlCache = new(len => sqlTemplate.SqlFinishDelivery(len));
+
+ public async Task Execute(
+ IOutboxContext[] outboxMessages,
+ IReadOnlyDictionary errors,
+ OutboxMessageFilter filter,
+ CancellationToken cancellationToken)
+ {
+ if (outboxMessages.Length == 0) return 0;
+
+ int total = 0;
+
+ int startIndex = 0;
+ foreach ((string sql, int len) in sqlCache.GetSql(outboxMessages.Length))
+ {
+ var segment = new ArraySegment>(outboxMessages, startIndex, len);
+ startIndex += len;
+
+ NpgsqlParameter[] parameters = GetSqlParams(segment, errors, filter);
+ total += await dataSource.ExecuteNonQuery(sql, parameters, cancellationToken);
+ }
+
+ return total;
+ }
+
+ private NpgsqlParameter[] GetSqlParams(
+ ArraySegment> sliceContext,
+ IReadOnlyDictionary errors,
+ OutboxMessageFilter filter)
+ {
+ NpgsqlParameter[] parameters = new NpgsqlParameter[sliceContext.Count * IndexParamsCount + ConstParamsCount];
+
+ int j = 0;
+ foreach (IOutboxContext context in sliceContext)
+ {
+ DateTimeOffset createdAt = context.DeliveryResult.CreatedAt;
+ string id = idGenerator.GenId(createdAt);
+ string msg = context.DeliveryResult.Message;
+ long lockExpiresOn = (createdAt + context.PostponeAt).ToUnixTimeSeconds();
+
+ string errorId = String.Empty;
+ Exception? error = context.Exception;
+ if (error != null)
+ {
+ errorId = errors[error].ErrorId.ToString();
+ if (string.IsNullOrEmpty(msg))
+ {
+ msg = error.Message;
+ }
+ }
+ // (@id_{i},@outbox_id_{i},@error_id_{i},@status_code_{i},@status_message_{i},@lock_expires_on_{i},@created_at_{i}
+ parameters[j] = new($"@p{j}", id); j++;
+ parameters[j] = new($"@p{j}", context.OutboxId); j++;
+ parameters[j] = new($"@p{j}", errorId); j++;
+ parameters[j] = new($"@p{j}", context.DeliveryResult.Code); j++;
+ parameters[j] = new($"@p{j}", msg); j++;
+ parameters[j] = new($"@p{j}", lockExpiresOn); j++;
+ parameters[j] = new($"@p{j}", createdAt.ToUnixTimeSeconds()); j++;
+ }
+
+ //@tenant AND @part AND @from_date AND @transact_id AND @created_at
+
+ parameters[j++] = new("tnt", filter.TenantId);
+ parameters[j++] = new("prt", filter.Part);
+ parameters[j++] = new("from_date", filter.FromDate.ToUnixTimeSeconds());
+ parameters[j] = new("tid", filter.TransactId);
+
+ return parameters;
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/Commands/IErrorDeliveryCommand.cs b/src/Sa.Outbox.PostgreSql/Commands/IErrorDeliveryCommand.cs
new file mode 100644
index 0000000..bf862be
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Commands/IErrorDeliveryCommand.cs
@@ -0,0 +1,11 @@
+
+namespace Sa.Outbox.PostgreSql.Commands;
+
+
+public record struct ErrorInfo(long ErrorId, string TypeName, DateTimeOffset CreatedAt);
+
+
+internal interface IErrorDeliveryCommand
+{
+ Task> Execute(IOutboxContext[] outboxMessages, CancellationToken cancellationToken);
+}
diff --git a/src/Sa.Outbox.PostgreSql/Commands/IExtendDeliveryCommand.cs b/src/Sa.Outbox.PostgreSql/Commands/IExtendDeliveryCommand.cs
new file mode 100644
index 0000000..ccbcc67
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Commands/IExtendDeliveryCommand.cs
@@ -0,0 +1,7 @@
+
+namespace Sa.Outbox.PostgreSql.Commands;
+
+internal interface IExtendDeliveryCommand
+{
+ Task Execute(TimeSpan lockExpiration, OutboxMessageFilter filter, CancellationToken cancellationToken);
+}
diff --git a/src/Sa.Outbox.PostgreSql/Commands/IFinishDeliveryCommand.cs b/src/Sa.Outbox.PostgreSql/Commands/IFinishDeliveryCommand.cs
new file mode 100644
index 0000000..19571ac
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Commands/IFinishDeliveryCommand.cs
@@ -0,0 +1,6 @@
+namespace Sa.Outbox.PostgreSql.Commands;
+
+internal interface IFinishDeliveryCommand
+{
+ Task Execute(IOutboxContext[] outboxMessages, IReadOnlyDictionary errors, OutboxMessageFilter filter, CancellationToken cancellationToken);
+}
diff --git a/src/Sa.Outbox.PostgreSql/Commands/IOutboxBulkCommand.cs b/src/Sa.Outbox.PostgreSql/Commands/IOutboxBulkCommand.cs
new file mode 100644
index 0000000..55adfb2
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Commands/IOutboxBulkCommand.cs
@@ -0,0 +1,6 @@
+namespace Sa.Outbox.PostgreSql.Commands;
+
+internal interface IOutboxBulkCommand
+{
+ ValueTask BulkWrite(string payloadType, ReadOnlyMemory> messages, CancellationToken cancellationToken);
+}
diff --git a/src/Sa.Outbox.PostgreSql/Commands/IStartDeliveryCommand.cs b/src/Sa.Outbox.PostgreSql/Commands/IStartDeliveryCommand.cs
new file mode 100644
index 0000000..09ffefd
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Commands/IStartDeliveryCommand.cs
@@ -0,0 +1,6 @@
+namespace Sa.Outbox.PostgreSql.Commands;
+
+internal interface IStartDeliveryCommand
+{
+ Task Execute(Memory> writeBuffer, int batchSize, TimeSpan lockDuration, OutboxMessageFilter filter, CancellationToken cancellationToken);
+}
\ No newline at end of file
diff --git a/src/Sa.Outbox.PostgreSql/Commands/OutboxBulkCommand.cs b/src/Sa.Outbox.PostgreSql/Commands/OutboxBulkCommand.cs
new file mode 100644
index 0000000..1681d70
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Commands/OutboxBulkCommand.cs
@@ -0,0 +1,70 @@
+using Microsoft.IO;
+using Npgsql;
+using NpgsqlTypes;
+using Sa.Data.PostgreSql;
+using Sa.Outbox.PostgreSql.IdGen;
+using Sa.Outbox.PostgreSql.Serialization;
+using Sa.Outbox.PostgreSql.TypeHashResolve;
+
+namespace Sa.Outbox.PostgreSql.Commands;
+
+
+internal class OutboxBulkCommand(
+ IPgDataSource dataSource
+ , SqlOutboxTemplate sqlTemplate
+ , RecyclableMemoryStreamManager streamManager
+ , IOutboxMessageSerializer serializer
+ , IIdGenerator idGenerator
+ , IMsgTypeHashResolver hashResolver
+) : IOutboxBulkCommand
+{
+ public async ValueTask BulkWrite(string payloadType, ReadOnlyMemory> messages, CancellationToken cancellationToken)
+ {
+ long typeCode = await hashResolver.GetCode(payloadType, cancellationToken);
+
+ ulong result = await dataSource.BeginBinaryImport(sqlTemplate.SqlBulkOutboxCopy, async (writer, t) =>
+ {
+ WriteRows(writer, typeCode, messages);
+
+ return await writer.CompleteAsync(t);
+
+ }, cancellationToken);
+
+ return result;
+ }
+
+ private void WriteRows(NpgsqlBinaryImporter writer, long payloadTypeCode, ReadOnlyMemory> messages)
+ {
+ foreach (OutboxMessage row in messages.Span)
+ {
+ string id = idGenerator.GenId(row.PartInfo.CreatedAt);
+
+ writer.StartRow();
+
+
+ // id
+ writer.Write(id, NpgsqlDbType.Char);
+ // tenant
+ writer.Write(row.PartInfo.TenantId, NpgsqlDbType.Integer);
+ // part
+ writer.Write(row.PartInfo.Part, NpgsqlDbType.Text);
+
+
+ // payload_id
+ writer.Write(row.PayloadId, NpgsqlDbType.Text);
+ // payload_type
+ writer.Write(payloadTypeCode, NpgsqlDbType.Bigint);
+ // payload
+ using RecyclableMemoryStream stream = streamManager.GetStream();
+ serializer.Serialize(stream, row.Payload);
+ stream.Position = 0;
+ writer.Write(stream, NpgsqlDbType.Bytea);
+ // payload_size
+ writer.Write(stream.Length, NpgsqlDbType.Integer);
+
+
+ // created_at
+ writer.Write(row.PartInfo.CreatedAt.ToUnixTimeSeconds(), NpgsqlDbType.Bigint);
+ }
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/Commands/Setup.cs b/src/Sa.Outbox.PostgreSql/Commands/Setup.cs
new file mode 100644
index 0000000..f43fa4c
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Commands/Setup.cs
@@ -0,0 +1,17 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+
+namespace Sa.Outbox.PostgreSql.Commands;
+
+internal static class Setup
+{
+ public static IServiceCollection AddOutboxCommands(this IServiceCollection services)
+ {
+ services.TryAddSingleton();
+ services.TryAddSingleton();
+ services.TryAddSingleton();
+ services.TryAddSingleton();
+ services.TryAddSingleton();
+ return services;
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/Commands/SqlCacheSplitter.cs b/src/Sa.Outbox.PostgreSql/Commands/SqlCacheSplitter.cs
new file mode 100644
index 0000000..b82ff5b
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Commands/SqlCacheSplitter.cs
@@ -0,0 +1,49 @@
+namespace Sa.Outbox.PostgreSql.Commands;
+
+internal class SqlCacheSplitter(Func genSql)
+{
+ private readonly Dictionary _sqlCache = [];
+
+ public IEnumerable<(string sql, int len)> GetSql(int len, int maxLen = 4096)
+ {
+ if (len <= 0)
+ {
+ yield break;
+ }
+
+ int multipleOf16 = len / 16 * 16;
+
+ if (multipleOf16 > maxLen)
+ {
+ int multipleOfMax = multipleOf16 / maxLen;
+
+ for (int i = 0; i < multipleOfMax; i++)
+ {
+ yield return GetOrAdd(maxLen);
+ }
+ }
+ else if (multipleOf16 > 0)
+ {
+ yield return GetOrAdd(multipleOf16);
+ }
+
+ int diff = len - multipleOf16;
+
+ if (diff > 0)
+ {
+ yield return GetOrAdd(diff);
+ }
+ }
+
+ private (string, int) GetOrAdd(int len)
+ {
+ if (!_sqlCache.TryGetValue(len, out string? sql))
+ {
+ sql = genSql(len);
+ _sqlCache[len] = sql;
+ }
+
+ return (sql, len);
+ }
+}
+
diff --git a/src/Sa.Outbox.PostgreSql/Commands/StartDeliveryCommand.cs b/src/Sa.Outbox.PostgreSql/Commands/StartDeliveryCommand.cs
new file mode 100644
index 0000000..4c64871
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Commands/StartDeliveryCommand.cs
@@ -0,0 +1,94 @@
+using Npgsql;
+using Sa.Data.PostgreSql;
+using Sa.Extensions;
+using Sa.Outbox.PostgreSql.Serialization;
+using Sa.Outbox.PostgreSql.TypeHashResolve;
+using System.Data;
+
+namespace Sa.Outbox.PostgreSql.Commands;
+
+internal class StartDeliveryCommand(
+ IPgDataSource dataSource
+ , SqlOutboxTemplate sqlTemplate
+ , IOutboxMessageSerializer serializer
+ , IMsgTypeHashResolver hashResolver
+) : IStartDeliveryCommand
+{
+ public async Task Execute(Memory> writeBuffer, int batchSize, TimeSpan lockDuration, OutboxMessageFilter filter, CancellationToken cancellationToken)
+ {
+
+ long typeCode = await hashResolver.GetCode(filter.PayloadType, cancellationToken);
+
+
+ return await dataSource.ExecuteReader(sqlTemplate.SqlLockAndSelect, (reader, i) =>
+ {
+ OutboxDeliveryMessage deliveryMessage = DeliveryReader.Read(reader, serializer);
+
+ writeBuffer.Span[i] = deliveryMessage;
+ },
+ [
+ new("tenant", filter.TenantId)
+ , new("part", filter.Part)
+ , new("from_date", filter.FromDate.ToUnixTimeSeconds())
+ , new("payload_type", typeCode)
+ , new("transact_id", filter.TransactId)
+ , new("limit", batchSize)
+ , new("lock_expires_on", (filter.NowDate + lockDuration).ToUnixTimeSeconds())
+ , new("now", filter.NowDate.ToUnixTimeSeconds())
+ ]
+ , cancellationToken);
+ }
+
+
+ internal static class DeliveryReader
+ {
+ public static OutboxDeliveryMessage Read(NpgsqlDataReader reader, IOutboxMessageSerializer serializer)
+ {
+ string outboxId = reader.GetString("outbox_id");
+ string payloadId = reader.GetString("outbox_payload_id");
+
+ TMessage payload = ReadPayload(reader, serializer);
+ OutboxPartInfo outboxPart = ReadOutboxPart(reader);
+ OutboxDeliveryInfo deliveryInfo = ReadDeliveryInfo(reader);
+
+ return new OutboxDeliveryMessage(outboxId, payloadId, payload, outboxPart, deliveryInfo);
+ }
+
+
+ private static OutboxPartInfo ReadOutboxPart(NpgsqlDataReader reader)
+ {
+ return new OutboxPartInfo(
+ reader.GetInt32("outbox_tenant")
+ , reader.GetString("outbox_part")
+ , reader.GetInt64("outbox_created_at").ToDateTimeOffsetFromUnixTimestamp()
+ );
+ }
+
+ private static OutboxDeliveryInfo ReadDeliveryInfo(NpgsqlDataReader reader)
+ {
+ return new OutboxDeliveryInfo(
+ reader.GetString("outbox_delivery_id")
+ , reader.GetInt32("outbox_delivery_attempt")
+ , reader.GetString("outbox_delivery_error_id")
+ , ReadStatus(reader)
+ , reader.GetInt64("outbox_delivery_created_at").ToDateTimeOffsetFromUnixTimestamp()
+ );
+ }
+
+ private static TMessage ReadPayload(NpgsqlDataReader reader, IOutboxMessageSerializer serializer)
+ {
+ using Stream stream = reader.GetStream("outbox_payload");
+ TMessage payload = serializer.Deserialize(stream)!;
+ return payload;
+ }
+
+
+ private static DeliveryStatus ReadStatus(NpgsqlDataReader reader)
+ {
+ int code = reader.GetInt32("outbox_delivery_status_code");
+ string message = reader.GetString("outbox_delivery_status_message");
+ DateTimeOffset createAt = reader.GetInt64("outbox_delivery_created_at").ToDateTimeOffsetFromUnixTimestamp();
+ return new DeliveryStatus(code, message, createAt);
+ }
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/Configuration/IPgOutboxConfiguration.cs b/src/Sa.Outbox.PostgreSql/Configuration/IPgOutboxConfiguration.cs
new file mode 100644
index 0000000..359e99e
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Configuration/IPgOutboxConfiguration.cs
@@ -0,0 +1,9 @@
+using Sa.Data.PostgreSql;
+
+namespace Sa.Outbox.PostgreSql;
+
+public interface IPgOutboxConfiguration
+{
+ IPgOutboxConfiguration WithPgOutboxSettings(Action? configure = null);
+ IPgOutboxConfiguration AddDataSource(Action? configure = null);
+}
diff --git a/src/Sa.Outbox.PostgreSql/Configuration/PgOutboxConfiguration.cs b/src/Sa.Outbox.PostgreSql/Configuration/PgOutboxConfiguration.cs
new file mode 100644
index 0000000..8e51402
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Configuration/PgOutboxConfiguration.cs
@@ -0,0 +1,62 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using Sa.Data.PostgreSql;
+using System.Collections.Concurrent;
+
+namespace Sa.Outbox.PostgreSql.Configuration;
+
+internal class PgOutboxConfiguration(IServiceCollection services) : IPgOutboxConfiguration
+{
+ private static readonly ConcurrentDictionary>> s_invokers = [];
+
+ public IPgOutboxConfiguration WithPgOutboxSettings(Action? configure = null)
+ {
+ if (configure != null)
+ {
+ if (s_invokers.TryGetValue(services, out var invokers))
+ {
+ invokers.Add(configure);
+ }
+ else
+ {
+ s_invokers[services] = [configure];
+ }
+ }
+
+
+ services.TryAddSingleton(sp =>
+ {
+ PgOutboxSettings settings = new();
+
+ if (s_invokers.TryGetValue(services, out var invokers))
+ {
+ foreach (Action build in invokers)
+ build.Invoke(sp, settings);
+
+ s_invokers.Remove(services, out _);
+ }
+
+ return settings;
+ });
+
+ AddSettings();
+ return this;
+ }
+
+ public IPgOutboxConfiguration AddDataSource(Action? configure = null)
+ {
+ services.AddPgDataSource(configure);
+ return this;
+ }
+
+
+ private void AddSettings()
+ {
+ services.TryAddSingleton(sp => sp.GetRequiredService().SerializationSettings);
+
+ services.TryAddSingleton(sp => sp.GetRequiredService().TableSettings);
+ services.TryAddSingleton(sp => sp.GetRequiredService().CacheSettings);
+ services.TryAddSingleton(sp => sp.GetRequiredService().MigrationSettings);
+ services.TryAddSingleton(sp => sp.GetRequiredService().CleanupSettings);
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/Configuration/PgOutboxSettings.cs b/src/Sa.Outbox.PostgreSql/Configuration/PgOutboxSettings.cs
new file mode 100644
index 0000000..b9a840c
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Configuration/PgOutboxSettings.cs
@@ -0,0 +1,171 @@
+using System.Text.Json;
+
+namespace Sa.Outbox.PostgreSql;
+
+///
+/// Represents the settings for the PostgreSQL Outbox configuration.
+/// This class contains various settings related to table configuration, serialization, caching, migration, and cleanup.
+///
+public class PgOutboxSettings
+{
+ ///
+ /// Gets the settings related to the Outbox table configuration.
+ ///
+ public PgOutboxTableSettings TableSettings { get; } = new();
+
+ ///
+ /// Gets the settings related to serialization of messages.
+ ///
+ public PgOutboxSerializeSettings SerializationSettings { get; } = new();
+
+ ///
+ /// Gets the settings related to caching of message types.
+ ///
+ public PgOutboxCacheSettings CacheSettings { get; } = new();
+
+ ///
+ /// Gets the settings related to migration of the Outbox schema.
+ ///
+ public PgOutboxMigrationSettings MigrationSettings { get; } = new();
+
+ ///
+ /// Gets the settings related to cleanup of old Outbox messages and parts.
+ ///
+ public PgOutboxCleanupSettings CleanupSettings { get; } = new();
+}
+
+
+///
+/// Represents the settings for configuring the Outbox tables in PostgreSQL.
+///
+public class PgOutboxTableSettings
+{
+ ///
+ /// Gets or sets the name of the database schema.
+ /// Default is set to "public".
+ ///
+ public string DatabaseSchemaName { get; set; } = "public";
+
+ ///
+ /// Gets or sets the name of the Outbox table.
+ /// Default is set to "outbox".
+ ///
+ public string DatabaseOutboxTableName { get; set; } = "outbox";
+
+ ///
+ /// Gets or sets the name of the delivery table.
+ /// Default is set to "outbox__$delivery".
+ ///
+ public string DatabaseDeliveryTableName { get; set; } = "outbox__$delivery";
+
+ ///
+ /// Gets or sets the name of the type table.
+ /// Default is set to "outbox__$type".
+ ///
+ public string DatabaseTypeTableName { get; set; } = "outbox__$type";
+
+ ///
+ /// Gets or sets the name of the error table.
+ /// Default is set to "outbox__$error".
+ ///
+ public string DatabaseErrorTableName { get; set; } = "outbox__$error";
+
+ ///
+ /// Gets the fully qualified name of the Outbox table, including the schema.
+ ///
+ /// The qualified name of the Outbox table.
+ public string GetQualifiedOutboxTableName() => $@"{DatabaseSchemaName}.""{DatabaseOutboxTableName}""";
+
+ ///
+ /// Gets the fully qualified name of the delivery table, including the schema.
+ ///
+ /// The qualified name of the delivery table.
+ public string GetQualifiedDeliveryTableName() => $@"{DatabaseSchemaName}.""{DatabaseDeliveryTableName}""";
+
+ ///
+ /// Gets the fully qualified name of the type table, including the schema.
+ ///
+ /// The qualified name of the type table.
+ public string GetQualifiedTypeTableName() => $@"{DatabaseSchemaName}.""{DatabaseTypeTableName}""";
+
+ ///
+ /// Gets the fully qualified name of the error table, including the schema.
+ ///
+ /// The qualified name of the error table.
+ public string GetQualifiedErrorTableName() => $@"{DatabaseSchemaName}.""{DatabaseErrorTableName}""";
+}
+
+///
+/// Represents the settings for serialization of messages in the Outbox.
+///
+public class PgOutboxSerializeSettings
+{
+ public JsonSerializerOptions? JsonSerializerOptions { get; set; }
+}
+
+
+///
+/// Represents the settings for caching message types in the Outbox.
+///
+public class PgOutboxCacheSettings
+{
+ ///
+ /// Gets or sets the duration for which message types are cached.
+ /// Default is set to 1 day.
+ ///
+ public TimeSpan CacheTypeDuration { get; set; } = TimeSpan.FromDays(1);
+}
+
+///
+/// Represents the settings for migrating the Outbox schema in PostgreSQL.
+///
+public class PgOutboxMigrationSettings
+{
+ ///
+ /// Gets or sets a value indicating whether the migration should be executed as a background job.
+ /// Default is set to true, meaning the migration will run as a job.
+ ///
+ public bool AsJob { get; set; } = true;
+
+ ///
+ /// Gets or sets the number of days to move forward during the migration process.
+ /// Default is set to 2 days.
+ ///
+ public int ForwardDays { get; set; } = 2;
+
+ ///
+ /// Gets or sets the interval at which the migration job will be executed.
+ /// Default is set to every 4 hours, with a random additional delay of up to 59 minutes.
+ ///
+ public TimeSpan ExecutionInterval { get; set; } = TimeSpan
+ .FromHours(4)
+ .Add(TimeSpan.FromMinutes(Random.Shared.Next(1, 59)));
+}
+
+
+///
+/// Represents the settings for cleaning up old Outbox messages and parts in PostgreSQL.
+/// This class contains configuration options for how and when the cleanup should occur.
+///
+public class PgOutboxCleanupSettings
+{
+ ///
+ /// Gets or sets a value indicating whether the cleanup should be executed as a background job.
+ /// Default is set to false, meaning the cleanup will not run as a job.
+ ///
+ public bool AsJob { get; set; } = false;
+
+ ///
+ /// Gets or sets the duration after which old parts will be dropped.
+ /// Default is set to 30 days.
+ ///
+ public TimeSpan DropPartsAfterRetention { get; set; } = TimeSpan.FromDays(30);
+
+ ///
+ /// Gets or sets the interval at which the cleanup job will be executed.
+ /// Default is set to every 4 hours, with a random additional delay of up to 59 minutes.
+ ///
+ public TimeSpan ExecutionInterval { get; set; } = TimeSpan
+ .FromHours(4)
+ .Add(TimeSpan.FromMinutes(Random.Shared.Next(1, 59)));
+}
diff --git a/src/Sa.Outbox.PostgreSql/Configuration/Setup.cs b/src/Sa.Outbox.PostgreSql/Configuration/Setup.cs
new file mode 100644
index 0000000..a6f28f3
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Configuration/Setup.cs
@@ -0,0 +1,19 @@
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Sa.Outbox.PostgreSql.Configuration;
+
+internal static class Setup
+{
+ public static IServiceCollection AddPgOutboxSettings(this IServiceCollection services, Action? configure = null)
+ {
+ var cfg = new PgOutboxConfiguration(services);
+ configure?.Invoke(cfg);
+
+ cfg
+ .WithPgOutboxSettings()
+ .AddDataSource()
+ ;
+
+ return services;
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/GlobalSuppressions.cs b/src/Sa.Outbox.PostgreSql/GlobalSuppressions.cs
new file mode 100644
index 0000000..0157fed
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/GlobalSuppressions.cs
@@ -0,0 +1,8 @@
+// This file is used by Code Analysis to maintain SuppressMessage
+// attributes that are applied to this project.
+// Project-level suppressions either have no target or are given
+// a specific target and scoped to a namespace, type, member, etc.
+
+using System.Diagnostics.CodeAnalysis;
+
+[assembly: SuppressMessage("Style", "IDE0130:Namespace does not match folder structure")]
diff --git a/src/Sa.Outbox.PostgreSql/IdGen/IIdGenerator.cs b/src/Sa.Outbox.PostgreSql/IdGen/IIdGenerator.cs
new file mode 100644
index 0000000..a89cc5b
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/IdGen/IIdGenerator.cs
@@ -0,0 +1,6 @@
+namespace Sa.Outbox.PostgreSql.IdGen;
+
+public interface IIdGenerator
+{
+ string GenId(DateTimeOffset date);
+}
diff --git a/src/Sa.Outbox.PostgreSql/IdGen/IdGenerator.cs b/src/Sa.Outbox.PostgreSql/IdGen/IdGenerator.cs
new file mode 100644
index 0000000..095fd9f
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/IdGen/IdGenerator.cs
@@ -0,0 +1,6 @@
+namespace Sa.Outbox.PostgreSql.IdGen;
+
+internal class IdGenerator : IIdGenerator
+{
+ public string GenId(DateTimeOffset date) => Ulid.NewUlid(date).ToString();
+}
diff --git a/src/Sa.Outbox.PostgreSql/IdGen/Setup.cs b/src/Sa.Outbox.PostgreSql/IdGen/Setup.cs
new file mode 100644
index 0000000..d78d13a
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/IdGen/Setup.cs
@@ -0,0 +1,14 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+
+namespace Sa.Outbox.PostgreSql.IdGen;
+
+internal static class Setup
+{
+ public static IServiceCollection AddIdGen(this IServiceCollection services)
+ {
+ services.TryAddSingleton();
+
+ return services;
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/Interceptors/DeliveryJobInterceptor.cs b/src/Sa.Outbox.PostgreSql/Interceptors/DeliveryJobInterceptor.cs
new file mode 100644
index 0000000..48344b6
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Interceptors/DeliveryJobInterceptor.cs
@@ -0,0 +1,18 @@
+using Sa.Outbox.Job;
+using Sa.Partitional.PostgreSql;
+using Sa.Schedule;
+
+namespace Sa.Outbox.PostgreSql.Interceptors;
+
+internal class DeliveryJobInterceptor(IPartMigrationService migrationService) : IOutboxJobInterceptor
+{
+ public async Task OnHandle(IJobContext context, Func next, object? key, CancellationToken cancellationToken)
+ {
+ if (!migrationService.OutboxMigrated.IsCancellationRequested && context.Settings.JobType.Name.StartsWith("DeliveryJob"))
+ {
+ return;
+ }
+
+ await next();
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/Interceptors/Setup.cs b/src/Sa.Outbox.PostgreSql/Interceptors/Setup.cs
new file mode 100644
index 0000000..f51c5f3
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Interceptors/Setup.cs
@@ -0,0 +1,14 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using Sa.Outbox.Job;
+
+namespace Sa.Outbox.PostgreSql.Interceptors;
+
+internal static class Setup
+{
+ public static IServiceCollection AddOutboxJobInterceptors(this IServiceCollection services)
+ {
+ services.TryAddSingleton();
+ return services;
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/Partitional/OutboxMigrationSupport.cs b/src/Sa.Outbox.PostgreSql/Partitional/OutboxMigrationSupport.cs
new file mode 100644
index 0000000..ef2c514
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Partitional/OutboxMigrationSupport.cs
@@ -0,0 +1,19 @@
+using Sa.Classes;
+using Sa.Outbox.Partitional;
+using Sa.Partitional.PostgreSql;
+
+namespace Sa.Outbox.PostgreSql.Partitional;
+
+internal class OutboxMigrationSupport(IOutboxPartitionalSupport? partitionalSupport = null) : IPartTableMigrationSupport
+{
+ public async Task GetPartValues(CancellationToken cancellationToken)
+ {
+ if (partitionalSupport == null) return [];
+
+ IReadOnlyCollection parts = await partitionalSupport.GetPartValues(cancellationToken);
+
+ return parts
+ .Select(c => new StrOrNum[] { c.TenantId, c.Part })
+ .ToArray();
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/Partitional/Setup.cs b/src/Sa.Outbox.PostgreSql/Partitional/Setup.cs
new file mode 100644
index 0000000..58935c8
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Partitional/Setup.cs
@@ -0,0 +1,66 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using Sa.Partitional.PostgreSql;
+
+namespace Sa.Outbox.PostgreSql.Partitional;
+
+internal static class Setup
+{
+ public static IServiceCollection AddOutboxPartitional(this IServiceCollection services)
+ {
+ services.TryAddSingleton();
+
+ services.AddPartitional((sp, builder) =>
+ {
+ SqlOutboxTemplate sql = sp.GetRequiredService();
+ IPartTableMigrationSupport? migrationSupport = sp.GetService();
+
+ builder.AddSchema(sql.DatabaseSchemaName, schema =>
+ {
+ ITableBuilder outboxTableBuilder = schema
+ .AddTable(sql.DatabaseOutboxTableName, SqlOutboxTemplate.OutboxFields)
+ .PartByList("outbox_tenant", "outbox_part")
+ .TimestampAs("outbox_created_at")
+ .AddPostSql(() => sql.SqlCreateTypeTable)
+ ;
+
+ ITableBuilder deliveryTableBuilder = schema
+ .AddTable(sql.DatabaseDeliveryTableName, SqlOutboxTemplate.DeliveryFields)
+ .PartByList("delivery_tenant", "delivery_part")
+ .TimestampAs("delivery_created_at")
+ ;
+
+ ITableBuilder errorTableBuilder = schema
+ .AddTable(sql.DatabaseErrorTableName, SqlOutboxTemplate.ErrorFields)
+ .TimestampAs("error_created_at")
+ ;
+
+ if (migrationSupport != null)
+ {
+ outboxTableBuilder.AddMigration(migrationSupport);
+ deliveryTableBuilder.AddMigration(migrationSupport);
+ }
+
+ errorTableBuilder.AddMigration();
+ })
+ ;
+ })
+ .AddPartMigrationSchedule((sp, opts) =>
+ {
+ PgOutboxMigrationSettings settings = sp.GetRequiredService();
+ opts.AsJob = settings.AsJob;
+ opts.ExecutionInterval = settings.ExecutionInterval;
+ opts.ForwardDays = settings.ForwardDays;
+ })
+ .AddPartCleanupSchedule((sp, opts) =>
+ {
+ PgOutboxCleanupSettings settings = sp.GetRequiredService();
+ opts.AsJob = settings.AsJob;
+ opts.ExecutionInterval = settings.ExecutionInterval;
+ opts.DropPartsAfterRetention = settings.DropPartsAfterRetention;
+ })
+ ;
+
+ return services;
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/Readme.md b/src/Sa.Outbox.PostgreSql/Readme.md
new file mode 100644
index 0000000..8ec6622
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Readme.md
@@ -0,0 +1,118 @@
+# Sa.Outbox.PostgreSql
+
+Предназначен для реализации паттерна Outbox с использованием PostgreSQL, который используется для обеспечения надежной доставки сообщений в распределенных системах. Он помогает избежать потери сообщений и гарантирует, что сообщения будут обработаны даже в случае сбоев.
+
+## Основные функции
+- **Надежная доставка сообщений**: Обеспечивает сохранение сообщений в базе данных до их успешной обработки.
+- **Поддержка транзакций**: Позволяет отправлять сообщения в рамках одной транзакции с изменениями в базе данных.
+- **Гибкость**: Поддерживает различные типы сообщений и их обработчиков.
+- **Параллельная обработка**: Позволяет обрабатывать сообщения параллельно, что увеличивает производительность системы.
+
+
+
+## Примеры
+
+### Пример конфигурирования
+
+```csharp
+using Microsoft.Extensions.DependencyInjection;
+using Sa.Outbox;
+using Sa.Outbox.PostgreSql;
+
+public class Startup
+{
+ public void ConfigureServices(IServiceCollection services)
+ {
+ // Конфигурация Outbox
+ services.AddOutbox(builder =>
+ {
+ builder.WithDeliveries(deliveryBuilder =>
+ {
+ deliveryBuilder.AddDelivery();
+ });
+
+ builder.WithPartitioningSupport((serviceProvider, partSettings) =>
+ {
+ // Пример настройки для обработки сообщений для каждого арендатора
+ partSettings.ForEachTenant = true;
+ partSettings.GetTenantIds = async cancellationToken =>
+ {
+ // Логика получения идентификаторов арендаторов
+ return await Task.FromResult(new int[] { 1, 2 });
+ };
+ });
+ });
+
+ // Подключение Outbox с использованием PostgreSQL
+ services.AddOutboxUsingPostgreSql(cfg =>
+ {
+ cfg.AddDataSource(c => c.WithConnectionString("Host=my_host;Database=my_db;Username=my_user;Password=my_password"));
+ cfg.WithPgOutboxSettings((_, settings) =>
+ {
+ // Установка схемы базы данных
+ settings.TableSettings.DatabaseSchemaName = "public";
+ // Настройка очистки
+ settings.CleanupSettings.DropPartsAfterRetention = TimeSpan.FromDays(30);
+ });
+ });
+ }
+}
+```
+
+### Пример потребителя сообщений
+
+```csharp
+using Sa.Outbox;
+
+namespace MyNamespace
+{
+ // Пример сообщения, которое будет отправляться через Outbox
+ [OutboxMessage]
+ public record MyMessage(string PayloadId, string Content) : IOutboxPayloadMessage
+ {
+ public int TenantId { get; init; } // Идентификатор арендатора
+ }
+
+ // Пример потребителя, который будет обрабатывать сообщения MyMessage
+ public class MyMessageConsumer : IConsumer
+ {
+ public async ValueTask Consume(IReadOnlyCollection> outboxMessages, CancellationToken cancellationToken)
+ {
+ foreach (var messageContext in outboxMessages)
+ {
+ // Логика обработки сообщения
+ Console.WriteLine($"Processing message with ID: {messageContext.Payload.PayloadId} and Content: {messageContext.Payload.Content}");
+
+ // Успешная обработка сообщения
+ messageContext.Ok("Message processed successfully.");
+ }
+ }
+ }
+}
+```
+
+### Пример отправки сообщения
+
+```csharp
+
+public class MessageSender(IOutboxMessagePublisher publisher)
+{
+ public async Task SendMessagesAsync(CancellationToken cancellationToken)
+ {
+ // Создание списка сообщений для отправки
+ var messages = new List
+ {
+ new MyMessage { PayloadId = Guid.NewGuid().ToString(), Content = "Hello, World!", TenantId = 1 },
+ new MyMessage { PayloadId = Guid.NewGuid().ToString(), Content = "Another message", TenantId = 2 }
+ };
+
+ // Отправка сообщений через Outbox
+ ulong result = await publisher.Publish(messages, cancellationToken);
+
+ Console.WriteLine($"Sent {result} messages.");
+ }
+}
+
+```
+
+
diff --git a/src/Sa.Outbox.PostgreSql/Repository/DeliveryRepository.cs b/src/Sa.Outbox.PostgreSql/Repository/DeliveryRepository.cs
new file mode 100644
index 0000000..9101978
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Repository/DeliveryRepository.cs
@@ -0,0 +1,47 @@
+using Sa.Outbox.PostgreSql.Commands;
+
+namespace Sa.Outbox.PostgreSql.Repository;
+
+internal class DeliveryRepository(
+ IStartDeliveryCommand startCmd
+ , IErrorDeliveryCommand errorCmd
+ , IFinishDeliveryCommand finishCmd
+ , IExtendDeliveryCommand extendCmd
+ , IOutboxPartRepository partRepository
+) : IDeliveryRepository
+{
+ public Task StartDelivery(Memory> writeBuffer, int batchSize, TimeSpan lockDuration, OutboxMessageFilter filter, CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested) return Task.FromResult(0);
+ return startCmd.Execute(writeBuffer, batchSize, lockDuration, filter, cancellationToken);
+ }
+
+ public async Task FinishDelivery(IOutboxContext[] outboxMessages, OutboxMessageFilter filter, CancellationToken cancellationToken)
+ {
+ IReadOnlyDictionary errors = await GetErrors(outboxMessages, cancellationToken);
+
+ IEnumerable parts = outboxMessages
+ .Select(c => new OutboxPartInfo(c.PartInfo.TenantId, c.PartInfo.Part, c.DeliveryResult.CreatedAt));
+
+ await partRepository.EnsureDeliveryParts(parts, cancellationToken);
+
+ return await finishCmd.Execute(outboxMessages, errors, filter, cancellationToken);
+ }
+
+ private async Task> GetErrors(IOutboxContext[] outboxMessages, CancellationToken cancellationToken)
+ {
+ IEnumerable errOnDates = outboxMessages
+ .Where(m => m.Exception != null)
+ .Select(m => m.DeliveryResult.CreatedAt);
+
+ await partRepository.EnsureErrorParts(errOnDates, cancellationToken);
+
+ IReadOnlyDictionary errors = await errorCmd.Execute(outboxMessages, cancellationToken);
+ return errors;
+ }
+
+ public async Task ExtendDelivery(TimeSpan lockExpiration, OutboxMessageFilter filter, CancellationToken cancellationToken)
+ {
+ return await extendCmd.Execute(lockExpiration, filter, cancellationToken);
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/Repository/IMsgTypeRepository.cs b/src/Sa.Outbox.PostgreSql/Repository/IMsgTypeRepository.cs
new file mode 100644
index 0000000..d972142
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Repository/IMsgTypeRepository.cs
@@ -0,0 +1,7 @@
+namespace Sa.Outbox.PostgreSql.Repository;
+
+internal interface IMsgTypeRepository
+{
+ Task Insert(long id, string typeName, CancellationToken cancellationToken);
+ Task> SelectAll(CancellationToken cancellationToken);
+}
diff --git a/src/Sa.Outbox.PostgreSql/Repository/IOutboxPartRepository.cs b/src/Sa.Outbox.PostgreSql/Repository/IOutboxPartRepository.cs
new file mode 100644
index 0000000..914a0f7
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Repository/IOutboxPartRepository.cs
@@ -0,0 +1,11 @@
+
+namespace Sa.Outbox.PostgreSql.Repository;
+
+internal interface IOutboxPartRepository
+{
+ Task EnsureDeliveryParts(IEnumerable outboxParts, CancellationToken cancellationToken);
+ Task EnsureOutboxParts(IEnumerable outboxParts, CancellationToken cancellationToken);
+ Task EnsureErrorParts(IEnumerable dates, CancellationToken cancellationToken);
+
+ Task Migrate();
+}
diff --git a/src/Sa.Outbox.PostgreSql/Repository/MsgTypeRepository.cs b/src/Sa.Outbox.PostgreSql/Repository/MsgTypeRepository.cs
new file mode 100644
index 0000000..f82e558
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Repository/MsgTypeRepository.cs
@@ -0,0 +1,24 @@
+using Sa.Data.PostgreSql;
+using System.Data;
+
+namespace Sa.Outbox.PostgreSql.Repository;
+
+
+internal class MsgTypeRepository(IPgDataSource dataSource, SqlOutboxTemplate template) : IMsgTypeRepository
+{
+ public Task Insert(long id, string typeName, CancellationToken cancellationToken)
+ {
+ return dataSource.ExecuteNonQuery(template.SqlInsertType, [
+ new ("type_id", id)
+ , new ("type_name", typeName)
+ ], cancellationToken);
+ }
+
+ public Task> SelectAll(CancellationToken cancellationToken)
+ {
+ return dataSource.ExecuteReaderList(template.SqlSelectType,
+ reader =>
+ (id: reader.GetInt64("type_id"), typeName: reader.GetString("type_name"))
+ , cancellationToken);
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/Repository/OutboxPartRepository.cs b/src/Sa.Outbox.PostgreSql/Repository/OutboxPartRepository.cs
new file mode 100644
index 0000000..4181a49
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Repository/OutboxPartRepository.cs
@@ -0,0 +1,42 @@
+using Sa.Extensions;
+using Sa.Partitional.PostgreSql;
+
+namespace Sa.Outbox.PostgreSql.Repository;
+
+internal class OutboxPartRepository(IPartitionManager partManager, PgOutboxTableSettings tableSettings)
+ : IOutboxPartRepository
+{
+
+ public Task EnsureDeliveryParts(IEnumerable outboxParts, CancellationToken cancellationToken)
+ => EnsureParts(tableSettings.DatabaseDeliveryTableName, outboxParts, cancellationToken);
+
+ public Task EnsureOutboxParts(IEnumerable outboxParts, CancellationToken cancellationToken)
+ => EnsureParts(tableSettings.DatabaseOutboxTableName, outboxParts, cancellationToken);
+
+ public async Task EnsureErrorParts(IEnumerable dates, CancellationToken cancellationToken)
+ {
+ int i = 0;
+ foreach (DateTimeOffset date in dates.Select(c => c.StartOfDay()).Distinct())
+ {
+ i++;
+ await partManager.EnsureParts(tableSettings.DatabaseErrorTableName, date, [], cancellationToken);
+ }
+
+ return i;
+ }
+
+ public Task Migrate() => partManager.Migrate(CancellationToken.None);
+
+
+ private async Task EnsureParts(string databaseTableName, IEnumerable outboxParts, CancellationToken cancellationToken)
+ {
+ int i = 0;
+ foreach (OutboxPartInfo part in outboxParts.Distinct())
+ {
+ i++;
+ await partManager.EnsureParts(databaseTableName, part.CreatedAt, [part.TenantId, part.Part], cancellationToken);
+ }
+
+ return i;
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/Repository/OutboxRepository.cs b/src/Sa.Outbox.PostgreSql/Repository/OutboxRepository.cs
new file mode 100644
index 0000000..b96bd55
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Repository/OutboxRepository.cs
@@ -0,0 +1,18 @@
+using Sa.Extensions;
+using Sa.Outbox.PostgreSql.Commands;
+
+namespace Sa.Outbox.PostgreSql.Repository;
+
+internal class OutboxRepository(IOutboxBulkCommand bulkCmd, IOutboxPartRepository partRepository)
+ : IOutboxRepository
+{
+ public async ValueTask Save(string payloadType, ReadOnlyMemory> messages, CancellationToken cancellationToken = default)
+ {
+ if (messages.Length == 0) return 0;
+
+ OutboxPartInfo[] parts = messages.Span.SelectWhere(c => c.PartInfo);
+ await partRepository.EnsureOutboxParts(parts, cancellationToken);
+
+ return await bulkCmd.BulkWrite(payloadType, messages, cancellationToken);
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/Repository/Setup.cs b/src/Sa.Outbox.PostgreSql/Repository/Setup.cs
new file mode 100644
index 0000000..388d634
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Repository/Setup.cs
@@ -0,0 +1,16 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+
+namespace Sa.Outbox.PostgreSql.Repository;
+
+internal static class Setup
+{
+ public static IServiceCollection AddOutboxMessageRepository(this IServiceCollection services)
+ {
+ services.TryAddSingleton();
+ services.TryAddSingleton();
+ services.TryAddSingleton();
+ services.TryAddSingleton();
+ return services;
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/Sa.Outbox.PostgreSql.csproj b/src/Sa.Outbox.PostgreSql/Sa.Outbox.PostgreSql.csproj
new file mode 100644
index 0000000..5763403
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Sa.Outbox.PostgreSql.csproj
@@ -0,0 +1,18 @@
+
+
+
+ net8.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Sa.Outbox.PostgreSql/Serialization/IOutboxMessageSerializer.cs b/src/Sa.Outbox.PostgreSql/Serialization/IOutboxMessageSerializer.cs
new file mode 100644
index 0000000..beb9428
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Serialization/IOutboxMessageSerializer.cs
@@ -0,0 +1,12 @@
+using System.Diagnostics.CodeAnalysis;
+
+namespace Sa.Outbox.PostgreSql.Serialization;
+
+public interface IOutboxMessageSerializer
+{
+ T? Deserialize(Stream stream);
+ Task DeserializeAsync(Stream stream, CancellationToken cancellationToken = default);
+
+ void Serialize(Stream stream, [NotNull] T value);
+ Task SerializeAsync(Stream stream, [NotNull] T value, CancellationToken cancellationToken = default);
+}
diff --git a/src/Sa.Outbox.PostgreSql/Serialization/OutboxMessageSerializer.cs b/src/Sa.Outbox.PostgreSql/Serialization/OutboxMessageSerializer.cs
new file mode 100644
index 0000000..f8e9896
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Serialization/OutboxMessageSerializer.cs
@@ -0,0 +1,56 @@
+using Sa.Serialization.Converter;
+using System.Diagnostics.CodeAnalysis;
+using System.Text.Encodings.Web;
+using System.Text.Json;
+using System.Text.Json.Serialization;
+using System.Text.Unicode;
+
+namespace Sa.Outbox.PostgreSql.Serialization;
+
+///
+/// Implementation of using .
+///
+internal class OutboxMessageSerializer : IOutboxMessageSerializer
+{
+
+ private readonly static JavaScriptEncoder encoder = JavaScriptEncoder.Create(UnicodeRanges.BasicLatin, UnicodeRanges.Cyrillic);
+
+
+ ///
+ /// options for the JSON serializer. By default adds converter.
+ ///
+ public JsonSerializerOptions Options { get; private set; } = CreateDefaultOptions();
+
+ public OutboxMessageSerializer WithOptions(JsonSerializerOptions? options)
+ {
+ if (options != null)
+ {
+ Options = options;
+ }
+ return this;
+ }
+
+ public static JsonSerializerOptions CreateDefaultOptions()
+ {
+ JsonSerializerOptions options = new(JsonSerializerDefaults.Web)
+ {
+ WriteIndented = false,
+ DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
+ AllowTrailingCommas = true,
+ Encoder = encoder,
+ };
+ options.Converters.Add(new ObjectToInferredTypesConverter());
+ return options;
+ }
+
+ public Task DeserializeAsync(Stream stream, CancellationToken cancellationToken = default)
+ => JsonSerializer.DeserializeAsync(stream, Options, cancellationToken).AsTask();
+
+ public T? Deserialize(Stream stream) => JsonSerializer.Deserialize(stream, Options);
+
+
+ public Task SerializeAsync(Stream stream, [NotNull] T value, CancellationToken cancellationToken = default)
+ => JsonSerializer.SerializeAsync(stream, value ?? throw new ArgumentNullException(nameof(value)), Options, cancellationToken);
+
+ public void Serialize(Stream stream, [NotNull] T value) => JsonSerializer.Serialize(stream, value ?? throw new ArgumentNullException(nameof(value)), Options);
+}
diff --git a/src/Sa.Outbox.PostgreSql/Serialization/Setup.cs b/src/Sa.Outbox.PostgreSql/Serialization/Setup.cs
new file mode 100644
index 0000000..2cabf7a
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Serialization/Setup.cs
@@ -0,0 +1,14 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+
+namespace Sa.Outbox.PostgreSql.Serialization;
+
+public static class Setup
+{
+ public static IServiceCollection AddOutboxMessageSerializer(this IServiceCollection services, PgOutboxSerializeSettings? settings = null)
+ {
+ services.TryAddSingleton(sp
+ => new OutboxMessageSerializer().WithOptions((settings ?? sp.GetRequiredService()).JsonSerializerOptions));
+ return services;
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/Setup.cs b/src/Sa.Outbox.PostgreSql/Setup.cs
new file mode 100644
index 0000000..a2f0e0c
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/Setup.cs
@@ -0,0 +1,40 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using Sa.Outbox.PostgreSql.Commands;
+using Sa.Outbox.PostgreSql.Configuration;
+using Sa.Outbox.PostgreSql.IdGen;
+using Sa.Outbox.PostgreSql.Interceptors;
+using Sa.Outbox.PostgreSql.Partitional;
+using Sa.Outbox.PostgreSql.Repository;
+using Sa.Outbox.PostgreSql.Serialization;
+using Sa.Outbox.PostgreSql.TypeHashResolve;
+
+namespace Sa.Outbox.PostgreSql;
+
+public static class Setup
+{
+ public static IServiceCollection AddOutboxUsingPostgreSql(this IServiceCollection services, Action? configure = null)
+ {
+ services
+ .AddSaInfrastructure();
+
+ services.TryAddSingleton();
+
+ services
+ .AddPgOutboxSettings(configure);
+
+ services
+ .AddOutboxMessageRepository()
+ .AddOutboxMessageSerializer()
+ .AddOutboxPartitional()
+ .AddIdGen()
+ .AddOutboxCommands()
+ .AddMsgTypeHashResolver()
+ .AddOutboxJobInterceptors()
+ ;
+
+ services.AddOutbox();
+
+ return services;
+ }
+}
\ No newline at end of file
diff --git a/src/Sa.Outbox.PostgreSql/SqlOutboxTemplate.cs b/src/Sa.Outbox.PostgreSql/SqlOutboxTemplate.cs
new file mode 100644
index 0000000..4e63e65
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/SqlOutboxTemplate.cs
@@ -0,0 +1,260 @@
+namespace Sa.Outbox.PostgreSql;
+
+internal class SqlOutboxTemplate(PgOutboxTableSettings settings)
+{
+ public string DatabaseSchemaName => settings.DatabaseSchemaName;
+ public string DatabaseOutboxTableName => settings.DatabaseOutboxTableName;
+ public string DatabaseDeliveryTableName => settings.DatabaseDeliveryTableName;
+ public string DatabaseErrorTableName => settings.DatabaseErrorTableName;
+
+
+ public readonly static string[] OutboxFields =
+ [
+ // ulid
+ "outbox_id CHAR(26) NOT NULL",
+
+ // -- parts + outbox_created_at
+ "outbox_tenant INT NOT NULL DEFAULT 0",
+ "outbox_part TEXT NOT NULL",
+
+ "outbox_payload_id TEXT NOT NULL",
+ "outbox_payload_type BIGINT NOT NULL",
+ "outbox_payload BYTEA NOT NULL",
+ "outbox_payload_size INT NOT NULL",
+
+ // -- rw
+ "outbox_transact_id TEXT NOT NULL DEFAULT ''",
+ "outbox_lock_expires_on BIGINT NOT NULL DEFAULT 0",
+
+ // -- delivery
+ "outbox_delivery_attempt int NOT NULL DEFAULT 0",
+ // --- copy last
+ "outbox_delivery_id CHAR(26) NOT NULL DEFAULT ''",
+ "outbox_delivery_error_id TEXT NOT NULL DEFAULT ''",
+ "outbox_delivery_status_code INT NOT NULL DEFAULT 0",
+ "outbox_delivery_status_message TEXT NOT NULL DEFAULT ''",
+ "outbox_delivery_created_at BIGINT NOT NULL DEFAULT 0"
+ ];
+
+
+ public readonly static string[] DeliveryFields =
+ [
+ "delivery_id CHAR(26) NOT NULL",
+ "delivery_outbox_id CHAR(26) NOT NULL",
+ "delivery_error_id TEXT NOT NULL DEFAULT ''",
+ "delivery_status_code INT NOT NULL DEFAULT 0",
+ "delivery_status_message TEXT NOT NULL DEFAULT ''",
+ "delivery_transact_id TEXT NOT NULL DEFAULT ''",
+ "delivery_lock_expires_on BIGINT NOT NULL DEFAULT 0",
+ // - parts
+ "delivery_tenant INT NOT NULL DEFAULT 0",
+ "delivery_part TEXT NOT NULL",
+ ];
+
+ // Delivery errors
+ public readonly static string[] ErrorFields =
+ [
+ "error_id BIGINT NOT NULL",
+ "error_type TEXT NOT NULL",
+ "error_message TEXT NOT NULL",
+ ];
+
+
+ //CREATE INDEX IF NOT EXISTS ix_{DatabaseTableName}__payload_id
+ // ON {GetQualifiedTableName()} (payload_id)
+ // WHERE payload_id <> '' AND (outbox_delivery_status_code BETWEEN {DeliveryStatusCode.Ok} AND 299 OR outbox_delivery_status_code >= 500)
+
+ //CREATE INDEX IF NOT EXISTS ix_{DatabaseTableName}__payload_type
+ // ON {GetQualifiedTableName()} (payload_type);
+ // WHERE (outbox_delivery_status_code < {DeliveryStatusCode.Ok} OR outbox_delivery_status_code BETWEEN 300 AND 499)
+ //""";
+
+
+ public string SqlBulkOutboxCopy =
+$"""
+COPY {settings.GetQualifiedOutboxTableName()} (
+ outbox_id
+ ,outbox_tenant
+ ,outbox_part
+ ,outbox_payload_id
+ ,outbox_payload_type
+ ,outbox_payload
+ ,outbox_payload_size
+ ,outbox_created_at
+)
+FROM STDIN (FORMAT BINARY)
+;
+""";
+
+
+ static readonly string s_InProcessing = $"(outbox_delivery_status_code < {DeliveryStatusCode.Ok} OR outbox_delivery_status_code BETWEEN {DeliveryStatusCode.Status300} AND {DeliveryStatusCode.Status499})";
+
+
+ public string SqlLockAndSelect =
+$"""
+WITH next_task AS (
+ SELECT outbox_id FROM {settings.GetQualifiedOutboxTableName()}
+ WHERE
+ outbox_tenant = @tenant AND outbox_part = @part AND outbox_created_at >= @from_date
+ AND outbox_payload_type = @payload_type
+ AND {s_InProcessing}
+ AND outbox_lock_expires_on < @now
+ LIMIT @limit
+ FOR UPDATE SKIP LOCKED
+)
+UPDATE {settings.GetQualifiedOutboxTableName()}
+SET
+ outbox_delivery_status_code = CASE
+ WHEN outbox_delivery_status_code = 0 THEN {DeliveryStatusCode.Processing}
+ ELSE outbox_delivery_status_code
+ END
+ ,outbox_transact_id = @transact_id
+ ,outbox_lock_expires_on = @lock_expires_on
+FROM
+ next_task
+WHERE
+ {settings.GetQualifiedOutboxTableName()}.outbox_id = next_task.outbox_id
+RETURNING
+ {settings.GetQualifiedOutboxTableName()}.outbox_id
+ ,outbox_tenant
+ ,outbox_part
+ ,outbox_payload
+ ,outbox_payload_id
+ ,outbox_delivery_id
+ ,outbox_delivery_attempt
+ ,outbox_delivery_error_id
+ ,outbox_delivery_status_code
+ ,outbox_delivery_status_message
+ ,outbox_delivery_created_at
+ ,outbox_created_at
+;
+""";
+
+
+ private static string SqlFinishDelivery(PgOutboxTableSettings settings, int count)
+ {
+ return
+$"""
+WITH inserted_delivery AS (
+ INSERT INTO {settings.GetQualifiedDeliveryTableName()} (
+ delivery_id
+ , delivery_outbox_id
+ , delivery_error_id
+ , delivery_status_code
+ , delivery_status_message
+ , delivery_lock_expires_on
+ , delivery_transact_id
+ , delivery_tenant
+ , delivery_part
+ , delivery_created_at
+ )
+ VALUES
+{BuildDeliveryInsertValues(count)}
+ ON CONFLICT DO NOTHING
+ RETURNING *
+)
+UPDATE {settings.GetQualifiedOutboxTableName()}
+SET
+ outbox_delivery_id = inserted_delivery.delivery_id
+ , outbox_delivery_attempt = outbox_delivery_attempt + CASE
+ WHEN inserted_delivery.delivery_status_code <> {DeliveryStatusCode.Postpone} THEN 1
+ ELSE 0
+ END
+ , outbox_delivery_error_id = inserted_delivery.delivery_error_id
+ , outbox_delivery_status_code = inserted_delivery.delivery_status_code
+ , outbox_delivery_status_message = inserted_delivery.delivery_status_message
+ , outbox_lock_expires_on = inserted_delivery.delivery_lock_expires_on
+ , outbox_delivery_created_at = inserted_delivery.delivery_created_at
+FROM
+ inserted_delivery
+WHERE
+ outbox_tenant = @tnt
+ AND outbox_part = @prt
+ AND outbox_created_at >= @from_date
+ AND outbox_transact_id = @tid
+ AND outbox_id = inserted_delivery.delivery_outbox_id
+;
+
+""";
+ }
+
+ private static string BuildDeliveryInsertValues(int count)
+ {
+ List values = [];
+ int j = 0;
+ for (int i = 0; i < count; i++)
+ {
+ // @id_{i},@outbox_id_{i},@error_id_{i},@status_code_{i},@status_message_{i},@lock_expires_on_{i},@created_at_{i}
+ values.Add($" (@p{j++},@p{j++},@p{j++},@p{j++},@p{j++},@p{j++},@tid,@tnt,@prt,@p{j++})");
+ }
+ return string.Join(",\r\n", values);
+ }
+
+
+ public string SqlExtendDelivery =
+$"""
+UPDATE {settings.GetQualifiedOutboxTableName()}
+SET
+ outbox_lock_expires_on = @lock_expires_on
+WHERE
+ outbox_tenant = @tenant AND outbox_part = @part AND outbox_created_at >= @from_date
+ AND outbox_payload_type = @payload_type
+ AND {s_InProcessing}
+ AND outbox_transact_id = @transact_id
+ AND outbox_lock_expires_on > @now
+FOR UPDATE SKIP LOCKED
+;
+""";
+
+
+ public string SqlFinishDelivery(int count) => SqlFinishDelivery(settings, count);
+
+
+ public string SqlCreateTypeTable
+ =
+$"""
+CREATE TABLE IF NOT EXISTS {settings.GetQualifiedTypeTableName()}
+(
+ type_id BIGINT NOT NULL,
+ type_name TEXT NOT NULL,
+ CONSTRAINT "pk_{settings.DatabaseTypeTableName}" PRIMARY KEY (type_id)
+)
+;
+""";
+
+
+ public string SqlSelectType = $"SELECT * FROM {settings.GetQualifiedTypeTableName()}";
+
+ public string SqlInsertType =
+$"""
+INSERT INTO {settings.GetQualifiedTypeTableName()}
+ (type_id, type_name)
+VALUES
+ (@type_id,@type_name)
+ON CONFLICT DO NOTHING
+;
+""";
+
+
+ private static string BuildErrorInsertValues(int count)
+ {
+ List values = [];
+ for (int i = 0; i < count; i++)
+ {
+ values.Add($" (@id_{i},@type_{i},@message_{i},@created_at_{i})");
+ }
+ return string.Join(",\r\n", values);
+ }
+
+ private static string SqlError(PgOutboxTableSettings settings, int count) =>
+$"""
+INSERT INTO {settings.GetQualifiedErrorTableName()}
+ (error_id,error_type,error_message,error_created_at)
+VALUES
+{BuildErrorInsertValues(count)}
+ON CONFLICT DO NOTHING
+;
+""";
+
+ public string SqlError(int count) => SqlError(settings, count);
+}
diff --git a/src/Sa.Outbox.PostgreSql/TypeHashResolve/IMsgTypeCache.cs b/src/Sa.Outbox.PostgreSql/TypeHashResolve/IMsgTypeCache.cs
new file mode 100644
index 0000000..4c231c1
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/TypeHashResolve/IMsgTypeCache.cs
@@ -0,0 +1,9 @@
+
+namespace Sa.Outbox.PostgreSql.TypeHashResolve;
+
+internal interface IMsgTypeCache
+{
+ ValueTask GetCode(string typeName, CancellationToken cancellationToken);
+ ValueTask GetTypeName(long code, CancellationToken cancellationToken);
+ ValueTask Reset(CancellationToken cancellationToken);
+}
\ No newline at end of file
diff --git a/src/Sa.Outbox.PostgreSql/TypeHashResolve/IMsgTypeHashResolver.cs b/src/Sa.Outbox.PostgreSql/TypeHashResolve/IMsgTypeHashResolver.cs
new file mode 100644
index 0000000..a2d4954
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/TypeHashResolve/IMsgTypeHashResolver.cs
@@ -0,0 +1,7 @@
+namespace Sa.Outbox.PostgreSql.TypeHashResolve;
+
+internal interface IMsgTypeHashResolver
+{
+ ValueTask GetCode(string typeName, CancellationToken cancellationToken);
+ ValueTask GetTypeName(long typeCode, CancellationToken cancellationToken);
+}
diff --git a/src/Sa.Outbox.PostgreSql/TypeHashResolve/MsgTypeCache.cs b/src/Sa.Outbox.PostgreSql/TypeHashResolve/MsgTypeCache.cs
new file mode 100644
index 0000000..32da900
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/TypeHashResolve/MsgTypeCache.cs
@@ -0,0 +1,77 @@
+using Sa.Outbox.PostgreSql.Repository;
+using ZiggyCreatures.Caching.Fusion;
+
+namespace Sa.Outbox.PostgreSql.TypeHashResolve;
+
+internal sealed class MsgTypeCache(
+ IFusionCacheProvider cacheProvider
+ , IMsgTypeRepository repository
+ , PgOutboxCacheSettings cacheSettings)
+ : IDisposable, IMsgTypeCache
+{
+ internal static class Env
+ {
+ public const string CacheName = "sa-msgtype";
+ }
+
+ private readonly IFusionCache _cache = cacheProvider.GetCache(Env.CacheName);
+
+ internal class Storage
+ {
+ private readonly Dictionary _hashType = [];
+ private readonly Dictionary _typeHash = [];
+
+ internal Storage(List<(long id, string typeName)> hashCodes)
+ {
+ foreach (var (id, typeName) in hashCodes)
+ {
+ _hashType[id] = typeName;
+ _typeHash[typeName] = id;
+ }
+ }
+
+ public long GetCode(string typeName)
+ {
+ if (_typeHash.TryGetValue(typeName, out var code)) return code;
+ return 0;
+ }
+
+ public string? GetType(long code)
+ {
+ if (_hashType.TryGetValue(code, out var name)) return name;
+ return default;
+ }
+ }
+
+ public async ValueTask GetCode(string typeName, CancellationToken cancellationToken)
+ {
+ var storage = await GetStorage(cancellationToken);
+ return storage.GetCode(typeName);
+ }
+
+ public async ValueTask GetTypeName(long code, CancellationToken cancellationToken)
+ {
+ var storage = await GetStorage(cancellationToken);
+ return storage.GetType(code);
+ }
+
+ public ValueTask Reset(CancellationToken cancellationToken) => _cache.RemoveAsync(Env.CacheName, token: cancellationToken);
+
+ private ValueTask GetStorage(CancellationToken cancellationToken)
+ {
+ return _cache.GetOrSetAsync(
+ Env.CacheName
+ , async (context, t) => await Load(context, t)
+ , options: null
+ , token: cancellationToken);
+ }
+
+ private async Task Load(FusionCacheFactoryExecutionContext context, CancellationToken cancellationToken)
+ {
+ List<(long id, string typeName)> hashCodes = await repository.SelectAll(cancellationToken);
+ context.Options.Duration = hashCodes.Count > 0 ? cacheSettings.CacheTypeDuration : TimeSpan.Zero;
+ return new Storage(hashCodes);
+ }
+
+ public void Dispose() => _cache.Dispose();
+}
diff --git a/src/Sa.Outbox.PostgreSql/TypeHashResolve/MsgTypeHashResolver.cs b/src/Sa.Outbox.PostgreSql/TypeHashResolve/MsgTypeHashResolver.cs
new file mode 100644
index 0000000..2524f0a
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/TypeHashResolve/MsgTypeHashResolver.cs
@@ -0,0 +1,44 @@
+using Sa.Extensions;
+using Sa.Outbox.PostgreSql.Repository;
+
+namespace Sa.Outbox.PostgreSql.TypeHashResolve;
+
+
+internal class MsgTypeHashResolver(IMsgTypeCache cache, IMsgTypeRepository repository) : IMsgTypeHashResolver
+{
+ private int _triggered = 0;
+
+ public async ValueTask GetCode(string typeName, CancellationToken cancellationToken)
+ {
+
+ long code = await cache.GetCode(typeName, cancellationToken);
+
+ if (code != 0) return code;
+
+ code = typeName.GetMurmurHash3();
+
+ if (Interlocked.CompareExchange(ref _triggered, 1, 0) == 1) return code;
+
+ try
+ {
+ await repository.Insert(code, typeName, cancellationToken);
+ await cache.Reset(cancellationToken);
+ }
+ finally
+ {
+ Interlocked.CompareExchange(ref _triggered, 0, 1);
+ }
+
+ return code;
+ }
+
+ public async ValueTask GetTypeName(long typeCode, CancellationToken cancellationToken)
+ {
+ string? typeName = await cache.GetTypeName(typeCode, cancellationToken);
+ if (typeName != null) return typeName;
+
+ await cache.Reset(cancellationToken);
+
+ return await cache.GetTypeName(typeCode, cancellationToken) ?? typeCode.ToString();
+ }
+}
diff --git a/src/Sa.Outbox.PostgreSql/TypeHashResolve/Setup.cs b/src/Sa.Outbox.PostgreSql/TypeHashResolve/Setup.cs
new file mode 100644
index 0000000..9609aec
--- /dev/null
+++ b/src/Sa.Outbox.PostgreSql/TypeHashResolve/Setup.cs
@@ -0,0 +1,23 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using Sa.Data.Cache;
+
+namespace Sa.Outbox.PostgreSql.TypeHashResolve;
+
+internal static class Setup
+{
+ public static IServiceCollection AddMsgTypeHashResolver(this IServiceCollection services)
+ {
+ services.AddFusionCacheEx(MsgTypeCache.Env.CacheName, (sp, opts) =>
+ {
+ PgOutboxCacheSettings cacheSettings = sp.GetRequiredService();
+ opts.Duration = cacheSettings.CacheTypeDuration;
+ });
+
+
+ services.TryAddSingleton();
+ services.TryAddSingleton();
+
+ return services;
+ }
+}
\ No newline at end of file
diff --git a/src/Sa.Outbox/Configuration/DeliveryBuilder.cs b/src/Sa.Outbox/Configuration/DeliveryBuilder.cs
new file mode 100644
index 0000000..ca47401
--- /dev/null
+++ b/src/Sa.Outbox/Configuration/DeliveryBuilder.cs
@@ -0,0 +1,14 @@
+using Microsoft.Extensions.DependencyInjection;
+using Sa.Outbox.Job;
+
+namespace Sa.Outbox.Configuration;
+
+internal class DeliveryBuilder(IServiceCollection services) : IDeliveryBuilder
+{
+ public IDeliveryBuilder AddDelivery(Action? configure = null, int instanceCount = 1)
+ where TConsumer : class, IConsumer
+ {
+ services.AddDeliveryJob(configure, instanceCount);
+ return this;
+ }
+}
diff --git a/src/Sa.Outbox/Configuration/IDeliveryBuilder.cs b/src/Sa.Outbox/Configuration/IDeliveryBuilder.cs
new file mode 100644
index 0000000..a8a19a0
--- /dev/null
+++ b/src/Sa.Outbox/Configuration/IDeliveryBuilder.cs
@@ -0,0 +1,19 @@
+namespace Sa.Outbox;
+
+
+///
+/// Represents a builder for creating outbox deliveries.
+///
+public interface IDeliveryBuilder
+{
+ ///
+ /// Adds a delivery for the specified consumer and message type.
+ ///
+ /// The type of consumer.
+ /// The type of message.
+ /// An optional action to configure the delivery settings.
+ /// The number of instances to create for the delivery.
+ /// The delivery builder instance.
+ IDeliveryBuilder AddDelivery(Action? configure = null, int instanceCount = 1)
+ where TConsumer : class, IConsumer;
+}
\ No newline at end of file
diff --git a/src/Sa.Outbox/Configuration/IOutboxBuilder.cs b/src/Sa.Outbox/Configuration/IOutboxBuilder.cs
new file mode 100644
index 0000000..7db49d8
--- /dev/null
+++ b/src/Sa.Outbox/Configuration/IOutboxBuilder.cs
@@ -0,0 +1,19 @@
+namespace Sa.Outbox;
+
+public interface IOutboxBuilder
+{
+ OutboxPublishSettings PublishSettings { get; }
+ ///
+ /// Configures the delivery settings for the outbox.
+ ///
+ /// An action to configure the delivery settings.
+ /// The current instance of the IOutboxSettingsBuilder.
+ IOutboxBuilder WithDeliveries(Action build);
+
+ ///
+ /// Enables partitioning support for the outbox.
+ ///
+ /// An action to configure the partitioning settings.
+ /// The current instance of the IOutboxSettingsBuilder.
+ IOutboxBuilder WithPartitioningSupport(Action configure);
+}
\ No newline at end of file
diff --git a/src/Sa.Outbox/Configuration/OutboxBuilder.cs b/src/Sa.Outbox/Configuration/OutboxBuilder.cs
new file mode 100644
index 0000000..dfc1bda
--- /dev/null
+++ b/src/Sa.Outbox/Configuration/OutboxBuilder.cs
@@ -0,0 +1,34 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using Sa.Outbox.Delivery;
+using Sa.Outbox.Partitional;
+using Sa.Outbox.Publication;
+
+namespace Sa.Outbox.Configuration;
+
+internal class OutboxBuilder : IOutboxBuilder
+{
+ private readonly IServiceCollection services;
+
+ public OutboxBuilder(IServiceCollection services)
+ {
+ this.services = services;
+ services.AddSaInfrastructure();
+ services.AddMessagePublisher();
+ services.TryAddSingleton(this.PublishSettings);
+ }
+
+ public OutboxPublishSettings PublishSettings { get; } = new();
+
+ public IOutboxBuilder WithDeliveries(Action build)
+ {
+ services.AddOutboxDelivery(build);
+ return this;
+ }
+
+ public IOutboxBuilder WithPartitioningSupport(Action configure)
+ {
+ services.AddPartitioningSupport(configure);
+ return this;
+ }
+}
diff --git a/src/Sa.Outbox/Configuration/OutboxSettings.cs b/src/Sa.Outbox/Configuration/OutboxSettings.cs
new file mode 100644
index 0000000..e83632f
--- /dev/null
+++ b/src/Sa.Outbox/Configuration/OutboxSettings.cs
@@ -0,0 +1,109 @@
+namespace Sa.Outbox;
+
+
+///
+/// Settings for publishing messages from the Outbox.
+///
+public class OutboxPublishSettings
+{
+ ///
+ /// The maximum batch size of messages to be sent at once.
+ /// Default value: 16.
+ /// for array pool size: 16, 32, 64, 128, 256, 512, 1024, 2048, 4096
+ ///
+ public int MaxBatchSize { get; set; } = 64;
+}
+
+
+///
+/// Indicates that this is a configuration for message delivery in the Outbox.
+///
+public class OutboxDeliverySettings(Guid jobId, int instanceIndex = 0)
+{
+ ///
+ /// Gets the unique identifier for the delivery job
+ ///
+ public Guid JobId => jobId;
+ ///
+ /// Indicates the index of the worker instance.
+ ///
+ public int WorkerInstanceIndex => instanceIndex;
+ ///
+ /// Gets the scheduling settings for the delivery job.
+ ///
+ public ScheduleSettings ScheduleSettings { get; } = new();
+ ///
+ /// Gets the extraction settings for retrieving messages from the Outbox.
+ ///
+ public ExtractSettings ExtractSettings { get; } = new();
+ ///
+ /// Gets the consumption settings for processing messages.
+ ///
+ public ConsumeSettings ConsumeSettings { get; } = new();
+}
+
+///
+/// Represents the scheduling settings for the delivery job.
+///
+public class ScheduleSettings
+{
+ public string? Name { get; set; }
+ public TimeSpan ExecutionInterval { get; set; } = TimeSpan.FromMinutes(1);
+
+ ///
+ /// job schedule delay before start
+ ///
+ public TimeSpan InitialDelay { get; set; } = TimeSpan.FromSeconds(10);
+
+ public int RetryCountOnError { get; set; } = 2;
+}
+
+///
+/// Represents the extraction settings for retrieving messages from the Outbox.
+///
+public class ExtractSettings
+{
+ ///
+ /// Gets or sets the maximum size of the Outbox message batch for each database poll.
+ /// for array pool size: 16, 32, 64, 128, 256, 512, 1024 ...
+ ///
+ public int MaxBatchSize { get; set; } = 16;
+ ///
+ /// Message lock expiration time.
+ /// When a batch of messages for a bus instance is acquired, the messages will be locked (reserved) for that amount of time.
+ ///
+ public TimeSpan LockDuration { get; set; } = TimeSpan.FromSeconds(10);
+
+ ///
+ /// How long before to request a lock renewal.
+ /// This should be much shorter than .
+ ///
+ public TimeSpan LockRenewal { get; set; } = TimeSpan.FromSeconds(3);
+
+ ///
+ /// Repeat extract for each tenant
+ ///
+ ///
+ public bool ForEachTenant { get; set; }
+
+ ///
+ /// select outbox messages for processing for the period
+ ///
+ public TimeSpan LookbackInterval { get; set; } = TimeSpan.FromDays(7);
+}
+
+///
+/// Represents the consumption settings for processing messages from the Outbox.
+///
+public class ConsumeSettings
+{
+ ///
+ /// The maximum number of delivery attempts before delivery will not be attempted again.
+ ///
+ public int MaxDeliveryAttempts { get; set; } = 3;
+ ///
+ /// The maximum number of messages that can take in part
+ /// default value
+ ///
+ public int? ConsumeBatchSize { get; set; }
+}
diff --git a/src/Sa.Outbox/Delivery/DeliveryCourier.cs b/src/Sa.Outbox/Delivery/DeliveryCourier.cs
new file mode 100644
index 0000000..815e720
--- /dev/null
+++ b/src/Sa.Outbox/Delivery/DeliveryCourier.cs
@@ -0,0 +1,70 @@
+using Microsoft.Extensions.DependencyInjection;
+using Sa.Extensions;
+using Sa.Outbox.Exceptions;
+
+namespace Sa.Outbox.Delivery;
+
+internal class DeliveryCourier(IServiceProvider serviceProvider) : IDeliveryCourier
+{
+
+ public async ValueTask Deliver(IReadOnlyCollection> outboxMessages, int maxDeliveryAttempts, CancellationToken cancellationToken)
+ {
+ try
+ {
+ using IServiceScope scope = serviceProvider.CreateScope();
+ IConsumer consumer = scope.ServiceProvider.GetRequiredService>();
+ await consumer.Consume(outboxMessages, cancellationToken);
+ }
+ catch (Exception ex) when (!ex.IsCritical())
+ {
+ HandleError(ex, outboxMessages);
+ }
+
+ return PostHandle(outboxMessages, maxDeliveryAttempts);
+ }
+
+ private static void HandleError(Exception error, IReadOnlyCollection> outboxMessages)
+ {
+ foreach (IOutboxContext item in outboxMessages)
+ {
+ // если не было обработанных ошибок до - то...
+ if (item.DeliveryResult.Code == 0)
+ {
+ // "Unknown delivery error."
+ // раскидываем ошибки в отложенную обработку от 10 до 45 мин
+ item.Error(error ?? new DeliveryException("Unknown delivery error."), postpone: GenTimeSpan.Next());
+ }
+ }
+ }
+
+ private static int PostHandle(IReadOnlyCollection> messages, int maxDeliveryAttempts)
+ {
+ int iOk = 0;
+ foreach (IOutboxContext message in messages)
+ {
+ if (message.DeliveryResult.Code >= 400 && message.DeliveryResult.Code < 500 && message.DeliveryInfo.Attempt + 1 > maxDeliveryAttempts)
+ {
+ Exception exception = message.Exception ?? new DeliveryPermanentException("Maximum delivery attempts exceeded", statusCode: 501);
+
+ // Устанавливаем постоянную ошибку
+ message.PermanentError(exception, statusCode: 501);
+ }
+ else if (message.DeliveryResult.Code == 0)
+ {
+ message.Ok();
+ iOk++;
+ }
+ }
+
+ return iOk;
+ }
+
+
+ static class GenTimeSpan
+ {
+ public static TimeSpan Next()
+ {
+ return TimeSpan.FromSeconds(Random.Shared.Next(60 * 10, 60 * 45));
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Sa.Outbox/Delivery/DeliveryProcessor.cs b/src/Sa.Outbox/Delivery/DeliveryProcessor.cs
new file mode 100644
index 0000000..5f107af
--- /dev/null
+++ b/src/Sa.Outbox/Delivery/DeliveryProcessor.cs
@@ -0,0 +1,19 @@
+namespace Sa.Outbox.Delivery;
+
+internal class DeliveryProcessor(IDeliveryRelay relayService) : IDeliveryProcessor
+{
+ public async Task ProcessMessages(OutboxDeliverySettings settings, CancellationToken cancellationToken)
+ {
+ long count = 0;
+ bool runAgain;
+ do
+ {
+ int sentCount = await relayService.StartDelivery(settings, cancellationToken);
+ runAgain = sentCount > 0;
+ count += sentCount;
+ }
+ while (runAgain && !cancellationToken.IsCancellationRequested);
+
+ return count;
+ }
+}
diff --git a/src/Sa.Outbox/Delivery/DeliveryRelay.cs b/src/Sa.Outbox/Delivery/DeliveryRelay.cs
new file mode 100644
index 0000000..c3d4590
--- /dev/null
+++ b/src/Sa.Outbox/Delivery/DeliveryRelay.cs
@@ -0,0 +1,135 @@
+using Sa.Classes;
+using Sa.Extensions;
+using Sa.Host.MessageTypeResolver;
+using Sa.Outbox.Partitional;
+using Sa.Outbox.Publication;
+using Sa.Outbox.Repository;
+using Sa.Timing.Providers;
+
+namespace Sa.Outbox.Delivery;
+
+internal sealed class DeliveryRelay(
+ IDeliveryRepository repository
+ , IMessageTypeResolver typeResolver
+ , IArrayPoolFactory arrayPoolFactory
+ , IPartitionalSupportCache partCache
+ , ICurrentTimeProvider timeProvider
+ , IDeliveryCourier deliveryCourier
+ , PartitionalSettings? partitionalSettings = null
+ ) : IDeliveryRelay
+{
+
+ private readonly bool _globalForEachTenant = partitionalSettings?.ForEachTenant ?? false;
+
+ public async Task StartDelivery(OutboxDeliverySettings settings, CancellationToken cancellationToken)
+ {
+ IArrayPooler> arrayPooler = arrayPoolFactory.Create>();
+
+ int batchSize = settings.ExtractSettings.MaxBatchSize;
+
+ if (batchSize == 0) return 0;
+
+ OutboxDeliveryMessage[] buffer = arrayPooler.Rent(batchSize);
+ Memory> slice = buffer.AsMemory(0, batchSize);
+ try
+ {
+ if (_globalForEachTenant || settings.ExtractSettings.ForEachTenant)
+ {
+ int count = 0;
+ int[] tenantIds = await partCache.GetTenantIds(cancellationToken);
+ foreach (int tenantId in tenantIds)
+ {
+ count += await FillBuffer(slice, settings, tenantId, cancellationToken);
+ }
+ return count;
+ }
+ else
+ {
+ return await FillBuffer(slice, settings, 0, cancellationToken);
+ }
+ }
+ finally
+ {
+ arrayPooler.Return(buffer);
+ }
+ }
+
+ private async Task FillBuffer(Memory> buffer, OutboxDeliverySettings settings, int tenantId, CancellationToken cancellationToken)
+ {
+ OutboxMessageFilter filter = CreateFilter(settings, tenantId);
+
+ int locked = await repository.StartDelivery(buffer, settings.ExtractSettings.MaxBatchSize, settings.ExtractSettings.LockDuration, filter, cancellationToken);
+ if (locked == 0) return locked;
+
+ buffer = buffer[..locked];
+
+ using IDisposable locker = KeepLocker.KeepLocked(
+ settings.ExtractSettings.LockRenewal
+ , async t =>
+ {
+ filter = ExtendFilter(filter);
+ await repository.ExtendDelivery(settings.ExtractSettings.LockDuration, filter, t);
+ }
+ , cancellationToken: cancellationToken
+ );
+
+ // send msgs to consumer
+ return await DeliverBatches(buffer, settings, filter, cancellationToken);
+ }
+
+ private OutboxMessageFilter CreateFilter(OutboxDeliverySettings settings, int tenantId)
+ {
+ OutboxMessageTypeInfo ti = OutboxMessageTypeHelper.GetOutboxMessageTypeInfo();
+ DateTimeOffset fromDate = timeProvider.GetUtcNow().StartOfDay() - settings.ExtractSettings.LookbackInterval;
+
+ return new OutboxMessageFilter(
+ GenTransactId()
+ , typeResolver.ToName()
+ , tenantId
+ , ti.PartName
+ , fromDate
+ , timeProvider.GetUtcNow()
+ );
+ }
+
+ private OutboxMessageFilter ExtendFilter(OutboxMessageFilter filter)
+ {
+ return new OutboxMessageFilter(
+ filter.TransactId
+ , filter.PayloadType
+ , filter.TenantId
+ , filter.Part
+ , filter.FromDate
+ , timeProvider.GetUtcNow()
+ );
+ }
+
+ private static string GenTransactId() => $"{Environment.MachineName}-{Guid.NewGuid():N}";
+
+ private async Task DeliverBatches(Memory> deliveryMessages, OutboxDeliverySettings settings, OutboxMessageFilter filter, CancellationToken cancellationToken)
+ {
+ int iOk = 0;
+
+ foreach (IOutboxContext[] outboxMessages in deliveryMessages
+ .GetChunks(settings.ConsumeSettings.ConsumeBatchSize ?? settings.ExtractSettings.MaxBatchSize)
+ .Select(chunk
+ => chunk.Span.SelectWhere(dm
+ => new OutboxContext