Skip to content

Commit

Permalink
Live status table (#481)
Browse files Browse the repository at this point in the history
* init

* replaced progress bar on live table for console

* cosmetic changes
  • Loading branch information
AntyaDev authored Sep 20, 2022
1 parent b31eacd commit 2fe1a39
Show file tree
Hide file tree
Showing 12 changed files with 216 additions and 376 deletions.
2 changes: 0 additions & 2 deletions src/NBomber/Constants.fs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ let SchedulerTimerDriftMs = 10.0
let SchedulerTickIntervalMs = 1_000.0
[<Literal>]
let ReportingTimerCompleteMs = 3_000
[<Literal>]
let WarmUpFinishPause = 1_000

/// Default status codes
Expand Down
4 changes: 2 additions & 2 deletions src/NBomber/Domain/ClientPool.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ open NBomber.Extensions.Internal
open NBomber.Contracts

type ClientPoolEvent =
| StartedInit of poolName:string
| StartedInit of poolName:string * clientCount:int
| StartedDispose of poolName:string
| ClientInitialized of poolName:string * clientNumber:int
| ClientDisposed of poolName:string * clientNumber:int * error:exn option
Expand Down Expand Up @@ -46,7 +46,7 @@ type ClientPool(factory: ClientFactory<obj>) =
| Error ex -> yield Error ex
}

_eventStream.OnNext(StartedInit factory.FactoryName)
_eventStream.OnNext(StartedInit(factory.FactoryName, factory.ClientCount))

let result = initClients(0, factory.ClientCount, context) |> Result.sequence
match result with
Expand Down
40 changes: 9 additions & 31 deletions src/NBomber/Domain/Concurrency/Scheduler/ScenarioScheduler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ module internal NBomber.Domain.Concurrency.Scheduler.ScenarioScheduler
open System
open System.Threading.Tasks

open FSharp.Control.Reactive

open NBomber.Contracts
open NBomber.Contracts.Stats
open NBomber.Domain
Expand All @@ -21,16 +19,6 @@ type SchedulerCommand =
| InjectOneTimeActors of scheduledCount:int
| DoNothing

type ScenarioProgressInfo = {
ConstantActorCount: int
OneTimeActorCount: int
CurrentSimulation: LoadSimulation
}

type SchedulerEvent =
| ScenarioStarted
| ProgressUpdated of ScenarioProgressInfo

let calcScheduleByTime (copiesCount: int, prevSegmentCopiesCount: int, timeSegmentProgress: int) =
let value = copiesCount - prevSegmentCopiesCount
let result = (float value / 100.0 * float timeSegmentProgress) + float prevSegmentCopiesCount
Expand Down Expand Up @@ -90,7 +78,6 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =
if _scenario.IsEnabled then new OneTimeActorScheduler(dep, OneTimeActorScheduler.exec)
else new OneTimeActorScheduler(dep, emptyExec)

let _eventStream = Subject.broadcast
let _tcs = TaskCompletionSource()
let _randomGen = Random()

Expand All @@ -104,7 +91,7 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =
_cachedSimulationStats <- getCurrentSimulationStats()
_scnDep.ScenarioStatsActor.Publish StartUseTempBuffer

let buildRealtimeStats (duration) =
let buildRealtimeStats (duration: TimeSpan) =
let simulationStats = getCurrentSimulationStats()
let reply = TaskCompletionSource<ScenarioStats>()
_scnDep.ScenarioStatsActor.Publish(BuildRealtimeStats(reply, simulationStats, duration))
Expand All @@ -116,9 +103,13 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =
_scnDep.ScenarioStatsActor.Publish FlushTempBuffer
reply.Task

let getFinalStats () =
let getStats (isFinal: bool) =
let simulationStats = getCurrentSimulationStats()
let duration = Scenario.getExecutedDuration _scenario

let duration =
if isFinal then Scenario.getExecutedDuration _scenario
else _scnDep.ScenarioTimer.Elapsed

let reply = TaskCompletionSource<ScenarioStats>()
_scnDep.ScenarioStatsActor.Publish(GetFinalStats(reply, simulationStats, duration))
reply.Task
Expand All @@ -135,7 +126,6 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =
_scnDep.ScenarioTimer.Stop()
_constantScheduler.Stop()
_oneTimeScheduler.Stop()
_eventStream.OnCompleted()

let execScheduler () =
if _isWorking && _scnDep.ScenarioStatsActor.FailCount > _scnDep.MaxFailCount then
Expand Down Expand Up @@ -169,40 +159,28 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =

| None -> stop()

let updateProgress () =
let progressInfo = {
ConstantActorCount = getConstantActorCount()
OneTimeActorCount = getOneTimeActorCount()
CurrentSimulation = _currentSimulation
}
_eventStream.OnNext(ProgressUpdated progressInfo)

let start () =
_isWorking <- true
_scnDep.ScenarioTimer.Restart()
execScheduler()
_eventStream.OnNext(ScenarioStarted)
_tcs.Task :> Task

member _.Working = _isWorking
member _.EventStream = _eventStream :> IObservable<_>
member _.Scenario = _scenario
member _.AllRealtimeStats = _scnDep.ScenarioStatsActor.AllRealtimeStats

member _.Start() = start()
member _.Stop() = stop()
member _.ExecScheduler() = execScheduler()
member _.UpdateProgress() = updateProgress()

member _.AddStatsFromAgent(stats) = _scnDep.ScenarioStatsActor.Publish(AddFromAgent stats)
member _.PrepareForRealtimeStats() = prepareForRealtimeStats()
member _.CommitRealtimeStats(duration) = commitRealtimeStats duration
member _.BuildRealtimeStats(duration) = buildRealtimeStats duration
member _.GetFinalStats() = getFinalStats()
member _.GetFinalStats() = getStats true
member _.GetCurrentStats() = getStats false

interface IDisposable with
member _.Dispose() =
stop()
_eventStream.Dispose()
_log.Verbose $"{nameof ScenarioScheduler} disposed"

6 changes: 6 additions & 0 deletions src/NBomber/Domain/Scenario.fs
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,9 @@ let defaultClusterCount = fun _ -> 1

let getScenariosForWarmUp (scenarios: Scenario list) =
scenarios |> List.filter(fun x -> x.WarmUpDuration.IsSome)

let getMaxDuration (scenarios: Scenario list) =
scenarios |> List.map(fun x -> x.PlanedDuration) |> List.max

let getMaxWarmUpDuration (scenarios: Scenario list) =
scenarios |> List.choose(fun x -> x.WarmUpDuration) |> List.max
1 change: 0 additions & 1 deletion src/NBomber/DomainServices/NBomberContext.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

open System
open System.IO
open System.Globalization

open FsToolkit.ErrorHandling

Expand Down
127 changes: 54 additions & 73 deletions src/NBomber/DomainServices/TestHost/TestHost.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ open System.Runtime.InteropServices
open Microsoft.FSharp.Collections

open Serilog
open Spectre.Console
open FsToolkit.ErrorHandling

open NBomber
Expand Down Expand Up @@ -43,7 +44,6 @@ type internal TestHost(dep: IGlobalDependency,
let mutable _targetScenarios = List.empty<Scenario>
let mutable _sessionArgs = SessionArgs.empty
let mutable _currentOperation = OperationType.None
let mutable _globalCancelToken = new CancellationTokenSource()
let _defaultNodeInfo = NodeInfo.init None

let getCurrentNodeInfo () =
Expand Down Expand Up @@ -92,85 +92,59 @@ type internal TestHost(dep: IGlobalDependency,
let stopSchedulers (schedulers: ScenarioScheduler list) =
schedulers |> List.iter(fun x -> x.Stop())

let initScenarios (sessionArgs: SessionArgs) (cancelToken: CancellationToken) = taskResult {

let targetScenarios = regScenarios |> TestHostScenario.getTargetScenarios sessionArgs
let initScenarios (consoleStatus: StatusContext) (cancelToken: CancellationToken) (sessionArgs: SessionArgs) = taskResult {

let baseContext = NBomberContext.createBaseContext(sessionArgs.TestInfo, getCurrentNodeInfo, cancelToken, _log)
let defaultScnContext = Scenario.ScenarioContext.create baseContext

do! dep.WorkerPlugins |> WorkerPlugins.init dep baseContext
do! dep.ReportingSinks |> ReportingSinks.init dep baseContext

return! TestHostScenario.initScenarios(dep, baseContext, defaultScnContext, sessionArgs, targetScenarios)
return! TestHostScenario.initScenarios(dep, consoleStatus, baseContext, sessionArgs, regScenarios)
}

let startWarmUp (schedulers: ScenarioScheduler list) = backgroundTask {
let isWarmUp = true
TestHostConsole.displayBombingProgress(dep.ApplicationType, schedulers, isWarmUp)

let schedulersArray = schedulers |> List.toArray
use schedulerTimer = new System.Timers.Timer(Constants.SchedulerTickIntervalMs)
schedulerTimer.Elapsed.Add(fun _ ->
schedulersArray
|> Array.Parallel.iter(fun x ->
x.ExecScheduler()
x.UpdateProgress()
)
)

schedulerTimer.Start()
do! schedulers |> List.map(fun x -> x.Start()) |> Task.WhenAll
schedulerTimer.Stop()

// wait on warmup progress bar to finish rendering
do! Task.Delay Constants.WarmUpFinishPause
}
let startScenarios (schedulers: ScenarioScheduler list) (reportingManager: IReportingManager option) = backgroundTask {

let startBombing (schedulers: ScenarioScheduler list)
(reportingManager: IReportingManager) = backgroundTask {
let isWarmUp = reportingManager.IsNone

let isWarmUp = false
TestHostConsole.displayBombingProgress(dep.ApplicationType, schedulers, isWarmUp)
if not isWarmUp then
dep.WorkerPlugins |> WorkerPlugins.start _log
dep.ReportingSinks |> ReportingSinks.start _log

dep.WorkerPlugins |> WorkerPlugins.start _log
dep.ReportingSinks |> ReportingSinks.start _log
use cancelToken = new CancellationTokenSource()
schedulers |> TestHostConsole.LiveStatusTable.display cancelToken.Token isWarmUp

reportingManager.Start()
if reportingManager.IsSome then reportingManager.Value.Start()

// waiting on all scenarios to finish
let schedulersArray = schedulers |> List.toArray
use schedulerTimer = new System.Timers.Timer(Constants.SchedulerTickIntervalMs)
schedulerTimer.Elapsed.Add(fun _ ->
schedulersArray
|> Array.Parallel.iter(fun x ->
x.ExecScheduler()
x.UpdateProgress()
)
)
schedulerTimer.Elapsed.Add(fun _ -> schedulersArray |> Array.Parallel.iter(fun x -> x.ExecScheduler()))

schedulerTimer.Start()
do! schedulers |> List.map(fun x -> x.Start()) |> Task.WhenAll
cancelToken.Cancel()
schedulerTimer.Stop()

// wait on final metrics and reporting tick
do! Task.Delay Constants.ReportingTimerCompleteMs
if not isWarmUp then
// wait on final metrics and reporting tick
do! Task.Delay Constants.ReportingTimerCompleteMs

// waiting (in case of cluster) on all raw stats
do! reportingManager.Stop()
// waiting (in case of cluster) on all raw stats
do! reportingManager.Value.Stop()

do! dep.WorkerPlugins |> WorkerPlugins.stop _log
do! dep.ReportingSinks |> ReportingSinks.stop _log
do! dep.WorkerPlugins |> WorkerPlugins.stop _log
do! dep.ReportingSinks |> ReportingSinks.stop _log
}

let cleanScenarios (sessionArgs: SessionArgs,
consoleStatus: StatusContext,
cancelToken: CancellationToken,
scenarios: Scenario list) =

let baseContext = NBomberContext.createBaseContext(sessionArgs.TestInfo, getCurrentNodeInfo, cancelToken, _log)
let defaultScnContext = Scenario.ScenarioContext.create baseContext
let enabledScenarios = scenarios |> List.filter(fun x -> x.IsEnabled)
TestHostScenario.cleanScenarios dep baseContext defaultScnContext enabledScenarios
TestHostScenario.cleanScenarios dep consoleStatus baseContext defaultScnContext enabledScenarios

member _.SessionArgs = _sessionArgs
member _.CurrentOperation = _currentOperation
Expand All @@ -179,28 +153,30 @@ type internal TestHost(dep: IGlobalDependency,
member _.TargetScenarios = _targetScenarios
member _.CurrentSchedulers = _currentSchedulers

member _.StartInit(sessionArgs: SessionArgs) = backgroundTask {
member _.StartInit(sessionArgs: SessionArgs) =
_stopped <- false
_currentOperation <- OperationType.Init

TestHostConsole.printContextInfo dep
_log.Information "Starting init..."
_globalCancelToken.Dispose()
_globalCancelToken <- new CancellationTokenSource()

match! initScenarios sessionArgs _globalCancelToken.Token with
| Ok initializedScenarios ->
_log.Information "Init finished"
_targetScenarios <- initializedScenarios
_sessionArgs <- sessionArgs
_currentOperation <- OperationType.None
return Ok _targetScenarios

| Error appError ->
_log.Error "Init failed"
_currentOperation <- OperationType.Stop
return AppError.createResult appError
}

TestHostConsole.displayStatus "Initializing scenarios..." (fun consoleStatus -> backgroundTask {
use cancelToken = new CancellationTokenSource()
match! initScenarios consoleStatus cancelToken.Token sessionArgs with
| Ok initializedScenarios ->
_log.Information "Init finished"
cancelToken.Cancel()
_targetScenarios <- initializedScenarios
_sessionArgs <- sessionArgs
_currentOperation <- OperationType.None
return Ok _targetScenarios

| Error appError ->
_log.Error "Init failed"
cancelToken.Cancel()
_currentOperation <- OperationType.Stop
return AppError.createResult appError
})

member _.StartWarmUp(scenarios: Scenario list, ?getScenarioClusterCount: ScenarioName -> int) = backgroundTask {
_stopped <- false
Expand All @@ -220,7 +196,7 @@ type internal TestHost(dep: IGlobalDependency,

_currentSchedulers <- warmUpSchedulers

do! startWarmUp warmUpSchedulers
do! startScenarios warmUpSchedulers None
stopSchedulers warmUpSchedulers

_currentOperation <- OperationType.None
Expand All @@ -232,13 +208,13 @@ type internal TestHost(dep: IGlobalDependency,
_currentSchedulers <- schedulers

_log.Information "Starting bombing..."
do! startBombing schedulers reportingManager
do! startScenarios schedulers (Some reportingManager)

do! this.StopScenarios()
_currentOperation <- OperationType.Complete
}

member _.StopScenarios([<Optional;DefaultParameterValue("":string)>]reason: string) = backgroundTask {
member _.StopScenarios([<Optional;DefaultParameterValue("":string)>]reason: string) =
if _currentOperation <> OperationType.Stop && not _stopped then
_currentOperation <- OperationType.Stop

Expand All @@ -247,12 +223,17 @@ type internal TestHost(dep: IGlobalDependency,
else
_log.Information "Stopping scenarios..."

stopSchedulers _currentSchedulers
do! cleanScenarios(_sessionArgs, _globalCancelToken.Token, _targetScenarios)
TestHostConsole.displayStatus "Cleaning scenarios..." (fun consoleStatus -> backgroundTask {
use cancelToken = new CancellationTokenSource()
stopSchedulers _currentSchedulers

_stopped <- true
_currentOperation <- OperationType.None
}
do! cleanScenarios(_sessionArgs, consoleStatus, cancelToken.Token, _targetScenarios)

_stopped <- true
_currentOperation <- OperationType.None
})
else
Task.FromResult()

member _.CreateScenarioSchedulers(scenarios: Scenario list,
operation: ScenarioOperation,
Expand Down
Loading

0 comments on commit 2fe1a39

Please sign in to comment.