diff --git a/Amplify/Categories/DataStore/Model/Internal/ModelProviderDecoder.swift b/Amplify/Categories/DataStore/Model/Internal/ModelProviderDecoder.swift index cf8ebab4e0..8470cba587 100644 --- a/Amplify/Categories/DataStore/Model/Internal/ModelProviderDecoder.swift +++ b/Amplify/Categories/DataStore/Model/Internal/ModelProviderDecoder.swift @@ -29,6 +29,17 @@ extension ModelProviderRegistry { } } +/// Extension to hold the decoder sources +public extension ModelProviderRegistry { + + /// Static decoder sources that will be referenced to initialize different type of decoders having source as + /// a metadata. + struct DecoderSource { + public static let dataStore = "DataStore" + public static let appSync = "AppSync" + } +} + /// `ModelProviderDecoder` provides decoding and lazy reference functionality. /// /// - Warning: Although this has `public` access, it is intended for internal & codegen use and should not be used diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Core/AppSyncModelDecoder.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Core/AppSyncModelDecoder.swift index 9fc1da75b7..b70728b052 100644 --- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Core/AppSyncModelDecoder.swift +++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Core/AppSyncModelDecoder.swift @@ -13,15 +13,13 @@ import Amplify /// can be decoded to the Model, then the model provider is created as a "loaded" reference. public struct AppSyncModelDecoder: ModelProviderDecoder { - public static let AppSyncSource = "AppSync" - /// Metadata that contains metadata of a model, specifically the identifiers used to hydrate the model. struct Metadata: Codable { let identifiers: [LazyReferenceIdentifier] let apiName: String? let source: String - init(identifiers: [LazyReferenceIdentifier], apiName: String?, source: String = AppSyncSource) { + init(identifiers: [LazyReferenceIdentifier], apiName: String?, source: String = ModelProviderRegistry.DecoderSource.appSync) { self.identifiers = identifiers self.apiName = apiName self.source = source @@ -30,7 +28,7 @@ public struct AppSyncModelDecoder: ModelProviderDecoder { public static func decode(modelType: ModelType.Type, decoder: Decoder) -> AnyModelProvider? { if let metadata = try? Metadata(from: decoder) { - if metadata.source == AppSyncSource { + if metadata.source == ModelProviderRegistry.DecoderSource.appSync { log.verbose("Creating not loaded model \(modelType.modelName) with metadata \(metadata)") return AppSyncModelProvider(metadata: metadata).eraseToAnyModelProvider() } else { diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Core/AppSyncModelMetadata.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Core/AppSyncModelMetadata.swift index 31cb93303a..d03f8623ac 100644 --- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Core/AppSyncModelMetadata.swift +++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Core/AppSyncModelMetadata.swift @@ -31,8 +31,11 @@ public struct AppSyncModelMetadataUtils { } } - static func addMetadata(toModel graphQLData: JSONValue, - apiName: String?) -> JSONValue { + static func addMetadata( + toModel graphQLData: JSONValue, + apiName: String?, + source: String = ModelProviderRegistry.DecoderSource.appSync) -> JSONValue { + guard case var .object(modelJSON) = graphQLData else { Amplify.API.log.debug("Not an model object: \(graphQLData)") return graphQLData @@ -85,7 +88,8 @@ public struct AppSyncModelMetadataUtils { // Scenario: Belongs-To Primary Keys only are available for lazy loading if let modelIdentifierMetadata = createModelIdentifierMetadata(associatedModelType, modelObject: modelObject, - apiName: apiName) { + apiName: apiName, + source: source) { if let serializedMetadata = try? encoder.encode(modelIdentifierMetadata), let metadataJSON = try? decoder.decode(JSONValue.self, from: serializedMetadata) { Amplify.API.log.verbose("Adding [\(modelField.name): \(metadataJSON)]") @@ -166,7 +170,8 @@ public struct AppSyncModelMetadataUtils { /// are more keys in the `modelObject` which means it was eager loaded. static func createModelIdentifierMetadata(_ associatedModel: Model.Type, modelObject: [String: JSONValue], - apiName: String?) -> AppSyncModelDecoder.Metadata? { + apiName: String?, + source: String) -> AppSyncModelDecoder.Metadata? { let primarykeys = associatedModel.schema.primaryKey var identifiers = [LazyReferenceIdentifier]() for identifierField in primarykeys.fields { @@ -180,7 +185,10 @@ public struct AppSyncModelMetadataUtils { modelObject["_deleted"] = nil modelObject["_version"] = nil if !identifiers.isEmpty && (identifiers.count) == modelObject.keys.count { - return AppSyncModelDecoder.Metadata(identifiers: identifiers, apiName: apiName) + return AppSyncModelDecoder.Metadata( + identifiers: identifiers, + apiName: apiName, + source: source) } else { return nil } diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Core/AppSyncModelProvider.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Core/AppSyncModelProvider.swift index ed717eec64..680c5afd14 100644 --- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Core/AppSyncModelProvider.swift +++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Core/AppSyncModelProvider.swift @@ -12,19 +12,21 @@ import AWSPluginsCore public class AppSyncModelProvider: ModelProvider { let apiName: String? - + let source: String var loadedState: ModelProviderState // Creates a "not loaded" provider init(metadata: AppSyncModelDecoder.Metadata) { self.loadedState = .notLoaded(identifiers: metadata.identifiers) self.apiName = metadata.apiName + self.source = metadata.source } // Creates a "loaded" provider init(model: ModelType?) { self.loadedState = .loaded(model: model) self.apiName = nil + self.source = ModelProviderRegistry.DecoderSource.appSync } // MARK: - APIs @@ -62,8 +64,12 @@ public class AppSyncModelProvider: ModelProvider { public func encode(to encoder: Encoder) throws { switch loadedState { case .notLoaded(let identifiers): - var container = encoder.singleValueContainer() - try container.encode(identifiers) + let metadata = AppSyncModelDecoder.Metadata( + identifiers: identifiers ?? [], + apiName: apiName, + source: source) + try metadata.encode(to: encoder) + case .loaded(let element): try element.encode(to: encoder) } diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLOperation.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLOperation.swift index a6115ff171..d57c2ba1c4 100644 --- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLOperation.swift +++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLOperation.swift @@ -153,7 +153,7 @@ final public class AWSGraphQLOperation: GraphQLOperation { private func getEndpointInterceptors(from request: GraphQLOperationRequest) -> Result { getEndpointConfig(from: request).flatMap { endpointConfig in do { - if let pluginOptions = request.options.pluginOptions as? AWSPluginOptions, + if let pluginOptions = request.options.pluginOptions as? AWSAPIPluginDataStoreOptions, let authType = pluginOptions.authType { return .success(try pluginConfig.interceptorsForEndpoint( withConfig: endpointConfig, diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift index b8e42644cc..bd457fcc3c 100644 --- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift +++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift @@ -81,7 +81,7 @@ public class AWSGraphQLSubscriptionTaskRunner: InternalTaskRunner, // Retrieve request plugin option and // auth type in case of a multi-auth setup - let pluginOptions = request.options.pluginOptions as? AWSPluginOptions + let pluginOptions = request.options.pluginOptions as? AWSAPIPluginDataStoreOptions let urlRequest = generateSubscriptionURLRequest(from: endpointConfig) // Retrieve the subscription connection @@ -280,7 +280,7 @@ final public class AWSGraphQLSubscriptionOperation: GraphQLSubscri // Retrieve request plugin option and // auth type in case of a multi-auth setup - let pluginOptions = request.options.pluginOptions as? AWSPluginOptions + let pluginOptions = request.options.pluginOptions as? AWSAPIPluginDataStoreOptions let urlRequest = generateSubscriptionURLRequest(from: endpointConfig) // Retrieve the subscription connection diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Support/Decode/GraphQLResponseDecoder+DecodeData.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Support/Decode/GraphQLResponseDecoder+DecodeData.swift index bfaf333a7f..ea37cc8343 100644 --- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Support/Decode/GraphQLResponseDecoder+DecodeData.swift +++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Support/Decode/GraphQLResponseDecoder+DecodeData.swift @@ -47,8 +47,17 @@ extension GraphQLResponseDecoder { case .object(var graphQLDataObject) = graphQLData, case .array(var graphQLDataArray) = graphQLDataObject["items"] { for (index, item) in graphQLDataArray.enumerated() { - let modelJSON = AppSyncModelMetadataUtils.addMetadata(toModel: item, - apiName: request.apiName) + let modelJSON: JSONValue + if let _ = (request.options.pluginOptions as? AWSAPIPluginDataStoreOptions) { + modelJSON = AppSyncModelMetadataUtils.addMetadata( + toModel: item, + apiName: request.apiName, + source: ModelProviderRegistry.DecoderSource.dataStore) + } else { + modelJSON = AppSyncModelMetadataUtils.addMetadata( + toModel: item, + apiName: request.apiName) + } graphQLDataArray[index] = modelJSON } graphQLDataObject["items"] = JSONValue.array(graphQLDataArray) @@ -107,7 +116,7 @@ extension GraphQLResponseDecoder { // latest version of the developer's app will continue to work because the the mutation request sent from the // latest library continues to have the typename field. private func shouldAddTypename(to graphQLData: JSONValue) -> JSONValue? { - if let modelName = modelName, + if let modelName = dataStorePluginOptions?.modelName, request.responseType == MutationSync.self, case var .object(modelJSON) = graphQLData, // No need to replace existing response payloads that have it already diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Support/Decode/GraphQLResponseDecoder.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Support/Decode/GraphQLResponseDecoder.swift index 8b0afc0af6..a80a16e4b3 100644 --- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Support/Decode/GraphQLResponseDecoder.swift +++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Support/Decode/GraphQLResponseDecoder.swift @@ -15,18 +15,17 @@ class GraphQLResponseDecoder { var response: Data let decoder = JSONDecoder() let encoder = JSONEncoder() - let modelName: String? + let dataStorePluginOptions: AWSAPIPluginDataStoreOptions? public init(request: GraphQLOperationRequest, response: Data = Data()) { self.request = request self.response = response decoder.dateDecodingStrategy = ModelDateFormatting.decodingStrategy encoder.dateEncodingStrategy = ModelDateFormatting.encodingStrategy - if let pluginOptions = request.options.pluginOptions as? AWSPluginOptions, - let modelName = pluginOptions.modelName { - self.modelName = modelName + if let pluginOptions = request.options.pluginOptions as? AWSAPIPluginDataStoreOptions { + self.dataStorePluginOptions = pluginOptions } else { - self.modelName = nil + self.dataStorePluginOptions = nil } } diff --git a/AmplifyPlugins/API/Tests/AWSAPIPluginTests/Support/Decode/GraphQLResponseDecoderLazyPostComment4V2Tests.swift b/AmplifyPlugins/API/Tests/AWSAPIPluginTests/Support/Decode/GraphQLResponseDecoderLazyPostComment4V2Tests.swift index 38a58e716f..fe72e229af 100644 --- a/AmplifyPlugins/API/Tests/AWSAPIPluginTests/Support/Decode/GraphQLResponseDecoderLazyPostComment4V2Tests.swift +++ b/AmplifyPlugins/API/Tests/AWSAPIPluginTests/Support/Decode/GraphQLResponseDecoderLazyPostComment4V2Tests.swift @@ -376,7 +376,12 @@ class GraphQLResponseDecoderLazyPostComment4V2Tests: XCTestCase, SharedTestCases ] ] + let expectedEncodedCommentModel = """ + {"content":"content","createdAt":null,"id":"id","post":{"identifiers":[{"name":"id","value":"\(post.id)"}],"source":"AppSync"},"updatedAt":null} + """ + let commentWithLazyLoadPost = try decoder.decodeToResponseType(graphQLData) + XCTAssertEqual(try commentWithLazyLoadPost.toJSON(), expectedEncodedCommentModel) XCTAssertEqual(commentWithLazyLoadPost.id, "id") XCTAssertEqual(commentWithLazyLoadPost.content, "content") switch commentWithLazyLoadPost._post.modelProvider.getState() { diff --git a/AmplifyPlugins/Core/AWSPluginsCore/AWSAPIPluginDataStoreOptions.swift b/AmplifyPlugins/Core/AWSPluginsCore/AWSAPIPluginDataStoreOptions.swift new file mode 100644 index 0000000000..f3035b76f0 --- /dev/null +++ b/AmplifyPlugins/Core/AWSPluginsCore/AWSAPIPluginDataStoreOptions.swift @@ -0,0 +1,28 @@ +// +// Copyright Amazon.com Inc. or its affiliates. +// All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + +import Foundation +import Amplify + +/// Plugin specific options type +/// +/// - Warning: Although this has `public` access, it is intended for internal use and should not be used directly +/// by host applications. The behavior of this may change without warning. +public struct AWSAPIPluginDataStoreOptions { + + /// authorization type + public let authType: AWSAuthorizationType? + + /// name of the model + public let modelName: String + + public init(authType: AWSAuthorizationType?, + modelName: String) { + self.authType = authType + self.modelName = modelName + } +} diff --git a/AmplifyPlugins/Core/AWSPluginsCore/AWSPluginOptions.swift b/AmplifyPlugins/Core/AWSPluginsCore/AWSPluginOptions.swift index e0aac8de36..b6f83ec2b3 100644 --- a/AmplifyPlugins/Core/AWSPluginsCore/AWSPluginOptions.swift +++ b/AmplifyPlugins/Core/AWSPluginsCore/AWSPluginOptions.swift @@ -9,6 +9,26 @@ import Foundation import Amplify /// Plugin specific options type +/// +/// - Warning: Although this has `public` access, it is intended for internal use and should not be used directly +/// by host applications. The behavior of this may change without warning. +/// +/// This method was used internally by DataStore to pass information to APIPlugin, it +/// has since been renamed to `AWSDataStorePluginOptions`. For customers +/// looking to use the runtime authType parameter, this is a feature that should result in +/// an options object on APIPlugin as something like `AWSAPIPluginOptions`, ie. +/// +///```swift +///public struct AWSAPIPluginOptions { +/// /// authorization type +/// public let authType: AWSAuthorizationType? +/// +/// public init(authType: AWSAuthorizationType?) { +/// self.authType = authType +/// } +///} +///``` +@available(*, deprecated, message: "Intended for internal use.") public struct AWSPluginOptions { /// authorization type @@ -17,7 +37,8 @@ public struct AWSPluginOptions { /// name of the model public let modelName: String? - public init(authType: AWSAuthorizationType?, modelName: String?) { + public init(authType: AWSAuthorizationType?, + modelName: String) { self.authType = authType self.modelName = modelName } diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Model/GraphQLRequest/GraphQLRequest+AnyModelWithSync.swift b/AmplifyPlugins/Core/AWSPluginsCore/Model/GraphQLRequest/GraphQLRequest+AnyModelWithSync.swift index cdc367dde3..44af846765 100644 --- a/AmplifyPlugins/Core/AWSPluginsCore/Model/GraphQLRequest/GraphQLRequest+AnyModelWithSync.swift +++ b/AmplifyPlugins/Core/AWSPluginsCore/Model/GraphQLRequest/GraphQLRequest+AnyModelWithSync.swift @@ -70,7 +70,7 @@ extension GraphQLRequest: ModelSyncGraphQLRequestFactory { documentBuilder.add(decorator: AuthRuleDecorator(.query, authType: authType)) let document = documentBuilder.build() - let awsPluginOptions = AWSPluginOptions(authType: authType, modelName: modelName) + let awsPluginOptions = AWSAPIPluginDataStoreOptions(authType: authType, modelName: modelName) let requestOptions = GraphQLRequest.Options(pluginOptions: awsPluginOptions) return GraphQLRequest(document: document.stringValue, @@ -158,7 +158,7 @@ extension GraphQLRequest: ModelSyncGraphQLRequestFactory { documentBuilder.add(decorator: AuthRuleDecorator(.mutation, authType: authType)) let document = documentBuilder.build() - let awsPluginOptions = AWSPluginOptions(authType: authType, modelName: modelSchema.name) + let awsPluginOptions = AWSAPIPluginDataStoreOptions(authType: authType, modelName: modelSchema.name) let requestOptions = GraphQLRequest.Options(pluginOptions: awsPluginOptions) return GraphQLRequest(document: document.stringValue, @@ -180,7 +180,7 @@ extension GraphQLRequest: ModelSyncGraphQLRequestFactory { documentBuilder.add(decorator: AuthRuleDecorator(.subscription(subscriptionType, nil), authType: authType)) let document = documentBuilder.build() - let awsPluginOptions = AWSPluginOptions(authType: authType, modelName: modelSchema.name) + let awsPluginOptions = AWSAPIPluginDataStoreOptions(authType: authType, modelName: modelSchema.name) let requestOptions = GraphQLRequest.Options(pluginOptions: awsPluginOptions) return GraphQLRequest(document: document.stringValue, variables: document.variables, @@ -202,7 +202,10 @@ extension GraphQLRequest: ModelSyncGraphQLRequestFactory { documentBuilder.add(decorator: AuthRuleDecorator(.subscription(subscriptionType, claims), authType: authType)) let document = documentBuilder.build() - let awsPluginOptions = AWSPluginOptions(authType: authType, modelName: modelSchema.name) + let awsPluginOptions = AWSAPIPluginDataStoreOptions( + authType: authType, + modelName: modelSchema.name + ) let requestOptions = GraphQLRequest.Options(pluginOptions: awsPluginOptions) return GraphQLRequest(document: document.stringValue, variables: document.variables, @@ -229,7 +232,7 @@ extension GraphQLRequest: ModelSyncGraphQLRequestFactory { documentBuilder.add(decorator: AuthRuleDecorator(.query, authType: authType)) let document = documentBuilder.build() - let awsPluginOptions = AWSPluginOptions(authType: authType, modelName: modelSchema.name) + let awsPluginOptions = AWSAPIPluginDataStoreOptions(authType: authType, modelName: modelSchema.name) let requestOptions = GraphQLRequest.Options(pluginOptions: awsPluginOptions) return GraphQLRequest(document: document.stringValue, @@ -259,7 +262,7 @@ extension GraphQLRequest: ModelSyncGraphQLRequestFactory { documentBuilder.add(decorator: AuthRuleDecorator(.mutation, authType: authType)) let document = documentBuilder.build() - let awsPluginOptions = AWSPluginOptions(authType: authType, modelName: modelSchema.name) + let awsPluginOptions = AWSAPIPluginDataStoreOptions(authType: authType, modelName: modelSchema.name) let requestOptions = GraphQLRequest.Options(pluginOptions: awsPluginOptions) return GraphQLRequest(document: document.stringValue, diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Core/DataStoreModelDecoder.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Core/DataStoreModelDecoder.swift index 54ede2c97c..ea4cd42ac9 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Core/DataStoreModelDecoder.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Core/DataStoreModelDecoder.swift @@ -8,17 +8,17 @@ import Foundation import Amplify import SQLite +import AWSPluginsCore public struct DataStoreModelDecoder: ModelProviderDecoder { - public static let DataStoreSource = "DataStore" - /// Metadata that contains the foreign key value of a parent model, which is the primary key of the model to be loaded. struct Metadata: Codable { let identifiers: [LazyReferenceIdentifier] let source: String - init(identifiers: [LazyReferenceIdentifier], source: String = DataStoreSource) { + init(identifiers: [LazyReferenceIdentifier], + source: String = ModelProviderRegistry.DecoderSource.dataStore) { self.identifiers = identifiers self.source = source } @@ -38,7 +38,7 @@ public struct DataStoreModelDecoder: ModelProviderDecoder { public static func decode(modelType: ModelType.Type, decoder: Decoder) -> AnyModelProvider? { if let metadata = try? DataStoreModelDecoder.Metadata(from: decoder) { - if metadata.source == DataStoreSource { + if metadata.source == ModelProviderRegistry.DecoderSource.dataStore { return DataStoreModelProvider(metadata: metadata).eraseToAnyModelProvider() } else { return nil diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Core/DataStoreModelProvider.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Core/DataStoreModelProvider.swift index 4b43c7de2f..64878140f9 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Core/DataStoreModelProvider.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Core/DataStoreModelProvider.swift @@ -55,9 +55,7 @@ public class DataStoreModelProvider: ModelProvider { switch loadedState { case .notLoaded(let identifiers): let metadata = DataStoreModelDecoder.Metadata(identifiers: identifiers ?? []) - var container = encoder.singleValueContainer() - try container.encode(metadata) - + try metadata.encode(to: encoder) case .loaded(let element): try element.encode(to: encoder) } diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift index e7ea029f6f..e66eb2df70 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift @@ -191,7 +191,8 @@ class SyncMutationToCloudOperation: AsynchronousOperation { return nil } - let awsPluginOptions = AWSPluginOptions(authType: authType, modelName: mutationEvent.modelName) + let awsPluginOptions = AWSAPIPluginDataStoreOptions(authType: authType, + modelName: mutationEvent.modelName) request.options = GraphQLRequest.Options(pluginOptions: awsPluginOptions) return request } @@ -247,7 +248,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation { } resolveReachabilityPublisher(request: request) - if let pluginOptions = request.options?.pluginOptions as? AWSPluginOptions, pluginOptions.authType != nil, + if let pluginOptions = request.options?.pluginOptions as? AWSAPIPluginDataStoreOptions, pluginOptions.authType != nil, let nextAuthType = authTypesIterator?.next() { scheduleRetry(advice: advice, withAuthType: nextAuthType) } else { diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift index c45ba79650..d5dae69b37 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift @@ -43,7 +43,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { private let awsAuthService: AWSAuthServiceBehavior private let consistencyQueue: DispatchQueue - + private let taskQueue: TaskQueue private let modelName: ModelName init(modelSchema: ModelSchema, @@ -58,6 +58,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { self.consistencyQueue = DispatchQueue( label: "com.amazonaws.Amplify.RemoteSyncEngine.\(modelSchema.name)" ) + self.taskQueue = TaskQueue() self.modelName = modelSchema.name self.connectionStatusQueue = OperationQueue() @@ -170,7 +171,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { func sendConnectionEventIfConnected(event: Event) { if combinedConnectionStatusIsConnected { - incomingSubscriptionEvents.send(event) + send(event) } } @@ -178,18 +179,18 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { if case .connection = event { connectionStatusQueue.addOperation(cancelAwareBlock) } else { - incomingSubscriptionEvents.send(event) + send(event) } } func genericCompletionListenerHandler(result: Result) { switch result { case .success: - incomingSubscriptionEvents.send(completion: .finished) + send(completion: .finished) case .failure(let apiError): log.verbose("[InitializeSubscription.1] API.subscribe failed for `\(modelName)` error: \(apiError.errorDescription)") let dataStoreError = DataStoreError(error: apiError) - incomingSubscriptionEvents.send(completion: .failure(dataStoreError)) + send(completion: .failure(dataStoreError)) } } @@ -237,6 +238,20 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { incomingSubscriptionEvents.subscribe(subscriber) } + func send(_ event: Event) { + taskQueue.async { [weak self] in + guard let self else { return } + self.incomingSubscriptionEvents.send(event) + } + } + + func send(completion: Subscribers.Completion) { + taskQueue.async { [weak self] in + guard let self else { return } + self.incomingSubscriptionEvents.send(completion: completion) + } + } + func cancel() { consistencyQueue.sync { genericCompletionListenerHandler(result: .successfulVoid) diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Storage/StorageEngineTestsLazyPostComment4V2Tests.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Storage/StorageEngineTestsLazyPostComment4V2Tests.swift index f6801b0394..62f06849b6 100644 --- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Storage/StorageEngineTestsLazyPostComment4V2Tests.swift +++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Storage/StorageEngineTestsLazyPostComment4V2Tests.swift @@ -257,6 +257,13 @@ final class StorageEngineTestsLazyPostComment4V2Tests: StorageEngineTestsBase, S XCTFail("Failed to query saved comment") return } + + let expectedJSONOutput = """ + {\"content\":\"content\",\"createdAt\":null,\"id\":\"\(comment.id)\",\"post\":{\"identifiers\":[{\"name\":\"id\",\"value\":\"\(post.id)\"}],\"source\":\"DataStore\"},\"updatedAt\":null} + """ + + XCTAssertEqual(expectedJSONOutput, try queriedCommentLazyLoadedPost.toJSON()) + switch queriedCommentLazyLoadedPost._post.modelProvider.getState() { case .notLoaded(let identifiers): guard let identifiers = identifiers else { diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisherTests.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisherTests.swift new file mode 100644 index 0000000000..48100ba687 --- /dev/null +++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisherTests.swift @@ -0,0 +1,109 @@ +// +// Copyright Amazon.com Inc. or its affiliates. +// All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + +import XCTest +@testable import Amplify +@testable import AmplifyTestCommon +@testable import AWSPluginsCore +@testable import AWSDataStorePlugin + +final class IncomingAsyncSubscriptionEventPublisherTests: XCTestCase { + var apiPlugin: MockAPICategoryPlugin! + override func setUp() { + apiPlugin = MockAPICategoryPlugin() + ModelRegistry.register(modelType: Post.self) + } + + /// This test was written to to reproduce a bug where the subscribe would miss events emitted by the publisher. + /// The pattern in this test using the publisher (`IncomingAsyncSubscriptionEventPublisher`) and subscriber + /// (`IncomingAsyncSubscriptionEventToAnyModelMapper`) are identical to the usage in `AWSModelReconciliationQueue.init()`. + /// + /// See the changes in this PR: https://github.com/aws-amplify/amplify-swift/pull/3489 + /// + /// Before the PR changes, the publisher would emit events concurrently which caused some of them to be missed + /// by the subscriber even though the subscriber applied back pressure to process one event at a time (demand + /// of `max(1)`). For more details regarding back-pressure, see + /// https://developer.apple.com/documentation/combine/processing-published-elements-with-subscribers + /// + /// The change, to publish the events though the same TaskQueue ensures that the events are properly buffered + /// and sent only when the subscriber demands for it. + func testSubscriberRecievedEvents() async throws { + let expectedEvents = expectation(description: "Expected number of ") + let numberOfEvents = 50 + expectedEvents.expectedFulfillmentCount = numberOfEvents + let asyncEvents = await IncomingAsyncSubscriptionEventPublisher( + modelSchema: Post.schema, + api: apiPlugin, + modelPredicate: nil, + auth: nil, + authModeStrategy: AWSDefaultAuthModeStrategy(), + awsAuthService: nil) + let mapper = IncomingAsyncSubscriptionEventToAnyModelMapper() + asyncEvents.subscribe(subscriber: mapper) + let sink = mapper + .publisher + .sink( + receiveCompletion: { _ in }, + receiveValue: { _ in + expectedEvents.fulfill() + } + ) + DispatchQueue.concurrentPerform(iterations: numberOfEvents) { index in + asyncEvents.send(.connection(.connected)) + } + + await fulfillment(of: [expectedEvents], timeout: 2) + sink.cancel() + } + + /// Ensure that the publisher-subscriber with back pressure is receiving all the events in the order in which they were sent. + func testSubscriberRecievedEventsInOrder() async throws { + let expectedEvents = expectation(description: "Expected number of ") + let expectedOrder = AtomicValue<[String]>(initialValue: []) + let actualOrder = AtomicValue<[String]>(initialValue: []) + let numberOfEvents = 50 + expectedEvents.expectedFulfillmentCount = numberOfEvents + let asyncEvents = await IncomingAsyncSubscriptionEventPublisher( + modelSchema: Post.schema, + api: apiPlugin, + modelPredicate: nil, + auth: nil, + authModeStrategy: AWSDefaultAuthModeStrategy(), + awsAuthService: nil) + let mapper = IncomingAsyncSubscriptionEventToAnyModelMapper() + asyncEvents.subscribe(subscriber: mapper) + let sink = mapper + .publisher + .sink( + receiveCompletion: { _ in }, + receiveValue: { event in + switch event { + case .payload(let mutationSync): + actualOrder.append(mutationSync.syncMetadata.modelId) + default: + break + } + expectedEvents.fulfill() + } + ) + + for index in 0...deleteMutation( + of: comment, + modelSchema: Comment.schema, + version: 1) + do { + let result = try await Amplify.API.mutate(request: deleteRequest) + print(result) + } catch { + XCTFail("Failed to send mutation request \(error)") + } + + await fulfillment(of: [mutationEventReceived], timeout: 60) + mutationEvents.cancel() + } } extension AWSDataStoreLazyLoadBlogPostComment8V2Tests {