Skip to content

Commit

Permalink
STITCH-2989 Support additional watch() options in Swift SDK (#189)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamchel authored Jul 5, 2019
1 parent 32de5fa commit c369bf8
Show file tree
Hide file tree
Showing 7 changed files with 371 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .evg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ functions:
cd stitch-ios-sdk
export GEM_HOME=`pwd`
export GEM_PATH=`pwd`
/opt/chef/embedded/bin/gem install cocoapods -v 1.6.0.beta.1 --install-dir ./
/opt/chef/embedded/bin/gem install cocoapods -v 1.7.3 --install-dir ./
bin/pod install --repo-update || (rm -rf ~/.cocoapods/repos && bin/pod install --repo-update)
tasks:
- name: lint
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// swiftlint:disable file_length
// swiftlint:disable type_body_length

import Foundation
import MongoSwift
Expand Down Expand Up @@ -398,14 +399,58 @@ public class CoreRemoteMongoCollection<T: Codable>: Closable {
withRequestTimeout: nil)
}

/**
* Opens a MongoDB change stream against the collection to watch for changes. The resulting stream will be notified
* of all events on this collection that the active user is authorized to see based on the configured MongoDB
* rules.
*
* - Parameters:
* - delegate: The delegate that will react to events and errors from the resulting change stream.
*
* - Returns: A reference to the change stream opened by this method.
*/
public func watch(delegate: SSEStreamDelegate) throws -> RawSSEStream {
var args = baseOperationArgs

args["useCompactEvents"] = false

let stream = try service.streamFunction(withName: "watch", withArgs: [args], delegate: delegate)
self.streams.append(WeakReference(stream))
return stream
}

/**
* Opens a MongoDB change stream against the collection to watch for changes. The provided BSON document will be
* used as a match expression filter on the change events coming from the stream.
*
* See https://docs.mongodb.com/manual/reference/operator/aggregation/match/ for documentation around how to define
* a match filter.
*
* Defining the match expression to filter ChangeEvents is similar to defining the match expression for triggers:
* https://docs.mongodb.com/stitch/triggers/database-triggers/
*
* - Parameters:
* - matchFilter: The $match filter to apply to incoming change events
* - delegate: The delegate that will react to events and errors from the resulting change stream.
*
* - Returns: A reference to the change stream opened by this method.
*/
public func watch(matchFilter: Document, delegate: SSEStreamDelegate) throws -> RawSSEStream {
var args = baseOperationArgs

args["filter"] = matchFilter
args["useCompactEvents"] = false

let stream = try service.streamFunction(withName: "watch", withArgs: [args], delegate: delegate)
self.streams.append(WeakReference(stream))
return stream
}

/**
* Opens a MongoDB change stream against the collection to watch for changes
* made to specific documents. The documents to watch must be explicitly
* specified by their _id.
*
* - This method does not support opening change streams on an entire collection
* or a specific query.
*
* - Parameters:
* - ids: The list of _ids in the collection to watch.
* - delegate: The delegate that will react to events and errors from the resulting change stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,90 @@ final class CoreRemoteMongoCollectionUnitTests: XCMongoMobileTestCase {

class UnitTestWatchDelegate: SSEStreamDelegate { }

func testWatch() throws {
func testWatchFullCollection() throws {
let coll = try remoteCollection()

mockServiceClient.streamFunctionMock.doReturn(
result: RawSSEStream.init(), forArg1: .any, forArg2: .any, forArg3: .any
)

_ = try coll.watch(delegate: UnitTestWatchDelegate.init())

var (funcNameArg, funcArgsArg, _) = mockServiceClient.streamFunctionMock.capturedInvocations.last!

XCTAssertEqual("watch", funcNameArg)
XCTAssertEqual(1, funcArgsArg.count)

let expectedArgs: Document = [
"database": namespace.databaseName,
"collection": namespace.collectionName,
"useCompactEvents": false
]

XCTAssertEqual(expectedArgs, funcArgsArg[0] as? Document)

// should pass along errors
mockServiceClient.streamFunctionMock.doThrow(
error: StitchError.serviceError(withMessage: "whoops", withServiceErrorCode: .unknown),
forArg1: .any,
forArg2: .any,
forArg3: .any
)

do {
_ = try coll.watch(delegate: UnitTestWatchDelegate.init())
XCTFail("function did not fail where expected")
} catch {
// do nothing
}
}

func testWatchWithFilter() throws {
let coll = try remoteCollection()

mockServiceClient.streamFunctionMock.doReturn(
result: RawSSEStream.init(), forArg1: .any, forArg2: .any, forArg3: .any
)

_ = try coll.watch(
matchFilter: ["fullDocument.someField": "someValue"] as Document,
delegate: UnitTestWatchDelegate.init()
)

var (funcNameArg, funcArgsArg, _) = mockServiceClient.streamFunctionMock.capturedInvocations.last!

XCTAssertEqual("watch", funcNameArg)
XCTAssertEqual(1, funcArgsArg.count)

let expectedArgs: Document = [
"database": namespace.databaseName,
"collection": namespace.collectionName,
"filter": ["fullDocument.someField": "someValue"] as Document,
"useCompactEvents": false
]

XCTAssertEqual(expectedArgs, funcArgsArg[0] as? Document)

// should pass along errors
mockServiceClient.streamFunctionMock.doThrow(
error: StitchError.serviceError(withMessage: "whoops", withServiceErrorCode: .unknown),
forArg1: .any,
forArg2: .any,
forArg3: .any
)

do {
_ = try coll.watch(
matchFilter: ["fullDocument.someField": "someValue"] as Document,
delegate: UnitTestWatchDelegate.init()
)
XCTFail("function did not fail where expected")
} catch {
// do nothing
}
}

func testWatchIds() throws {
let coll = try remoteCollection()

mockServiceClient.streamFunctionMock.doReturn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,16 +384,83 @@ public class RemoteMongoCollection<T: Codable> {
}

/**
* Opens a MongoDB change stream against the collection to watch for changes
* made to specific documents. The documents to watch must be explicitly
* specified by their _id.
* Opens a MongoDB change stream against the collection to watch for changes. The resulting stream will be notified
* of all events on this collection that the active user is authorized to see based on the configured MongoDB
* rules.
*
* - This method has a generic type parameter of DelegateT, which is the type of the delegate that will react to
* events on the stream. This can be any type as long as it conforms to the `ChangeStreamDelegate` protocol, and
* the `DocumentT` type parameter on the delegate matches the `T` parameter of this collection.
*
* - When this method returns the `ChangeStreamSession`, the change stream may not yet be open. The stream is
* not open until the `didOpen` method is called on the provided delegate. This means that events that happen
* after this method returns will not necessarily be received by this stream until the `didOpen` method is
* called.
*
* - Parameters:
* - delegate: The delegate that will react to events and errors from the resulting change stream.
*
* - Returns: A reference to the change stream opened by this method.
*/
public func watch<DelegateT: ChangeStreamDelegate>(
delegate: DelegateT
) throws -> ChangeStreamSession<T> where DelegateT.DocumentT == T {
let session = ChangeStreamSession.init(changeEventType: .fullDocument(withDelegate: delegate))

let rawStream = try self.proxy.watch(delegate: session.internalDelegate)
session.rawStream = rawStream

return session
}

/**
* Opens a MongoDB change stream against the collection to watch for changes. The provided BSON document will be
* used as a match expression filter on the change events coming from the stream.
*
* - See https://docs.mongodb.com/manual/reference/operator/aggregation/match/ for documentation around how to
* define a match filter.
*
* - Defining the match expression to filter ChangeEvents is similar to defining the match expression for triggers:
* https://docs.mongodb.com/stitch/triggers/database-triggers/
*
* - This method has a generic type parameter of DelegateT, which is the type of the delegate that will react to
* events on the stream. This can be any type as long as it conforms to the `ChangeStreamDelegate` protocol, and
* the `DocumentT` type parameter on the delegate matches the `T` parameter of this collection.
*
* - This method does not support opening change streams on an entire collection
* or a specific query.
* - When this method returns the `ChangeStreamSession`, the change stream may not yet be open. The stream is
* not open until the `didOpen` method is called on the provided delegate. This means that events that happen
* after this method returns will not necessarily be received by this stream until the `didOpen` method is
* called.
*
* - Parameters:
* - matchFilter: The $match filter to apply to incoming change events
* - delegate: The delegate that will react to events and errors from the resulting change stream.
*
* - Returns: A reference to the change stream opened by this method.
*/
public func watch<DelegateT: ChangeStreamDelegate >(
matchFilter: Document,
delegate: DelegateT
) throws -> ChangeStreamSession<T> where DelegateT.DocumentT == T {
let session = ChangeStreamSession.init(changeEventType: .fullDocument(withDelegate: delegate))

let rawStream = try self.proxy.watch(matchFilter: matchFilter,
delegate: session.internalDelegate)
session.rawStream = rawStream

return session
}

/**
* Opens a MongoDB change stream against the collection to watch for changes
* made to specific documents. The documents to watch must be explicitly
* specified by their _id.
*
* - This method's forStreamType can be initialized with generic type parameters of FullDelegateT or
* CompactDelegateT, which are the type of the delegate that will react to events on the stream. These can be
* any type as long as they conform to either the `ChangeStreamDelegate` protocol or
* `CompactChangeStreamDelegate` protocol respectively, and the `DocumentT` type parameter on the delegate
* matches the `T` parameter of this collection.
*
* - When this method returns the `ChangeStreamSession`, the change stream may not yet be open. The stream is
* not open until the `didOpen` method is called on the provided delegate. This means that events that happen
Expand Down
Loading

0 comments on commit c369bf8

Please sign in to comment.