Skip to content

Commit

Permalink
fix: WebSocket data race crash (#578)
Browse files Browse the repository at this point in the history
  • Loading branch information
BobaFetters authored Jan 21, 2025
1 parent 0d46066 commit 055fbb9
Showing 2 changed files with 126 additions and 116 deletions.
Original file line number Diff line number Diff line change
@@ -207,6 +207,7 @@ public final class WebSocket: NSObject, WebSocketClient, StreamDelegate, WebSock
private var connected = false
private var isConnecting = false
private let mutex = NSLock()
private let serialQueue = DispatchQueue(label: "com.apollographql.WebSocket.serial", qos: .background)
private var compressionState = CompressionState()
private var writeQueue = OperationQueue()
private var readStack = [WSResponse]()
@@ -278,10 +279,12 @@ public final class WebSocket: NSObject, WebSocketClient, StreamDelegate, WebSock
Connect to the WebSocket server on a background thread.
*/
public func connect() {
guard !isConnecting else { return }
didDisconnect = false
isConnecting = true
createHTTPRequest()
serialQueue.sync {
guard !self.isConnecting else { return }
self.didDisconnect = false
self.isConnecting = true
self.createHTTPRequest()
}
}

/**
@@ -1106,19 +1109,21 @@ public final class WebSocket: NSObject, WebSocketClient, StreamDelegate, WebSock
Used to preform the disconnect delegate
*/
private func doDisconnect(_ error: (any Error)?) {
guard !didDisconnect else { return }
didDisconnect = true
isConnecting = false
mutex.lock()
connected = false
mutex.unlock()
guard canDispatch else {return}
callbackQueue.async { [weak self] in
guard let self = self else { return }
self.onDisconnect?(error)
self.delegate?.websocketDidDisconnect(socket: self, error: error)
let userInfo = error.map{ [Constants.WebsocketDisconnectionErrorKeyName: $0] }
NotificationCenter.default.post(name: NSNotification.Name(Constants.Notifications.WebsocketDidDisconnect), object: self, userInfo: userInfo)
serialQueue.sync {
guard !self.didDisconnect else { return }
self.didDisconnect = true
self.isConnecting = false
self.mutex.lock()
self.connected = false
self.mutex.unlock()
guard self.canDispatch else {return}
self.callbackQueue.async { [weak self] in
guard let self = self else { return }
self.onDisconnect?(error)
self.delegate?.websocketDidDisconnect(socket: self, error: error)
let userInfo = error.map{ [Constants.WebsocketDisconnectionErrorKeyName: $0] }
NotificationCenter.default.post(name: NSNotification.Name(Constants.Notifications.WebsocketDidDisconnect), object: self, userInfo: userInfo)
}
}
}

Original file line number Diff line number Diff line change
@@ -46,115 +46,118 @@ class FoundationStream : NSObject, WebSocketStream, StreamDelegate, SOCKSProxyab
private let workQueue = DispatchQueue(label: "com.apollographql.websocket", attributes: [])
private var inputStream: InputStream?
private var outputStream: OutputStream?
private let serialQueue = DispatchQueue(label: "com.apollographql.WebSocketStream.serial", qos: .background)
weak var delegate: (any WebSocketStreamDelegate)?
let BUFFER_MAX = 4096

var enableSOCKSProxy = false

func connect(url: URL, port: Int, timeout: TimeInterval, ssl: SSLSettings, completion: @escaping (((any Error)?) -> Void)) {
var readStream: Unmanaged<CFReadStream>?
var writeStream: Unmanaged<CFWriteStream>?
let h = url.host! as NSString
CFStreamCreatePairWithSocketToHost(nil, h, UInt32(port), &readStream, &writeStream)
inputStream = readStream!.takeRetainedValue()
outputStream = writeStream!.takeRetainedValue()

#if os(watchOS) //watchOS us unfortunately is missing the kCFStream properties to make this work
#else
if enableSOCKSProxy {
let proxyDict = CFNetworkCopySystemProxySettings()
let socksConfig = CFDictionaryCreateMutableCopy(nil, 0, proxyDict!.takeRetainedValue())
let propertyKey = CFStreamPropertyKey(rawValue: kCFStreamPropertySOCKSProxy)
CFWriteStreamSetProperty(outputStream, propertyKey, socksConfig)
CFReadStreamSetProperty(inputStream, propertyKey, socksConfig)
}
#endif

guard let inStream = inputStream, let outStream = outputStream else { return }
inStream.delegate = self
outStream.delegate = self
if ssl.useSSL {
inStream.setProperty(StreamSocketSecurityLevel.negotiatedSSL as AnyObject, forKey: Stream.PropertyKey.socketSecurityLevelKey)
outStream.setProperty(StreamSocketSecurityLevel.negotiatedSSL as AnyObject, forKey: Stream.PropertyKey.socketSecurityLevelKey)
serialQueue.sync {
var readStream: Unmanaged<CFReadStream>?
var writeStream: Unmanaged<CFWriteStream>?
let h = url.host! as NSString
CFStreamCreatePairWithSocketToHost(nil, h, UInt32(port), &readStream, &writeStream)
inputStream = readStream!.takeRetainedValue()
outputStream = writeStream!.takeRetainedValue()

#if os(watchOS) //watchOS us unfortunately is missing the kCFStream properties to make this work
#else
var settings = [NSObject: NSObject]()
if ssl.disableCertValidation {
settings[kCFStreamSSLValidatesCertificateChain] = NSNumber(value: false)
}
if ssl.overrideTrustHostname {
if let hostname = ssl.desiredTrustHostname {
settings[kCFStreamSSLPeerName] = hostname as NSString
} else {
settings[kCFStreamSSLPeerName] = kCFNull
}
}
if let sslClientCertificate = ssl.sslClientCertificate {
settings[kCFStreamSSLCertificates] = sslClientCertificate.streamSSLCertificates
if enableSOCKSProxy {
let proxyDict = CFNetworkCopySystemProxySettings()
let socksConfig = CFDictionaryCreateMutableCopy(nil, 0, proxyDict!.takeRetainedValue())
let propertyKey = CFStreamPropertyKey(rawValue: kCFStreamPropertySOCKSProxy)
CFWriteStreamSetProperty(outputStream, propertyKey, socksConfig)
CFReadStreamSetProperty(inputStream, propertyKey, socksConfig)
}

inStream.setProperty(settings, forKey: kCFStreamPropertySSLSettings as Stream.PropertyKey)
outStream.setProperty(settings, forKey: kCFStreamPropertySSLSettings as Stream.PropertyKey)
#endif

#if os(Linux)
#else
if let cipherSuites = ssl.cipherSuites {
guard let inStream = inputStream, let outStream = outputStream else { return }
inStream.delegate = self
outStream.delegate = self
if ssl.useSSL {
inStream.setProperty(StreamSocketSecurityLevel.negotiatedSSL as AnyObject, forKey: Stream.PropertyKey.socketSecurityLevelKey)
outStream.setProperty(StreamSocketSecurityLevel.negotiatedSSL as AnyObject, forKey: Stream.PropertyKey.socketSecurityLevelKey)
#if os(watchOS) //watchOS us unfortunately is missing the kCFStream properties to make this work
#else
if let sslContextIn = CFReadStreamCopyProperty(inputStream, CFStreamPropertyKey(rawValue: kCFStreamPropertySSLContext)) as! SSLContext?,
let sslContextOut = CFWriteStreamCopyProperty(outputStream, CFStreamPropertyKey(rawValue: kCFStreamPropertySSLContext)) as! SSLContext? {
let resIn = SSLSetEnabledCiphers(sslContextIn, cipherSuites, cipherSuites.count)
let resOut = SSLSetEnabledCiphers(sslContextOut, cipherSuites, cipherSuites.count)
if resIn != errSecSuccess {
completion(WebSocket.WSError(
type: .invalidSSLError,
message: "Error setting ingoing cypher suites",
code: Int(resIn)))
var settings = [NSObject: NSObject]()
if ssl.disableCertValidation {
settings[kCFStreamSSLValidatesCertificateChain] = NSNumber(value: false)
}
if ssl.overrideTrustHostname {
if let hostname = ssl.desiredTrustHostname {
settings[kCFStreamSSLPeerName] = hostname as NSString
} else {
settings[kCFStreamSSLPeerName] = kCFNull
}
if resOut != errSecSuccess {
completion(WebSocket.WSError(
type: .invalidSSLError,
message: "Error setting outgoing cypher suites",
code: Int(resOut)))
}
if let sslClientCertificate = ssl.sslClientCertificate {
settings[kCFStreamSSLCertificates] = sslClientCertificate.streamSSLCertificates
}

inStream.setProperty(settings, forKey: kCFStreamPropertySSLSettings as Stream.PropertyKey)
outStream.setProperty(settings, forKey: kCFStreamPropertySSLSettings as Stream.PropertyKey)
#endif

#if os(Linux)
#else
if let cipherSuites = ssl.cipherSuites {
#if os(watchOS) //watchOS us unfortunately is missing the kCFStream properties to make this work
#else
if let sslContextIn = CFReadStreamCopyProperty(inputStream, CFStreamPropertyKey(rawValue: kCFStreamPropertySSLContext)) as! SSLContext?,
let sslContextOut = CFWriteStreamCopyProperty(outputStream, CFStreamPropertyKey(rawValue: kCFStreamPropertySSLContext)) as! SSLContext? {
let resIn = SSLSetEnabledCiphers(sslContextIn, cipherSuites, cipherSuites.count)
let resOut = SSLSetEnabledCiphers(sslContextOut, cipherSuites, cipherSuites.count)
if resIn != errSecSuccess {
completion(WebSocket.WSError(
type: .invalidSSLError,
message: "Error setting ingoing cypher suites",
code: Int(resIn)))
}
if resOut != errSecSuccess {
completion(WebSocket.WSError(
type: .invalidSSLError,
message: "Error setting outgoing cypher suites",
code: Int(resOut)))
}
}
#endif
}
#endif
}
#endif
}

CFReadStreamSetDispatchQueue(inStream, workQueue)
CFWriteStreamSetDispatchQueue(outStream, workQueue)
inStream.open()
outStream.open()

var out = timeout// wait X seconds before giving up
workQueue.async { [weak self] in
while !outStream.hasSpaceAvailable {
usleep(100) // wait until the socket is ready
out -= 100
if out < 0 {
completion(
WebSocket.WSError(
type: .writeTimeoutError,
message: "Timed out waiting for the socket to be ready for a write",
code: 0))
return

} else if let error = outStream.streamError {
completion(error)
return // disconnectStream will be called.

} else if self == nil {
completion(WebSocket.WSError(
type: .closeError,
message: "socket object has been dereferenced",
code: 0))
return
CFReadStreamSetDispatchQueue(inStream, workQueue)
CFWriteStreamSetDispatchQueue(outStream, workQueue)
inStream.open()
outStream.open()

var out = timeout// wait X seconds before giving up
workQueue.async { [weak self] in
while !outStream.hasSpaceAvailable {
usleep(100) // wait until the socket is ready
out -= 100
if out < 0 {
completion(
WebSocket.WSError(
type: .writeTimeoutError,
message: "Timed out waiting for the socket to be ready for a write",
code: 0))
return

} else if let error = outStream.streamError {
completion(error)
return // disconnectStream will be called.

} else if self == nil {
completion(WebSocket.WSError(
type: .closeError,
message: "socket object has been dereferenced",
code: 0))
return
}
}
completion(nil) //success!
}
completion(nil) //success!
}
}

@@ -176,18 +179,20 @@ class FoundationStream : NSObject, WebSocketStream, StreamDelegate, SOCKSProxyab
}

func cleanup() {
if let stream = inputStream {
stream.delegate = nil
CFReadStreamSetDispatchQueue(stream, nil)
stream.close()
}
if let stream = outputStream {
stream.delegate = nil
CFWriteStreamSetDispatchQueue(stream, nil)
stream.close()
serialQueue.sync {
if let stream = inputStream {
stream.delegate = nil
CFReadStreamSetDispatchQueue(stream, nil)
stream.close()
}
if let stream = outputStream {
stream.delegate = nil
CFWriteStreamSetDispatchQueue(stream, nil)
stream.close()
}
outputStream = nil
inputStream = nil
}
outputStream = nil
inputStream = nil
}

#if os(Linux) || os(watchOS)

0 comments on commit 055fbb9

Please sign in to comment.