Skip to content

Commit

Permalink
Implement UDP exchange in Kotlin/Native
Browse files Browse the repository at this point in the history
  • Loading branch information
stoyicker committed Jan 23, 2024
1 parent 376a0cf commit d0c557e
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ internal actual class HostNameResolver {
}

private fun clear() {
cfHost?.let { CFRelease(it) }
cfHost = null
hostReference?.let { CFRelease(it) }
hostReference = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,22 @@ package com.tidal.networktime.internal

import com.tidal.networktime.ProtocolFamily
import kotlinx.cinterop.ExperimentalForeignApi
import kotlinx.cinterop.StableRef
import kotlinx.cinterop.alloc
import kotlinx.cinterop.asStableRef
import kotlinx.cinterop.memScoped
import kotlinx.cinterop.pointed
import kotlinx.cinterop.ptr
import kotlinx.cinterop.refTo
import kotlinx.cinterop.reinterpret
import kotlinx.cinterop.sizeOf
import kotlinx.cinterop.staticCFunction
import kotlinx.cinterop.toCPointer
import kotlinx.cinterop.toLong
import kotlinx.cinterop.value
import kotlinx.coroutines.delay
import kotlinx.coroutines.withTimeout
import platform.CoreFoundation.CFDataCreate
import platform.CoreFoundation.CFDataGetBytes
import platform.CoreFoundation.CFDataGetLength
import platform.CoreFoundation.CFDataRefVar
import platform.CoreFoundation.CFRangeMake
import platform.CoreFoundation.CFSocketCallBack
import platform.CoreFoundation.CFRelease
import platform.CoreFoundation.CFSocketConnectToAddress
import platform.CoreFoundation.CFSocketContext
import platform.CoreFoundation.CFSocketCreate
import platform.CoreFoundation.CFSocketInvalidate
import platform.CoreFoundation.CFSocketRef
import platform.CoreFoundation.CFSocketSendData
import platform.CoreFoundation.kCFAllocatorDefault
import platform.CoreFoundation.kCFSocketDataCallBack
import platform.darwin.inet_aton
import platform.darwin.inet_pton
import platform.posix.AF_INET
import platform.posix.AF_INET6
Expand All @@ -45,63 +31,39 @@ import platform.posix.signal
import platform.posix.sockaddr_in
import platform.posix.sockaddr_in6
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlin.time.DurationUnit

@OptIn(ExperimentalForeignApi::class)
@Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING")
internal actual class NTPUDPSocketOperations {
private var cfSocket: CFSocketRef? = null
private var userDataRef: StableRef<UserData>? = null

actual suspend fun prepareSocket(
actual suspend fun prepare(
address: String,
protocolFamily: ProtocolFamily,
portNumber: Int,
connectTimeout: Duration,
) {
userDataRef = StableRef.create(UserData())
val callback: CFSocketCallBack = staticCFunction { _, callbackType, _, data, info ->
val userData = info!!.asStableRef<UserData>().get()
if (callbackType != kCFSocketDataCallBack) {
return@staticCFunction
}
val reinterpretedData = data!!.reinterpret<CFDataRefVar>().pointed.value
val length = CFDataGetLength(reinterpretedData)
val range = CFRangeMake(0, length)
val bridgeBuffer = UByteArray(length.toInt())
CFDataGetBytes(reinterpretedData, range, bridgeBuffer.refTo(0))
userData.apply {
buffer = bridgeBuffer.toByteArray()
exchangeCompleted = true
}
}
signal(SIGPIPE, SIG_IGN)
cfSocket = memScoped {
val socketContext = alloc<CFSocketContext> {
version = 0
info = userDataRef!!.asCPointer()
retain = null
release = null
copyDescription = null
}
val socket = CFSocketCreate(
kCFAllocatorDefault,
when (protocolFamily) {
ProtocolFamily.INET -> PF_INET
ProtocolFamily.INET6 -> PF_INET6
},
SOCK_DGRAM,
IPPROTO_UDP,
kCFSocketDataCallBack,
callback,
socketContext.ptr,
)
val addrCFDataRef = when (protocolFamily) {
val address = "127.0.0.1"
val portNumber = 31337
cfSocket = CFSocketCreate(
kCFAllocatorDefault,
when (protocolFamily) {
ProtocolFamily.INET -> PF_INET
ProtocolFamily.INET6 -> PF_INET6
},
SOCK_DGRAM,
IPPROTO_UDP,
kCFSocketDataCallBack,
null,
null,
)
val addrCfDataRef = memScoped {
when (protocolFamily) {
ProtocolFamily.INET -> alloc<sockaddr_in> {
sin_family = AF_INET.toUByte()
sin_port = portNumber.toUShort()
inet_aton(address, sin_addr.ptr)
inet_pton(AF_INET, address, sin_addr.ptr)
}.run {
CFDataCreate(kCFAllocatorDefault, ptr.toLong().toCPointer(), sizeOf<sockaddr_in>())
}
Expand All @@ -116,16 +78,16 @@ internal actual class NTPUDPSocketOperations {
}
}
}
CFSocketConnectToAddress(
socket,
addrCFDataRef,
connectTimeout.toDouble(DurationUnit.MILLISECONDS),
)
socket
}
CFSocketConnectToAddress(
cfSocket,
addrCfDataRef,
connectTimeout.toDouble(DurationUnit.SECONDS)
)
}

actual suspend fun exchangeInPlace(buffer: ByteArray, readTimeout: Duration) {
actual suspend fun exchange(buffer: ByteArray, readTimeout: Duration) {
signal(SIGPIPE, SIG_IGN)
val bufferCFDataRef = CFDataCreate(
kCFAllocatorDefault,
buffer.asUByteArray().refTo(0),
Expand All @@ -137,22 +99,18 @@ internal actual class NTPUDPSocketOperations {
bufferCFDataRef,
Duration.INFINITE.toDouble(DurationUnit.MILLISECONDS),
)
withTimeout(readTimeout) {
while (!userDataRef!!.get().exchangeCompleted) {
delay(1.seconds)
}
}
/* val reinterpretedData = data!!.reinterpret<CFDataRefVar>().pointed.value
val length = CFDataGetLength(reinterpretedData)
val range = CFRangeMake(0, length)
val bridgeBuffer = UByteArray(length.toInt())
CFDataGetBytes(reinterpretedData, range, bridgeBuffer.refTo(0)) */
}

actual fun closeSocket() {
CFSocketInvalidate(cfSocket)
cfSocket?.let {
CFSocketInvalidate(it)
CFRelease(it)
}
cfSocket = null
userDataRef?.dispose()
userDataRef = null
}

private class UserData {
var exchangeCompleted = false
var buffer: ByteArray = byteArrayOf()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ internal class NTPExchanger(
): NTPExchangeResult? {
val ntpUdpSocketOperations = NTPUDPSocketOperations()
return try {
ntpUdpSocketOperations.prepareSocket(address, protocolFamily, NTP_PORT_NUMBER, connectTimeout)
ntpUdpSocketOperations.prepare(address, protocolFamily, NTP_PORT_NUMBER, connectTimeout)
val ntpPacket = NTPPacket(versionNumber = ntpVersion.toInt(), mode = NTP_MODE_CLIENT)
val requestTime = referenceClock.referenceEpochTime
ntpPacket.transmitEpochTimestamp = EpochTimestamp(requestTime).asNTPTimestamp
val buffer = ntpPacketSerializer(ntpPacket)
ntpUdpSocketOperations.exchangeInPlace(buffer, queryReadTimeout)
ntpUdpSocketOperations.exchange(buffer, queryReadTimeout)
val returnTime = referenceClock.referenceEpochTime
ntpPacketDeserializer(buffer)?.let { NTPExchangeResult(returnTime, it) }
} catch (_: Throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import kotlin.time.Duration

@Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING")
internal expect class NTPUDPSocketOperations() {
suspend fun prepareSocket(
suspend fun prepare(
address: String,
protocolFamily: ProtocolFamily,
portNumber: Int,
connectTimeout: Duration,
)

suspend fun exchangeInPlace(buffer: ByteArray, readTimeout: Duration)
suspend fun exchange(buffer: ByteArray, readTimeout: Duration)

fun closeSocket()
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import kotlin.time.DurationUnit
internal actual class NTPUDPSocketOperations {
private lateinit var datagramSocket: DatagramSocket

actual suspend fun prepareSocket(
actual suspend fun prepare(
address: String,
protocolFamily: ProtocolFamily,
portNumber: Int,
Expand All @@ -24,7 +24,7 @@ internal actual class NTPUDPSocketOperations {
datagramSocket.connect(InetAddress.getByName(address), portNumber)
}

actual suspend fun exchangeInPlace(buffer: ByteArray, readTimeout: Duration) {
actual suspend fun exchange(buffer: ByteArray, readTimeout: Duration) {
val exchangePacket = DatagramPacket(buffer, buffer.size)
withContext(Dispatchers.IO) {
datagramSocket.send(exchangePacket)
Expand Down
Binary file not shown.

0 comments on commit d0c557e

Please sign in to comment.