Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: WebSocket data race crash #578

Merged
merged 5 commits into from
Jan 21, 2025
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@

import Foundation
import CommonCrypto
import Apollo
BobaFetters marked this conversation as resolved.
Show resolved Hide resolved

//Standard WebSocket close codes
enum CloseCode : UInt16 {
@@ -207,6 +208,7 @@ public final class WebSocket: NSObject, WebSocketClient, StreamDelegate, WebSock
private var connected = false
private var isConnecting = false
private let mutex = NSLock()
private let serialQueue = DispatchQueue(label: "com.apollographql.WebSocket.serial", qos: .background)
private var compressionState = CompressionState()
private var writeQueue = OperationQueue()
private var readStack = [WSResponse]()
@@ -278,10 +280,12 @@ public final class WebSocket: NSObject, WebSocketClient, StreamDelegate, WebSock
Connect to the WebSocket server on a background thread.
*/
public func connect() {
guard !isConnecting else { return }
didDisconnect = false
isConnecting = true
createHTTPRequest()
serialQueue.sync {
guard !self.isConnecting else { return }
self.didDisconnect = false
self.isConnecting = true
self.createHTTPRequest()
}
}

/**
@@ -1106,19 +1110,21 @@ public final class WebSocket: NSObject, WebSocketClient, StreamDelegate, WebSock
Used to preform the disconnect delegate
*/
private func doDisconnect(_ error: (any Error)?) {
guard !didDisconnect else { return }
didDisconnect = true
isConnecting = false
mutex.lock()
connected = false
mutex.unlock()
guard canDispatch else {return}
callbackQueue.async { [weak self] in
guard let self = self else { return }
self.onDisconnect?(error)
self.delegate?.websocketDidDisconnect(socket: self, error: error)
let userInfo = error.map{ [Constants.WebsocketDisconnectionErrorKeyName: $0] }
NotificationCenter.default.post(name: NSNotification.Name(Constants.Notifications.WebsocketDidDisconnect), object: self, userInfo: userInfo)
serialQueue.sync {
guard !self.didDisconnect else { return }
self.didDisconnect = true
self.isConnecting = false
self.mutex.lock()
self.connected = false
self.mutex.unlock()
guard self.canDispatch else {return}
self.callbackQueue.async { [weak self] in
guard let self = self else { return }
self.onDisconnect?(error)
self.delegate?.websocketDidDisconnect(socket: self, error: error)
let userInfo = error.map{ [Constants.WebsocketDisconnectionErrorKeyName: $0] }
NotificationCenter.default.post(name: NSNotification.Name(Constants.Notifications.WebsocketDidDisconnect), object: self, userInfo: userInfo)
}
}
}

Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
// Derived Work License: https://github.com/apollographql/apollo-ios/blob/main/LICENSE

import Foundation
import Apollo

protocol WebSocketStreamDelegate: AnyObject {
func newBytesInStream()
@@ -44,8 +45,8 @@ protocol WebSocketStream {

class FoundationStream : NSObject, WebSocketStream, StreamDelegate, SOCKSProxyable {
private let workQueue = DispatchQueue(label: "com.apollographql.websocket", attributes: [])
private var inputStream: InputStream?
private var outputStream: OutputStream?
@Atomic private var inputStream: InputStream?
@Atomic private var outputStream: OutputStream?
weak var delegate: (any WebSocketStreamDelegate)?
let BUFFER_MAX = 4096

@@ -56,8 +57,8 @@ class FoundationStream : NSObject, WebSocketStream, StreamDelegate, SOCKSProxyab
var writeStream: Unmanaged<CFWriteStream>?
let h = url.host! as NSString
CFStreamCreatePairWithSocketToHost(nil, h, UInt32(port), &readStream, &writeStream)
inputStream = readStream!.takeRetainedValue()
outputStream = writeStream!.takeRetainedValue()
$inputStream.mutate { $0 = readStream!.takeRetainedValue() }
$outputStream.mutate { $0 = writeStream!.takeRetainedValue() }

#if os(watchOS) //watchOS us unfortunately is missing the kCFStream properties to make this work
#else
@@ -186,8 +187,8 @@ class FoundationStream : NSObject, WebSocketStream, StreamDelegate, SOCKSProxyab
CFWriteStreamSetDispatchQueue(stream, nil)
stream.close()
}
outputStream = nil
inputStream = nil
$outputStream.mutate { $0 = nil }
$inputStream.mutate { $0 = nil }
}

#if os(Linux) || os(watchOS)
Loading