Skip to content

Commit

Permalink
Extract timeout logic to common
Browse files Browse the repository at this point in the history
  • Loading branch information
stoyicker committed Apr 9, 2024
1 parent af75726 commit 7c7bbbf
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import kotlinx.cinterop.convert
import kotlinx.cinterop.pin
import kotlinx.cinterop.usePinned
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.withTimeout
import platform.Network.nw_connection_create
import platform.Network.nw_connection_force_cancel
import platform.Network.nw_connection_receive
Expand All @@ -29,14 +28,13 @@ import platform.darwin.dispatch_get_current_queue
import platform.posix.memcpy
import kotlin.test.assertEquals
import kotlin.test.assertNull
import kotlin.time.Duration

@OptIn(ExperimentalForeignApi::class)
@Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING")
internal actual class NTPUDPSocketOperations {
private var connection: nw_connection_t = null

actual suspend fun prepare(address: String, portNumber: Int, connectTimeout: Duration) {
actual suspend fun prepare(address: String, portNumber: Int) {
val parameters = nw_parameters_create_secure_udp_disable_protocol()
val endpoint = nw_endpoint_create_host(address, portNumber.toString())
connection = nw_connection_create(endpoint, parameters)
Expand All @@ -49,12 +47,10 @@ internal actual class NTPUDPSocketOperations {
}
}
nw_connection_start(connection)
withTimeout(connectTimeout) {
assertEquals(nw_connection_state_ready, connectionStateDeferred.await())
}
assertEquals(nw_connection_state_ready, connectionStateDeferred.await())
}

actual suspend fun exchange(buffer: ByteArray, readTimeout: Duration) {
actual suspend fun exchange(buffer: ByteArray) {
val data = buffer.pin().run {
dispatch_data_create(
addressOf(0),
Expand All @@ -79,9 +75,7 @@ internal actual class NTPUDPSocketOperations {
assertNull(error)
connectionReceptionDeferred.complete(content)
}
val receivedData = withTimeout(readTimeout) {
connectionReceptionDeferred.await()
}
val receivedData = connectionReceptionDeferred.await()
buffer.usePinned {
dispatch_data_apply(receivedData) { _, offset, src, size ->
memcpy(it.addressOf(offset.toInt()), src, size)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.tidal.networktime.internal

import kotlinx.coroutines.withTimeout
import kotlin.time.Duration

internal class NTPExchangeCoordinator(
Expand All @@ -15,12 +16,16 @@ internal class NTPExchangeCoordinator(
): NTPExchangeResult? {
val ntpUdpSocketOperations = NTPUDPSocketOperations()
return try {
ntpUdpSocketOperations.prepare(address, NTP_PORT_NUMBER, connectTimeout)
withTimeout(connectTimeout) {
ntpUdpSocketOperations.prepare(address, NTP_PORT_NUMBER)
}
val ntpPacket = NTPPacket(versionNumber = ntpVersion.toInt(), mode = NTP_MODE_CLIENT)
val requestTime = referenceClock.referenceEpochTime
ntpPacket.transmitEpochTimestamp = EpochTimestamp(requestTime).asNTPTimestamp
val buffer = ntpPacketSerializer(ntpPacket)
ntpUdpSocketOperations.exchange(buffer, queryReadTimeout)
withTimeout(queryReadTimeout) {
ntpUdpSocketOperations.exchange(buffer)
}
val returnTime = referenceClock.referenceEpochTime
ntpPacketDeserializer(buffer)?.let { NTPExchangeResult(returnTime, it) }
} catch (_: Throwable) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package com.tidal.networktime.internal

import kotlin.time.Duration

@Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING")
internal expect class NTPUDPSocketOperations() {
suspend fun prepare(address: String, portNumber: Int, connectTimeout: Duration)
suspend fun prepare(address: String, portNumber: Int)

suspend fun exchange(buffer: ByteArray, readTimeout: Duration)
suspend fun exchange(buffer: ByteArray)

fun tearDown()
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,21 @@
package com.tidal.networktime.internal

import kotlinx.coroutines.withTimeout
import java.net.DatagramPacket
import java.net.DatagramSocket
import java.net.InetAddress
import kotlin.time.Duration
import kotlin.time.DurationUnit

@Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING", "BlockingMethodInNonBlockingContext")
internal actual class NTPUDPSocketOperations {
private var datagramSocket: DatagramSocket? = null

actual suspend fun prepare(address: String, portNumber: Int, connectTimeout: Duration) {
actual suspend fun prepare(address: String, portNumber: Int) {
datagramSocket = DatagramSocket()
withTimeout(connectTimeout) {
datagramSocket!!.connect(InetAddress.getByName(address), portNumber)
}
datagramSocket!!.connect(InetAddress.getByName(address), portNumber)
}

actual suspend fun exchange(buffer: ByteArray, readTimeout: Duration) {
actual suspend fun exchange(buffer: ByteArray) {
val exchangePacket = DatagramPacket(buffer, buffer.size)
datagramSocket!!.send(exchangePacket)
datagramSocket!!.soTimeout = readTimeout.toInt(DurationUnit.MILLISECONDS)
datagramSocket!!.receive(exchangePacket)
}

Expand Down

0 comments on commit 7c7bbbf

Please sign in to comment.