Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: consolidate event processing and optimize application performance #23

Merged
merged 1 commit into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ COPY --from=build /app/build/libs/app.jar app.jar
RUN java -Djarmode=layertools -jar app.jar extract

# JVM memory and garbage collection optimizations for containers
ENTRYPOINT ["java", "-Dsun.net.inetaddr.ttl=60", "-Dsun.net.inetaddr.negative.ttl=10", "-Xms512m", "-Xmx1024m", "-XX:+UseG1GC", "-XX:MaxGCPauseMillis=200", "-XX:+UseContainerSupport", "-jar", "/app.jar"]
ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -Dsun.net.inetaddr.ttl=60 -Dsun.net.inetaddr.negative.ttl=10 -XX:+UseContainerSupport --add-opens java.base/java.math=ALL-UNNAMED -jar /app.jar"]
26 changes: 6 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ Alternatively, you can build and run the application using Docker:
This project is a Kotlin-based Spring Boot application designed to connect to a Server-Sent Events (
SSE) endpoint, process incoming events, and store data in an OpenSearch index.

It leverages Spring Boot's reactive WebFlux framework and integrates custom health indicators using
Spring Boot Actuator to provide insights into application performance.

### System Flow Overview

The main loop of this service is illustrated in the following diagram:
Expand All @@ -72,15 +69,13 @@ The main loop of this service is illustrated in the following diagram:
sequenceDiagram
participant DataTransfer
participant SSEEndpoint
participant LockManager
participant OpenSearch
DataTransfer ->> OpenSearch: Initialize index
DataTransfer ->> SSEEndpoint: Connects and listens to events
loop Listen for events
SSEEndpoint --) DataTransfer: Send Event
DataTransfer ->> LockManager: Acquire session lock
DataTransfer ->> OpenSearch: Store event
DataTransfer -->> LockManager: Release session lock
end
```

Expand All @@ -91,14 +86,10 @@ system:

- [PillarboxDataTransferApplication.kt][main-entry-point]: The main entry point of the application
that bootstraps and configures the service.
- [BenchmarkHealthIndicator.kt][health-indicator]: Monitors the performance of key operations,
providing real-time health metrics for the application.
- [LockManager.kt][lock-manager]: Ensures concurrency control by managing locks for different
sessions, enabling thread-safe operations.
- [SetupService.kt][setup-service]: Manages the initial setup of the OpenSearch index and the
application’s configuration for SSE processing.
- [SseClient.kt][sse-client]: Listens to the SSE endpoint, handling incoming events and managing
retries in case of connection failures.
- [OpenSearchSetupService.kt][setup-service]: Manages the initial setup of the OpenSearch index and
the application’s configuration for SSE processing.
- [EventDispatcherClient.kt][sse-client]: Listens to the SSE endpoint, handling incoming events and
managing retries in case of connection failures.

Here’s a more concise description of the GitHub Actions setup without listing the steps:

Expand Down Expand Up @@ -162,11 +153,6 @@ Refer to our [Contribution Guide](docs/CONTRIBUTING.md) for more detailed inform
This project is licensed under the [MIT License](LICENSE).

[main-entry-point]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/PillarboxDataTransferApplication.kt
[setup-service]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupService.kt
[sse-client]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt

[health-indicator]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/BenchmarkHealthIndicator.kt

[lock-manager]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/concurrent/LockManager.kt

[setup-service]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/SetupService.kt

[sse-client]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/SseClient.kt
23 changes: 12 additions & 11 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import java.util.Properties
plugins {
kotlin("jvm") version "2.0.10"
kotlin("plugin.spring") version "2.0.10"
id("org.springframework.boot") version "3.3.4"
id("io.spring.dependency-management") version "1.1.6"
id("org.springframework.boot") version "3.4.1"
id("io.spring.dependency-management") version "1.1.7"
id("io.gitlab.arturbosch.detekt") version "1.23.7"
id("org.jlleitschuh.gradle.ktlint") version "12.1.1"
id("org.jetbrains.kotlinx.kover") version "0.8.3"
id("org.jlleitschuh.gradle.ktlint") version "12.1.2"
id("org.jetbrains.kotlinx.kover") version "0.9.0"
id("com.github.ben-manes.versions") version "0.51.0"
}

group = "ch.srgssr.pillarbox"
Expand All @@ -24,23 +25,23 @@ repositories {
}

dependencies {
implementation("org.springframework.boot:spring-boot-starter-aop")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.opensearch.client:spring-data-opensearch-starter:1.5.3")
// Dependencies
implementation("org.opensearch.client:spring-data-opensearch-starter:1.6.0")
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("nl.basjes.parse.useragent:yauaa:7.29.0")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
implementation("com.github.ben-manes.caffeine:caffeine")
implementation("nl.basjes.parse.useragent:yauaa:7.28.1")

// Test Dependencies
testImplementation("io.kotest:kotest-runner-junit5:5.9.1")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("io.kotest.extensions:kotest-extensions-spring:1.3.0")
testImplementation("org.jetbrains.kotlin:kotlin-test-junit5")
testImplementation("io.mockk:mockk:1.13.13")
testImplementation("com.squareup.okhttp3:mockwebserver")
testImplementation("com.squareup.okhttp3:okhttp")
testImplementation("com.squareup.okhttp3:mockwebserver:4.12.0")
testImplementation("com.squareup.okhttp3:okhttp:4.12.0")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package ch.srgssr.pillarbox.monitoring

import ch.srgssr.pillarbox.monitoring.event.EventDispatcherClient
import ch.srgssr.pillarbox.monitoring.event.setup.OpenSearchSetupService
import ch.srgssr.pillarbox.monitoring.exception.RetryExhaustedException
import ch.srgssr.pillarbox.monitoring.log.logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.runBlocking
import org.springframework.boot.ApplicationArguments
import org.springframework.boot.ApplicationRunner
import org.springframework.context.annotation.Profile
Expand Down Expand Up @@ -43,33 +45,22 @@ class DataTransferApplicationRunner(
*
* @param args Application arguments.
*/
override fun run(args: ApplicationArguments?) {
openSearchSetupService.start().subscribe(
// Success
{ this.startSseClient() },
// Error
{
logger.error("Failed to connect to OpenSearch:", it)
CoroutineScope(Dispatchers.IO).launch {
terminationService.terminateApplication()
}
},
)
}
override fun run(args: ApplicationArguments?) =
runBlocking {
try {
openSearchSetupService
.start()
.asFlow()
.catch {
logger.error("Failed to connect to OpenSearch:", it)
throw it
}.launchIn(CoroutineScope(Dispatchers.Default))
.join()

private fun startSseClient() {
eventDispatcherClient.start().subscribe(
// Success
{ },
// Error
{
if (it is RetryExhaustedException) {
logger.error("Failed to connect to SSE after retries, terminating application.", it)
CoroutineScope(Dispatchers.IO).launch {
terminationService.terminateApplication()
}
}
},
)
}
logger.info("All setup tasks are completed, starting SSE client...")
eventDispatcherClient.start().join()
} finally {
terminationService.terminateApplication()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration
import org.springframework.boot.context.properties.ConfigurationPropertiesScan
import org.springframework.boot.runApplication
import org.springframework.scheduling.annotation.EnableScheduling

/**
* Main entry point for the Pillarbox QoS Data Transfer application.
*/
@SpringBootApplication(exclude = [ElasticsearchDataAutoConfiguration::class])
@ConfigurationPropertiesScan
@EnableScheduling
class PillarboxDataTransferApplication

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package ch.srgssr.pillarbox.monitoring.benchmark

import ch.srgssr.pillarbox.monitoring.log.info
import ch.srgssr.pillarbox.monitoring.log.logger
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.util.concurrent.TimeUnit

/**
* A scheduled logger that periodically logs the average execution times
* and stats regarding the number of processed events.
*/
@Component
class BenchmarkScheduledLogger {
private companion object {
/**
* Logger instance for logging within this component.
*/
private val logger = logger()
}

/**
* The scheduled logging function, executes every minute.
*/
@Scheduled(fixedRate = 1, timeUnit = TimeUnit.MINUTES)
fun logBenchmarkAverages() {
logger.info { "Benchmark averages: ${TimeTracker.averages}" }
logger.info { "Latest stats per minute: ${StatsTracker.getAndResetAll()}" }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package ch.srgssr.pillarbox.monitoring.benchmark

import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.LongAdder

/**
* A thread-safe class for calculating the moving average of a stream of long values.
*
* This implementation accumulates values until the average is calculated, at which point
* the internal state (sum and count) is reset.
*/
internal class MovingAverageCalculator {
private val sum = LongAdder()
private val count = AtomicLong()

/**
* Adds a new value to the moving average calculation.
*
* @param n The value to be added to the moving average calculation.
*/
fun add(n: Long) {
sum.add(n)
count.incrementAndGet()
}

/**
* Calculates the average of the accumulated values and resets the internal state.
*
* This method atomically retrieves and resets both the accumulated sum and count.
* If no values have been added, the method returns `Double.NaN`.
*
* @return The average of the accumulated values, or `NaN` if no values were added.
*/
val average: Double
get() =
synchronized(this) {
val totalCount = count.getAndSet(0)
val total = sum.sumThenReset().toDouble()
if (totalCount == 0L) Double.NaN else total / totalCount
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package ch.srgssr.pillarbox.monitoring.benchmark

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

/**
* Utility object for tracking and aggregating statistics.
*
* This class uses a [ConcurrentHashMap] to store statistics counters identified by unique keys.
* It provides methods to increment, retrieve, and reset these counters.
*/
object StatsTracker {
private val stats = ConcurrentHashMap<String, AtomicLong>()

/**
* Increments the count for a specific key by the specified delta.
* If the key does not exist, it is initialized to 0 before incrementing.
*
* @param key The unique identifier for the statistic.
* @param delta The amount to increment the counter by. Defaults to 1.
*/
fun increment(
key: String,
delta: Long = 1,
) {
stats.computeIfAbsent(key) { AtomicLong(0) }.addAndGet(delta)
}

/**
* Increments the count for a specific key by the specified delta.
*
* @param key The unique identifier for the statistic.
* @param delta The amount to increment the counter by, as an [Int].
*/
fun increment(
key: String,
delta: Int,
) {
increment(key, delta.toLong())
}

/**
* Retrieves the current count for a specific key.
*
* @param key The unique identifier for the statistic.
* @return The current count for the key, or 0 if the key does not exist.
*/
operator fun get(key: String): Long = stats[key]?.get() ?: 0L

/**
* Retrieves all current statistics as a map and resets their counters to 0.
*
* @return A map containing the keys and their corresponding counts before reset.
*/
fun getAndResetAll(): Map<String, Long> = stats.mapValues { it.value.getAndSet(0) }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package ch.srgssr.pillarbox.monitoring.benchmark

import ch.srgssr.pillarbox.monitoring.log.logger
import ch.srgssr.pillarbox.monitoring.log.trace
import java.util.concurrent.ConcurrentHashMap
import kotlin.time.measureTimedValue

/**
* Utility object for tracking and logging execution times of code blocks with support for moving averages.
*/
object TimeTracker {
private val logger = logger()

private val movingAverages = ConcurrentHashMap<String, MovingAverageCalculator>()

/**
* Tracks the execution time of a code block.
*
* @param T The return type of the code block.
* @param signature A unique identifier (e.g., method name) for tracking and logging purposes.
* @param block The suspending code block to measure.
*
* @return The result of the code block.
*/
suspend fun <T> track(
signature: String,
block: suspend () -> T,
): T {
val (result, executionTime) = measureTimedValue { block() }
val calculator =
movingAverages.computeIfAbsent(signature) {
MovingAverageCalculator()
}

calculator.add(executionTime.inWholeMilliseconds)
logger.trace { "$signature took $executionTime" }

return result
}

/**
* Provides the average execution times for all monitored methods.
*
* @return A map where the keys are method signatures and the values are the average execution times.
*/
val averages get() = movingAverages.mapValues { it.value.average }
}

/**
* Convenience function to track execution time of a code block using [TimeTracker].
*
* @param T The return type of the code block.
* @param signature A unique identifier (e.g., method name) for tracking and logging purposes.
* @param block The suspending code block to measure.
*
* @return The result of the code block.
*/
suspend fun <T> timed(
signature: String,
block: suspend () -> T,
): T = TimeTracker.track(signature, block)
Loading