Skip to content

Commit

Permalink
Implement UDP exchange in Kotlin/Native
Browse files Browse the repository at this point in the history
  • Loading branch information
stoyicker committed Mar 15, 2024
1 parent 376a0cf commit ef958eb
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 207 deletions.
4 changes: 1 addition & 3 deletions .idea/artifacts/library_jvm.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions .idea/artifacts/samples_multiplatform_kotlin_shared_jvm.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import kotlinx.cinterop.ByteVar
import kotlinx.cinterop.ExperimentalForeignApi
import kotlinx.cinterop.alloc
import kotlinx.cinterop.allocArray
import kotlinx.cinterop.convert
import kotlinx.cinterop.memScoped
import kotlinx.cinterop.pointed
import kotlinx.cinterop.ptr
Expand Down Expand Up @@ -51,8 +52,8 @@ internal actual class HostNameResolver {
timeout: Duration,
includeINET: Boolean,
includeINET6: Boolean,
): Iterable<Pair<String, ProtocolFamily>> {
var ret: Iterable<Pair<String, ProtocolFamily>>? = emptySet()
): Iterable<String> {
var ret: Iterable<String>? = emptySet()
try {
ret = withTimeoutOrNull(timeout) { invokeInternal(hostName, includeINET, includeINET6) }
} finally {
Expand All @@ -70,7 +71,7 @@ internal actual class HostNameResolver {
hostName: String,
includeINET: Boolean,
includeINET6: Boolean,
): Iterable<Pair<String, ProtocolFamily>> {
): Iterable<String> {
hostReference = CFBridgingRetain(hostName as NSString)
cfHost = CFHostCreateWithName(kCFAllocatorDefault, hostReference as CFStringRef)
CFHostStartInfoResolution(cfHost, kCFHostAddresses, null)
Expand All @@ -82,21 +83,21 @@ internal actual class HostNameResolver {
addresses.takeIf { hasResolved.value }
addresses ?: return emptySet()
val count = CFArrayGetCount(addresses)
val ret = mutableSetOf<Pair<String, ProtocolFamily>>()
val ret = mutableSetOf<String>()
(0 until count).forEach {
val socketAddressData = CFArrayGetValueAtIndex(addresses, it) as CFDataRef
val sockAddr = CFDataGetBytePtr(socketAddressData)!!.reinterpret<sockaddr>().pointed
val addrPrettyToProtocolFamily = when (sockAddr.sa_family.toInt()) {
val addrPretty = when (sockAddr.sa_family.toInt()) {
AF_INET -> {
if (includeINET) {
val buffer = allocArray<ByteVar>(INET_ADDRSTRLEN)
inet_ntop(
AF_INET,
sockAddr.reinterpret<sockaddr_in>().sin_addr.readValue(),
buffer,
INET_ADDRSTRLEN.toUInt(),
INET_ADDRSTRLEN.convert(),
)
buffer.toKString() to ProtocolFamily.INET
buffer.toKString()
} else {
null
}
Expand All @@ -109,9 +110,9 @@ internal actual class HostNameResolver {
AF_INET6,
sockAddr.reinterpret<sockaddr_in6>().sin6_addr.readValue(),
buffer,
INET6_ADDRSTRLEN.toUInt(),
INET6_ADDRSTRLEN.convert(),
)
buffer.toKString() to ProtocolFamily.INET6
buffer.toKString()
} else {
null
}
Expand All @@ -121,15 +122,16 @@ internal actual class HostNameResolver {
null
}
}
if (addrPrettyToProtocolFamily != null) {
ret.add(addrPrettyToProtocolFamily)
if (addrPretty != null) {
ret.add(addrPretty)
}
}
ret
}
}

private fun clear() {
cfHost?.let { CFRelease(it) }
cfHost = null
hostReference?.let { CFRelease(it) }
hostReference = null
Expand Down
Original file line number Diff line number Diff line change
@@ -1,158 +1,106 @@
package com.tidal.networktime.internal

import com.tidal.networktime.ProtocolFamily
import kotlinx.cinterop.ByteVar
import kotlinx.cinterop.CPointer
import kotlinx.cinterop.ExperimentalForeignApi
import kotlinx.cinterop.StableRef
import kotlinx.cinterop.alloc
import kotlinx.cinterop.asStableRef
import kotlinx.cinterop.allocArray
import kotlinx.cinterop.convert
import kotlinx.cinterop.memScoped
import kotlinx.cinterop.pointed
import kotlinx.cinterop.ptr
import kotlinx.cinterop.refTo
import kotlinx.cinterop.reinterpret
import kotlinx.cinterop.sizeOf
import kotlinx.cinterop.staticCFunction
import kotlinx.cinterop.toCPointer
import kotlinx.cinterop.toLong
import kotlinx.cinterop.value
import kotlinx.coroutines.delay
import kotlinx.cinterop.set
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.withTimeout
import platform.CoreFoundation.CFDataCreate
import platform.CoreFoundation.CFDataGetBytes
import platform.CoreFoundation.CFDataGetLength
import platform.CoreFoundation.CFDataRefVar
import platform.CoreFoundation.CFRangeMake
import platform.CoreFoundation.CFSocketCallBack
import platform.CoreFoundation.CFSocketConnectToAddress
import platform.CoreFoundation.CFSocketContext
import platform.CoreFoundation.CFSocketCreate
import platform.CoreFoundation.CFSocketInvalidate
import platform.CoreFoundation.CFSocketRef
import platform.CoreFoundation.CFSocketSendData
import platform.CoreFoundation.kCFAllocatorDefault
import platform.CoreFoundation.kCFSocketDataCallBack
import platform.darwin.inet_aton
import platform.darwin.inet_pton
import platform.posix.AF_INET
import platform.posix.AF_INET6
import platform.posix.IPPROTO_UDP
import platform.posix.PF_INET
import platform.posix.PF_INET6
import platform.posix.SIGPIPE
import platform.posix.SIG_IGN
import platform.posix.SOCK_DGRAM
import platform.posix.signal
import platform.posix.sockaddr_in
import platform.posix.sockaddr_in6
import platform.Network.NW_CONNECTION_FINAL_MESSAGE_CONTEXT
import platform.Network.NW_PARAMETERS_DEFAULT_CONFIGURATION
import platform.Network.NW_PARAMETERS_DISABLE_PROTOCOL
import platform.Network.nw_connection_create
import platform.Network.nw_connection_force_cancel
import platform.Network.nw_connection_receive
import platform.Network.nw_connection_send
import platform.Network.nw_connection_set_queue
import platform.Network.nw_connection_set_state_changed_handler
import platform.Network.nw_connection_start
import platform.Network.nw_connection_state_cancelled
import platform.Network.nw_connection_state_failed
import platform.Network.nw_connection_state_ready
import platform.Network.nw_connection_state_t
import platform.Network.nw_connection_t
import platform.Network.nw_endpoint_create_host
import platform.Network.nw_error_t
import platform.Network.nw_parameters_create_secure_udp
import platform.darwin._dispatch_data_destructor_free
import platform.darwin.dispatch_data_apply
import platform.darwin.dispatch_data_create
import platform.darwin.dispatch_data_t
import platform.darwin.dispatch_get_current_queue
import platform.posix.memcpy
import kotlin.test.assertEquals
import kotlin.test.assertNull
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlin.time.DurationUnit

@OptIn(ExperimentalForeignApi::class)
@Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING")
internal actual class NTPUDPSocketOperations {
private var cfSocket: CFSocketRef? = null
private var userDataRef: StableRef<UserData>? = null
private var connection: nw_connection_t = null

actual suspend fun prepareSocket(
address: String,
protocolFamily: ProtocolFamily,
portNumber: Int,
connectTimeout: Duration,
) {
userDataRef = StableRef.create(UserData())
val callback: CFSocketCallBack = staticCFunction { _, callbackType, _, data, info ->
val userData = info!!.asStableRef<UserData>().get()
if (callbackType != kCFSocketDataCallBack) {
return@staticCFunction
}
val reinterpretedData = data!!.reinterpret<CFDataRefVar>().pointed.value
val length = CFDataGetLength(reinterpretedData)
val range = CFRangeMake(0, length)
val bridgeBuffer = UByteArray(length.toInt())
CFDataGetBytes(reinterpretedData, range, bridgeBuffer.refTo(0))
userData.apply {
buffer = bridgeBuffer.toByteArray()
exchangeCompleted = true
actual suspend fun prepare(address: String, portNumber: Int, connectTimeout: Duration) {
val parameters = nw_parameters_create_secure_udp(
NW_PARAMETERS_DISABLE_PROTOCOL,
NW_PARAMETERS_DEFAULT_CONFIGURATION
)
val endpoint = nw_endpoint_create_host(address, portNumber.toString())
connection = nw_connection_create(endpoint, parameters)
nw_connection_set_queue(connection, dispatch_get_current_queue())
val connectionStateDeferred = CompletableDeferred<nw_connection_state_t>()
nw_connection_set_state_changed_handler(connection) { state: nw_connection_state_t, _ ->
when (state) {
nw_connection_state_ready, nw_connection_state_failed, nw_connection_state_cancelled ->
connectionStateDeferred.complete(state)
}
}
signal(SIGPIPE, SIG_IGN)
cfSocket = memScoped {
val socketContext = alloc<CFSocketContext> {
version = 0
info = userDataRef!!.asCPointer()
retain = null
release = null
copyDescription = null
}
val socket = CFSocketCreate(
kCFAllocatorDefault,
when (protocolFamily) {
ProtocolFamily.INET -> PF_INET
ProtocolFamily.INET6 -> PF_INET6
},
SOCK_DGRAM,
IPPROTO_UDP,
kCFSocketDataCallBack,
callback,
socketContext.ptr,
)
val addrCFDataRef = when (protocolFamily) {
ProtocolFamily.INET -> alloc<sockaddr_in> {
sin_family = AF_INET.toUByte()
sin_port = portNumber.toUShort()
inet_aton(address, sin_addr.ptr)
}.run {
CFDataCreate(kCFAllocatorDefault, ptr.toLong().toCPointer(), sizeOf<sockaddr_in>())
}

ProtocolFamily.INET6 -> {
alloc<sockaddr_in6> {
sin6_family = AF_INET6.toUByte()
sin6_port = portNumber.toUShort()
inet_pton(AF_INET6, address, sin6_addr.ptr)
}.run {
CFDataCreate(kCFAllocatorDefault, ptr.toLong().toCPointer(), sizeOf<sockaddr_in6>())
}
}
}
CFSocketConnectToAddress(
socket,
addrCFDataRef,
connectTimeout.toDouble(DurationUnit.MILLISECONDS),
)
socket
nw_connection_start(connection)
withTimeout(connectTimeout) {
assertEquals(nw_connection_state_ready, connectionStateDeferred.await())
}
}

actual suspend fun exchangeInPlace(buffer: ByteArray, readTimeout: Duration) {
val bufferCFDataRef = CFDataCreate(
kCFAllocatorDefault,
buffer.asUByteArray().refTo(0),
buffer.size.toLong(),
)
CFSocketSendData(
cfSocket,
null,
bufferCFDataRef,
Duration.INFINITE.toDouble(DurationUnit.MILLISECONDS),
)
withTimeout(readTimeout) {
while (!userDataRef!!.get().exchangeCompleted) {
delay(1.seconds)
@OptIn(ExperimentalForeignApi::class)
actual suspend fun exchange(buffer: ByteArray, readTimeout: Duration) {
val toSendData = memScoped {
val cArray = allocArray<ByteVar>(buffer.size)
buffer.forEachIndexed { i, it ->
cArray[i] = it
}
cArray
}
nw_connection_send(
connection,
dispatch_data_create(toSendData, buffer.size.convert(), null, _dispatch_data_destructor_free),
NW_CONNECTION_FINAL_MESSAGE_CONTEXT,
true
) {
assertNull(it)
}
val connectionReceptionDeferred = CompletableDeferred<dispatch_data_t>()
nw_connection_receive(
connection,
1.convert(),
buffer.size.convert()
) { content: dispatch_data_t, _, _, error: nw_error_t ->
assertNull(error)
connectionReceptionDeferred.complete(content)
}
val receivedData = withTimeout(readTimeout) {
connectionReceptionDeferred.await()
}
dispatch_data_apply(receivedData) { _, _, regionPointer, _ ->
memcpy(buffer.refTo(0), regionPointer!!.reinterpret<ByteVar>(), buffer.size.convert())
false
}
}

actual fun closeSocket() {
CFSocketInvalidate(cfSocket)
cfSocket = null
userDataRef?.dispose()
userDataRef = null
}

private class UserData {
var exchangeCompleted = false
var buffer: ByteArray = byteArrayOf()
actual fun tearDown() {
nw_connection_force_cancel(connection)
connection = null
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package com.tidal.networktime

import com.tidal.networktime.internal.SNTPClientImpl
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import okio.Path.Companion.toPath
import kotlin.time.Duration
Expand All @@ -14,7 +11,6 @@ import kotlin.time.Duration.Companion.seconds
* [ntpServers] to obtain information about their provided time.
*
* @param ntpServers Representation of supported unicast NTP sources.
* @param coroutineScope The scope where synchronization will run on.
* @param synchronizationInterval The amount of time to wait between a sync finishing and the next
* one being started.
* @param backupFilePath A path to a file that will be used to save the selected received NTP
Expand All @@ -23,17 +19,13 @@ import kotlin.time.Duration.Companion.seconds
* packet has been received and processed. If not `null` but writing or reading fail when attempted,
* program execution will continue as if it had been `null` until the next attempt.
*/
class SNTPClient
@OptIn(DelicateCoroutinesApi::class)
constructor(
class SNTPClient(
vararg val ntpServers: NTPServer,
val coroutineScope: CoroutineScope = GlobalScope,
val synchronizationInterval: Duration = 64.seconds,
val backupFilePath: String? = null,
) {
private val delegate = SNTPClientImpl(
ntpServers,
coroutineScope,
backupFilePath?.toPath(),
synchronizationInterval,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.tidal.networktime.internal

import com.tidal.networktime.ProtocolFamily
import kotlin.time.Duration

@Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING")
Expand All @@ -10,5 +9,5 @@ internal expect class HostNameResolver() {
timeout: Duration,
includeINET: Boolean,
includeINET6: Boolean,
): Iterable<Pair<String, ProtocolFamily>>
): Iterable<String>
}
Loading

0 comments on commit ef958eb

Please sign in to comment.