diff --git a/src/NBomber/Constants.fs b/src/NBomber/Constants.fs index 8eb82911..8c3b6103 100644 --- a/src/NBomber/Constants.fs +++ b/src/NBomber/Constants.fs @@ -51,8 +51,6 @@ let SchedulerTimerDriftMs = 10.0 let SchedulerTickIntervalMs = 1_000.0 [] let ReportingTimerCompleteMs = 3_000 -[] -let WarmUpFinishPause = 1_000 /// Default status codes diff --git a/src/NBomber/Domain/ClientPool.fs b/src/NBomber/Domain/ClientPool.fs index 5fcd6660..c4a370ae 100644 --- a/src/NBomber/Domain/ClientPool.fs +++ b/src/NBomber/Domain/ClientPool.fs @@ -10,7 +10,7 @@ open NBomber.Extensions.Internal open NBomber.Contracts type ClientPoolEvent = - | StartedInit of poolName:string + | StartedInit of poolName:string * clientCount:int | StartedDispose of poolName:string | ClientInitialized of poolName:string * clientNumber:int | ClientDisposed of poolName:string * clientNumber:int * error:exn option @@ -46,7 +46,7 @@ type ClientPool(factory: ClientFactory) = | Error ex -> yield Error ex } - _eventStream.OnNext(StartedInit factory.FactoryName) + _eventStream.OnNext(StartedInit(factory.FactoryName, factory.ClientCount)) let result = initClients(0, factory.ClientCount, context) |> Result.sequence match result with diff --git a/src/NBomber/Domain/Concurrency/Scheduler/ScenarioScheduler.fs b/src/NBomber/Domain/Concurrency/Scheduler/ScenarioScheduler.fs index 15c482cd..b6e579b4 100644 --- a/src/NBomber/Domain/Concurrency/Scheduler/ScenarioScheduler.fs +++ b/src/NBomber/Domain/Concurrency/Scheduler/ScenarioScheduler.fs @@ -3,8 +3,6 @@ module internal NBomber.Domain.Concurrency.Scheduler.ScenarioScheduler open System open System.Threading.Tasks -open FSharp.Control.Reactive - open NBomber.Contracts open NBomber.Contracts.Stats open NBomber.Domain @@ -21,16 +19,6 @@ type SchedulerCommand = | InjectOneTimeActors of scheduledCount:int | DoNothing -type ScenarioProgressInfo = { - ConstantActorCount: int - OneTimeActorCount: int - CurrentSimulation: LoadSimulation -} - -type SchedulerEvent = - | ScenarioStarted - | ProgressUpdated of ScenarioProgressInfo - let calcScheduleByTime (copiesCount: int, prevSegmentCopiesCount: int, timeSegmentProgress: int) = let value = copiesCount - prevSegmentCopiesCount let result = (float value / 100.0 * float timeSegmentProgress) + float prevSegmentCopiesCount @@ -90,7 +78,6 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) = if _scenario.IsEnabled then new OneTimeActorScheduler(dep, OneTimeActorScheduler.exec) else new OneTimeActorScheduler(dep, emptyExec) - let _eventStream = Subject.broadcast let _tcs = TaskCompletionSource() let _randomGen = Random() @@ -104,7 +91,7 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) = _cachedSimulationStats <- getCurrentSimulationStats() _scnDep.ScenarioStatsActor.Publish StartUseTempBuffer - let buildRealtimeStats (duration) = + let buildRealtimeStats (duration: TimeSpan) = let simulationStats = getCurrentSimulationStats() let reply = TaskCompletionSource() _scnDep.ScenarioStatsActor.Publish(BuildRealtimeStats(reply, simulationStats, duration)) @@ -116,9 +103,13 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) = _scnDep.ScenarioStatsActor.Publish FlushTempBuffer reply.Task - let getFinalStats () = + let getStats (isFinal: bool) = let simulationStats = getCurrentSimulationStats() - let duration = Scenario.getExecutedDuration _scenario + + let duration = + if isFinal then Scenario.getExecutedDuration _scenario + else _scnDep.ScenarioTimer.Elapsed + let reply = TaskCompletionSource() _scnDep.ScenarioStatsActor.Publish(GetFinalStats(reply, simulationStats, duration)) reply.Task @@ -135,7 +126,6 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) = _scnDep.ScenarioTimer.Stop() _constantScheduler.Stop() _oneTimeScheduler.Stop() - _eventStream.OnCompleted() let execScheduler () = if _isWorking && _scnDep.ScenarioStatsActor.FailCount > _scnDep.MaxFailCount then @@ -169,40 +159,28 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) = | None -> stop() - let updateProgress () = - let progressInfo = { - ConstantActorCount = getConstantActorCount() - OneTimeActorCount = getOneTimeActorCount() - CurrentSimulation = _currentSimulation - } - _eventStream.OnNext(ProgressUpdated progressInfo) - let start () = _isWorking <- true _scnDep.ScenarioTimer.Restart() execScheduler() - _eventStream.OnNext(ScenarioStarted) _tcs.Task :> Task member _.Working = _isWorking - member _.EventStream = _eventStream :> IObservable<_> member _.Scenario = _scenario member _.AllRealtimeStats = _scnDep.ScenarioStatsActor.AllRealtimeStats member _.Start() = start() member _.Stop() = stop() member _.ExecScheduler() = execScheduler() - member _.UpdateProgress() = updateProgress() member _.AddStatsFromAgent(stats) = _scnDep.ScenarioStatsActor.Publish(AddFromAgent stats) member _.PrepareForRealtimeStats() = prepareForRealtimeStats() member _.CommitRealtimeStats(duration) = commitRealtimeStats duration member _.BuildRealtimeStats(duration) = buildRealtimeStats duration - member _.GetFinalStats() = getFinalStats() + member _.GetFinalStats() = getStats true + member _.GetCurrentStats() = getStats false interface IDisposable with member _.Dispose() = stop() - _eventStream.Dispose() _log.Verbose $"{nameof ScenarioScheduler} disposed" - diff --git a/src/NBomber/Domain/Scenario.fs b/src/NBomber/Domain/Scenario.fs index 7a5e385b..008d7c97 100644 --- a/src/NBomber/Domain/Scenario.fs +++ b/src/NBomber/Domain/Scenario.fs @@ -288,3 +288,9 @@ let defaultClusterCount = fun _ -> 1 let getScenariosForWarmUp (scenarios: Scenario list) = scenarios |> List.filter(fun x -> x.WarmUpDuration.IsSome) + +let getMaxDuration (scenarios: Scenario list) = + scenarios |> List.map(fun x -> x.PlanedDuration) |> List.max + +let getMaxWarmUpDuration (scenarios: Scenario list) = + scenarios |> List.choose(fun x -> x.WarmUpDuration) |> List.max diff --git a/src/NBomber/DomainServices/NBomberContext.fs b/src/NBomber/DomainServices/NBomberContext.fs index de18637b..3fb41d46 100644 --- a/src/NBomber/DomainServices/NBomberContext.fs +++ b/src/NBomber/DomainServices/NBomberContext.fs @@ -2,7 +2,6 @@ open System open System.IO -open System.Globalization open FsToolkit.ErrorHandling diff --git a/src/NBomber/DomainServices/TestHost/TestHost.fs b/src/NBomber/DomainServices/TestHost/TestHost.fs index 58d815d4..c1c11909 100644 --- a/src/NBomber/DomainServices/TestHost/TestHost.fs +++ b/src/NBomber/DomainServices/TestHost/TestHost.fs @@ -8,6 +8,7 @@ open System.Runtime.InteropServices open Microsoft.FSharp.Collections open Serilog +open Spectre.Console open FsToolkit.ErrorHandling open NBomber @@ -43,7 +44,6 @@ type internal TestHost(dep: IGlobalDependency, let mutable _targetScenarios = List.empty let mutable _sessionArgs = SessionArgs.empty let mutable _currentOperation = OperationType.None - let mutable _globalCancelToken = new CancellationTokenSource() let _defaultNodeInfo = NodeInfo.init None let getCurrentNodeInfo () = @@ -92,85 +92,59 @@ type internal TestHost(dep: IGlobalDependency, let stopSchedulers (schedulers: ScenarioScheduler list) = schedulers |> List.iter(fun x -> x.Stop()) - let initScenarios (sessionArgs: SessionArgs) (cancelToken: CancellationToken) = taskResult { - - let targetScenarios = regScenarios |> TestHostScenario.getTargetScenarios sessionArgs + let initScenarios (consoleStatus: StatusContext) (cancelToken: CancellationToken) (sessionArgs: SessionArgs) = taskResult { let baseContext = NBomberContext.createBaseContext(sessionArgs.TestInfo, getCurrentNodeInfo, cancelToken, _log) - let defaultScnContext = Scenario.ScenarioContext.create baseContext do! dep.WorkerPlugins |> WorkerPlugins.init dep baseContext do! dep.ReportingSinks |> ReportingSinks.init dep baseContext - return! TestHostScenario.initScenarios(dep, baseContext, defaultScnContext, sessionArgs, targetScenarios) + return! TestHostScenario.initScenarios(dep, consoleStatus, baseContext, sessionArgs, regScenarios) } - let startWarmUp (schedulers: ScenarioScheduler list) = backgroundTask { - let isWarmUp = true - TestHostConsole.displayBombingProgress(dep.ApplicationType, schedulers, isWarmUp) - - let schedulersArray = schedulers |> List.toArray - use schedulerTimer = new System.Timers.Timer(Constants.SchedulerTickIntervalMs) - schedulerTimer.Elapsed.Add(fun _ -> - schedulersArray - |> Array.Parallel.iter(fun x -> - x.ExecScheduler() - x.UpdateProgress() - ) - ) - - schedulerTimer.Start() - do! schedulers |> List.map(fun x -> x.Start()) |> Task.WhenAll - schedulerTimer.Stop() - - // wait on warmup progress bar to finish rendering - do! Task.Delay Constants.WarmUpFinishPause - } + let startScenarios (schedulers: ScenarioScheduler list) (reportingManager: IReportingManager option) = backgroundTask { - let startBombing (schedulers: ScenarioScheduler list) - (reportingManager: IReportingManager) = backgroundTask { + let isWarmUp = reportingManager.IsNone - let isWarmUp = false - TestHostConsole.displayBombingProgress(dep.ApplicationType, schedulers, isWarmUp) + if not isWarmUp then + dep.WorkerPlugins |> WorkerPlugins.start _log + dep.ReportingSinks |> ReportingSinks.start _log - dep.WorkerPlugins |> WorkerPlugins.start _log - dep.ReportingSinks |> ReportingSinks.start _log + use cancelToken = new CancellationTokenSource() + schedulers |> TestHostConsole.LiveStatusTable.display cancelToken.Token isWarmUp - reportingManager.Start() + if reportingManager.IsSome then reportingManager.Value.Start() // waiting on all scenarios to finish let schedulersArray = schedulers |> List.toArray use schedulerTimer = new System.Timers.Timer(Constants.SchedulerTickIntervalMs) - schedulerTimer.Elapsed.Add(fun _ -> - schedulersArray - |> Array.Parallel.iter(fun x -> - x.ExecScheduler() - x.UpdateProgress() - ) - ) + schedulerTimer.Elapsed.Add(fun _ -> schedulersArray |> Array.Parallel.iter(fun x -> x.ExecScheduler())) schedulerTimer.Start() do! schedulers |> List.map(fun x -> x.Start()) |> Task.WhenAll + cancelToken.Cancel() schedulerTimer.Stop() - // wait on final metrics and reporting tick - do! Task.Delay Constants.ReportingTimerCompleteMs + if not isWarmUp then + // wait on final metrics and reporting tick + do! Task.Delay Constants.ReportingTimerCompleteMs - // waiting (in case of cluster) on all raw stats - do! reportingManager.Stop() + // waiting (in case of cluster) on all raw stats + do! reportingManager.Value.Stop() - do! dep.WorkerPlugins |> WorkerPlugins.stop _log - do! dep.ReportingSinks |> ReportingSinks.stop _log + do! dep.WorkerPlugins |> WorkerPlugins.stop _log + do! dep.ReportingSinks |> ReportingSinks.stop _log } let cleanScenarios (sessionArgs: SessionArgs, + consoleStatus: StatusContext, cancelToken: CancellationToken, scenarios: Scenario list) = let baseContext = NBomberContext.createBaseContext(sessionArgs.TestInfo, getCurrentNodeInfo, cancelToken, _log) let defaultScnContext = Scenario.ScenarioContext.create baseContext let enabledScenarios = scenarios |> List.filter(fun x -> x.IsEnabled) - TestHostScenario.cleanScenarios dep baseContext defaultScnContext enabledScenarios + TestHostScenario.cleanScenarios dep consoleStatus baseContext defaultScnContext enabledScenarios member _.SessionArgs = _sessionArgs member _.CurrentOperation = _currentOperation @@ -179,28 +153,30 @@ type internal TestHost(dep: IGlobalDependency, member _.TargetScenarios = _targetScenarios member _.CurrentSchedulers = _currentSchedulers - member _.StartInit(sessionArgs: SessionArgs) = backgroundTask { + member _.StartInit(sessionArgs: SessionArgs) = _stopped <- false _currentOperation <- OperationType.Init TestHostConsole.printContextInfo dep _log.Information "Starting init..." - _globalCancelToken.Dispose() - _globalCancelToken <- new CancellationTokenSource() - - match! initScenarios sessionArgs _globalCancelToken.Token with - | Ok initializedScenarios -> - _log.Information "Init finished" - _targetScenarios <- initializedScenarios - _sessionArgs <- sessionArgs - _currentOperation <- OperationType.None - return Ok _targetScenarios - - | Error appError -> - _log.Error "Init failed" - _currentOperation <- OperationType.Stop - return AppError.createResult appError - } + + TestHostConsole.displayStatus "Initializing scenarios..." (fun consoleStatus -> backgroundTask { + use cancelToken = new CancellationTokenSource() + match! initScenarios consoleStatus cancelToken.Token sessionArgs with + | Ok initializedScenarios -> + _log.Information "Init finished" + cancelToken.Cancel() + _targetScenarios <- initializedScenarios + _sessionArgs <- sessionArgs + _currentOperation <- OperationType.None + return Ok _targetScenarios + + | Error appError -> + _log.Error "Init failed" + cancelToken.Cancel() + _currentOperation <- OperationType.Stop + return AppError.createResult appError + }) member _.StartWarmUp(scenarios: Scenario list, ?getScenarioClusterCount: ScenarioName -> int) = backgroundTask { _stopped <- false @@ -220,7 +196,7 @@ type internal TestHost(dep: IGlobalDependency, _currentSchedulers <- warmUpSchedulers - do! startWarmUp warmUpSchedulers + do! startScenarios warmUpSchedulers None stopSchedulers warmUpSchedulers _currentOperation <- OperationType.None @@ -232,13 +208,13 @@ type internal TestHost(dep: IGlobalDependency, _currentSchedulers <- schedulers _log.Information "Starting bombing..." - do! startBombing schedulers reportingManager + do! startScenarios schedulers (Some reportingManager) do! this.StopScenarios() _currentOperation <- OperationType.Complete } - member _.StopScenarios([]reason: string) = backgroundTask { + member _.StopScenarios([]reason: string) = if _currentOperation <> OperationType.Stop && not _stopped then _currentOperation <- OperationType.Stop @@ -247,12 +223,17 @@ type internal TestHost(dep: IGlobalDependency, else _log.Information "Stopping scenarios..." - stopSchedulers _currentSchedulers - do! cleanScenarios(_sessionArgs, _globalCancelToken.Token, _targetScenarios) + TestHostConsole.displayStatus "Cleaning scenarios..." (fun consoleStatus -> backgroundTask { + use cancelToken = new CancellationTokenSource() + stopSchedulers _currentSchedulers - _stopped <- true - _currentOperation <- OperationType.None - } + do! cleanScenarios(_sessionArgs, consoleStatus, cancelToken.Token, _targetScenarios) + + _stopped <- true + _currentOperation <- OperationType.None + }) + else + Task.FromResult() member _.CreateScenarioSchedulers(scenarios: Scenario list, operation: ScenarioOperation, diff --git a/src/NBomber/DomainServices/TestHost/TestHostConsole.fs b/src/NBomber/DomainServices/TestHost/TestHostConsole.fs index 1b894e49..599f3fb1 100644 --- a/src/NBomber/DomainServices/TestHost/TestHostConsole.fs +++ b/src/NBomber/DomainServices/TestHost/TestHostConsole.fs @@ -1,176 +1,59 @@ module internal NBomber.DomainServices.TestHost.TestHostConsole +open System.Diagnostics +open System.Threading open System.Threading.Tasks open FSharp.Control.Reactive open FsToolkit.ErrorHandling open Spectre.Console -open NBomber -open NBomber.Contracts +open NBomber.Extensions.Data open NBomber.Domain open NBomber.Domain.DomainTypes +open NBomber.Domain.Stats.Statistics open NBomber.Domain.Concurrency.Scheduler.ScenarioScheduler open NBomber.Domain.ClientPool open NBomber.Extensions.Internal open NBomber.Infra open NBomber.Infra.Dependency -open NBomber.Infra.ProgressBar let printTargetScenarios (dep: IGlobalDependency) (targetScns: Scenario list) = targetScns |> List.map(fun x -> x.ScenarioName) |> fun targets -> dep.Logger.Information("Target scenarios: {TargetScenarios}", String.concatWithComma targets) -let displayBombingProgress (applicationType: ApplicationType, scnSchedulers: ScenarioScheduler list, isWarmUp: bool) = - - let calcTickCount (scn: Scenario) = - if isWarmUp then int(scn.WarmUpDuration.Value.TotalMilliseconds / Constants.SchedulerTickIntervalMs) - else int(scn.PlanedDuration.TotalMilliseconds / Constants.SchedulerTickIntervalMs) - - let calcTotalTickCount (schedulers: ScenarioScheduler list) = - schedulers |> Seq.map(fun scheduler -> scheduler.Scenario) |> Seq.map(calcTickCount) |> Seq.sum - - let getSimulationValue (progressInfo: ScenarioProgressInfo) = - match progressInfo.CurrentSimulation with - | RampConstant _ - | KeepConstant _ -> progressInfo.ConstantActorCount - - | RampPerSec _ - | InjectPerSec _ - | InjectPerSecRandom _ -> progressInfo.OneTimeActorCount - - let createSimulationDescription (simulation: LoadSimulation) (simulationValue: int) = - let simulationName = LoadTimeLine.getSimulationName simulation - - match simulation with - | RampConstant _ - | KeepConstant _ -> - $"load: {Console.blueColor simulationName}, copies: {Console.blueColor simulationValue}" - - | RampPerSec _ - | InjectPerSec _ - | InjectPerSecRandom _ -> - $"load: {Console.blueColor simulationName}, rate: {Console.blueColor simulationValue}" - - let createScenarioDescription (scenarioName: string) (simulation: LoadSimulation) (simulationValue: int) = - let simulationDescription = createSimulationDescription simulation simulationValue - $"{Console.okEscColor scenarioName}{MultilineColumn.NewLine}{simulationDescription}" - - let createProgressTaskConfig (scheduler: ScenarioScheduler) = - let scenarioName = scheduler.Scenario.ScenarioName - let simulation = scheduler.Scenario.LoadTimeLine.Head.LoadSimulation - let description = createScenarioDescription scenarioName simulation 0 - let ticks = scheduler.Scenario |> calcTickCount |> float - { Description = description; Ticks = ticks } - - let tickProgressTask (pbTask: ProgressTask) (scenarioName: string) (progressInfo: ScenarioProgressInfo) = - - let description = - progressInfo - |> getSimulationValue - |> createScenarioDescription scenarioName progressInfo.CurrentSimulation - - pbTask - |> ProgressBar.setDescription description - |> ProgressBar.defaultTick - - let displayProgressForConcurrentScenarios (schedulers: ScenarioScheduler list) = - schedulers - |> List.map createProgressTaskConfig - |> List.append [ - { Description = $"All Scenarios{MultilineColumn.NewLine}"; Ticks = schedulers |> calcTotalTickCount |> float } - ] - |> ProgressBar.create - (fun progressTasks -> - let pbTotalTask = progressTasks.Head - - progressTasks - |> List.iteri(fun i pbTask -> - if i > 0 then - schedulers[i - 1].EventStream - |> Observable.choose(function ProgressUpdated info -> Some info | _ -> None) - |> Observable.subscribeWithCompletion - - (fun progressInfo -> - let scenarioName = schedulers[i - 1].Scenario.ScenarioName - tickProgressTask pbTask scenarioName progressInfo - pbTotalTask |> ProgressBar.defaultTick) - - (fun () -> ProgressBar.stop pbTask) - - |> ignore - ) - ) - - let displayProgressForOneScenario (scheduler: ScenarioScheduler) = - scheduler - |> createProgressTaskConfig - |> List.singleton - |> ProgressBar.create - (fun tasks -> - scheduler.EventStream - |> Observable.choose(function ProgressUpdated info -> Some info | _ -> None) - |> Observable.subscribeWithCompletion - (fun progressInfo -> - let scenarioName = scheduler.Scenario.ScenarioName - tickProgressTask tasks.Head scenarioName progressInfo) - - (fun () -> tasks |> List.iter ProgressBar.stop) - - |> ignore - ) - - match applicationType with - | ApplicationType.Console -> - if scnSchedulers.Length > 1 then - scnSchedulers |> displayProgressForConcurrentScenarios |> ignore - else - scnSchedulers.Head |> displayProgressForOneScenario |> ignore - | _ -> () - -let displayClientPoolProgress (dep: IGlobalDependency, pool: ClientPool) = - - let pbHandler (pbTasks: ProgressTask list) = - pbTasks - |> List.iteri(fun i pbTask -> - pool.EventStream - |> Observable.takeWhile(function - | InitFinished -> false - | InitFailed -> false - | _ -> true - ) - |> Observable.subscribe(function - | StartedInit poolName -> - dep.Logger.Information("Start init client factory: {ClientFactory}", poolName) - - | StartedDispose poolName -> - dep.Logger.Information("Start disposing client factory: {ClientFactory}", poolName) - - | ClientInitialized (poolName,number) -> - pbTask - |> ProgressBar.setDescription $"{Console.okColor poolName}{MultilineColumn.NewLine}initialized client: {Console.blueColor number}" - |> ProgressBar.defaultTick - - | ClientDisposed (poolName,number,error) -> - pbTask - |> ProgressBar.setDescription $"{Console.okColor poolName}{MultilineColumn.NewLine}disposed client: {Console.blueColor number}" - |> ProgressBar.defaultTick - - error |> Option.iter(fun ex -> dep.Logger.Error(ex, "Client exception occurred")) - - | InitFinished - | InitFailed -> () - ) - |> ignore - ) - - match dep.ApplicationType with - | ApplicationType.Console -> - let pbConfig = { Description = pool.PoolName; Ticks = pool.ClientCount |> float } - ProgressBar.create pbHandler [pbConfig] - - | _ -> Task.FromResult() +let displayStatus (msg: string) (runAction: StatusContext -> Task<'T>) = + let status = AnsiConsole.Status() + status.StartAsync(msg, runAction) + +let displayClientPoolProgress (dep: IGlobalDependency, consoleStatus: StatusContext, pool: ClientPool) = + pool.EventStream + |> Observable.takeWhile(function + | InitFinished -> false + | InitFailed -> false + | _ -> true + ) + |> Observable.subscribe(function + | StartedInit (poolName, clientCount) -> + dep.Logger.Information("Start init client factory: {0}, client count: {1}", poolName, clientCount) + + | StartedDispose poolName -> + dep.Logger.Information("Start disposing client factory: {0}", poolName) + + | ClientInitialized (poolName,number) -> + consoleStatus.Status <- $"Initializing client factory: {Console.okColor poolName}, initialized client: {Console.blueColor number}" + consoleStatus.Refresh() + + | ClientDisposed (poolName,number,error) -> + consoleStatus.Status <- $"Disposing client factory: {Console.okColor poolName}, disposed client: {Console.blueColor number}" + consoleStatus.Refresh() + error |> Option.iter(fun ex -> dep.Logger.Error(ex, "Client exception occurred")) + + | InitFinished + | InitFailed -> () + ) + |> ignore let printContextInfo (dep: IGlobalDependency) = dep.Logger.Verbose("NBomberConfig: {NBomberConfig}", $"%A{dep.NBomberConfig}") @@ -186,3 +69,85 @@ let printContextInfo (dep: IGlobalDependency) = else dep.ReportingSinks |> List.iter(fun sink -> dep.Logger.Information("Reporting sink loaded: {SinkName}", sink.SinkName)) + +module LiveStatusTable = + + let private buildTable () = + let table = Table() + table.Border <- TableBorder.Square + + TableColumn("scenario") |> table.AddColumn |> ignore + TableColumn("step") |> table.AddColumn |> ignore + TableColumn("load simulation") |> table.AddColumn |> ignore + TableColumn("latency stats (ms)") |> table.AddColumn |> ignore + TableColumn("data transfer stats (bytes)") |> table.AddColumn + + let private renderTable (table: Table) (scnSchedulers: ScenarioScheduler list) = backgroundTask { + let! stats = + scnSchedulers + |> List.map(fun x -> x.GetCurrentStats() |> Task.map ScenarioStats.round) + |> Task.WhenAll + + let mutable rowIndex = 0 + let updateOperation = table.Rows.Count > 0 + + for scnStats in stats do + for stepStats in scnStats.StepStats do + let ok = stepStats.Ok + let req = ok.Request + let lt = ok.Latency + let data = ok.DataTransfer + + if updateOperation then + table.UpdateCell(rowIndex, 2, $"{scnStats.LoadSimulationStats.SimulationName}: {Console.blueColor scnStats.LoadSimulationStats.Value}") |> ignore + table.UpdateCell(rowIndex, 3, $"ok: {Console.okColor req.Count}, fail: {Console.errorColor stepStats.Fail.Request.Count}, RPS: {Console.okColor req.RPS}, p50 = {Console.okColor lt.Percent50}, p99 = {Console.okColor lt.Percent99}") |> ignore + table.UpdateCell(rowIndex, 4, $"min: {Console.blueColor data.MinBytes}, max: {Console.blueColor data.MaxBytes}, all: {data.AllBytes |> Converter.fromBytesToMb |> Console.blueColor} MB") |> ignore + rowIndex <- rowIndex + 1 + else + table.AddRow( + scnStats.ScenarioName, + stepStats.StepName, + $"{scnStats.LoadSimulationStats.SimulationName}: {Console.blueColor scnStats.LoadSimulationStats.Value}", + $"ok: {Console.okColor req.Count}, fail: {Console.errorColor stepStats.Fail.Request.Count}, RPS: {Console.okColor req.RPS}, p50 = {Console.okColor lt.Percent50}, p99 = {Console.okColor lt.Percent99}", + $"min: {Console.blueColor data.MinBytes}, max: {Console.blueColor data.MaxBytes}, all: {data.AllBytes |> Converter.fromBytesToMb |> Console.blueColor} MB") + |> ignore + } + + let display (cancelToken: CancellationToken) (isWarmUp: bool) (scnSchedulers: ScenarioScheduler list) = + let stopWatch = Stopwatch() + let mutable refreshTableCounter = 0 + + let maxDuration = + if isWarmUp then scnSchedulers |> List.map(fun x -> x.Scenario) |> Scenario.getMaxWarmUpDuration + else scnSchedulers |> List.map(fun x -> x.Scenario) |> Scenario.getMaxDuration + + let table = buildTable () + + let liveTable = AnsiConsole.Live(table) + liveTable.AutoClear <- false + liveTable.Overflow <- VerticalOverflow.Ellipsis + liveTable.Cropping <- VerticalOverflowCropping.Bottom + + stopWatch.Start() + + liveTable.StartAsync(fun ctx -> backgroundTask { + while not cancelToken.IsCancellationRequested do + try + let currentTime = stopWatch.Elapsed + if currentTime < maxDuration && refreshTableCounter = 0 then + + do! renderTable table scnSchedulers + + table.Title <- TableTitle($"duration: ({currentTime:``hh\:mm\:ss``} - {maxDuration:``hh\:mm\:ss``})") + ctx.Refresh() + do! Task.Delay(1_000, cancelToken) + + refreshTableCounter <- refreshTableCounter + 1 + if refreshTableCounter = 5 then refreshTableCounter <- 0 + with + | _ -> () + + table.Title <- TableTitle($"duration: ({maxDuration:``hh\:mm\:ss``} - {maxDuration:``hh\:mm\:ss``})") + ctx.Refresh() + }) + |> ignore diff --git a/src/NBomber/DomainServices/TestHost/TestHostScenario.fs b/src/NBomber/DomainServices/TestHost/TestHostScenario.fs index b4b7667e..0652b274 100644 --- a/src/NBomber/DomainServices/TestHost/TestHostScenario.fs +++ b/src/NBomber/DomainServices/TestHost/TestHostScenario.fs @@ -1,6 +1,7 @@ module internal NBomber.DomainServices.TestHost.TestHostScenario open FsToolkit.ErrorHandling +open Spectre.Console open NBomber.Contracts open NBomber.Contracts.Internal @@ -8,6 +9,7 @@ open NBomber.Domain open NBomber.Domain.DomainTypes open NBomber.Domain.ClientPool open NBomber.Errors +open NBomber.Infra open NBomber.Infra.Dependency let getTargetScenarios (sessionArgs: SessionArgs) (regScenarios: Scenario list) = @@ -15,23 +17,26 @@ let getTargetScenarios (sessionArgs: SessionArgs) (regScenarios: Scenario list) |> Scenario.filterTargetScenarios (sessionArgs.GetTargetScenarios()) |> Scenario.applySettings (sessionArgs.GetScenariosSettings()) (sessionArgs.GetDefaultStepTimeout()) -let initClientPools (dep: IGlobalDependency) (context: IBaseContext) (pools: ClientPool list) = taskResult { +let initClientPools (dep: IGlobalDependency) (consoleStatus: StatusContext) (context: IBaseContext) (pools: ClientPool list) = taskResult { try for pool in pools do - let progressTask = TestHostConsole.displayClientPoolProgress(dep, pool) - do! pool.Init(context) |> TaskResult.mapError(InitScenarioError >> AppError.create) - progressTask.Wait() + TestHostConsole.displayClientPoolProgress(dep, consoleStatus, pool) + + do! pool.Init(context) + |> TaskResult.mapError(InitScenarioError >> AppError.create) return pools with | ex -> return! AppError.createResult(InitScenarioError ex) } -let initDataFeeds (dep: IGlobalDependency) (context: IBaseContext) (feeds: IFeed list) = taskResult { +let initDataFeeds (dep: IGlobalDependency) (consoleStatus: StatusContext) (context: IBaseContext) (feeds: IFeed list) = taskResult { try for feed in feeds do + consoleStatus.Status <- $"Initializing data feed: {Console.okColor feed.FeedName}" + consoleStatus.Refresh() do! feed.Init(context) - dep.Logger.Information("Initialized data feed: {FeedName}", feed.FeedName) + dep.Logger.Information("Initialized data feed: {0}", feed.FeedName) return feeds with @@ -39,11 +44,14 @@ let initDataFeeds (dep: IGlobalDependency) (context: IBaseContext) (feeds: IFeed } let initScenarios (dep: IGlobalDependency, + consoleStatus: StatusContext, baseContext: IBaseContext, - defaultScnContext: IScenarioContext, sessionArgs: SessionArgs, - targetScenarios: Scenario list) = taskResult { + regScenarios: Scenario list) = taskResult { try + let targetScenarios = regScenarios |> getTargetScenarios sessionArgs + let defaultScnContext = Scenario.ScenarioContext.create baseContext + let enabledScenarios = targetScenarios |> List.filter(fun x -> x.IsEnabled) let disabledScenarios = targetScenarios |> List.filter(fun x -> not x.IsEnabled) @@ -53,8 +61,9 @@ let initScenarios (dep: IGlobalDependency, for scn in enabledScenarios do match scn.Init with | Some initFunc -> - dep.Logger.Information("Start init scenario: {Scenario}", scn.ScenarioName) + consoleStatus.Status <- $"Initializing scenario: {Console.okColor scn.ScenarioName}" + consoleStatus.Refresh() let scnContext = Scenario.ScenarioContext.setCustomSettings defaultScnContext scn.CustomSettings do! initFunc scnContext @@ -64,12 +73,12 @@ let initScenarios (dep: IGlobalDependency, let! pools = enabledScenarios |> Scenario.ClientPool.createPools sessionArgs.UpdatedClientFactorySettings - |> initClientPools dep baseContext + |> initClientPools dep consoleStatus baseContext // data feed init do! enabledScenarios |> Scenario.Feed.filterDistinctFeeds - |> initDataFeeds dep baseContext + |> initDataFeeds dep consoleStatus baseContext |> TaskResult.ignore return @@ -81,20 +90,20 @@ let initScenarios (dep: IGlobalDependency, | ex -> return! AppError.createResult(InitScenarioError ex) } -let disposeClientPools (dep: IGlobalDependency) (baseContext: IBaseContext) (pools: ClientPool list) = +let disposeClientPools (dep: IGlobalDependency) (consoleStatus: StatusContext) (baseContext: IBaseContext) (pools: ClientPool list) = for pool in pools do - let progressTask = TestHostConsole.displayClientPoolProgress(dep, pool) + TestHostConsole.displayClientPoolProgress(dep, consoleStatus, pool) pool.DisposePool(baseContext) - progressTask.Wait() let cleanScenarios (dep: IGlobalDependency) + (consoleStatus: StatusContext) (baseContext: IBaseContext) (defaultScnContext: IScenarioContext) (scenarios: Scenario list) = backgroundTask { scenarios |> Scenario.ClientPool.filterDistinct - |> disposeClientPools dep baseContext + |> disposeClientPools dep consoleStatus baseContext for scn in scenarios do match scn.Clean with diff --git a/src/NBomber/Extensions/Data.fs b/src/NBomber/Extensions/Data.fs index 9b1129a9..32795492 100644 --- a/src/NBomber/Extensions/Data.fs +++ b/src/NBomber/Extensions/Data.fs @@ -46,7 +46,7 @@ module Converter = let inline fromBytesToKb (bytes) = Math.Round(float bytes / 1024.0, 3) [] - let inline fromBytesToMb (bytes) = Math.Round(decimal bytes / 1024.0M / 1024.0M, 4) + let inline fromBytesToMb (bytes) = Math.Round(decimal bytes / 1024.0M / 1024.0M, 1) [] let inline round (digits: int) (value: float) = Math.Round(value, digits) diff --git a/src/NBomber/Infra/Console.fs b/src/NBomber/Infra/Console.fs index 1fe308ed..084f6947 100644 --- a/src/NBomber/Infra/Console.fs +++ b/src/NBomber/Infra/Console.fs @@ -9,7 +9,7 @@ let escapeMarkup (text: string) = Markup.Escape(text) let render (renderable: IRenderable) = - AnsiConsole.Render(renderable) + AnsiConsole.Write(renderable) let okColor (text: 'T) = $"[lime]{text}[/]" let okEscColor: (obj -> string) = string >> escapeMarkup >> okColor diff --git a/src/NBomber/Infra/ProgressBar.fs b/src/NBomber/Infra/ProgressBar.fs deleted file mode 100644 index 448076ec..00000000 --- a/src/NBomber/Infra/ProgressBar.fs +++ /dev/null @@ -1,95 +0,0 @@ -module internal NBomber.Infra.ProgressBar - -open System -open System.Threading.Tasks - -open Spectre.Console -open Spectre.Console.Rendering - -type MultilineColumn () = - inherit ProgressColumn() - - static member val NewLine = "|" with get - - override _.NoWrap = false - - override _.Render(context: RenderContext, task: ProgressTask, deltaTime: TimeSpan) = - let text = task.Description.Replace(MultilineColumn.NewLine, Environment.NewLine) - Markup(text).RightAligned() :> IRenderable - -type CustomElapsedTimeColumn () = - inherit ProgressColumn() - - override _.NoWrap = false - - override _.Render(context: RenderContext, task: ProgressTask, deltaTime: TimeSpan) = - let elapsedTime = - task.ElapsedTime - |> ValueOption.ofNullable - |> ValueOption.map(fun x -> TimeSpan(days = x.Days, hours = x.Hours, minutes = x.Minutes, seconds = x.Seconds)) - |> ValueOption.defaultValue TimeSpan.Zero - - let maxTime = TimeSpan.FromSeconds task.MaxValue - - Markup($"({elapsedTime:g}-{maxTime:g})").RightAligned() :> IRenderable - -let defaultColumns: ProgressColumn[] = - [| MultilineColumn() - ProgressBarColumn() - PercentageColumn() - CustomElapsedTimeColumn() |] - -type ProgressTaskConfig = { - Description: string - Ticks: float -} - -let private createProgressTask (ctx: ProgressContext) (config: ProgressTaskConfig) = - let task = ctx.AddTask(config.Description) - - if config.Ticks > 0.0 then - task.MaxValue <- config.Ticks - task.Increment(0.0) - else - // set 100% if number of ticks equal to 0 - task.MaxValue <- 1.0 - task.Increment(1.0) - - task - -let create (pbHandler: ProgressTask list -> unit) (config: ProgressTaskConfig list) = - AnsiConsole.Progress() - |> fun progressBar -> ProgressExtensions.Columns(progressBar, defaultColumns) - |> fun progressBar -> - progressBar.AutoRefresh <- true - progressBar.RefreshRate <- TimeSpan.FromSeconds 1 - progressBar.AutoClear <- false - progressBar - |> fun progressBar -> - progressBar.StartAsync(fun ctx -> - backgroundTask { - config |> List.map(createProgressTask ctx) |> pbHandler - - while not ctx.IsFinished do - do! Task.Delay(TimeSpan.FromSeconds 1) - } - ) - -let setDescription (description: string) (pbTask: ProgressTask) = - pbTask.Description <- description - pbTask - -let tick (progressTickInterval: float) (pbTask: ProgressTask) = - pbTask.Increment(progressTickInterval) - -let maxTick (pbTask: ProgressTask) = - pbTask.Increment(Double.MaxValue) - -let defaultTick (pbTask: ProgressTask) = - pbTask.Increment(1.0) - -let getRemainTicks (pbTask: ProgressTask) = - (pbTask.MaxValue - pbTask.Value) - -let stop (pbTask: ProgressTask) = - pbTask.StopTask() diff --git a/src/NBomber/NBomber.fsproj b/src/NBomber/NBomber.fsproj index f2438302..b6340b7c 100644 --- a/src/NBomber/NBomber.fsproj +++ b/src/NBomber/NBomber.fsproj @@ -48,7 +48,6 @@ - @@ -90,8 +89,8 @@ - - + +