Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel tracker #658

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/src/main/java/graphql/nadel/NextgenEngine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import graphql.nadel.engine.blueprint.NadelExecutionBlueprintFactory
import graphql.nadel.engine.blueprint.NadelIntrospectionRunnerFactory
import graphql.nadel.engine.blueprint.NadelOverallExecutionBlueprintMigrator
import graphql.nadel.engine.document.DocumentPredicates
import graphql.nadel.engine.instrumentation.NadelInstrumentationTimer
import graphql.nadel.time.NadelInstrumentationTimer
import graphql.nadel.engine.plan.NadelExecutionPlan
import graphql.nadel.engine.plan.NadelExecutionPlanFactory
import graphql.nadel.engine.transform.NadelTransform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import graphql.GraphQLContext
import graphql.execution.instrumentation.InstrumentationState
import graphql.nadel.NadelExecutionHints
import graphql.nadel.ServiceExecutionHydrationDetails
import graphql.nadel.engine.instrumentation.NadelInstrumentationTimer
import graphql.nadel.hooks.NadelExecutionHooks
import graphql.nadel.result.NadelResultTracker
import graphql.nadel.time.NadelInstrumentationTimer
import graphql.normalized.ExecutableNormalizedOperation
import kotlinx.coroutines.CoroutineScope
import java.util.concurrent.CompletableFuture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import graphql.nadel.Service
import graphql.nadel.engine.NadelExecutionContext
import graphql.nadel.engine.NadelServiceExecutionContext
import graphql.nadel.engine.blueprint.NadelOverallExecutionBlueprint
import graphql.nadel.engine.instrumentation.NadelInstrumentationTimer
import graphql.nadel.engine.plan.NadelExecutionPlan
import graphql.nadel.engine.transform.NadelTransform
import graphql.nadel.engine.transform.NadelTransformFieldResult
import graphql.nadel.engine.util.toBuilder
import graphql.nadel.time.NadelInstrumentationTimer
import graphql.normalized.ExecutableNormalizedField

class NadelQueryTransformer private constructor(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package graphql.nadel.engine.instrumentation
package graphql.nadel.time

import graphql.execution.instrumentation.InstrumentationState
import graphql.nadel.instrumentation.NadelInstrumentation
Expand All @@ -7,7 +7,6 @@ import graphql.nadel.instrumentation.parameters.NadelInstrumentationTimingParame
import java.io.Closeable
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference

internal class NadelInstrumentationTimer(
private val isEnabled: Boolean,
Expand Down Expand Up @@ -89,7 +88,7 @@ internal class NadelInstrumentationTimer(
}

inner class BatchTimer internal constructor() : Closeable {
private val timings: MutableMap<Step, AtomicReference<Duration>> = ConcurrentHashMap()
private val timings: MutableMap<Step, NadelParallelElapsedCalculator> = ConcurrentHashMap()

private var exception: Throwable? = null

Expand All @@ -98,10 +97,6 @@ internal class NadelInstrumentationTimer(
return function()
}

timings.computeIfAbsent(step) {
AtomicReference(Duration.ZERO)
}

val start = ticker()

return try {
Expand All @@ -112,16 +107,15 @@ internal class NadelInstrumentationTimer(
} finally {
val end = ticker()

timings[step]!!.getAndUpdate { current ->
// Just get the max
current.coerceAtLeast(end - start)
}
timings.computeIfAbsent(step) {
NadelParallelElapsedCalculator()
}.submit(start, end)
}
}

override fun close() {
timings.forEach { (step, durationNs) ->
emit(step, durationNs.get(), exception)
timings.forEach { (step, calculator) ->
emit(step, calculator.calculate(), exception)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package graphql.nadel.time

import java.time.Duration

/**
* This calculates the time elapsed while considering that times can overlap.
*
* Timings are submitted to [submit] as soon as they finish i.e. earliest end first
*/
internal class NadelParallelElapsedCalculator {
/**
* Effectively a LinkedList
*/
private data class TimeNode(
val prev: TimeNode?,
@Volatile
var start: Duration,
@Volatile
var end: Duration,
@Volatile
var next: TimeNode?,
)

@Volatile
private var tail: TimeNode? = null

@Synchronized
fun submit(start: Duration, end: Duration) {
val headOverlapNode = getHeadOverlapNode(start)

if (headOverlapNode == null) {
val currentTail = tail
val newTail = TimeNode(
prev = currentTail,
start,
end,
next = null,
)
currentTail?.next = newTail
tail = newTail
} else {
headOverlapNode.start = min(start, headOverlapNode.start)
headOverlapNode.end = max(end, headOverlapNode.end)
headOverlapNode.next = null
tail = headOverlapNode
}
}

fun calculate(): Duration {
var sum = 0L

var cursor = tail
while (cursor != null) {
sum += cursor.end.toNanos() - cursor.start.toNanos()
cursor = cursor.prev
}

return Duration.ofNanos(sum)
}

/**
* We are finding the [TimeNode] closest to the head that still overlaps.
*/
private fun getHeadOverlapNode(
start: Duration,
): TimeNode? {
var best: TimeNode? = null

var cursor = tail
while (cursor != null) {
if (start < cursor.end) {
best = cursor
} else {
break
}
cursor = cursor.prev
}

return best
}

private fun min(a: Duration, b: Duration): Duration {
return if (a < b) a else b
}

private fun max(a: Duration, b: Duration): Duration {
return if (a > b) a else b
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import graphql.nadel.instrumentation.parameters.NadelInstrumentationIsTimingEnab
import graphql.nadel.instrumentation.parameters.NadelInstrumentationTimingParameters
import graphql.nadel.instrumentation.parameters.NadelInstrumentationTimingParameters.ChildStep
import graphql.nadel.instrumentation.parameters.NadelInstrumentationTimingParameters.RootStep
import graphql.nadel.time.NadelInstrumentationTimer
import io.kotest.core.spec.style.DescribeSpec
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import org.junit.jupiter.api.assertThrows
import java.time.Duration

Expand Down Expand Up @@ -383,7 +383,7 @@ class NadelInstrumentationTimerTest : DescribeSpec({
assert(instrumentationParams.isNotEmpty())
}

it("takes the highest time in a batch") {
it("combines times together") {
// given
var time = 10L
val ticker = { Duration.ofMillis(time) }
Expand All @@ -409,33 +409,32 @@ class NadelInstrumentationTimerTest : DescribeSpec({
// when
timer.batch { batchTimer ->
coroutineScope {
launch {
batchTimer.time(
ChildStep(parent = RootStep.ExecutionPlanning, "NadelHydrationTransform"),
) { time += 64 }
batchTimer.time(
ChildStep(parent = RootStep.ExecutionPlanning, "NadelHydrationTransform"),
) { time += 128 }
batchTimer.time(
ChildStep(parent = RootStep.ExecutionPlanning, "NadelHydrationTransform"),
) { time += 128 }
}
launch {
batchTimer.time(
ChildStep(parent = RootStep.ExecutionPlanning, "NadelBatchHydrationTransform"),
) { time += 32 }
batchTimer.time(
ChildStep(parent = RootStep.ExecutionPlanning, "NadelBatchHydrationTransform"),
) { time += 32 }
}
launch {
batchTimer.time(
ChildStep(parent = RootStep.ResultTransforming, "NadelBatchHydrationTransform"),
) { time += 256 }
batchTimer.time(
ChildStep(parent = RootStep.ResultTransforming, "NadelBatchHydrationTransform"),
) { time += 64 }
}
batchTimer.time(
ChildStep(parent = RootStep.ExecutionPlanning, "NadelHydrationTransform"),
) { time += 64 }
time -= 32 // Go back in time to cause overlap
batchTimer.time(
ChildStep(parent = RootStep.ExecutionPlanning, "NadelHydrationTransform"),
) { time += 128 }
batchTimer.time(
ChildStep(parent = RootStep.ExecutionPlanning, "NadelHydrationTransform"),
) { time += 128 }

batchTimer.time(
ChildStep(parent = RootStep.ExecutionPlanning, "NadelBatchHydrationTransform"),
) { time += 32 }
time -= 31 // Go back in time to cause overlap
batchTimer.time(
ChildStep(parent = RootStep.ExecutionPlanning, "NadelBatchHydrationTransform"),
) { time += 32 }

batchTimer.time(
ChildStep(parent = RootStep.ResultTransforming, "NadelBatchHydrationTransform"),
) { time += 256 }
time += 20 // Go forward in time, timings are distinct
batchTimer.time(
ChildStep(parent = RootStep.ResultTransforming, "NadelBatchHydrationTransform"),
) { time += 64 }
}
}

Expand All @@ -444,19 +443,19 @@ class NadelInstrumentationTimerTest : DescribeSpec({
it.step.getFullName() == "ExecutionPlanning.NadelHydrationTransform"
}
assert(planHydration.exception == null)
assert(planHydration.internalLatency == Duration.ofMillis(128))
assert(planHydration.internalLatency == Duration.ofMillis(64 + 128 - 32 + 128))

val planBatchHydration = instrumentationParams.single {
it.step.getFullName() == "ExecutionPlanning.NadelBatchHydrationTransform"
}
assert(planBatchHydration.exception == null)
assert(planBatchHydration.internalLatency == Duration.ofMillis(32))
assert(planBatchHydration.internalLatency == Duration.ofMillis(32 - 31 + 32))

val resultTransformBatchHydration = instrumentationParams.single {
it.step.getFullName() == "ResultTransforming.NadelBatchHydrationTransform"
}
assert(resultTransformBatchHydration.exception == null)
assert(resultTransformBatchHydration.internalLatency == Duration.ofMillis(256))
assert(resultTransformBatchHydration.internalLatency == Duration.ofMillis(256 + 64))
}
}
})
Loading
Loading