From 43039589fc0bf00355c93ed9e4c76caaa5a1df5f Mon Sep 17 00:00:00 2001 From: Fernando Cerqueira Date: Tue, 28 Nov 2023 12:26:02 -0300 Subject: [PATCH] preview v3.0.0 --- NugetREADME.md | 179 +---- README.md | 110 ++- README.txt | 119 ++-- docs/apis/apis.md | 4 + docs/apis/ringbufferplus.iringbuffer-1.md | 40 +- ...ngbufferplus.iringbufferscalecapacity-1.md | 4 +- ...fferplus.iringbufferscalefromcapacity-1.md | 113 +++ .../ringbufferplus.iringbufferscalemax-1.md | 2 +- .../ringbufferplus.iringbufferscalemin-1.md | 4 +- .../ringbufferplus.iringbufferservice-1.md | 16 +- docs/apis/ringbufferplus.iringbufferswith.md | 40 ++ docs/apis/ringbufferplus.ringbuffermetric.md | 67 +- docs/apis/ringbufferplus.ringbuffervalue-1.md | 12 - docs/apis/ringbufferplus.scalemode.md | 7 +- docs/apis/ringbufferplus.sourcetrigger.md | 1 + docs/index.md | 110 ++- docs/whatsnewprev.md | 4 + samples/RingBufferPlusApiSample/Program.cs | 13 +- .../RingBufferPlusApiSample.http | 6 - .../BenchmarkProgram.cs | 51 +- .../ConsumerRoleProgram.cs | 25 +- .../RingBufferPlusBenchmarkSample/Program.cs | 8 +- .../Properties/launchSettings.json | 9 +- .../PublisherRoleProgram.cs | 78 +- .../RingBufferPlusConsoleSample/Program.cs | 20 +- src/Commands/IRingBuffer.cs | 21 +- ...pacity.cs => IRingBufferMasterCapacity.cs} | 25 +- src/Commands/IRingBufferScaleMax.cs | 2 +- src/Commands/IRingBufferScaleMin.cs | 4 +- src/Commands/IRingBufferService.cs | 7 +- src/Commands/IRingBufferSlaveCapacity.cs | 57 ++ src/Commands/IRingBufferSwith.cs | 23 + src/RingBufferMetric.cs | 33 +- src/RingBufferPlus.csproj | 4 - src/RingBufferValue.cs | 4 +- src/ScaleMode.cs | 4 + src/SourceTrigger.cs | 6 +- src/internal/IRingBufferCallback.cs | 18 + src/internal/IRingBufferOptions.cs | 8 +- src/internal/RingBufferBuilder.cs | 196 +++-- src/internal/RingBufferManager.cs | 672 +++++++++++------- 41 files changed, 1278 insertions(+), 848 deletions(-) create mode 100644 docs/apis/ringbufferplus.iringbufferscalefromcapacity-1.md create mode 100644 docs/apis/ringbufferplus.iringbufferswith.md delete mode 100644 samples/RingBufferPlusApiSample/RingBufferPlusApiSample.http rename src/Commands/{IRingBufferScaleCapacity.cs => IRingBufferMasterCapacity.cs} (72%) create mode 100644 src/Commands/IRingBufferSlaveCapacity.cs create mode 100644 src/Commands/IRingBufferSwith.cs create mode 100644 src/internal/IRingBufferCallback.cs diff --git a/NugetREADME.md b/NugetREADME.md index 98d5408..b9262b3 100644 --- a/NugetREADME.md +++ b/NugetREADME.md @@ -7,9 +7,21 @@ **[Visit the official page for more documentation of RingBufferPlus](https://fracerqueira.github.io/RingBufferPlus)** ## What's new in the latest version -### V2.0.0 - -- Release G.A with .NET8 +### V3.0.0 +[**Top**](#table-of-contents) + +- Added command 'FactoryHealth' + - Check health item before accquire buffer. +- Renamed Method 'SwithToScaleDefinitions' to 'MasterScale' +- Added master-slave feature(2 Ring Buffer with synchronization) + - Added command set 'SlaveScale' to set report handler, Minimum and maximum capacity +- Added 'MasterSlave' enum item in SourceTrigger +- Added 'None' enum item in ScaleMode +- Revised to have greater performance without 'lock' +- Removed Method 'Counters' + - data was not relevant and inaccurate +- Revised 'RingBufferMetric' + - Now only propreties 'Trigger', 'FromCapacity', 'ToCapacity' and 'MetricDate' ## Features @@ -19,22 +31,27 @@ The implementation follows the basic principle. The principle was expanded to ha ### Key Features +- Conscious use of resources - Set unique name for same buffer type - Set the buffer capacity +- Set buffer integrity (validate if the buffer is valid) + - Verified with each acquiring - Set the minimum and maximum capacity (optional) - Set the conditions for scaling to maximum and minimum (required) - Automatic condition values ​​based on capacity (value not required) - - Define a user role to receive capacity change events to log/save (optional) - - Executed in a separate thread asynchronously +- Set master-slave (2 Ring Buffer with synchronization) + - Master controls slave scale +- Event with scale change information + - Executed in a separate thread asynchronously - Associate the logger interface (optional) - Define a user role for generated errors (optional) - Executed in a separate thread asynchronously -- Invalidate the buffer when it is in an invalid state +- Command to Invalidate the buffer when it is in an invalid state - Warm up to full capacity before starting application - Receive item from buffer with success/failure information and elapsed time for acquisition - Sets a time limit for acquiring the item in the buffer - Detailed information about operations when the minimum log is Debug -- Simple and clear fluent syntaxx +- Simple and clear fluent syntax ## Installing @@ -56,154 +73,6 @@ See folder [**Samples**](https://github.com/FRACerqueira/RingBufferPlus/tree/mai dotnet run --project [name of sample] ``` -## Usage - -The **RingBufferPlus** use **fluent interface**; an object-oriented API whose design relies extensively on method chaining. Its goal is to increase code legibility. The term was coined in 2005 by Eric Evans and Martin Fowler. - -### Sample-Console Usage (Full features) - -```csharp -using var loggerFactory = LoggerFactory.Create(builder => -{ - builder - .SetMinimumLevel(LogLevel.Information) - .AddFilter("Microsoft", LogLevel.Warning) - .AddFilter("System", LogLevel.Warning) - .AddConsole(); -}); -logger = loggerFactory.CreateLogger(); -``` - -```csharp -var rb = RingBuffer.New("MyBuffer", cts.Token) - .Capacity(8) - .Logger(logger!) - .OnError((log, error) => - { - log?.LogError("{error}",error); - }) - .Factory((cts) => { return rnd.Next(1, 10); }) - .SwithToScaleDefinitions() - .SampleUnit(TimeSpan.FromSeconds(10), 10) - .ReportScale((mode, log, metric, _) => - { - log.LogInformation($"{connectionRingBuffer!.Name} Report: [{metric.MetricDate}] Trigger {metric.Trigger} : {mode} from {metric.FromCapacity} to {metric.ToCapacity} ({metric.Capacity}/{metric.MinCapacity}/{metric.MaxCapacity}) : {metric.FreeResource}"); - }) - .MinCapacity(4) - // Defaut = Max (Min = 1, Max = Capacity) - .ScaleWhenFreeGreaterEq() - // Defaut = Min (Min = 1, Max = MinCapacity) - .RollbackWhenFreeLessEq() - // Defaut = Max-1 (Min = 1, Max = MinCapacity) - //.TriggerByAccqWhenFreeLessEq() - .MaxCapacity(20) - // Default = Min (Min = 1, Max = Capacity) - .ScaleWhenFreeLessEq() - // Default = Min (Min = MaxCapacity-Capacity, Max = MaxCapacity) - .RollbackWhenFreeGreaterEq() - // Default = Min (Min = MaxCapacity-Capacity, Max = MaxCapacity) - //.TriggerByAccqWhenFreeGreaterEq() - .BuildWarmup(out var completed); -``` - -```csharp -using (var buffer = rb.Accquire()) -{ - if (bufferedItem.Successful) - { - try - { - //do something - } - catch - { - buffer.Invalidate(); - } - } -} -``` - - -### Sample-api/webUsage - -```csharp -builder.Services.AddRingBuffer("Mybuffer",(ringbuf, _) => -{ - return ringbuf - .Capacity(8) - .Factory((cts) => { return 10; }) - .AccquireTimeout(TimeSpan.FromMilliseconds(1500)) - .OnError((log, error) => - { - log?.LogError("{error}",error); - }) - .Build(); -}); - -... -//If you do not use the 'Warmup Ring Buffer' command, the first access to acquire the buffer will be Warmup (not recommended) -app.WarmupRingBuffer("Mybuffer"); -``` - -```csharp -[ApiController] -[Route("[controller]")] -public class MyController(IRingBufferService ringBufferService) : ControllerBase -{ - private readonly IRingBufferService _ringBufferService = ringBufferService; - - [HttpGet] - public ActionResult Get() - { - using (var buffer = _ringBufferService.Accquire()) - { - if (bufferedItem.Successful) - { - try - { - //do something - } - catch - { - buffer.Invalidate(); - } - } - } - } -} -``` - -## Performance - -The BenchmarkDotNet test was done on the local machine, with **'RabbitMQ' (over wsl)**. The measures are **about publisher** action (Scenario where Ringbuffer makes sense and brings significant performance gains). - -**The gain can be much greater for real machines in production!** - -See folder [**Samples/RingBufferPlusBenchmarkSample**](https://github.com/FRACerqueira/RingBufferPlus/tree/main/Samples/RingBufferPlusBenchmarkSample). - -``` -BenchmarkDotNet v0.13.10, Windows 10 (10.0.19044.3693/21H2/November2021Update) -Intel Core i7-8565U CPU 1.80GHz (Whiskey Lake), 1 CPU, 8 logical and 4 physical cores -.NET SDK 8.0.100 - [Host] : .NET 8.0.0 (8.0.23.53103), X64 RyuJIT AVX2 - Job-IMTEVT : .NET 8.0.0 (8.0.23.53103), X64 RyuJIT AVX2 - Dry : .NET 8.0.0 (8.0.23.53103), X64 RyuJIT AVX2 - -| Method | Mean | StdErr | StdDev | Min | Q1 | Median | Q3 | Max | Op/s | Rank | -|------------------ |------------:|----------:|------------:|------------:|------------:|------------:|------------:|------------:|-------:|-----:| -| WithRingBuffer | 589.8 ms | 16.90 ms | 169.01 ms | 191.2 ms | 487.2 ms | 555.4 ms | 659.0 ms | 1,324.8 ms | 1.6955 | 1 | -| WithoutRingBuffer | 15,441.5 ms | 154.39 ms | 1,543.90 ms | 13,785.4 ms | 14,562.8 ms | 15,071.1 ms | 15,981.9 ms | 24,595.2 ms | 0.0648 | 2 | -``` - -## Credits - -This work was inspired by the project by [**Luis Carlos Farias**](https://github.com/luizcarlosfaria/Oragon.Common.RingBuffer). -My thanks for your great work of bringing knowledge to the community! - -**API documentation generated by** - -- [xmldoc2md](https://github.com/FRACerqueira/xmldoc2md), Copyright (c) 2022 Charles de Vandière. - ## License Copyright 2022 @ Fernando Cerqueira diff --git a/README.md b/README.md index 25d2d8d..1237e7a 100644 --- a/README.md +++ b/README.md @@ -25,10 +25,22 @@ - [API Reference](https://fracerqueira.github.io/RingBufferPlus/apis/apis.html) ## What's new in the latest version -### V2.0.0 + +### V3.0.0 [**Top**](#table-of-contents) -- Release G.A with .NET8 +- Added command 'FactoryHealth' + - Check health item before accquire buffer. +- Renamed Method 'SwithToScaleDefinitions' to 'MasterScale' +- Added master-slave feature(2 Ring Buffer with synchronization) + - Added command set 'SlaveScale' to set report handler, Minimum and maximum capacity +- Added 'MasterSlave' enum item in SourceTrigger +- Added 'None' enum item in ScaleMode +- Revised to have greater performance without 'lock' +- Removed Method 'Counters' + - data was not relevant and inaccurate +- Revised 'RingBufferMetric' + - Now only propreties 'Trigger', 'FromCapacity', 'ToCapacity' and 'MetricDate' ## Features @@ -49,17 +61,22 @@ The implementation follows the basic principle. The principle was expanded to ha ### Key Features [**Top**](#table-of-contents) +- Conscious use of resources - Set unique name for same buffer type - Set the buffer capacity +- Set buffer integrity (validate if the buffer is valid) + - Verified with each acquiring - Set the minimum and maximum capacity (optional) - Set the conditions for scaling to maximum and minimum (required) - Automatic condition values ​​based on capacity (value not required) - - Define a user role to receive capacity change events to log/save (optional) - - Executed in a separate thread asynchronously +- Set master-slave (2 Ring Buffer with synchronization) + - Master controls slave scale +- Event with scale change information + - Executed in a separate thread asynchronously - Associate the logger interface (optional) - Define a user role for generated errors (optional) - Executed in a separate thread asynchronously -- Invalidate the buffer when it is in an invalid state +- Command to Invalidate the buffer when it is in an invalid state - Warm up to full capacity before starting application - Receive item from buffer with success/failure information and elapsed time for acquisition - Sets a time limit for acquiring the item in the buffer @@ -93,7 +110,7 @@ dotnet run --project [name of sample] The **RingBufferPlus** use **fluent interface**; an object-oriented API whose design relies extensively on method chaining. Its goal is to increase code legibility. The term was coined in 2005 by Eric Evans and Martin Fowler. -### Sample-Console Usage (Full features) +### Sample-Console Usage (Minimal features with auto-scale) ```csharp using var loggerFactory = LoggerFactory.Create(builder => @@ -108,34 +125,18 @@ logger = loggerFactory.CreateLogger(); ``` ```csharp +Random rnd = new(); var rb = RingBuffer.New("MyBuffer", cts.Token) .Capacity(8) - .Logger(logger!) - .OnError((log, error) => - { - log?.LogError("{error}",error); - }) .Factory((cts) => { return rnd.Next(1, 10); }) - .SwithToScaleDefinitions() + .MasterScale() .SampleUnit(TimeSpan.FromSeconds(10), 10) - .ReportScale((mode, log, metric, _) => - { - log.LogInformation($"{connectionRingBuffer!.Name} Report: [{metric.MetricDate}] Trigger {metric.Trigger} : {mode} from {metric.FromCapacity} to {metric.ToCapacity} ({metric.Capacity}/{metric.MinCapacity}/{metric.MaxCapacity}) : {metric.FreeResource}"); - }) .MinCapacity(4) - // Defaut = Max (Min = 1, Max = Capacity) .ScaleWhenFreeGreaterEq() - // Defaut = Min (Min = 1, Max = MinCapacity) .RollbackWhenFreeLessEq() - // Defaut = Max-1 (Min = 1, Max = MinCapacity) - //.TriggerByAccqWhenFreeLessEq() .MaxCapacity(20) - // Default = Min (Min = 1, Max = Capacity) .ScaleWhenFreeLessEq() - // Default = Min (Min = MaxCapacity-Capacity, Max = MaxCapacity) .RollbackWhenFreeGreaterEq() - // Default = Min (Min = MaxCapacity-Capacity, Max = MaxCapacity) - //.TriggerByAccqWhenFreeGreaterEq() .BuildWarmup(out var completed); ``` @@ -157,7 +158,7 @@ using (var buffer = rb.Accquire()) ``` -### Sample-api/webUsage +### Sample-api/web Usage (Minimal features without auto-scale) [**Top**](#table-of-contents) ```csharp @@ -166,11 +167,6 @@ builder.Services.AddRingBuffer("Mybuffer",(ringbuf, _) => return ringbuf .Capacity(8) .Factory((cts) => { return 10; }) - .AccquireTimeout(TimeSpan.FromMilliseconds(1500)) - .OnError((log, error) => - { - log?.LogError("{error}",error); - }) .Build(); }); @@ -207,6 +203,47 @@ public class MyController(IRingBufferService ringBufferService) : Controlle } ``` +### Sample-Console Master-Slave feature using RabbitMq (basic usage) +[**Top**](#table-of-contents) + +For more details see [**Complete-Samples**](https://github.com/FRACerqueira/RingBufferPlus/tree/main/samples/RingBufferPlusBenchmarkSample). + +```csharp +connectionRingBuffer = RingBuffer.New("RabbitCnn") + .Capacity(2) + .Logger(applogger!) + .AccquireTimeout(TimeSpan.FromMilliseconds(500)) + .OnError((log, error) => + { + log?.LogError("{error}", error); + }) + .Factory((cts) => ConnectionFactory.CreateConnection()) + .FactoryHealth((item) => item.IsOpen) + .SlaveScale() + .MaxCapacity(10) + .MinCapacity(1) + .BuildWarmup(out completedCnn); + +modelRingBuffer = RingBuffer.New("RabbitChanels") + .Capacity(10) + .Logger(applogger!) + .OnError((log, error) => + { + log?.LogError("{error}", error); + }) + .Factory((cts) => ModelFactory(cts)) + .FactoryHealth((item) => item.IsOpen) + .MasterScale(connectionRingBuffer) + .SampleUnit(TimeSpan.FromSeconds(10), 10) + .MaxCapacity(50) + .ScaleWhenFreeLessEq() + .RollbackWhenFreeGreaterEq() + .MinCapacity(2) + .ScaleWhenFreeGreaterEq() + .RollbackWhenFreeLessEq() + .BuildWarmup(out completedChanels); +``` + ## Performance [**Top**](#table-of-contents) @@ -223,11 +260,14 @@ Intel Core i7-8565U CPU 1.80GHz (Whiskey Lake), 1 CPU, 8 logical and 4 physical [Host] : .NET 8.0.0 (8.0.23.53103), X64 RyuJIT AVX2 Job-IMTEVT : .NET 8.0.0 (8.0.23.53103), X64 RyuJIT AVX2 Dry : .NET 8.0.0 (8.0.23.53103), X64 RyuJIT AVX2 - -| Method | Mean | StdErr | StdDev | Min | Q1 | Median | Q3 | Max | Op/s | Rank | -|------------------ |------------:|----------:|------------:|------------:|------------:|------------:|------------:|------------:|-------:|-----:| -| WithRingBuffer | 589.8 ms | 16.90 ms | 169.01 ms | 191.2 ms | 487.2 ms | 555.4 ms | 659.0 ms | 1,324.8 ms | 1.6955 | 1 | -| WithoutRingBuffer | 15,441.5 ms | 154.39 ms | 1,543.90 ms | 13,785.4 ms | 14,562.8 ms | 15,071.1 ms | 15,981.9 ms | 24,595.2 ms | 0.0648 | 2 | ++------------------ +-------:+-----:+------------:+----------:+------------:+-------------:+------------:+------------:+------------:+------------+| +| Method | Op/s | Rank | Mean | StdErr | StdDev | Min | Q1 | Median | Q3 | Max | +|------------------ |-------:|-----:|------------:|----------:|------------:|-------------:|------------:|------------:|------------:|------------:| +| WithRingBuffer | 6.8218 | 1 | 146.6 ms | 0.00 ms | 0.00 ms | 146.59 ms | 146.6 ms | 146.6 ms | 146.6 ms | 146.6 ms | +| WithRingBuffer | 1.9411 | 2 | 515.2 ms | 101.72 ms | 1,017.24 ms | 72.76 ms | 300.1 ms | 439.2 ms | 508.2 ms | 10,426.5 ms | +| WithoutRingBuffer | 0.0676 | 3 | 14,797.6 ms | 133.34 ms | 1,333.36 ms | 13,306.95 ms | 14,061.5 ms | 14,497.3 ms | 15,098.1 ms | 21,286.4 ms | +| WithoutRingBuffer | 0.0662 | 4 | 15,109.9 ms | 0.00 ms | 0.00 ms | 15,109.93 ms | 15,109.9 ms | 15,109.9 ms | 15,109.9 ms |115,109.9 ms | ++------------------ +-------:+-----:+------------:+----------:+------------:+-------------:+------------:+------------:+------------:+------------+| ``` ## Code of Conduct diff --git a/README.txt b/README.txt index 07239f9..f08c390 100644 --- a/README.txt +++ b/README.txt @@ -11,23 +11,29 @@ ======================================================================================== Welcome to RingBufferPlus -------------------------- +========================= -RingBufferPlus A generic circular buffer (ring buffer) in C# with Auto-Scaler, and Report-Metrics. +RingBufferPlus A generic circular buffer (ring buffer) in C# with Auto-Scaler. Features --------- +======== + +- Conscious use of resources - Set unique name for same buffer type - Set the buffer capacity +- Set buffer integrity (validate if the buffer is valid) + - Verified with each acquiring - Set the minimum and maximum capacity (optional) - Set the conditions for scaling to maximum and minimum (required) - Automatic condition values ​​based on capacity (value not required) - - Define a user role to receive capacity change events to log/save (optional) - - Executed in a separate thread asynchronously +- Set master-slave (2 Ring Buffer with synchronization) + - Master controls slave scale +- Event with scale change information + - Executed in a separate thread asynchronously - Associate the logger interface (optional) - Define a user role for generated errors (optional) - Executed in a separate thread asynchronously -- Invalidate the buffer when it is in an invalid state +- Command to Invalidate the buffer when it is in an invalid state - Warm up to full capacity before starting application - Receive item from buffer with success/failure information and elapsed time for acquisition - Sets a time limit for acquiring the item in the buffer @@ -44,22 +50,33 @@ PipeAndFilter was developed in C# with target frameworks: - .NET 7 - .NET 8 -*** What's new in V2.0.0 *** ----------------------------- - -- Release G.A with .NET8 +*** What's new in V3.0.0 *** +============================ + +- Added command 'FactoryHealth' + - Check health item before accquire buffer. +- Renamed Method 'SwithToScaleDefinitions' to 'MasterScale' +- Added master-slave feature(2 Ring Buffer with synchronization) + - Added command set 'SlaveScale' to set report handler, Minimum and maximum capacity +- Added 'MasterSlave' enum item in SourceTrigger +- Added 'None' enum item in ScaleMode +- Revised to have greater performance without 'lock' +- Removed Method 'Counters' + - data was not relevant and inaccurate +- Revised 'RingBufferMetric' + - Now only propreties 'Trigger', 'FromCapacity', 'ToCapacity' and 'MetricDate' **Examples** ------------- +============ + See folder: https://github.com/FRACerqueira/RingBufferPlus/tree/main/samples **Usage** ---------- - -Sample-Console Usage (Full features) -==================================== +========= +Sample-Console Usage (Minimal features with auto-scale) +------------------------------------------------------- using var loggerFactory = LoggerFactory.Create(builder => { builder @@ -70,36 +87,20 @@ using var loggerFactory = LoggerFactory.Create(builder => }); logger = loggerFactory.CreateLogger(); -... +... +Random rnd = new(); var rb = RingBuffer.New("MyBuffer", cts.Token) .Capacity(8) - .Logger(logger!) .Factory((cts) => { return rnd.Next(1, 10); }) - .OnError((log, error) => - { - log?.LogError("{error}",error); - }) - .SwithToScaleDefinitions() + .MasterScale() .SampleUnit(TimeSpan.FromSeconds(10), 10) - .ReportScale((mode, log, metric, _) => - { - log.LogInformation($"{connectionRingBuffer!.Name} Report: [{metric.MetricDate}] Trigger {metric.Trigger} : {mode} from {metric.FromCapacity} to {metric.ToCapacity} ({metric.Capacity}/{metric.MinCapacity}/{metric.MaxCapacity}) : {metric.FreeResource}"); - }) .MinCapacity(4) - // Defaut = Max (Min = 1, Max = Capacity) .ScaleWhenFreeGreaterEq() - // Defaut = Min (Min = 1, Max = MinCapacity) .RollbackWhenFreeLessEq() - // Defaut = Max-1 (Min = 1, Max = MinCapacity) - //.TriggerByAccqWhenFreeLessEq() .MaxCapacity(20) - // Default = Min (Min = 1, Max = Capacity) .ScaleWhenFreeLessEq() - // Default = Min (Min = MaxCapacity-Capacity, Max = MaxCapacity) .RollbackWhenFreeGreaterEq() - // Default = Min (Min = MaxCapacity-Capacity, Max = MaxCapacity) - //.TriggerByAccqWhenFreeGreaterEq() .BuildWarmup(out var completed); ... @@ -119,19 +120,14 @@ using (var buffer = rb.Accquire()) } } -Sample-Sample-api/webUsage -========================== +Sample-api/web Usage (Minimal features without auto-scale) +---------------------------------------------------------- builder.Services.AddRingBuffer("Mybuffer",(ringbuf, _) => { return ringbuf .Capacity(8) .Factory((cts) => { return 10; }) - .AccquireTimeout(TimeSpan.FromMilliseconds(1500)) - .OnError((log, error) => - { - log?.LogError("{error}",error); - }) .Build(); }); @@ -168,8 +164,47 @@ public class MyController(IRingBufferService ringBufferService) : Controlle } } +Sample-Console Master-Slave feature using RabbitMq (basic usage) +---------------------------------------------------------------- + +For more details see https://github.com/FRACerqueira/RingBufferPlus/tree/main/samples/RingBufferPlusBenchmarkSample. + +connectionRingBuffer = RingBuffer.New("RabbitCnn") + .Capacity(2) + .Logger(applogger!) + .AccquireTimeout(TimeSpan.FromMilliseconds(500)) + .OnError((log, error) => + { + log?.LogError("{error}", error); + }) + .Factory((cts) => ConnectionFactory.CreateConnection()) + .FactoryHealth((item) => item.IsOpen) + .SlaveScale() + .MaxCapacity(10) + .MinCapacity(1) + .BuildWarmup(out completedCnn); + +modelRingBuffer = RingBuffer.New("RabbitChanels") + .Capacity(10) + .Logger(applogger!) + .OnError((log, error) => + { + log?.LogError("{error}", error); + }) + .Factory((cts) => ModelFactory(cts)) + .FactoryHealth((item) => item.IsOpen) + .MasterScale(connectionRingBuffer) + .SampleUnit(TimeSpan.FromSeconds(10), 10) + .MaxCapacity(50) + .ScaleWhenFreeLessEq() + .RollbackWhenFreeGreaterEq() + .MinCapacity(2) + .ScaleWhenFreeGreaterEq() + .RollbackWhenFreeLessEq() + .BuildWarmup(out completedChanels); + **License** ------------ +=========== Copyright 2022 @ Fernando Cerqueira RingBufferPlus project is licensed under the the MIT license. diff --git a/docs/apis/apis.md b/docs/apis/apis.md index a875a79..7ee7e18 100644 --- a/docs/apis/apis.md +++ b/docs/apis/apis.md @@ -19,12 +19,16 @@ - [IRingBufferScaleCapacity<T>](./ringbufferplus.iringbufferscalecapacity-1.md) +- [IRingBufferScaleFromCapacity<T>](./ringbufferplus.iringbufferscalefromcapacity-1.md) + - [IRingBufferScaleMax<T>](./ringbufferplus.iringbufferscalemax-1.md) - [IRingBufferScaleMin<T>](./ringbufferplus.iringbufferscalemin-1.md) - [IRingBufferService<T>](./ringbufferplus.iringbufferservice-1.md) +- [IRingBufferSwith](./ringbufferplus.iringbufferswith.md) + - [RingBuffer<T>](./ringbufferplus.ringbuffer-1.md) - [RingBufferException](./ringbufferplus.ringbufferexception.md) diff --git a/docs/apis/ringbufferplus.iringbuffer-1.md b/docs/apis/ringbufferplus.iringbuffer-1.md index 7bfa70e..2adb0d8 100644 --- a/docs/apis/ringbufferplus.iringbuffer-1.md +++ b/docs/apis/ringbufferplus.iringbuffer-1.md @@ -114,6 +114,23 @@ The delay time for retrying when a build fails. Default value is 5 seconds. [IRingBuffer<T>](./ringbufferplus.iringbuffer-1.md). +### **FactoryHealth(Func<T, Boolean>)** + +Health before accquire buffer. + +```csharp +IRingBuffer FactoryHealth(Func value) +``` + +#### Parameters + +`value` Func<T, Boolean>
+The handler to factory Health. + +#### Returns + +[IRingBuffer<T>](./ringbufferplus.iringbuffer-1.md). + ###
**Logger(ILogger)** The Logger @@ -132,6 +149,23 @@ IRingBuffer Logger(ILogger value) [IRingBuffer<T>](./ringbufferplus.iringbuffer-1.md). +### **MasterScale(IRingBufferSwith)** + +Swith to scale definitions commands (self) or other ring buffer. + +```csharp +IRingBufferScaleCapacity MasterScale(IRingBufferSwith ringBuffer) +``` + +#### Parameters + +`ringBuffer` [IRingBufferSwith](./ringbufferplus.iringbufferswith.md)
+The slave Ring buffer. + +#### Returns + +[IRingBufferScaleCapacity<T>](./ringbufferplus.iringbufferscalecapacity-1.md). + ###
**OnError(Action<ILogger, RingBufferException>)** Extension point to log a error. @@ -150,12 +184,12 @@ he handler to log error. [IRingBuffer<T>](./ringbufferplus.iringbuffer-1.md). -### **SwithToScaleDefinitions()** +### **SlaveScale()** -Swith to scale definitions commands. +Swith to scale definitions from other ring buffer. ```csharp -IRingBufferScaleCapacity SwithToScaleDefinitions() +IRingBufferScaleFromCapacity SlaveScale() ``` #### Returns diff --git a/docs/apis/ringbufferplus.iringbufferscalecapacity-1.md b/docs/apis/ringbufferplus.iringbufferscalecapacity-1.md index afd6a34..257c0a7 100644 --- a/docs/apis/ringbufferplus.iringbufferscalecapacity-1.md +++ b/docs/apis/ringbufferplus.iringbufferscalecapacity-1.md @@ -67,7 +67,7 @@ IRingBufferScaleMax MaxCapacity(int value) #### Parameters `value` [Int32](https://docs.microsoft.com/en-us/dotnet/api/system.int32)
-The maximum buffer. +The maximum buffer.Value mus be greater or equal [IRingBuffer<T>.Capacity(Int32)](./ringbufferplus.iringbuffer-1.md#capacityint32) #### Returns @@ -84,7 +84,7 @@ IRingBufferScaleMin MinCapacity(int value) #### Parameters `value` [Int32](https://docs.microsoft.com/en-us/dotnet/api/system.int32)
-The minimal buffer. +The minimal buffer. Value mus be greater or equal 1 #### Returns diff --git a/docs/apis/ringbufferplus.iringbufferscalefromcapacity-1.md b/docs/apis/ringbufferplus.iringbufferscalefromcapacity-1.md new file mode 100644 index 0000000..9dced66 --- /dev/null +++ b/docs/apis/ringbufferplus.iringbufferscalefromcapacity-1.md @@ -0,0 +1,113 @@ +# RingBufferPlus API:IRingBufferScaleFromCapacity + +[![Build](https://github.com/FRACerqueira/RingBufferPlus/workflows/Build/badge.svg)](https://github.com/FRACerqueira/RingBufferPlus/actions/workflows/build.yml) +[![License](https://img.shields.io/badge/License-MIT-brightgreen.svg)](https://github.com/FRACerqueira/RingBufferPlus/blob/master/LICENSE) +[![NuGet](https://img.shields.io/nuget/v/RingBufferPlus)](https://www.nuget.org/packages/RingBufferPlus/) +[![Downloads](https://img.shields.io/nuget/dt/RingBufferPlus)](https://www.nuget.org/packages/RingBufferPlus/) + +[**Back to List Api**](./apis.md) + +# IRingBufferScaleFromCapacity<T> + +Namespace: RingBufferPlus + +Represents the scale capacity commands to RingBufferPlus. + +```csharp +public interface IRingBufferScaleFromCapacity +``` + +#### Type Parameters + +`T`
+Type of buffer. + +## Methods + +###
**Build()** + +Validate and generate RingBufferPlus to service mode. + +```csharp +IRingBufferService Build() +``` + +#### Returns + +[IRingBufferService<T>](./ringbufferplus.iringbufferservice-1.md). + +### **BuildWarmup(ref Boolean, Nullable<TimeSpan>)** + +Validate and generate RingBufferPlus and warmup with full capacity ready or reaching timeout (default 30 seconds). + +```csharp +IRingBufferService BuildWarmup(ref Boolean fullcapacity, Nullable timeout) +``` + +#### Parameters + +`fullcapacity` [Boolean&](https://docs.microsoft.com/en-us/dotnet/api/system.boolean&)
+True if Warmup has full capacity, otherwise false. + +`timeout` [Nullable<TimeSpan>](https://docs.microsoft.com/en-us/dotnet/api/system.nullable-1)
+The Timeout to Warmup has full capacity. Default value is 30 seconds. + +#### Returns + +[IRingBufferService<T>](./ringbufferplus.iringbufferservice-1.md). + +###
**MaxCapacity(Int32)** + +Maximum capacity. + +```csharp +IRingBufferScaleFromCapacity MaxCapacity(int value) +``` + +#### Parameters + +`value` [Int32](https://docs.microsoft.com/en-us/dotnet/api/system.int32)
+The maximum buffer.Value mus be greater or equal [IRingBuffer<T>.Capacity(Int32)](./ringbufferplus.iringbuffer-1.md#capacityint32) + +#### Returns + +[IRingBufferScaleCapacity<T>](./ringbufferplus.iringbufferscalecapacity-1.md). + +###
**MinCapacity(Int32)** + +Minimum capacity. + +```csharp +IRingBufferScaleFromCapacity MinCapacity(int value) +``` + +#### Parameters + +`value` [Int32](https://docs.microsoft.com/en-us/dotnet/api/system.int32)
+The minimal buffer. Value mus be greater or equal 1 + +#### Returns + +[IRingBufferScaleCapacity<T>](./ringbufferplus.iringbufferscalecapacity-1.md). + +###
**ReportScale(Action<ScaleMode, ILogger, RingBufferMetric, CancellationToken>)** + +Extension point when capacity was changed. +
Executes asynchronously. + +```csharp +IRingBufferScaleFromCapacity ReportScale(Action report) +``` + +#### Parameters + +`report` [Action<ScaleMode, ILogger, RingBufferMetric, CancellationToken>](https://docs.microsoft.com/en-us/dotnet/api/system.action-4)
+The handler to action. + +#### Returns + +[IRingBufferScaleCapacity<T>](./ringbufferplus.iringbufferscalecapacity-1.md). + + +- - - +[**Back to List Api**](./apis.md) \ No newline at end of file diff --git a/docs/apis/ringbufferplus.iringbufferscalemax-1.md b/docs/apis/ringbufferplus.iringbufferscalemax-1.md index 71779d3..8078990 100644 --- a/docs/apis/ringbufferplus.iringbufferscalemax-1.md +++ b/docs/apis/ringbufferplus.iringbufferscalemax-1.md @@ -105,7 +105,7 @@ IRingBufferScaleMax ScaleWhenFreeLessEq(Nullable value) `value` [Nullable<Int32>](https://docs.microsoft.com/en-us/dotnet/api/system.nullable-1)
Number to trigger. -
The free resource collected must be less than or equal to value.
Default = Min (Min = 1, Max = Capacity). +
The free resource collected must be less than or equal to value.
Default = Min (Min = 2, Max = Capacity). #### Returns diff --git a/docs/apis/ringbufferplus.iringbufferscalemin-1.md b/docs/apis/ringbufferplus.iringbufferscalemin-1.md index c66461e..c3c88b8 100644 --- a/docs/apis/ringbufferplus.iringbufferscalemin-1.md +++ b/docs/apis/ringbufferplus.iringbufferscalemin-1.md @@ -105,7 +105,7 @@ IRingBufferScaleMin ScaleWhenFreeGreaterEq(Nullable value) `value` [Nullable<Int32>](https://docs.microsoft.com/en-us/dotnet/api/system.nullable-1)
Number to free resource. -
Defaut = Max. (Min = 1, Max = Capacity). +
Defaut = Max. (Min = 2, Max = Capacity). #### Returns @@ -124,7 +124,7 @@ IRingBufferScaleMin TriggerByAccqWhenFreeLessEq(Nullable value) `value` [Nullable<Int32>](https://docs.microsoft.com/en-us/dotnet/api/system.nullable-1)
Number to trigger. -
Defaut = Max-1 (Min = 1, Max = MinCapacity). +
Defaut = Max-1 (Min = 2, Max = MinCapacity). #### Returns diff --git a/docs/apis/ringbufferplus.iringbufferservice-1.md b/docs/apis/ringbufferplus.iringbufferservice-1.md index ec6ca9d..7df3672 100644 --- a/docs/apis/ringbufferplus.iringbufferservice-1.md +++ b/docs/apis/ringbufferplus.iringbufferservice-1.md @@ -14,14 +14,14 @@ Namespace: RingBufferPlus Represents the commands to RingBufferPlus service. ```csharp -public interface IRingBufferService : System.IDisposable +public interface IRingBufferService : IRingBufferSwith, System.IDisposable ``` #### Type Parameters `T`
-Implements [IDisposable](https://docs.microsoft.com/en-us/dotnet/api/system.idisposable) +Implements [IRingBufferSwith](./ringbufferplus.iringbufferswith.md), [IDisposable](https://docs.microsoft.com/en-us/dotnet/api/system.idisposable) ## Properties @@ -243,18 +243,6 @@ The [CancellationToken](https://docs.microsoft.com/en-us/dotnet/api/system.threa [RingBufferValue<T>](./ringbufferplus.ringbuffervalue-1.md). -###
**Counters(Action<Int32, Int32, Int32>)** - -Action to read read counters (available/unavailable/for creation) - -```csharp -void Counters(Action counters) -``` - -#### Parameters - -`counters` [Action<Int32, Int32, Int32>](https://docs.microsoft.com/en-us/dotnet/api/system.action-3)
- - - - [**Back to List Api**](./apis.md) \ No newline at end of file diff --git a/docs/apis/ringbufferplus.iringbufferswith.md b/docs/apis/ringbufferplus.iringbufferswith.md new file mode 100644 index 0000000..b954709 --- /dev/null +++ b/docs/apis/ringbufferplus.iringbufferswith.md @@ -0,0 +1,40 @@ +# RingBufferPlus API:IRingBufferSwith + +[![Build](https://github.com/FRACerqueira/RingBufferPlus/workflows/Build/badge.svg)](https://github.com/FRACerqueira/RingBufferPlus/actions/workflows/build.yml) +[![License](https://img.shields.io/badge/License-MIT-brightgreen.svg)](https://github.com/FRACerqueira/RingBufferPlus/blob/master/LICENSE) +[![NuGet](https://img.shields.io/nuget/v/RingBufferPlus)](https://www.nuget.org/packages/RingBufferPlus/) +[![Downloads](https://img.shields.io/nuget/dt/RingBufferPlus)](https://www.nuget.org/packages/RingBufferPlus/) + +[**Back to List Api**](./apis.md) + +# IRingBufferSwith + +Namespace: RingBufferPlus + +Represents the commands to RingBufferPlus service. + +```csharp +public interface IRingBufferSwith +``` + +## Methods + +###
**SwithTo(ScaleMode)** + +Swith to new capacity + +```csharp +bool SwithTo(ScaleMode scaleMode) +``` + +#### Parameters + +`scaleMode` [ScaleMode](./ringbufferplus.scalemode.md)
+ +#### Returns + +True if scale changed, otherwise false + + +- - - +[**Back to List Api**](./apis.md) \ No newline at end of file diff --git a/docs/apis/ringbufferplus.ringbuffermetric.md b/docs/apis/ringbufferplus.ringbuffermetric.md index 5c55356..b8724b2 100644 --- a/docs/apis/ringbufferplus.ringbuffermetric.md +++ b/docs/apis/ringbufferplus.ringbuffermetric.md @@ -21,30 +21,6 @@ Inheritance [Object](https://docs.microsoft.com/en-us/dotnet/api/system.object) ## Properties -###
**Capacity** - -Default capacity of ring buffer. - -```csharp -public int Capacity { get; } -``` - -#### Property Value - -[Int32](https://docs.microsoft.com/en-us/dotnet/api/system.int32)
- -###
**FreeResource** - -Free resource capacity of ring buffer. - -```csharp -public int FreeResource { get; } -``` - -#### Property Value - -[Int32](https://docs.microsoft.com/en-us/dotnet/api/system.int32)
- ###
**FromCapacity** Current capacity. @@ -57,18 +33,6 @@ public int FromCapacity { get; } [Int32](https://docs.microsoft.com/en-us/dotnet/api/system.int32)
-###
**MaxCapacity** - -Maximum capacity of ring buffer. - -```csharp -public int MaxCapacity { get; } -``` - -#### Property Value - -[Int32](https://docs.microsoft.com/en-us/dotnet/api/system.int32)
- ###
**MetricDate** Date of metric . @@ -81,18 +45,6 @@ public DateTime MetricDate { get; } [DateTime](https://docs.microsoft.com/en-us/dotnet/api/system.datetime)
-###
**MinCapacity** - -Minimum capacity of ring buffer. - -```csharp -public int MinCapacity { get; } -``` - -#### Property Value - -[Int32](https://docs.microsoft.com/en-us/dotnet/api/system.int32)
- ###
**ToCapacity** New capacity trigger. @@ -127,12 +79,12 @@ Create empty Metric of RingBufferPlus. RingBufferMetric() ``` -### **RingBufferMetric(SourceTrigger, Int32, Int32, Int32, Int32, Int32, Int32, DateTime)** +### **RingBufferMetric(SourceTrigger, Int32, Int32)** Create Metric of RingBufferPlus. ```csharp -RingBufferMetric(SourceTrigger source, int fromcapacity, int tocapacity, int capacity, int mincapacity, int maxcapacity, int freeresource, DateTime dateref) +RingBufferMetric(SourceTrigger source, int fromcapacity, int tocapacity) ``` #### Parameters @@ -146,21 +98,6 @@ Current capacity. `tocapacity` [Int32](https://docs.microsoft.com/en-us/dotnet/api/system.int32)
New capacity trigger. -`capacity` [Int32](https://docs.microsoft.com/en-us/dotnet/api/system.int32)
-Default capacity. - -`mincapacity` [Int32](https://docs.microsoft.com/en-us/dotnet/api/system.int32)
-Minimum capacity. - -`maxcapacity` [Int32](https://docs.microsoft.com/en-us/dotnet/api/system.int32)
-Maximum capacity. - -`freeresource` [Int32](https://docs.microsoft.com/en-us/dotnet/api/system.int32)
-Free resource value trigger. - -`dateref` [DateTime](https://docs.microsoft.com/en-us/dotnet/api/system.datetime)
-Date of metric. - - - - [**Back to List Api**](./apis.md) \ No newline at end of file diff --git a/docs/apis/ringbufferplus.ringbuffervalue-1.md b/docs/apis/ringbufferplus.ringbuffervalue-1.md index f7e5069..75b8743 100644 --- a/docs/apis/ringbufferplus.ringbuffervalue-1.md +++ b/docs/apis/ringbufferplus.ringbuffervalue-1.md @@ -77,18 +77,6 @@ public bool Successful { get; } ## Constructors -###
**RingBufferValue(Int32)** - -Create empty RingBufferValue. - -```csharp -public RingBufferValue(int diffCapacity) -``` - -#### Parameters - -`diffCapacity` [Int32](https://docs.microsoft.com/en-us/dotnet/api/system.int32)
- ###
**RingBufferValue(String, TimeSpan, Boolean, T, Action<RingBufferValue<T>>)** Create RingBufferValue. diff --git a/docs/apis/ringbufferplus.scalemode.md b/docs/apis/ringbufferplus.scalemode.md index 348ccdc..5eabfaf 100644 --- a/docs/apis/ringbufferplus.scalemode.md +++ b/docs/apis/ringbufferplus.scalemode.md @@ -24,9 +24,10 @@ Implements [IComparable](https://docs.microsoft.com/en-us/dotnet/api/system.icom | Name | Value | Description | | --- | --: | --- | -| ToMinCapacity | 0 | Scale to minimal capacity. | -| ToMaxCapacity | 1 | Scale to maximum capacity. | -| ToDefaultCapacity | 2 | Scale to default capacity. | +| None | 0 | Current Scale | +| ToMinCapacity | 1 | Scale to minimal capacity. | +| ToMaxCapacity | 2 | Scale to maximum capacity. | +| ToDefaultCapacity | 3 | Scale to default capacity. | - - - diff --git a/docs/apis/ringbufferplus.sourcetrigger.md b/docs/apis/ringbufferplus.sourcetrigger.md index 7977570..f4d3e96 100644 --- a/docs/apis/ringbufferplus.sourcetrigger.md +++ b/docs/apis/ringbufferplus.sourcetrigger.md @@ -26,6 +26,7 @@ Implements [IComparable](https://docs.microsoft.com/en-us/dotnet/api/system.icom | --- | --: | --- | | AutoScale | 0 | Source is scale capacity thread. | | Accquire | 1 | Source is accquire command. | +| MasterSlave | 2 | Source is Master-Slave | - - - diff --git a/docs/index.md b/docs/index.md index c9e8b62..c18c26b 100644 --- a/docs/index.md +++ b/docs/index.md @@ -21,10 +21,22 @@ - [API Reference](https://fracerqueira.github.io/RingBufferPlus/apis/apis.html) ## What's new in the latest version -### V2.0.0 + +### V3.0.0 [**Top**](#table-of-contents) -- Release G.A with .NET8 +- Added command 'FactoryHealth' + - Check health item before accquire buffer. +- Renamed Method 'SwithToScaleDefinitions' to 'MasterScale' +- Added master-slave feature(2 Ring Buffer with synchronization) + - Added command set 'SlaveScale' to set report handler, Minimum and maximum capacity +- Added 'MasterSlave' enum item in SourceTrigger +- Added 'None' enum item in ScaleMode +- Revised to have greater performance without 'lock' +- Removed Method 'Counters' + - data was not relevant and inaccurate +- Revised 'RingBufferMetric' + - Now only propreties 'Trigger', 'FromCapacity', 'ToCapacity' and 'MetricDate' ## Features @@ -45,17 +57,22 @@ The implementation follows the basic principle. The principle was expanded to ha ### Key Features [**Top**](#table-of-contents) +- Conscious use of resources - Set unique name for same buffer type - Set the buffer capacity +- Set buffer integrity (validate if the buffer is valid) + - Verified with each acquiring - Set the minimum and maximum capacity (optional) - Set the conditions for scaling to maximum and minimum (required) - Automatic condition values ​​based on capacity (value not required) - - Define a user role to receive capacity change events to log/save (optional) - - Executed in a separate thread asynchronously +- Set master-slave (2 Ring Buffer with synchronization) + - Master controls slave scale +- Event with scale change information + - Executed in a separate thread asynchronously - Associate the logger interface (optional) - Define a user role for generated errors (optional) - Executed in a separate thread asynchronously -- Invalidate the buffer when it is in an invalid state +- Command to Invalidate the buffer when it is in an invalid state - Warm up to full capacity before starting application - Receive item from buffer with success/failure information and elapsed time for acquisition - Sets a time limit for acquiring the item in the buffer @@ -89,7 +106,7 @@ dotnet run --project [name of sample] The **RingBufferPlus** use **fluent interface**; an object-oriented API whose design relies extensively on method chaining. Its goal is to increase code legibility. The term was coined in 2005 by Eric Evans and Martin Fowler. -### Sample-Console Usage (Full features) +### Sample-Console Usage (Minimal features with auto-scale) ```csharp using var loggerFactory = LoggerFactory.Create(builder => @@ -104,34 +121,18 @@ logger = loggerFactory.CreateLogger(); ``` ```csharp +Random rnd = new(); var rb = RingBuffer.New("MyBuffer", cts.Token) .Capacity(8) - .Logger(logger!) - .OnError((log, error) => - { - log?.LogError("{error}",error); - }) .Factory((cts) => { return rnd.Next(1, 10); }) - .SwithToScaleDefinitions() + .MasterScale() .SampleUnit(TimeSpan.FromSeconds(10), 10) - .ReportScale((mode, log, metric, _) => - { - log.LogInformation($"{connectionRingBuffer!.Name} Report: [{metric.MetricDate}] Trigger {metric.Trigger} : {mode} from {metric.FromCapacity} to {metric.ToCapacity} ({metric.Capacity}/{metric.MinCapacity}/{metric.MaxCapacity}) : {metric.FreeResource}"); - }) .MinCapacity(4) - // Defaut = Max (Min = 1, Max = Capacity) .ScaleWhenFreeGreaterEq() - // Defaut = Min (Min = 1, Max = MinCapacity) .RollbackWhenFreeLessEq() - // Defaut = Max-1 (Min = 1, Max = MinCapacity) - //.TriggerByAccqWhenFreeLessEq() .MaxCapacity(20) - // Default = Min (Min = 1, Max = Capacity) .ScaleWhenFreeLessEq() - // Default = Min (Min = MaxCapacity-Capacity, Max = MaxCapacity) .RollbackWhenFreeGreaterEq() - // Default = Min (Min = MaxCapacity-Capacity, Max = MaxCapacity) - //.TriggerByAccqWhenFreeGreaterEq() .BuildWarmup(out var completed); ``` @@ -153,7 +154,7 @@ using (var buffer = rb.Accquire()) ``` -### Sample-api/webUsage +### Sample-api/web Usage (Minimal features without auto-scale) [**Top**](#table-of-contents) ```csharp @@ -162,11 +163,6 @@ builder.Services.AddRingBuffer("Mybuffer",(ringbuf, _) => return ringbuf .Capacity(8) .Factory((cts) => { return 10; }) - .AccquireTimeout(TimeSpan.FromMilliseconds(1500)) - .OnError((log, error) => - { - log?.LogError("{error}",error); - }) .Build(); }); @@ -203,6 +199,47 @@ public class MyController(IRingBufferService ringBufferService) : Controlle } ``` +### Sample-Console Master-Slave feature using RabbitMq (basic usage) +[**Top**](#table-of-contents) + +For more details see [**Complete-Samples**](https://github.com/FRACerqueira/RingBufferPlus/tree/main/samples/RingBufferPlusBenchmarkSample). + +```csharp +connectionRingBuffer = RingBuffer.New("RabbitCnn") + .Capacity(2) + .Logger(applogger!) + .AccquireTimeout(TimeSpan.FromMilliseconds(500)) + .OnError((log, error) => + { + log?.LogError("{error}", error); + }) + .Factory((cts) => ConnectionFactory.CreateConnection()) + .FactoryHealth((item) => item.IsOpen) + .SlaveScale() + .MaxCapacity(10) + .MinCapacity(1) + .BuildWarmup(out completedCnn); + +modelRingBuffer = RingBuffer.New("RabbitChanels") + .Capacity(10) + .Logger(applogger!) + .OnError((log, error) => + { + log?.LogError("{error}", error); + }) + .Factory((cts) => ModelFactory(cts)) + .FactoryHealth((item) => item.IsOpen) + .MasterScale(connectionRingBuffer) + .SampleUnit(TimeSpan.FromSeconds(10), 10) + .MaxCapacity(50) + .ScaleWhenFreeLessEq() + .RollbackWhenFreeGreaterEq() + .MinCapacity(2) + .ScaleWhenFreeGreaterEq() + .RollbackWhenFreeLessEq() + .BuildWarmup(out completedChanels); +``` + ## Performance [**Top**](#table-of-contents) @@ -219,11 +256,14 @@ Intel Core i7-8565U CPU 1.80GHz (Whiskey Lake), 1 CPU, 8 logical and 4 physical [Host] : .NET 8.0.0 (8.0.23.53103), X64 RyuJIT AVX2 Job-IMTEVT : .NET 8.0.0 (8.0.23.53103), X64 RyuJIT AVX2 Dry : .NET 8.0.0 (8.0.23.53103), X64 RyuJIT AVX2 - -| Method | Mean | StdErr | StdDev | Min | Q1 | Median | Q3 | Max | Op/s | Rank | -|------------------ |------------:|----------:|------------:|------------:|------------:|------------:|------------:|------------:|-------:|-----:| -| WithRingBuffer | 589.8 ms | 16.90 ms | 169.01 ms | 191.2 ms | 487.2 ms | 555.4 ms | 659.0 ms | 1,324.8 ms | 1.6955 | 1 | -| WithoutRingBuffer | 15,441.5 ms | 154.39 ms | 1,543.90 ms | 13,785.4 ms | 14,562.8 ms | 15,071.1 ms | 15,981.9 ms | 24,595.2 ms | 0.0648 | 2 | ++------------------ +-------:+-----:+------------:+----------:+------------:+-------------:+------------:+------------:+------------:+------------+| +| Method | Op/s | Rank | Mean | StdErr | StdDev | Min | Q1 | Median | Q3 | Max | +|------------------ |-------:|-----:|------------:|----------:|------------:|-------------:|------------:|------------:|------------:|------------:| +| WithRingBuffer | 6.8218 | 1 | 146.6 ms | 0.00 ms | 0.00 ms | 146.59 ms | 146.6 ms | 146.6 ms | 146.6 ms | 146.6 ms | +| WithRingBuffer | 1.9411 | 2 | 515.2 ms | 101.72 ms | 1,017.24 ms | 72.76 ms | 300.1 ms | 439.2 ms | 508.2 ms | 10,426.5 ms | +| WithoutRingBuffer | 0.0676 | 3 | 14,797.6 ms | 133.34 ms | 1,333.36 ms | 13,306.95 ms | 14,061.5 ms | 14,497.3 ms | 15,098.1 ms | 21,286.4 ms | +| WithoutRingBuffer | 0.0662 | 4 | 15,109.9 ms | 0.00 ms | 0.00 ms | 15,109.93 ms | 15,109.9 ms | 15,109.9 ms | 15,109.9 ms |115,109.9 ms | ++------------------ +-------:+-----:+------------:+----------:+------------:+-------------:+------------:+------------:+------------:+------------+| ``` ## Credits diff --git a/docs/whatsnewprev.md b/docs/whatsnewprev.md index 8758843..1bc0d6b 100644 --- a/docs/whatsnewprev.md +++ b/docs/whatsnewprev.md @@ -4,6 +4,10 @@ [![NuGet](https://img.shields.io/nuget/v/RingBufferPlus)](https://www.nuget.org/packages/RingBufferPlus/) [![Downloads](https://img.shields.io/nuget/dt/RingBufferPlus)](https://www.nuget.org/packages/RingBufferPlus/) +### V2.0.0 (Deprecate!) +[**Main**](index.md) | [**Top**](#ringbufferplus-whats-new) + +- Release G.A with .NET8 ### V1.0.1 (Deprecate!) [**Main**](index.md) | [**Top**](#ringbufferplus-whats-new) diff --git a/samples/RingBufferPlusApiSample/Program.cs b/samples/RingBufferPlusApiSample/Program.cs index 9e73a1b..b4f6a82 100644 --- a/samples/RingBufferPlusApiSample/Program.cs +++ b/samples/RingBufferPlusApiSample/Program.cs @@ -19,19 +19,8 @@ public static void Main(string[] args) return ringbuf .Capacity(8) .Factory((cts) => { return 10; }) - .AccquireTimeout(TimeSpan.FromSeconds(10)) - .OnError((log, error) => - { - log?.LogError("{error}",error); - }) - .SwithToScaleDefinitions() + .MasterScale() .SampleUnit(TimeSpan.FromSeconds(60),60) - .ReportScale((mode, loger, metric, cts) => - { - #pragma warning disable CA2254 // Template should be a static expression - loger.LogInformation($"Report [{metric.MetricDate}] Trigger {metric.Trigger} : {mode} from {metric.FromCapacity} to {metric.ToCapacity} ({metric.Capacity}/{metric.MinCapacity}/{metric.MaxCapacity}) : {metric.FreeResource}"); - #pragma warning restore CA2254 // Template should be a static expression - }) .MinCapacity(4) // Defaut = Max (Min = 1, Max = Capacity) .ScaleWhenFreeGreaterEq() diff --git a/samples/RingBufferPlusApiSample/RingBufferPlusApiSample.http b/samples/RingBufferPlusApiSample/RingBufferPlusApiSample.http deleted file mode 100644 index 3f4b201..0000000 --- a/samples/RingBufferPlusApiSample/RingBufferPlusApiSample.http +++ /dev/null @@ -1,6 +0,0 @@ -@RingBufferPlusApiSample_HostAddress = http://localhost:5253 - -GET {{RingBufferPlusApiSample_HostAddress}}/weatherforecast/ -Accept: application/json - -### diff --git a/samples/RingBufferPlusBenchmarkSample/BenchmarkProgram.cs b/samples/RingBufferPlusBenchmarkSample/BenchmarkProgram.cs index 4162b6f..8b90062 100644 --- a/samples/RingBufferPlusBenchmarkSample/BenchmarkProgram.cs +++ b/samples/RingBufferPlusBenchmarkSample/BenchmarkProgram.cs @@ -1,4 +1,10 @@ -using BenchmarkDotNet.Analysers; +// *************************************************************************************** +// Original source code : Copyright 2020 Luis Carlos Farias. +// https://github.com/luizcarlosfaria/Oragon.Common.RingBuffer +// Current source code : The maintenance and evolution is maintained by the RingBufferPlus project +// *************************************************************************************** + +using BenchmarkDotNet.Analysers; using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Columns; using BenchmarkDotNet.Configs; @@ -29,13 +35,28 @@ public class BenchmarkProgram while (!cancellation.IsCancellationRequested) { using var connectionWrapper = connectionRingBuffer!.Accquire(); - if (connectionWrapper.Successful && connectionWrapper.Current.IsOpen) + try + { + if (connectionWrapper.Successful) + { + if (connectionWrapper.Current.IsOpen) + { + model = connectionWrapper.Current.CreateModel(); + if (model.IsOpen) + { + break; + } + } + else + { + connectionWrapper.Invalidate(); + } + } + } + catch { - model = connectionWrapper.Current.CreateModel(); - model.QueueDeclare("log", false, false, false); - break; } - cancellation.WaitHandle.WaitOne(TimeSpan.FromMilliseconds(100)); + cancellation.WaitHandle.WaitOne(TimeSpan.FromMilliseconds(5)); } return model; } @@ -84,7 +105,11 @@ public void GlobalSetup() log?.LogError("{error}", error); }) .Factory((cts) => ConnectionFactory.CreateConnection()) - .AccquireTimeout(TimeSpan.FromMilliseconds(1500)) + .FactoryHealth((item) => item.IsOpen) + .AccquireTimeout(TimeSpan.FromMilliseconds(500)) + .SlaveScale() + .MaxCapacity(10) + .MinCapacity(1) .BuildWarmup(out _); modelRingBuffer = RingBuffer.New("RabbitChanels") @@ -95,7 +120,15 @@ public void GlobalSetup() log?.LogError("{error}", error); }) .Factory((cts) => ModelFactory(cts)!) - .AccquireTimeout(TimeSpan.FromMilliseconds(1500)) + .FactoryHealth((item) => item.IsOpen) + .MasterScale(connectionRingBuffer) + .SampleUnit(TimeSpan.FromSeconds(10), 10) + .MaxCapacity(50) + .ScaleWhenFreeLessEq() + .RollbackWhenFreeGreaterEq() + .MinCapacity(2) + .ScaleWhenFreeGreaterEq() + .RollbackWhenFreeLessEq() .BuildWarmup(out _); } @@ -144,7 +177,7 @@ public int WithRingBuffer() return 0; } - [Benchmark(Baseline = true)] + [Benchmark] public int WithoutRingBuffer() { string queueName = $"WithoutRingBuffer-{Guid.NewGuid():D}"; diff --git a/samples/RingBufferPlusBenchmarkSample/ConsumerRoleProgram.cs b/samples/RingBufferPlusBenchmarkSample/ConsumerRoleProgram.cs index 3691a42..a5abe8e 100644 --- a/samples/RingBufferPlusBenchmarkSample/ConsumerRoleProgram.cs +++ b/samples/RingBufferPlusBenchmarkSample/ConsumerRoleProgram.cs @@ -1,5 +1,10 @@ -using System.Text; -using BenchmarkDotNet.Engines; +// *************************************************************************************** +// Original source code : Copyright 2020 Luis Carlos Farias. +// https://github.com/luizcarlosfaria/Oragon.Common.RingBuffer +// Current source code : The maintenance and evolution is maintained by the RingBufferPlus project +// *************************************************************************************** + +using System.Text; using Microsoft.Extensions.Logging; using RabbitMQ.Client; using RabbitMQ.Client.Events; @@ -103,12 +108,7 @@ public static void Start(ILogger logger, int delaysec) Console.WriteLine($"Ring Buffer ScaleToMax({connectionRingBuffer!.ScaleToMax})"); Console.WriteLine($"Ring Buffer RollbackFromMax({connectionRingBuffer!.RollbackFromMax})"); Console.WriteLine($"Ring Buffer TriggerFromMax({connectionRingBuffer!.TriggerFromMax})"); - connectionRingBuffer.Counters((available, unavailable, forcreation) => - { - Console.WriteLine($"Ring Buffer Available({available})"); - Console.WriteLine($"Ring Buffer Unavailable({unavailable})"); - Console.WriteLine($"Ring Buffer ToCreating({forcreation})"); - }); + Console.WriteLine(); #endregion @@ -117,7 +117,7 @@ public static void Start(ILogger logger, int delaysec) Console.WriteLine($"Ring Buffer {modelRingBuffer!.Name}"); - Console.WriteLine($"Ring Buffer Warmup({completedCnn})"); + Console.WriteLine($"Ring Buffer Warmup({completedChanels})"); Console.WriteLine($"Ring Buffer Capacity({modelRingBuffer.Capacity})"); Console.WriteLine($"Ring Buffer MinCapacity({modelRingBuffer.MinCapacity})"); Console.WriteLine($"Ring Buffer MaxCapacity({modelRingBuffer.MaxCapacity})"); @@ -133,12 +133,7 @@ public static void Start(ILogger logger, int delaysec) Console.WriteLine($"Ring Buffer ScaleToMax({modelRingBuffer.ScaleToMax})"); Console.WriteLine($"Ring Buffer RollbackFromMax({modelRingBuffer.RollbackFromMax})"); Console.WriteLine($"Ring Buffer TriggerFromMax({modelRingBuffer.TriggerFromMax})"); - modelRingBuffer.Counters((available, unavailable, forcreation) => - { - Console.WriteLine($"Ring Buffer Available({available})"); - Console.WriteLine($"Ring Buffer Unavailable({unavailable})"); - Console.WriteLine($"Ring Buffer ToCreating({forcreation})"); - }); + Console.WriteLine(); #endregion diff --git a/samples/RingBufferPlusBenchmarkSample/Program.cs b/samples/RingBufferPlusBenchmarkSample/Program.cs index 0b22e47..2ebeab6 100644 --- a/samples/RingBufferPlusBenchmarkSample/Program.cs +++ b/samples/RingBufferPlusBenchmarkSample/Program.cs @@ -1,4 +1,10 @@ -using System.CommandLine; +// *************************************************************************************** +// Original source code : Copyright 2020 Luis Carlos Farias. +// https://github.com/luizcarlosfaria/Oragon.Common.RingBuffer +// Current source code : The maintenance and evolution is maintained by the RingBufferPlus project +// *************************************************************************************** + +using System.CommandLine; using System.CommandLine.Builder; using System.CommandLine.Parsing; using BenchmarkDotNet.Running; diff --git a/samples/RingBufferPlusBenchmarkSample/Properties/launchSettings.json b/samples/RingBufferPlusBenchmarkSample/Properties/launchSettings.json index dbf71b0..466566a 100644 --- a/samples/RingBufferPlusBenchmarkSample/Properties/launchSettings.json +++ b/samples/RingBufferPlusBenchmarkSample/Properties/launchSettings.json @@ -2,7 +2,14 @@ "profiles": { "RingBufferPlusBenchmarkSample": { "commandName": "Project", - "commandLineArgs": "--role consumer --delay 5" + "commandLineArgs": "--role publisher --delay 10" + }, + "Docker": { + "commandName": "Docker" + }, + "WSL": { + "commandName": "WSL2", + "distributionName": "" } } } \ No newline at end of file diff --git a/samples/RingBufferPlusBenchmarkSample/PublisherRoleProgram.cs b/samples/RingBufferPlusBenchmarkSample/PublisherRoleProgram.cs index eb06960..0a026ea 100644 --- a/samples/RingBufferPlusBenchmarkSample/PublisherRoleProgram.cs +++ b/samples/RingBufferPlusBenchmarkSample/PublisherRoleProgram.cs @@ -1,4 +1,10 @@ -using System.Text; +// *************************************************************************************** +// Original source code : Copyright 2020 Luis Carlos Farias. +// https://github.com/luizcarlosfaria/Oragon.Common.RingBuffer +// Current source code : The maintenance and evolution is maintained by the RingBufferPlus project +// *************************************************************************************** + +using System.Text; using Microsoft.Extensions.Logging; using RabbitMQ.Client; using RingBufferPlus; @@ -15,18 +21,33 @@ internal class PublisherRoleProgram private static ILogger? applogger; static IModel? ModelFactory(CancellationToken cancellation) - { + { IModel? model = null; while (!cancellation.IsCancellationRequested) { using var connectionWrapper = connectionRingBuffer!.Accquire(); - if (connectionWrapper.Successful && connectionWrapper.Current.IsOpen) + try + { + if (connectionWrapper.Successful) + { + if (connectionWrapper.Current.IsOpen) + { + model = connectionWrapper.Current.CreateModel(); + if (model.IsOpen) + { + break; + } + } + else + { + connectionWrapper.Invalidate(); + } + } + } + catch { - model = connectionWrapper.Current.CreateModel(); - model.QueueDeclare("log", false, false, false); - break; } - cancellation.WaitHandle.WaitOne(TimeSpan.FromMilliseconds(100)); + cancellation.WaitHandle.WaitOne(TimeSpan.FromMilliseconds(5)); } return model; } @@ -47,25 +68,24 @@ private static void Init() }; connectionRingBuffer = RingBuffer.New("RabbitCnn") - .Capacity(5) + .Capacity(2) .Logger(applogger!) - .AccquireTimeout(TimeSpan.FromMilliseconds(1500)) + .AccquireTimeout(TimeSpan.FromMilliseconds(500)) .OnError((log, error) => { log?.LogError("{error}", error); }) .Factory((cts) => ConnectionFactory.CreateConnection()) - .SwithToScaleDefinitions() - .SampleUnit(TimeSpan.FromSeconds(10), 10) + .FactoryHealth((item) => item.IsOpen) + .SlaveScale() .ReportScale((mode, log, metric, _) => { #pragma warning disable CA2254 // Template should be a static expression - log.LogInformation($"{connectionRingBuffer!.Name} Report: [{metric.MetricDate}] Trigger {metric.Trigger} : {mode} from {metric.FromCapacity} to {metric.ToCapacity} ({metric.Capacity}/{metric.MinCapacity}/{metric.MaxCapacity}) : {metric.FreeResource}"); + log.LogInformation($"RabbitCnn Report: [{metric.MetricDate}] Trigger {metric.Trigger} : {mode} from {metric.FromCapacity} to {metric.ToCapacity}"); #pragma warning restore CA2254 // Template should be a static expression }) - .MinCapacity(2) - .ScaleWhenFreeGreaterEq() - .RollbackWhenFreeLessEq() + .MaxCapacity(10) + .MinCapacity(1) .BuildWarmup(out completedCnn); modelRingBuffer = RingBuffer.New("RabbitChanels") @@ -76,16 +96,16 @@ private static void Init() log?.LogError("{error}", error); }) .Factory((cts) => ModelFactory(cts)!) - .AccquireTimeout(TimeSpan.FromMilliseconds(1500)) - .SwithToScaleDefinitions() + .FactoryHealth((item) => item.IsOpen) + .MasterScale(connectionRingBuffer) .SampleUnit(TimeSpan.FromSeconds(10), 10) .ReportScale((mode,log,metric,_) => { #pragma warning disable CA2254 // Template should be a static expression - log.LogInformation($"{modelRingBuffer!.Name} Report: [{metric.MetricDate}] Trigger {metric.Trigger} : {mode} from {metric.FromCapacity} to {metric.ToCapacity} ({metric.Capacity}/{metric.MinCapacity}/{metric.MaxCapacity}) : {metric.FreeResource}"); + log.LogInformation($"RabbitChanels Report: [{metric.MetricDate}] Trigger {metric.Trigger} : {mode} from {metric.FromCapacity} to {metric.ToCapacity}"); #pragma warning restore CA2254 // Template should be a static expression }) - .MaxCapacity(20) + .MaxCapacity(50) .ScaleWhenFreeLessEq() .RollbackWhenFreeGreaterEq() .MinCapacity(2) @@ -131,12 +151,6 @@ public static void Start(ILogger logger,int delaysec) Console.WriteLine($"Ring Buffer ScaleToMax({connectionRingBuffer!.ScaleToMax})"); Console.WriteLine($"Ring Buffer RollbackFromMax({connectionRingBuffer!.RollbackFromMax})"); Console.WriteLine($"Ring Buffer TriggerFromMax({connectionRingBuffer!.TriggerFromMax})"); - connectionRingBuffer.Counters((available, unavailable, forcreation) => - { - Console.WriteLine($"Ring Buffer Available({available})"); - Console.WriteLine($"Ring Buffer Unavailable({unavailable})"); - Console.WriteLine($"Ring Buffer ToCreating({forcreation})"); - }); Console.WriteLine(); #endregion @@ -161,12 +175,6 @@ public static void Start(ILogger logger,int delaysec) Console.WriteLine($"Ring Buffer ScaleToMax({modelRingBuffer.ScaleToMax})"); Console.WriteLine($"Ring Buffer RollbackFromMax({modelRingBuffer.RollbackFromMax})"); Console.WriteLine($"Ring Buffer TriggerFromMax({modelRingBuffer.TriggerFromMax})"); - modelRingBuffer.Counters((available, unavailable, forcreation) => - { - Console.WriteLine($"Ring Buffer Available({available})"); - Console.WriteLine($"Ring Buffer Unavailable({unavailable})"); - Console.WriteLine($"Ring Buffer ToCreating({forcreation})"); - }); Console.WriteLine(); #endregion @@ -175,7 +183,7 @@ public static void Start(ILogger logger,int delaysec) Thread.Sleep(TimeSpan.FromSeconds(delaysec)); Console.WriteLine($"Running"); - var dtref = DateTime.Now.AddSeconds(60); + var dtref = DateTime.Now.AddSeconds(120); for (int i = 0; i < threadCount; i++) { Thread thread = new(() => @@ -184,9 +192,9 @@ public static void Start(ILogger logger,int delaysec) { if (DateTime.Now > dtref) { - Console.WriteLine($"wait 30 seconds idle"); - Thread.Sleep(TimeSpan.FromSeconds(30)); - dtref = DateTime.Now.AddSeconds(60); + Console.WriteLine($"wait 120 seconds idle"); + Thread.Sleep(TimeSpan.FromSeconds(120)); + dtref = DateTime.Now.AddSeconds(120); } using var bufferedItem = modelRingBuffer!.Accquire(); if (bufferedItem.Successful) diff --git a/samples/RingBufferPlusConsoleSample/Program.cs b/samples/RingBufferPlusConsoleSample/Program.cs index 0dea6e9..acdd114 100644 --- a/samples/RingBufferPlusConsoleSample/Program.cs +++ b/samples/RingBufferPlusConsoleSample/Program.cs @@ -25,11 +25,7 @@ static void Main(string[] args) .Capacity(8) .Logger(logger!) .Factory((cts) => { return rnd.Next(1, 10); }) - .OnError((log, error) => - { - log?.LogError("{error}", error); - }) - .SwithToScaleDefinitions() + .MasterScale() .MinCapacity(4) // Defaut = Max (Min = 1, Max = Capacity) .ScaleWhenFreeGreaterEq() @@ -79,25 +75,11 @@ static void Main(string[] args) buffer3.Invalidate(); } - rb.Counters((available, unavailable, forcreation) => - { - Console.WriteLine($"Ring Buffer Available({available})"); - Console.WriteLine($"Ring Buffer Unavailable({unavailable})"); - Console.WriteLine($"Ring Buffer ToCreating({forcreation})"); - }); - Console.WriteLine("Press anykey to stop/close ring buffer"); Console.ReadKey(); cts.Cancel(); - rb.Counters((available, unavailable, forcreation) => - { - Console.WriteLine($"Ring Buffer Available({available})"); - Console.WriteLine($"Ring Buffer Unavailable({unavailable})"); - Console.WriteLine($"Ring Buffer ToCreating({forcreation})"); - }); - Console.WriteLine("Press anykey to end"); Console.ReadKey(); diff --git a/src/Commands/IRingBuffer.cs b/src/Commands/IRingBuffer.cs index d3eb9a6..c88dacd 100644 --- a/src/Commands/IRingBuffer.cs +++ b/src/Commands/IRingBuffer.cs @@ -32,6 +32,13 @@ public interface IRingBuffer /// . IRingBuffer Factory(Func value, TimeSpan? timeout = null, TimeSpan? idleRetryError = null); + /// + /// Health before accquire buffer. + /// + /// The handler to factory Health. + /// . + IRingBuffer FactoryHealth(Func value); + /// /// The Logger ///
Default value is ILoggerFactory.Create (if any) with category euqal name of ring buffer
@@ -47,7 +54,6 @@ public interface IRingBuffer /// . IRingBuffer AccquireTimeout(TimeSpan value); - /// /// Extension point to log a error. ///
Executes asynchronously.
@@ -71,10 +77,17 @@ public interface IRingBuffer IRingBufferService BuildWarmup(out bool fullcapacity, TimeSpan? timeout = null); /// - /// Swith to scale definitions commands. + /// Swith to scale definitions commands (self) or other ring buffer. + /// + /// The slave Ring buffer. + /// . + IRingBufferMasterCapacity MasterScale(IRingBufferSwith? ringBuffer = null); + + /// + /// Swith to scale definitions from other ring buffer. /// - /// . - IRingBufferScaleCapacity SwithToScaleDefinitions(); + /// . + IRingBufferSlaveCapacity SlaveScale(); } } diff --git a/src/Commands/IRingBufferScaleCapacity.cs b/src/Commands/IRingBufferMasterCapacity.cs similarity index 72% rename from src/Commands/IRingBufferScaleCapacity.cs rename to src/Commands/IRingBufferMasterCapacity.cs index 63e18df..c86b334 100644 --- a/src/Commands/IRingBufferScaleCapacity.cs +++ b/src/Commands/IRingBufferMasterCapacity.cs @@ -10,10 +10,11 @@ namespace RingBufferPlus { /// - /// Represents the scale capacity commands to RingBufferPlus. + /// Represents the Master capacity commands to RingBufferPlus. /// /// Type of buffer. - public interface IRingBufferScaleCapacity + public interface IRingBufferMasterCapacity + { /// /// Sampling unit for return buffer-free resource (Average colledted samples). @@ -21,8 +22,8 @@ public interface IRingBufferScaleCapacity /// /// The interval to colleted samples.Default baseunit is 60 seconds. /// Number of samples collected.Default value is baseunit/10. Default value is 60. - /// . - IRingBufferScaleCapacity SampleUnit(TimeSpan? baseunit = null,int? value = null); + /// . + IRingBufferMasterCapacity SampleUnit(TimeSpan? baseunit = null,int? value = null); /// /// Sampling unit for return buffer-free resource (Average colledted samples). @@ -33,30 +34,30 @@ public interface IRingBufferScaleCapacity /// Number of samples collected.Default value is baseunit/10. Default value is 60. ///
Base unit = The interval to colledted samples. Default is 60 seconds.
/// - /// . - IRingBufferScaleCapacity SampleUnit(int? value = null); + /// . + IRingBufferMasterCapacity SampleUnit(int? value = null); /// /// Extension point when capacity was changed. ///
Executes asynchronously.
///
/// The handler to action. - /// . - IRingBufferScaleCapacity ReportScale(Action report = null); + /// . + IRingBufferMasterCapacity ReportScale(Action report = null); /// /// Minimum capacity. /// - /// The minimal buffer. - /// . + /// The minimal buffer. Value mus be greater or equal 1 + /// . IRingBufferScaleMin MinCapacity(int value); /// /// Maximum capacity. /// - /// The maximum buffer. - /// . + /// The maximum buffer.Value mus be greater or equal + /// . IRingBufferScaleMax MaxCapacity(int value); /// diff --git a/src/Commands/IRingBufferScaleMax.cs b/src/Commands/IRingBufferScaleMax.cs index 14a680a..4908c96 100644 --- a/src/Commands/IRingBufferScaleMax.cs +++ b/src/Commands/IRingBufferScaleMax.cs @@ -21,7 +21,7 @@ public interface IRingBufferScaleMax /// /// Number to trigger. ///
The free resource collected must be less than or equal to value.
- ///
Default = Min (Min = 1, Max = Capacity).
+ ///
Default = Min (Min = 2, Max = Capacity).
/// /// IRingBufferScaleMax ScaleWhenFreeLessEq(int? value = null); diff --git a/src/Commands/IRingBufferScaleMin.cs b/src/Commands/IRingBufferScaleMin.cs index 5e01fbb..ee287d9 100644 --- a/src/Commands/IRingBufferScaleMin.cs +++ b/src/Commands/IRingBufferScaleMin.cs @@ -19,7 +19,7 @@ public interface IRingBufferScaleMin ///
/// /// Number to free resource. - ///
Defaut = Max. (Min = 1, Max = Capacity).
+ ///
Defaut = Max. (Min = 2, Max = Capacity).
/// /// . IRingBufferScaleMin ScaleWhenFreeGreaterEq(int? value = null); @@ -31,7 +31,7 @@ public interface IRingBufferScaleMin ///
/// /// Number to trigger. - ///
Defaut = Max-1 (Min = 1, Max = MinCapacity).
+ ///
Defaut = Max-1 (Min = 2, Max = MinCapacity).
/// /// . IRingBufferScaleMin TriggerByAccqWhenFreeLessEq(int? value = null); diff --git a/src/Commands/IRingBufferService.cs b/src/Commands/IRingBufferService.cs index 66ef81e..bd7eb53 100644 --- a/src/Commands/IRingBufferService.cs +++ b/src/Commands/IRingBufferService.cs @@ -12,18 +12,13 @@ namespace RingBufferPlus /// Represents the commands to RingBufferPlus service. ///
/// - public interface IRingBufferService : IDisposable + public interface IRingBufferService : IRingBufferSwith, IDisposable { /// /// Unique name to RingBuffer. /// string Name { get; } - /// - /// Action to read read counters (available/unavailable/for creation) - /// - void Counters(Action counters); - /// /// Default capacity of ring buffer. /// diff --git a/src/Commands/IRingBufferSlaveCapacity.cs b/src/Commands/IRingBufferSlaveCapacity.cs new file mode 100644 index 0000000..d9a9946 --- /dev/null +++ b/src/Commands/IRingBufferSlaveCapacity.cs @@ -0,0 +1,57 @@ +// *************************************************************************************** +// MIT LICENCE +// The maintenance and evolution is maintained by the RingBufferPlus project under MIT license +// *************************************************************************************** + +using System; +using System.Threading; +using Microsoft.Extensions.Logging; + +namespace RingBufferPlus +{ + /// + /// Represents the Slave capacity commands to RingBufferPlus. + /// + /// Type of buffer. + public interface IRingBufferSlaveCapacity + + { + /// + /// Extension point when capacity was changed. + ///
Executes asynchronously.
+ ///
+ /// The handler to action. + /// . + IRingBufferSlaveCapacity ReportScale(Action report = null); + + + /// + /// Minimum capacity. + /// + /// The minimal buffer. Value mus be greater or equal 1 + /// . + IRingBufferSlaveCapacity MinCapacity(int value); + + /// + /// Maximum capacity. + /// + /// The maximum buffer.Value mus be greater or equal + /// . + IRingBufferSlaveCapacity MaxCapacity(int value); + + /// + /// Validate and generate RingBufferPlus to service mode. + /// + /// . + IRingBufferService Build(); + + /// + /// Validate and generate RingBufferPlus and warmup with full capacity ready or reaching timeout (default 30 seconds). + /// + /// True if Warmup has full capacity, otherwise false. + /// The Timeout to Warmup has full capacity. Default value is 30 seconds. + /// . + IRingBufferService BuildWarmup(out bool fullcapacity, TimeSpan? timeout = null); + + } +} diff --git a/src/Commands/IRingBufferSwith.cs b/src/Commands/IRingBufferSwith.cs new file mode 100644 index 0000000..2865679 --- /dev/null +++ b/src/Commands/IRingBufferSwith.cs @@ -0,0 +1,23 @@ +// *************************************************************************************** +// MIT LICENCE +// The maintenance and evolution is maintained by the RingBufferPlus project under MIT license +// *************************************************************************************** + +using System.Threading; + +namespace RingBufferPlus +{ + /// + /// Represents the salve commands to RingBufferPlus service. + /// + public interface IRingBufferSwith + { + /// + /// Swith to new capacity in slave RingBuffer + /// + /// + /// True if scale changed, otherwise false + bool SwithTo(ScaleMode scaleMode); + + } +} diff --git a/src/RingBufferMetric.cs b/src/RingBufferMetric.cs index 769f0da..ee15a25 100644 --- a/src/RingBufferMetric.cs +++ b/src/RingBufferMetric.cs @@ -24,23 +24,14 @@ public RingBufferMetric() /// Create Metric of RingBufferPlus. ///
/// Source tigger. - /// Default capacity. /// Current capacity. /// New capacity trigger. - /// Minimum capacity. - /// Maximum capacity. - /// Free resource value trigger. - /// Date of metric. - public RingBufferMetric(SourceTrigger source, int fromcapacity, int tocapacity, int capacity, int mincapacity, int maxcapacity, int freeresource, DateTime dateref) + public RingBufferMetric(SourceTrigger source, int fromcapacity, int tocapacity) { Trigger = source; - Capacity = capacity; - MinCapacity = mincapacity; - MaxCapacity = maxcapacity; FromCapacity = fromcapacity; ToCapacity = tocapacity; - FreeResource = freeresource; - MetricDate = dateref; + MetricDate = DateTime.Now; } /// @@ -59,26 +50,6 @@ public RingBufferMetric(SourceTrigger source, int fromcapacity, int tocapacity, public int ToCapacity { get; } - /// - /// Default capacity of ring buffer. - /// - public int Capacity { get; } - - /// - /// Maximum capacity of ring buffer. - /// - public int MaxCapacity { get; } - - /// - /// Minimum capacity of ring buffer. - /// - public int MinCapacity { get; } - - /// - /// Free resource capacity of ring buffer. - /// - public int FreeResource { get; } - /// /// Date of metric . /// diff --git a/src/RingBufferPlus.csproj b/src/RingBufferPlus.csproj index b66ab4d..630b418 100644 --- a/src/RingBufferPlus.csproj +++ b/src/RingBufferPlus.csproj @@ -52,10 +52,6 @@ - - - - diff --git a/src/RingBufferValue.cs b/src/RingBufferValue.cs index 62c13ac..9c8fabd 100644 --- a/src/RingBufferValue.cs +++ b/src/RingBufferValue.cs @@ -25,11 +25,12 @@ private RingBufferValue() /// /// Create empty RingBufferValue. /// - public RingBufferValue(int diffCapacity) + internal RingBufferValue(int diffCapacity, ScaleMode scaleMode) { IsScaleCapacity = true; Current = default; DiffCapacity = diffCapacity; + ScaleMode = scaleMode; } /// @@ -110,6 +111,7 @@ public void Dispose() internal bool SkipTurnback { get; set; } internal bool IsScaleCapacity { get; set; } internal int DiffCapacity { get; } + internal ScaleMode ScaleMode { get; } } } diff --git a/src/ScaleMode.cs b/src/ScaleMode.cs index 9b670f2..e104ffa 100644 --- a/src/ScaleMode.cs +++ b/src/ScaleMode.cs @@ -10,6 +10,10 @@ namespace RingBufferPlus /// public enum ScaleMode { + /// + /// Current Scale + /// + None, /// /// Scale to minimal capacity. /// diff --git a/src/SourceTrigger.cs b/src/SourceTrigger.cs index 2915cd5..d5d3e7f 100644 --- a/src/SourceTrigger.cs +++ b/src/SourceTrigger.cs @@ -17,6 +17,10 @@ public enum SourceTrigger /// /// Source is accquire command. /// - Accquire + Accquire, + /// + /// Source is Master-Slave + /// + MasterSlave } } diff --git a/src/internal/IRingBufferCallback.cs b/src/internal/IRingBufferCallback.cs new file mode 100644 index 0000000..fdf61c0 --- /dev/null +++ b/src/internal/IRingBufferCallback.cs @@ -0,0 +1,18 @@ +// *************************************************************************************** +// MIT LICENCE +// The maintenance and evolution is maintained by the RingBufferPlus project under MIT license +// *************************************************************************************** + +using System.Threading; + +namespace RingBufferPlus +{ + internal interface IRingBufferCallback + { + void CallBackMaster(IRingBufferSwith value); + SemaphoreSlim SemaphoremasterSlave { get; } + string Name { get; } + bool IsSlave { get; } + + } +} diff --git a/src/internal/IRingBufferOptions.cs b/src/internal/IRingBufferOptions.cs index e4f05a9..f58ad27 100644 --- a/src/internal/IRingBufferOptions.cs +++ b/src/internal/IRingBufferOptions.cs @@ -4,6 +4,7 @@ // *************************************************************************************** using System; +using System.Runtime.CompilerServices; using System.Threading; using Microsoft.Extensions.Logging; @@ -15,12 +16,13 @@ internal interface IRingBufferOptions int Capacity { get; } int MinCapacity { get; } int MaxCapacity { get; } + Func FactoryHealth { get; } Func FactoryHandler { get; } TimeSpan FactoryTimeout { get; } TimeSpan FactoryIdleRetryError { get; } ILogger Logger { get; } bool HasScaleCapacity { get; } - Action ErrorHandler { get; } + Action ErrorHandler { get; } TimeSpan SampleDelay { get; } int SampleUnit { get; } TimeSpan ScaleCapacityDelay { get; } @@ -32,5 +34,9 @@ internal interface IRingBufferOptions int? MaxTriggerByAccqWhenFreeGreaterEq { get; } Action ReportHandler { get; } TimeSpan AccquireTimeout { get; } + IRingBufferSwith SwithFrom { get; } + IRingBufferSwith SwithTo { get; } + bool IsSlave { get; } + } } diff --git a/src/internal/RingBufferBuilder.cs b/src/internal/RingBufferBuilder.cs index ad3acbe..03ec838 100644 --- a/src/internal/RingBufferBuilder.cs +++ b/src/internal/RingBufferBuilder.cs @@ -10,7 +10,7 @@ namespace RingBufferPlus { - internal class RingBufferBuilder(string uniquename, ILoggerFactory? loggerFactory, CancellationToken? cancellation) : IRingBuffer,IRingBufferScaleCapacity, IRingBufferScaleMax, IRingBufferScaleMin, IRingBufferOptions + internal class RingBufferBuilder(string uniquename, ILoggerFactory? loggerFactory, CancellationToken? cancellation) : IRingBuffer,IRingBufferMasterCapacity, IRingBufferScaleMax, IRingBufferScaleMin, IRingBufferSlaveCapacity, IRingBufferOptions, IRingBufferCallback { private readonly ILoggerFactory? _loggerFactory = loggerFactory; private readonly CancellationToken _apptoken = cancellation??CancellationToken.None; @@ -27,6 +27,8 @@ internal class RingBufferBuilder(string uniquename, ILoggerFactory? loggerFac public int MaxCapacity { get; private set; } = 2; + public Func FactoryHealth { get; private set; } + public Func FactoryHandler { get; private set; } public TimeSpan FactoryTimeout { get; private set; } = TimeSpan.FromSeconds(10); @@ -61,10 +63,30 @@ internal class RingBufferBuilder(string uniquename, ILoggerFactory? loggerFac public TimeSpan AccquireTimeout { get; private set; } = TimeSpan.FromSeconds(30); + public IRingBufferSwith SwithFrom { get; private set; } + public IRingBufferSwith SwithTo { get; private set; } + + #endregion + + #region IRingBufferCallback + + public SemaphoreSlim SemaphoremasterSlave => null; + + public void CallBackMaster(IRingBufferSwith value) + { + //none + } + + string IRingBufferCallback.Name => Name; + + public bool IsSlave { get; private set; } + + #endregion #region IRingBuffer + IRingBuffer IRingBuffer.Logger(ILogger value) { Logger = value; @@ -113,6 +135,12 @@ IRingBuffer IRingBuffer.Capacity(int value) return this; } + IRingBuffer IRingBuffer.FactoryHealth(Func value) + { + FactoryHealth = value; + return this; + } + IRingBuffer IRingBuffer.Factory(Func value, TimeSpan? timeout, TimeSpan? idleRetryError) { #if NETSTANDARD2_1 @@ -144,11 +172,33 @@ IRingBuffer IRingBuffer.Factory(Func value, TimeSpan } - IRingBufferScaleCapacity IRingBuffer.SwithToScaleDefinitions() + IRingBufferMasterCapacity IRingBuffer.MasterScale(IRingBufferSwith ringBuffer) { +#if NETSTANDARD2_1 + if (ringBuffer is null) + { + throw new ArgumentNullException(nameof(ringBuffer)); + } +#else + ArgumentNullException.ThrowIfNull(ringBuffer); +#endif + if(!((IRingBufferCallback)ringBuffer).IsSlave) + { + throw new InvalidOperationException("ringBuffer parameter not slave"); + } + SwithTo = ringBuffer; + IsSlave = false; + SwithFrom = null; return this; } + IRingBufferSlaveCapacity IRingBuffer.SlaveScale() + { + SwithTo = null; + SwithFrom = null; + IsSlave = true; + return this; + } IRingBufferService IRingBuffer.Build() { @@ -159,50 +209,42 @@ IRingBufferService IRingBuffer.Build() #region IRingBufferScaleCapacity - IRingBufferService IRingBufferScaleCapacity.BuildWarmup(out bool fullcapacity, TimeSpan? timeout) + IRingBufferService IRingBufferMasterCapacity.BuildWarmup(out bool fullcapacity, TimeSpan? timeout) { return SharedBuildWarmup(out fullcapacity, timeout); } - IRingBufferService IRingBufferScaleCapacity.Build() + IRingBufferService IRingBufferMasterCapacity.Build() { return SharedBuild(); } - IRingBufferScaleMax IRingBufferScaleCapacity.MaxCapacity(int value) + IRingBufferScaleMax IRingBufferMasterCapacity.MaxCapacity(int value) { ShareMaxCapacity(value); return this; } - IRingBufferScaleMin IRingBufferScaleCapacity.MinCapacity(int value) + IRingBufferScaleMin IRingBufferMasterCapacity.MinCapacity(int value) { ShareMinCapacity(value); return this; } - IRingBufferScaleCapacity IRingBufferScaleCapacity.ReportScale(Action report) + IRingBufferMasterCapacity IRingBufferMasterCapacity.ReportScale(Action report) { -#if NETSTANDARD2_1 - if (report is null) - { - throw new ArgumentNullException(nameof(report)); - } -#else - ArgumentNullException.ThrowIfNull(report); -#endif - ReportHandler = report; + SharedReport(report); return this; } - IRingBufferScaleCapacity IRingBufferScaleCapacity.SampleUnit(int? value) + IRingBufferMasterCapacity IRingBufferMasterCapacity.SampleUnit(int? value) { SharedSampleUnit(ScaleCapacityDelay, value); return this; } - IRingBufferScaleCapacity IRingBufferScaleCapacity.SampleUnit(TimeSpan? baseunit, int? value) + IRingBufferMasterCapacity IRingBufferMasterCapacity.SampleUnit(TimeSpan? baseunit, int? value) { SharedSampleUnit(baseunit, value); return this; @@ -233,10 +275,10 @@ IRingBufferScaleMax IRingBufferScaleMax.TriggerByAccqWhenFreeGreaterEq(int IRingBufferScaleMax IRingBufferScaleMax.ScaleWhenFreeLessEq(int? value) { - var localvalue = value ?? 1; - if (localvalue < 1) + var localvalue = value ?? 2; + if (localvalue < 2) { - throw new ArgumentException($"The value({localvalue}) must be greater or equal 1"); + throw new ArgumentException($"The value({localvalue}) must be greater or equal 2"); } if (localvalue > Capacity) { @@ -312,9 +354,9 @@ IRingBufferScaleMin IRingBufferScaleMin.RollbackWhenFreeLessEq(int? value) IRingBufferScaleMin IRingBufferScaleMin.ScaleWhenFreeGreaterEq(int? value) { var localvalue = value ?? Capacity; - if (localvalue < 1) + if (localvalue < 2) { - throw new ArgumentException($"The value({localvalue}) must be greater than or equal to 1", nameof(value)); + throw new ArgumentException($"The value({localvalue}) must be greater than or equal to 2", nameof(value)); } if (localvalue > Capacity) { @@ -328,10 +370,10 @@ IRingBufferScaleMin IRingBufferScaleMin.ScaleWhenFreeGreaterEq(int? value) IRingBufferScaleMin IRingBufferScaleMin.TriggerByAccqWhenFreeLessEq(int? value) { - var localvalue = value ?? MinCapacity-1; - if (localvalue < 1) + var localvalue = value ?? MinCapacity; + if (localvalue < 2) { - throw new ArgumentException($"The value must be greater than or equal to 1", nameof(value)); + throw new ArgumentException($"The value must be greater than or equal to 2", nameof(value)); } if (localvalue > MinCapacity) { @@ -363,6 +405,39 @@ IRingBufferService IRingBufferScaleMin.BuildWarmup(out bool fullcapacity, #endregion + #region IRingBufferScaleFromCapacity + + IRingBufferSlaveCapacity IRingBufferSlaveCapacity.ReportScale(Action report) + { + SharedReport(report); + return this; + } + + IRingBufferSlaveCapacity IRingBufferSlaveCapacity.MinCapacity(int value) + { + ShareMinCapacity(value); + return this; + } + + IRingBufferSlaveCapacity IRingBufferSlaveCapacity.MaxCapacity(int value) + { + ShareMaxCapacity(value); + return this; + } + + IRingBufferService IRingBufferSlaveCapacity.Build() + { + return SharedBuild(); + } + + IRingBufferService IRingBufferSlaveCapacity.BuildWarmup(out bool fullcapacity, TimeSpan? timeout) + { + return SharedBuildWarmup(out fullcapacity, timeout); + } + + #endregion + + private void SharedSampleUnit(TimeSpan? baseunit, int? value) { var localvalue = value ?? 60; @@ -381,10 +456,6 @@ private void SharedSampleUnit(TimeSpan? baseunit, int? value) private void ShareMaxCapacity(int value) { - if (value < 2) - { - throw new ArgumentException("MaxCapacity must be greater than or equal to 2", nameof(value)); - } if (value < Capacity) { throw new ArgumentException($"MaxCapacity must be greater than or equal to Capacity({Capacity})", nameof(value)); @@ -397,9 +468,9 @@ private void ShareMaxCapacity(int value) private void ShareMinCapacity(int value) { - if (value < 2) + if (value < 1) { - throw new ArgumentException("MinCapacity must be greater than or equal to 2", nameof(value)); + throw new ArgumentException("MinCapacity must be greater than or equal to 1", nameof(value)); } if (value > Capacity) { @@ -411,6 +482,19 @@ private void ShareMinCapacity(int value) MinTriggerByAccqWhenFreeGreaterEq = null; } + private void SharedReport(Action report) + { +#if NETSTANDARD2_1 + if (report is null) + { + throw new ArgumentNullException(nameof(report)); + } +#else + ArgumentNullException.ThrowIfNull(report); +#endif + ReportHandler = report; + } + private RingBufferManager SharedBuild() { if (_loggerFactory is not null && Logger is null) @@ -423,32 +507,38 @@ private RingBufferManager SharedBuild() } if (MaxCapacity != Capacity) { - if (!ScaleToMaxLessEq.HasValue) - { - throw new RingBufferException(Name, "MaxCapacity : ScaleWhenFreeLessEq is null"); - } - if (!MaxTriggerByAccqWhenFreeGreaterEq.HasValue && !MaxRollbackWhenFreeGreaterEq.HasValue) - { - throw new RingBufferException(Name, "MaxCapacity : TriggerByAccqWhenFreeGreaterEq or RollbackWhenFreeGreaterEq is null"); - } - if (!HasScaleCapacity) + if (SwithTo != null) { - HasScaleCapacity = MaxRollbackWhenFreeGreaterEq.HasValue; + if (!ScaleToMaxLessEq.HasValue) + { + throw new RingBufferException(Name, "MaxCapacity : ScaleWhenFreeLessEq is null"); + } + if (!MaxTriggerByAccqWhenFreeGreaterEq.HasValue && !MaxRollbackWhenFreeGreaterEq.HasValue) + { + throw new RingBufferException(Name, "MaxCapacity : TriggerByAccqWhenFreeGreaterEq or RollbackWhenFreeGreaterEq is null"); + } + if (!HasScaleCapacity) + { + HasScaleCapacity = MaxRollbackWhenFreeGreaterEq.HasValue; + } } } if (MinCapacity != Capacity) - { - if (!ScaleToMinGreaterEq.HasValue) - { - throw new RingBufferException(Name, "MaxCapacity : ScaleWhenFreeGreaterEq is null"); - } - if (!MinTriggerByAccqWhenFreeGreaterEq .HasValue && !MinRollbackWhenFreeLessEq.HasValue) - { - throw new RingBufferException(Name, "MinCapacity: TriggerByAccqWhenFreeGreaterEq or RollbackWhenFreeGreaterEq is null"); - } - if (!HasScaleCapacity) + { + if (SwithTo != null) { - HasScaleCapacity = MinRollbackWhenFreeLessEq.HasValue; + if (!ScaleToMinGreaterEq.HasValue) + { + throw new RingBufferException(Name, "MaxCapacity : ScaleWhenFreeGreaterEq is null"); + } + if (!MinTriggerByAccqWhenFreeGreaterEq.HasValue && !MinRollbackWhenFreeLessEq.HasValue) + { + throw new RingBufferException(Name, "MinCapacity: TriggerByAccqWhenFreeGreaterEq or RollbackWhenFreeGreaterEq is null"); + } + if (!HasScaleCapacity) + { + HasScaleCapacity = MinRollbackWhenFreeLessEq.HasValue; + } } } return new RingBufferManager(this, _apptoken); diff --git a/src/internal/RingBufferManager.cs b/src/internal/RingBufferManager.cs index a23953c..b07550a 100644 --- a/src/internal/RingBufferManager.cs +++ b/src/internal/RingBufferManager.cs @@ -14,15 +14,15 @@ namespace RingBufferPlus { - internal class RingBufferManager : IRingBufferService, IRingBufferWarmup, IDisposable + internal class RingBufferManager : IRingBufferService, IRingBufferWarmup,IRingBufferSwith, IRingBufferCallback, IDisposable { private readonly ConcurrentQueue _availableBuffer = new(); private readonly BlockingCollection> _blockrenewBuffer = []; private readonly BlockingCollection _blockexceptionsBuffer = []; private readonly BlockingCollection _blockRetryFactoryBuffer = []; - private readonly List _MetricBuffer = []; private readonly BlockingCollection<(ScaleMode, RingBufferMetric)> _blockreportBuffer = []; private readonly BlockingCollection _blockScaleBuffer = []; + private readonly List _MetricBuffer = []; private Task _renewBufferThread; private Task _retryFactoryThread; @@ -31,84 +31,65 @@ internal class RingBufferManager : IRingBufferService, IRingBufferWarmup _factoryHealth; + private readonly Func _factory; + private readonly Action _errorHandler; + private readonly Action _reportHandler; private readonly object _lockcount = new(); private readonly object _lockMetric = new(); private readonly object _lockWarmup = new(); - private readonly IRingBufferOptions _ringBufferOptions; + private readonly SemaphoreSlim SemaphoreAquire = new(1, 1); + private readonly CancellationToken _apptoken; - private volatile int _currentCapacityBuffer; private readonly CancellationTokenSource _managertoken; - + private readonly ILogger? _logger; private bool _disposed; private bool _WarmupComplete; private bool _WarmupRunning; - private int _available; - private int _unavailable; - private int _toCreating; + private int _currentCapacityBuffer; public RingBufferManager(IRingBufferOptions ringBufferOptions, CancellationToken cancellationToken) { _apptoken = cancellationToken; _apptoken.Register(() => Dispose(true)); - _ringBufferOptions = ringBufferOptions; + + Name = ringBufferOptions.Name; + Capacity = ringBufferOptions.Capacity; + MinCapacity = ringBufferOptions.MinCapacity; + MaxCapacity = ringBufferOptions.MaxCapacity; + FactoryTimeout = ringBufferOptions.FactoryTimeout; + FactoryIdleRetry = ringBufferOptions.FactoryIdleRetryError; + ScaleCapacity = + (ringBufferOptions.ScaleToMaxLessEq.HasValue && ringBufferOptions.MaxRollbackWhenFreeGreaterEq.HasValue) || + (ringBufferOptions.ScaleToMinGreaterEq.HasValue && ringBufferOptions.MinRollbackWhenFreeLessEq.HasValue); + SampleUnit = ringBufferOptions.ScaleCapacityDelay/ ringBufferOptions.SampleUnit; + SamplesCount = ringBufferOptions.SampleUnit; + ScaleToMin = ringBufferOptions.ScaleToMinGreaterEq; + RollbackFromMin = ringBufferOptions.MinRollbackWhenFreeLessEq; + TriggerFromMin = ringBufferOptions.MinTriggerByAccqWhenFreeGreaterEq; + ScaleToMax = ringBufferOptions.ScaleToMaxLessEq; + RollbackFromMax = ringBufferOptions.MaxRollbackWhenFreeGreaterEq; + TriggerFromMax = ringBufferOptions.MaxTriggerByAccqWhenFreeGreaterEq; + AccquireTimeout = ringBufferOptions.AccquireTimeout; + + _logger = ringBufferOptions.Logger; + _factory = ringBufferOptions.FactoryHandler; + _factoryHealth = ringBufferOptions.FactoryHealth; + _errorHandler = ringBufferOptions.ErrorHandler; + _reportHandler = ringBufferOptions.ReportHandler; + _swithTo = ringBufferOptions.SwithTo; + _swithFrom = null; + IsSlave = ringBufferOptions.IsSlave; + if (!IsSlave && _swithTo is not null) + { + ((IRingBufferCallback)_swithTo).CallBackMaster(this); + } + _currentCapacityBuffer = ringBufferOptions.Capacity; _managertoken = CancellationTokenSource.CreateLinkedTokenSource(_apptoken); - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Capacity : {_ringBufferOptions.Capacity}")); - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} MinCapacity : {_ringBufferOptions.MinCapacity}")); - if (_ringBufferOptions.ScaleToMinGreaterEq.HasValue) - { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Scale down (Capacity->Min) Greater-Eq: {_ringBufferOptions.ScaleToMinGreaterEq.Value}")); - } - else - { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Scale down (Capacity->Min) Greater-Eq: Null")); - } - if (_ringBufferOptions.MinRollbackWhenFreeLessEq.HasValue) - { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Rollback (Min->Capacity) Less-Eq : {_ringBufferOptions.MinRollbackWhenFreeLessEq}")); - } - else - { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Rollback (Min->Capacity) Less-Eq : Null")); - } - if (_ringBufferOptions.MinTriggerByAccqWhenFreeGreaterEq.HasValue) - { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Trigger (Min->Capacity) Accq. Less-Eq : {_ringBufferOptions.MinTriggerByAccqWhenFreeGreaterEq}")); - } - else - { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Trigger (Min->Capacity) Accq. Less-Eq : Null")); - } - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} MaxCapacity : {_ringBufferOptions.MaxCapacity}")); - if (_ringBufferOptions.ScaleToMaxLessEq.HasValue) - { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Scale up (Capacity->Max) Less-Eq: {_ringBufferOptions.ScaleToMaxLessEq}")); - } - else - { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Scale up (Capacity->Max) Less-Eq: Null")); - } - if (_ringBufferOptions.MaxRollbackWhenFreeGreaterEq.HasValue) - { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Rollback (Max->Capacity) Greater-Eq : {_ringBufferOptions.MaxRollbackWhenFreeGreaterEq}")); - } - else - { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Rollback (Max->Capacity) Greater-Eq : Null")); - } - if (_ringBufferOptions.MinTriggerByAccqWhenFreeGreaterEq.HasValue) - { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Trigger (Max->Capacity) Accq. Greater-Eq : {_ringBufferOptions.MinTriggerByAccqWhenFreeGreaterEq}")); - } - else - { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Trigger (Max->Capacity) Accq. Greater-Eq : Null")); - } - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Unit Time: {_ringBufferOptions.ScaleCapacityDelay}")); - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Unit Samples: {_ringBufferOptions.SampleUnit}")); - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Sample Delay: {_ringBufferOptions.SampleDelay}")); } #region IDisposable @@ -144,6 +125,7 @@ protected virtual void Dispose(bool disposing) } } } + SemaphoremasterSlave.Dispose(); SemaphoreAquire.Dispose(); _managertoken?.Dispose(); } @@ -160,52 +142,37 @@ public void Dispose() #region IRingBufferService - public string Name => _ringBufferOptions.Name; - - public int Capacity => _ringBufferOptions.Capacity; - - public int MinCapacity => _ringBufferOptions.MinCapacity; + public string Name { get; } - public int MaxCapacity => _ringBufferOptions.MaxCapacity; + public int Capacity { get; } - public TimeSpan FactoryTimeout => _ringBufferOptions.FactoryTimeout; + public int MinCapacity { get; } - public TimeSpan FactoryIdleRetry => _ringBufferOptions.FactoryIdleRetryError; + public int MaxCapacity { get; } - public bool ScaleCapacity => _ringBufferOptions.ScaleToMaxLessEq.HasValue || _ringBufferOptions.ScaleToMinGreaterEq.HasValue; + public TimeSpan FactoryTimeout { get; } - public TimeSpan SampleUnit => _ringBufferOptions.ScaleCapacityDelay; + public TimeSpan FactoryIdleRetry { get; } - public int SamplesCount => _ringBufferOptions.SampleUnit; + public bool ScaleCapacity { get; } + + public TimeSpan SampleUnit { get; } - public int? ScaleToMin => _ringBufferOptions.ScaleToMinGreaterEq; + public int SamplesCount { get; } - public int? RollbackFromMin => _ringBufferOptions.MinRollbackWhenFreeLessEq; + public int? ScaleToMin { get; } - public int? TriggerFromMin => _ringBufferOptions.MinTriggerByAccqWhenFreeGreaterEq; + public int? RollbackFromMin { get; } - public int? ScaleToMax => _ringBufferOptions.ScaleToMaxLessEq; + public int? TriggerFromMin { get; } - public int? RollbackFromMax => _ringBufferOptions.MaxRollbackWhenFreeGreaterEq; + public int? ScaleToMax { get; } - public int? TriggerFromMax => _ringBufferOptions.MaxTriggerByAccqWhenFreeGreaterEq; + public int? RollbackFromMax { get; } - public TimeSpan AccquireTimeout => _ringBufferOptions.AccquireTimeout; - - - public void Counters(Action counters) - { - lock (_lockcount) - { - var cur = _currentCapacityBuffer; - var ava = _availableBuffer.Count; - _available = ava; - _unavailable = cur - ava; - _toCreating = cur - (_available + _unavailable); - counters(_available, _unavailable, _toCreating); - } - } + public int? TriggerFromMax { get; } + public TimeSpan AccquireTimeout { get; } public RingBufferValue Accquire(CancellationToken? cancellation = null) { @@ -213,40 +180,38 @@ public RingBufferValue Accquire(CancellationToken? cancellation = null) T result = default; var ok = false; - Warmup(false, TimeSpan.Zero); - + //if not Warmup execute if (!_WarmupComplete) { - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Wait Warmup Completed")); + Warmup(false, TimeSpan.Zero); + WriteLogDebug(DateTime.Now, $"{Name} Wait Warmup Completed"); while (!_WarmupComplete && !localcancellation.IsCancellationRequested) { - localcancellation.WaitHandle.WaitOne(100); + localcancellation.WaitHandle.WaitOne(2); } - if (_availableBuffer.Count != _ringBufferOptions.Capacity) + if (_availableBuffer.Count != Capacity) { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Accquire Send Warmup Timeout to OnError Handler")); + WriteLogDebug(DateTime.Now, $"{Name} Accquire Send Warmup Timeout to OnError Handler"); //Send error - _blockexceptionsBuffer.Add(new RingBufferException(_ringBufferOptions.Name, "Accquire Warmup Timeout"), _managertoken.Token); + _blockexceptionsBuffer.Add(new RingBufferException(Name, "Accquire Warmup Timeout"), _managertoken.Token); } else { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Accquire Warmup Completed")); + WriteLogInfo(DateTime.Now, $"{Name} Accquire Warmup Completed"); + } + if (localcancellation.IsCancellationRequested) + { + return new RingBufferValue(Name, TimeSpan.Zero, false, default, null); } } - - if (localcancellation.IsCancellationRequested) - { - return new RingBufferValue(_ringBufferOptions.Name, TimeSpan.Zero, ok, result,RenewBuffer); - } - + //try Accquire buffer var sw = Stopwatch.StartNew(); - var max = _ringBufferOptions.AccquireTimeout; while (!localcancellation.IsCancellationRequested && !ok) { if (!_availableBuffer.TryDequeue(out result)) { - if (sw.Elapsed > max) + if (sw.Elapsed > AccquireTimeout) { break; } @@ -254,73 +219,109 @@ public RingBufferValue Accquire(CancellationToken? cancellation = null) } else { - ok = true; + if (_factoryHealth is not null && !_factoryHealth(result)) + { + if (result is IDisposable disposablevalue) + { + disposablevalue.Dispose(); + WriteLogDebug(DateTime.Now, $"{Name} Accquire Disposed Item"); + } + _blockrenewBuffer.Add(new RingBufferValue(1, ScaleMode.None)); + } + else + { + ok = true; + } } } sw.Stop(); - if (localcancellation.IsCancellationRequested && ok) + + //Accquire timeout + if (!ok) { - _availableBuffer.Enqueue(result); + WriteLogDebug(DateTime.Now, $"{Name} Accquire timeout {sw.Elapsed}"); + //Send error + _blockexceptionsBuffer.Add(new RingBufferException(Name, $"Accquire timeout {sw.Elapsed}, Current Capacity : {_currentCapacityBuffer}"), _managertoken.Token); + return new RingBufferValue(Name, TimeSpan.Zero, false, default, null); } - if (!localcancellation.IsCancellationRequested && _ringBufferOptions.HasScaleCapacity && ok && _currentCapacityBuffer == _ringBufferOptions.MinCapacity && _availableBuffer.Count <= _ringBufferOptions.MinTriggerByAccqWhenFreeGreaterEq) + + //send trigger mincapacity to default capacity + if (!localcancellation.IsCancellationRequested && + _WarmupComplete && + ScaleCapacity && + ok && + _currentCapacityBuffer == MinCapacity && + _availableBuffer.Count <= TriggerFromMin) { + SemaphoreAquire.Wait(_managertoken.Token); try { - SemaphoreAquire.Wait(_managertoken.Token); + //update _currentCapacityBuffer and send to consumer metric report Task.Run(() => { - if (_currentCapacityBuffer == _ringBufferOptions.MinCapacity && _availableBuffer.Count >= _ringBufferOptions.MinTriggerByAccqWhenFreeGreaterEq) + if (_currentCapacityBuffer == MinCapacity && _availableBuffer.Count >= TriggerFromMin) { - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Accquire Invoked {ScaleMode.ToDefaultCapacity} : {_ringBufferOptions.Capacity}")); + WriteLogDebug(DateTime.Now, $"{Name} Accquire Invoked {ScaleMode.ToDefaultCapacity} : {Capacity}"); + var diff = Capacity - _availableBuffer.Count; + Interlocked.Exchange(ref _currentCapacityBuffer, Capacity); lock (_lockMetric) { _MetricBuffer.Clear(); - var diff = _ringBufferOptions.Capacity - _availableBuffer.Count; - _currentCapacityBuffer = _ringBufferOptions.Capacity; - _blockrenewBuffer.Add(new RingBufferValue(diff)); } + + _blockrenewBuffer.Add(new RingBufferValue(diff, ScaleMode.ToDefaultCapacity)); SemaphoreAquire.Release(); } }, _managertoken.Token); } catch (OperationCanceledException) + { + //none + } + finally { SemaphoreAquire.Release(); } } - if (!localcancellation.IsCancellationRequested && _ringBufferOptions.HasScaleCapacity && ok && _currentCapacityBuffer == _ringBufferOptions.MaxCapacity && _availableBuffer.Count >= _ringBufferOptions.MaxTriggerByAccqWhenFreeGreaterEq) - { + //update _currentCapacityBuffer and send trigger maxcapacity to default capacity + else if (!localcancellation.IsCancellationRequested && + ScaleCapacity && + _WarmupComplete && + ok && + _currentCapacityBuffer == MaxCapacity && + _availableBuffer.Count >= TriggerFromMax) + { + SemaphoreAquire.Wait(_managertoken.Token); try { - SemaphoreAquire.Wait(_managertoken.Token); + //send to consumer metric report Task.Run(() => { - if (_currentCapacityBuffer == _ringBufferOptions.MaxCapacity && _availableBuffer.Count >= _ringBufferOptions.MaxTriggerByAccqWhenFreeGreaterEq) + if (_currentCapacityBuffer == MaxCapacity && _availableBuffer.Count >= TriggerFromMax) { - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Accquire Invoked {ScaleMode.ToDefaultCapacity} : {_ringBufferOptions.Capacity}")); + WriteLogDebug(DateTime.Now, $"{Name} Accquire Invoked {ScaleMode.ToDefaultCapacity} : {Capacity}"); + var diff = _availableBuffer.Count - Capacity; + Interlocked.Exchange(ref _currentCapacityBuffer, Capacity); lock (_lockMetric) { _MetricBuffer.Clear(); - var diff = _availableBuffer.Count - _ringBufferOptions.Capacity; - _currentCapacityBuffer = _ringBufferOptions.Capacity; - _blockrenewBuffer.Add(new RingBufferValue(diff)); } + _blockrenewBuffer.Add(new RingBufferValue(diff, ScaleMode.ToDefaultCapacity)); SemaphoreAquire.Release(); } }, _managertoken.Token); } catch (OperationCanceledException) + { + //none + } + finally { SemaphoreAquire.Release(); } } - if (!ok) - { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Accquire fail after {sw.Elapsed}")); - //Send error - _blockexceptionsBuffer.Add(new RingBufferException(_ringBufferOptions.Name, $"Accquire fail after { sw.Elapsed }, Current Capacity : {_currentCapacityBuffer}"), _managertoken.Token); - } - return new RingBufferValue(_ringBufferOptions.Name, sw.Elapsed, ok, result,RenewBuffer); + //ok + return new RingBufferValue(Name, sw.Elapsed, true, result,DisposeBuffer); } #endregion @@ -337,6 +338,74 @@ public bool Warmup(TimeSpan? timeout = null) #endregion + #region IRingBufferSwith + + public bool SwithTo(ScaleMode scaleMode) + { + if (!_WarmupComplete) + { + return false; + } + if (_swithFrom is null) + { + throw new InvalidOperationException($"{Name}: Not found Ring Buffer from Swith"); + } + int diff; + int newcap; + switch (scaleMode) + { + case ScaleMode.None: + throw new InvalidOperationException($"{Name}: {scaleMode} Not valid to Swith"); + case ScaleMode.ToMinCapacity: + diff = _currentCapacityBuffer - MinCapacity; + newcap = MinCapacity; + break; + case ScaleMode.ToMaxCapacity: + diff = MaxCapacity - _currentCapacityBuffer; + newcap = MaxCapacity; + break; + case ScaleMode.ToDefaultCapacity: + diff = _currentCapacityBuffer - Capacity; + newcap = Capacity; + break; + default: + throw new ArgumentException($"scaleMode Not found {scaleMode}"); + } + if (diff != 0) + { + if (_reportHandler != null) + { + WriteLogDebug(DateTime.Now, $"{Name} SwithTo Invoked {SourceTrigger.MasterSlave} : {scaleMode} and Send Metric To Report Thread"); + _blockreportBuffer.Add((scaleMode, new RingBufferMetric(SourceTrigger.MasterSlave, _currentCapacityBuffer, newcap))); + } + _currentCapacityBuffer = newcap; + WriteLogDebug(DateTime.Now, $"{Name} SwithTo Send Message Create to Renew Buffer Thread"); + _blockrenewBuffer.Add(new RingBufferValue(diff, scaleMode)); + lock (_lockMetric) + { + _MetricBuffer.Clear(); + } + return true; + } + return false; + } + + #endregion + + #region IRingBufferCallback + + + public void CallBackMaster(IRingBufferSwith value) + { + _swithFrom = value; + } + + public bool IsSlave { get; private set; } + + public SemaphoreSlim SemaphoremasterSlave { get; } = new(1, 1); + + #endregion + private void Warmup(bool waitfullcapacity, TimeSpan timeoutfullcapacity) { if (_WarmupComplete) @@ -361,90 +430,75 @@ private void Warmup(bool waitfullcapacity, TimeSpan timeoutfullcapacity) { foreach (var item in _blockrenewBuffer.GetConsumingEnumerable(_managertoken.Token)) { - if (item.Successful || item.IsScaleCapacity) { if (item.SkipTurnback || item.IsScaleCapacity) { - if (item.Current is IDisposable disposable) - { - disposable.Dispose(); - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Renew Buffer Disposed Item")); - } if (item.SkipTurnback) { - _blockrenewBuffer.Add(new RingBufferValue(1)); + if (item.Current is not null && item.Current is IDisposable disposable) + { + disposable.Dispose(); + WriteLogDebug(DateTime.Now, $"{Name} Renew Buffer Disposed Item"); + } + _blockrenewBuffer.Add(new RingBufferValue(1, ScaleMode.None)); } else if (item.IsScaleCapacity) { - using var ctstimeout = CancellationTokenSource.CreateLinkedTokenSource(_managertoken.Token); - try + if (_swithTo is not null && item.ScaleMode != ScaleMode.None) { - if (item.DiffCapacity > 0) + + WriteLogDebug(DateTime.Now, $"{Name} SemaphoremasterSlave wait"); + SemaphoremasterSlave.Wait(); + WriteLogDebug(DateTime.Now, $"{Name} SemaphoremasterSlave done"); + + var slavename = ((IRingBufferCallback)_swithTo).Name; + WriteLogDebug(DateTime.Now, $"Master({Name}) to Slave({slavename}) swith to {item.ScaleMode}"); + if (_swithTo.SwithTo(item.ScaleMode)) { - for (int i = 0; i < item.DiffCapacity; i++) + WriteLogDebug(DateTime.Now, $"{Name} SemaphoremasterSlave wait {slavename} Release"); + SemaphoremasterSlave.Wait(); + var diff = _currentCapacityBuffer - _availableBuffer.Count; + RemoveBuffer(_currentCapacityBuffer); + await TryLoadBufferAsync(diff); + if (_currentCapacityBuffer == MinCapacity && _swithTo is not null && _MetricBuffer.Count == 0) { - ctstimeout.CancelAfter(_ringBufferOptions.FactoryTimeout); - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Renew Factory Handler Invoked")); - await Task.Run(() => - { - var value = _ringBufferOptions.FactoryHandler.Invoke(ctstimeout.Token); - if (RehydrateBuffer(value)) - { - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Renew Added New Item To Buffer")); - } - }, ctstimeout.Token); + //clear all buffer + RemoveBuffer(0); + //add buffer (MinCapacity) + await TryLoadBufferAsync(MinCapacity); } } - else + if (SemaphoremasterSlave.CurrentCount == 0) { - for (int i = 0; i < item.DiffCapacity * -1; i++) - { - _availableBuffer.TryDequeue(out var value); - if (value is IDisposable disposablevalue) - { - disposablevalue.Dispose(); - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Renew Buffer Disposed Item")); - } - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Renew Buffer removed Item")); - } + SemaphoremasterSlave.Release(); } } - catch (OperationCanceledException) + else { - if (ctstimeout.IsCancellationRequested && !_managertoken.IsCancellationRequested) + var diff = _currentCapacityBuffer - _availableBuffer.Count; + RemoveBuffer(_currentCapacityBuffer); + await TryLoadBufferAsync(diff); + if (diff != 0) { - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Renew Buffer Send Timeout to OnError Handler")); - //Send error - _blockexceptionsBuffer.Add(new RingBufferException(_ringBufferOptions.Name, "Renew Buffer Factory Timeout, Send Factory to Retry")); - //Send to retry - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Renew Buffer Send Factory to Retry")); - _blockRetryFactoryBuffer.Add(DateTime.Now.Add(_ringBufferOptions.FactoryTimeout)); - } - } - catch (Exception ex) - { - - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Renew Buffer Send Exception to OnError Handler")); - //error - if (!_managertoken.Token.IsCancellationRequested) - { - _blockexceptionsBuffer.Add(new RingBufferException(_ringBufferOptions.Name, "Renew Buffer Factory Error", ex)); - } - //timeout - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Renew Buffer Send Factory to Retry")); - if (!_managertoken.Token.IsCancellationRequested) - { - _blockRetryFactoryBuffer.Add(DateTime.Now.Add(_ringBufferOptions.FactoryTimeout)); + if (_swithFrom is not null) + { + if (((IRingBufferCallback)_swithFrom).SemaphoremasterSlave.CurrentCount == 0) + { + var master = ((IRingBufferCallback)_swithFrom).Name; + WriteLogDebug(DateTime.Now, $"{Name}: From Master({master}) SemaphoremasterSlave Release"); + ((IRingBufferCallback)_swithFrom).SemaphoremasterSlave.Release(); + } + } } } } } else { - if (RehydrateBuffer(item.Current)) + if (!item.SkipTurnback && !item.IsScaleCapacity && RehydrateBuffer(item.Current)) { - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Renew Rehydrated Buffer")); + WriteLogDebug(DateTime.Now, $"{Name} Renew Rehydrated Buffer"); } } } @@ -456,40 +510,47 @@ await Task.Run(() => } finally { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Renew Buffer Thread Stoped")); + WriteLogInfo(DateTime.Now, $"{Name} Renew Buffer Thread Stoped"); } }, _managertoken.Token); - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Renew Buffer Thread Created")); + WriteLogInfo(DateTime.Now, $"{Name} Renew Buffer Thread Created"); _renewBufferThread.Start(); _logErrorBufferThread = new Task(() => { - try + if (_errorHandler != null) { - foreach (var item in _blockexceptionsBuffer.GetConsumingEnumerable(_managertoken.Token)) + try { - try + foreach (var item in _blockexceptionsBuffer.GetConsumingEnumerable(_managertoken.Token)) { - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} OnError Handler Invoked")); - _ringBufferOptions.ErrorHandler?.Invoke(_ringBufferOptions.Logger, item); - } - catch (Exception ex) - { - var dtref = DateTime.Now; - _ringBufferOptions.Logger?.Log(LogLevel.Error, "[{dtref}] Error again! OnError Handler : {ex}", dtref, ex); + try + { + WriteLogDebug(DateTime.Now, $"{Name} OnError Handler Invoked"); + _errorHandler?.Invoke(_logger, item); + } + catch (Exception ex) + { + var dtref = DateTime.Now; + _logger?.Log(LogLevel.Error, "[{dtref}] Error again! OnError Handler : {ex}", dtref, ex); + } } } + catch (OperationCanceledException) + { + //none + } + finally + { + WriteLogInfo(DateTime.Now, $"{Name} Log Error Buffer Thread Stoped"); + } } - catch (OperationCanceledException) - { - //none - } - finally + else { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Log Error Buffer Thread Stoped")); + WriteLogInfo(DateTime.Now, $"{Name} Log Error Buffer Thread Stoped"); } }, _managertoken.Token); - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Log Error Buffer Thread Created")); + WriteLogInfo(DateTime.Now, $"{Name} Log Error Buffer Thread Created"); _logErrorBufferThread.Start(); _retryFactoryThread = new Task(() => @@ -503,12 +564,12 @@ await Task.Run(() => { diff = item - DateTime.Now; } - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Wait Retry Factory {diff}")); + WriteLogDebug(DateTime.Now, $"{Name} Wait Retry Factory {diff}"); _managertoken.Token.WaitHandle.WaitOne(diff); if (!_managertoken.Token.IsCancellationRequested) { - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Retry Factory Send Message Create to Renew Buffer Thread")); - _blockrenewBuffer.Add(new RingBufferValue(10)); + WriteLogDebug(DateTime.Now, $"{Name} Retry Factory Send Message Create to Renew Buffer Thread"); + _blockrenewBuffer.Add(new RingBufferValue(1, ScaleMode.None)); } } } @@ -518,17 +579,21 @@ await Task.Run(() => } finally { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Retry Factory Thread Stoped")); + WriteLogInfo(DateTime.Now, $"{Name} Retry Factory Thread Stoped"); } }, _managertoken.Token); - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Retry Factory Thread Created")); + WriteLogInfo(DateTime.Now, $"{Name} Retry Factory Thread Created"); _retryFactoryThread.Start(); _metricBufferThread = new Task(() => { - while (_ringBufferOptions.HasScaleCapacity && !_managertoken.Token.IsCancellationRequested) + while (!_managertoken.Token.IsCancellationRequested && !_WarmupComplete) + { + _managertoken.Token.WaitHandle.WaitOne(10); + } + while (ScaleCapacity && !_managertoken.Token.IsCancellationRequested) { - _managertoken.Token.WaitHandle.WaitOne(_ringBufferOptions.SampleDelay); + _managertoken.Token.WaitHandle.WaitOne(SampleUnit); if (_managertoken.Token.IsCancellationRequested) { continue; @@ -537,67 +602,70 @@ await Task.Run(() => { var available = _availableBuffer.Count; _MetricBuffer.Add(available); - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Metric Added Available information: {available}, {_MetricBuffer.Count}/{_ringBufferOptions.SampleUnit}")); - if (_MetricBuffer.Count >= _ringBufferOptions.SampleUnit) + WriteLogDebug(DateTime.Now, $"{Name} Metric Added Available information: {available}, {_MetricBuffer.Count}/{SamplesCount}"); + if (_MetricBuffer.Count >= SamplesCount) { var avg = _MetricBuffer.ToArray().Average(); _MetricBuffer.Clear(); if (!_managertoken.Token.IsCancellationRequested) { _blockScaleBuffer.Add(Convert.ToInt32(avg)); - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Metric Resume Created({Convert.ToInt32(avg)})")); + WriteLogDebug(DateTime.Now, $"{Name} Metric Resume Created({Convert.ToInt32(avg)})"); } } } } - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Metric Buffer Thread Stoped")); + WriteLogInfo(DateTime.Now, $"{Name} Metric Buffer Thread Stoped"); }, _managertoken.Token); - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Metric Buffer Thread Created")); + WriteLogInfo(DateTime.Now, $"{Name} Metric Buffer Thread Created"); _metricBufferThread.Start(); _scaleCapacityThread = new Task(() => { - if (_ringBufferOptions.HasScaleCapacity) + if (ScaleCapacity) { try { - foreach (var item in _blockScaleBuffer.GetConsumingEnumerable(_managertoken.Token)) + foreach (var freebuffer in _blockScaleBuffer.GetConsumingEnumerable(_managertoken.Token)) { var newcap = _currentCapacityBuffer; var currentcap = newcap; var diff = 0; ScaleMode? mode = null; - if (_ringBufferOptions.ScaleToMaxLessEq.HasValue && item <= _ringBufferOptions.ScaleToMaxLessEq.Value && currentcap == _ringBufferOptions.Capacity) + if (ScaleToMax.HasValue && freebuffer <= ScaleToMax.Value && currentcap == Capacity) { - newcap = _ringBufferOptions.MaxCapacity; + newcap = MaxCapacity; mode = ScaleMode.ToMaxCapacity; diff = newcap - _availableBuffer.Count; } - else if (_ringBufferOptions.ScaleToMinGreaterEq.HasValue && item >= _ringBufferOptions.ScaleToMinGreaterEq.Value && currentcap == _ringBufferOptions.Capacity) + else if (ScaleToMin.HasValue && freebuffer >= ScaleToMin.Value && currentcap == Capacity) { - newcap = _ringBufferOptions.MinCapacity; + newcap = MinCapacity; mode = ScaleMode.ToMinCapacity; - diff = (currentcap - _availableBuffer.Count) - newcap; + diff = currentcap - _availableBuffer.Count - newcap; } - else if (_ringBufferOptions.MinRollbackWhenFreeLessEq.HasValue && item <= _ringBufferOptions.MinRollbackWhenFreeLessEq.Value && currentcap == _ringBufferOptions.MinCapacity) + else if (RollbackFromMin.HasValue && freebuffer <= RollbackFromMin.Value && currentcap == MinCapacity) { - newcap = _ringBufferOptions.Capacity; + newcap = Capacity; mode = ScaleMode.ToDefaultCapacity; diff = newcap - _availableBuffer.Count; } - else if (_ringBufferOptions.MaxRollbackWhenFreeGreaterEq.HasValue && item >= _ringBufferOptions.MaxRollbackWhenFreeGreaterEq.Value && currentcap == _ringBufferOptions.MaxCapacity) + else if (RollbackFromMax.HasValue && freebuffer >= RollbackFromMax.Value && currentcap == MaxCapacity) { - newcap = _ringBufferOptions.Capacity; + newcap = Capacity; mode = ScaleMode.ToDefaultCapacity; diff = _availableBuffer.Count - newcap; } if (mode.HasValue && !_managertoken.Token.IsCancellationRequested) { - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Scale Capacity Invoked {mode} : {newcap} and Send Metric To Report Thread")); - _blockreportBuffer.Add((mode.Value, new RingBufferMetric(SourceTrigger.AutoScale, currentcap, newcap, _ringBufferOptions.Capacity, _ringBufferOptions.MinCapacity, _ringBufferOptions.MaxCapacity, item, DateTime.Now))); + if (_reportHandler != null) + { + WriteLogDebug(DateTime.Now, $"{Name} Scale Capacity Invoked {mode} : {newcap} and Send Metric To Report Thread"); + _blockreportBuffer.Add((mode.Value, new RingBufferMetric(SourceTrigger.AutoScale, currentcap, newcap))); + } _currentCapacityBuffer = newcap; - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Scale Capacity Send Message Create to Renew Buffer Thread")); - _blockrenewBuffer.Add(new RingBufferValue(diff)); + WriteLogDebug(DateTime.Now, $"{Name} Scale Capacity Send Message Create to Renew Buffer Thread"); + _blockrenewBuffer.Add(new RingBufferValue(diff,mode.Value)); lock (_lockMetric) { _MetricBuffer.Clear(); @@ -610,49 +678,53 @@ await Task.Run(() => //none } } - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Scale Capacity Thread Stoped")); + WriteLogInfo(DateTime.Now, $"{Name} Scale Capacity Thread Stoped"); }, _managertoken.Token); - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Scale Capacity Thread Created")); + WriteLogInfo(DateTime.Now, $"{Name} Scale Capacity Thread Created"); _scaleCapacityThread.Start(); _reportscaleCapacityThread = new Task(() => { - try + if (_reportHandler != null) { - if (_ringBufferOptions.HasScaleCapacity) + try { foreach ((ScaleMode Mode, RingBufferMetric Metric) in _blockreportBuffer.GetConsumingEnumerable(_managertoken.Token)) { try { - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} ReportHandler Invoked")); - _ringBufferOptions.ReportHandler?.Invoke(Mode, _ringBufferOptions.Logger, Metric, _managertoken.Token); + WriteLogDebug(DateTime.Now, $"{Name} ReportHandler Invoked"); + _reportHandler?.Invoke(Mode, _logger, Metric, _managertoken.Token); } catch (Exception ex) { var dtref = DateTime.Now; - _ringBufferOptions.Logger?.Log(LogLevel.Error, "[{dtref}] Error ReportScale Handler : {ex}", dtref, ex); + _logger?.Log(LogLevel.Error, "[{dtref}] Error ReportScale Handler : {ex}", dtref, ex); } } } + catch (OperationCanceledException) + { + //none + } + finally + { + WriteLogInfo(DateTime.Now, $"{Name} Report Scale Capacity Thread Stoped"); + } } - catch (OperationCanceledException) - { - //none - } - finally + else { - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Report Scale Capacity Thread Stoped")); + WriteLogInfo(DateTime.Now, $"{Name} Report Scale Capacity Thread Stoped"); } }, _managertoken.Token); - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Report Scale Capacity Thread Created")); + WriteLogInfo(DateTime.Now, $"{Name} Report Scale Capacity Thread Created"); _reportscaleCapacityThread.Start(); using var warmupcts = CancellationTokenSource.CreateLinkedTokenSource(_managertoken.Token); - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Creating {_ringBufferOptions.Capacity} items")); + WriteLogInfo(DateTime.Now, $"{Name} Creating {Capacity} items"); - _blockrenewBuffer.Add(new RingBufferValue(_ringBufferOptions.Capacity)); + _blockrenewBuffer.Add(new RingBufferValue(Capacity, ScaleMode.None)); warmupcts.CancelAfter(timeoutfullcapacity); if (waitfullcapacity) @@ -660,18 +732,18 @@ await Task.Run(() => while (!warmupcts.Token.IsCancellationRequested && _availableBuffer.Count != _currentCapacityBuffer) { - warmupcts.Token.WaitHandle.WaitOne(100); + warmupcts.Token.WaitHandle.WaitOne(10); } } else { while (!warmupcts.Token.IsCancellationRequested && _availableBuffer.Count >= 2) { - warmupcts.Token.WaitHandle.WaitOne(100); + warmupcts.Token.WaitHandle.WaitOne(10); } } - WriteLogInfo(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Created {_availableBuffer.Count} items")); + WriteLogInfo(DateTime.Now, $"{Name} Created {_availableBuffer.Count} items"); lock (_lockWarmup) { @@ -680,7 +752,7 @@ await Task.Run(() => } } - private void RenewBuffer(RingBufferValue value) + private void DisposeBuffer(RingBufferValue value) { if (!_managertoken.IsCancellationRequested) { @@ -691,19 +763,88 @@ private void RenewBuffer(RingBufferValue value) if (value.Current is IDisposable disposablevalue) { disposablevalue.Dispose(); - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} Renew Buffer Disposed Item")); + WriteLogDebug(DateTime.Now, $"{Name} Renew Buffer Disposed Item"); + } + } + } + + private void RemoveBuffer(int newcapacity) + { + while (_availableBuffer.Count > newcapacity) + { + if (_availableBuffer.TryDequeue(out var value)) + { + if (value is IDisposable disposablevalue) + { + disposablevalue.Dispose(); + WriteLogDebug(DateTime.Now, $"{Name} RemoveBuffer Disposed Item"); + } + WriteLogDebug(DateTime.Now, $"{Name} RemoveBuffer removed Item"); + } + } + } + + private async Task TryLoadBufferAsync(int diff) + { + for (int i = 0; i < diff; i++) + { + using var ctstimeout = CancellationTokenSource.CreateLinkedTokenSource(_managertoken.Token); + ctstimeout.CancelAfter(FactoryTimeout); + WriteLogDebug(DateTime.Now, $"{Name} TryLoadBufferAsync Handler Invoked"); + try + { + await Task.Run(() => + { + try + { + var value = _factory.Invoke(ctstimeout.Token); + if (value != null && RehydrateBuffer(value)) + { + WriteLogDebug(DateTime.Now, $"{Name} TryLoadBufferAsync Added New Item To Buffer"); + } + } + catch (Exception ex) + { + + WriteLogDebug(DateTime.Now, $"{Name} TryLoadBufferAsync Send Exception to OnError Handler"); + //error + if (!_managertoken.Token.IsCancellationRequested) + { + _blockexceptionsBuffer.Add(new RingBufferException(Name, "TryLoadBufferAsync Factory Error", ex)); + } + //timeout + WriteLogDebug(DateTime.Now, $"{Name} TryLoadBufferAsync Send Factory to Retry"); + if (!_managertoken.Token.IsCancellationRequested) + { + _blockRetryFactoryBuffer.Add(DateTime.Now.Add(FactoryIdleRetry)); + } + } + + }, ctstimeout.Token); + } + catch (OperationCanceledException) + { + if (ctstimeout.IsCancellationRequested && !_managertoken.IsCancellationRequested) + { + WriteLogDebug(DateTime.Now, $"{Name}TryLoadBufferAsync Send Timeout to OnError Handler"); + //Send error + _blockexceptionsBuffer.Add(new RingBufferException(Name, "TryLoadBufferAsync Factory Timeout, Send Factory to Retry")); + //Send to retry + WriteLogDebug(DateTime.Now, $"{Name} TryLoadBufferAsync Send Factory to Retry"); + _blockRetryFactoryBuffer.Add(DateTime.Now.Add(FactoryTimeout)); + } } } } private void WriteLogDebug(DateTime dtref, string message) { - _ringBufferOptions.Logger?.Log( LogLevel.Debug, "[{dtref}] {message}", dtref,message); + _logger?.Log( LogLevel.Debug, "[{dtref}] {message}", dtref,message); } private void WriteLogInfo(DateTime dtref, string message) { - _ringBufferOptions.Logger?.Log(LogLevel.Information, "[{dtref}] {message}", dtref, message); + _logger?.Log(LogLevel.Information, "[{dtref}] {message}", dtref, message); } private bool RehydrateBuffer(T value) @@ -716,9 +857,10 @@ private bool RehydrateBuffer(T value) if (value is IDisposable disposable) { disposable.Dispose(); - WriteLogDebug(DateTime.Now, string.Format($"{_ringBufferOptions.Name} RehydrateBuffer Disposed Item")); + WriteLogDebug(DateTime.Now, $"{Name} RehydrateBuffer Disposed Item"); } return false; } + } }