Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Streamable functions #14395

Open
wants to merge 55 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
231d602
[Infra] Update functions workflow to use macOS 15 for Xcode 16 jobs (…
eBlender Nov 8, 2024
a14d964
Stremable Functions.
eBlender Dec 20, 2024
a92d7c2
Changed return type.
eBlender Dec 20, 2024
10bec1d
Lint test
eBlender Dec 20, 2024
53a2aab
Remove test function
eBlender Dec 20, 2024
758fbed
Remove old test.
eBlender Dec 20, 2024
93b6c8b
Updated function, add full test.
eBlender Dec 27, 2024
a7e8fe8
Update functions
eBlender Jan 2, 2025
6d59fcd
Update FunctionsTests.swift
eBlender Jan 2, 2025
51f02b8
Cleanup HTTPCallable
eBlender Jan 2, 2025
7b61076
Add documentation for processResponseDataForStreamableContent
eBlender Jan 2, 2025
a95449e
Update Functions.swift
eBlender Jan 2, 2025
cdc49ee
Update Functions.swift
eBlender Jan 2, 2025
426b6bc
Update FunctionsTests.swift
eBlender Jan 2, 2025
9cb0a5e
Update and Cleanup
eBlender Jan 3, 2025
ad31052
Update IntegrationTests.swift
eBlender Jan 3, 2025
6ee9000
Clean up
eBlender Jan 3, 2025
1ffe73d
Update check.sh
eBlender Jan 3, 2025
9fcd91e
Bump to Main.
eBlender Jan 6, 2025
177aa8e
Merge branch 'main' into iOS-Stremable-Functions
eBlender Jan 6, 2025
f4d678b
Cleanup
eBlender Jan 6, 2025
74557e7
Merge branch 'iOS-Stremable-Functions' of https://github.com/eBlender…
eBlender Jan 6, 2025
18f748b
Update Functions.swift
eBlender Jan 7, 2025
4f956fb
Lint check
eBlender Jan 7, 2025
4edc0ad
Function concurrency error
eBlender Jan 7, 2025
e50f69c
Update .github/workflows/functions.yml
eBlender Jan 15, 2025
7356cf9
Update FirebaseFunctions/Tests/Unit/FunctionsTests.swift
eBlender Jan 15, 2025
aed47d6
Delete firebase-database-emulator.log
eBlender Jan 15, 2025
f6c6cff
Delete firebase-database-emulator.pid
eBlender Jan 15, 2025
75a7574
Update function error handling.
eBlender Jan 15, 2025
adf7366
Merge branch 'iOS-Stremable-Functions' of https://github.com/eBlender…
eBlender Jan 15, 2025
9ef7411
Update FirebaseFunctions/Tests/Unit/FunctionsTests.swift
eBlender Jan 16, 2025
4ee820e
Update FunctionsTests.swift
eBlender Jan 16, 2025
fd68f01
Merge branch 'iOS-Stremable-Functions' of https://github.com/eBlender…
eBlender Jan 16, 2025
f27bf07
Update FunctionsTests.swift
eBlender Jan 16, 2025
1ffa4f0
Format and refactoring.
eBlender Jan 16, 2025
80f0991
Update FirebaseFunctions/Tests/Unit/FunctionsTests.swift
eBlender Jan 16, 2025
756dc26
Update FirebaseFunctions/Tests/Unit/FunctionsTests.swift
eBlender Jan 16, 2025
f031c1f
Update FirebaseFunctions/Tests/Unit/FunctionsTests.swift
eBlender Jan 16, 2025
0df7f8d
Update FirebaseFunctions/Tests/Unit/FunctionsTests.swift
eBlender Jan 16, 2025
3e325aa
[WIP] Add generic and basic streaming implementation
ncooke3 Jan 23, 2025
95fc340
Merge branch 'main' into iOS-Stremable-Functions
ncooke3 Jan 23, 2025
a712525
Merge branch 'main' into iOS-Stremable-Functions
ncooke3 Jan 23, 2025
7b42c35
Move unit tests
ncooke3 Jan 23, 2025
6dc3959
Merge remote-tracking branch 'origin/main' into iOS-Stremable-Functions
ncooke3 Jan 27, 2025
ca53153
Post main sync checkpoint
ncooke3 Jan 27, 2025
d3e476c
Copy over more structure from vertex
ncooke3 Jan 27, 2025
e33f74c
Updated to changes in 14376 so the impl is closer to Vertex on how th…
ncooke3 Jan 29, 2025
b98f71d
Update - remove tests
eBlender Jan 28, 2025
0ea118f
Add Documentation.
eBlender Jan 29, 2025
20892fd
Update Functions.swift
eBlender Jan 29, 2025
83eae53
Update IntegrationTests.swift
eBlender Jan 29, 2025
749b52d
Add more integration tests
ncooke3 Jan 29, 2025
480f7c3
Remove unneeded code
ncooke3 Jan 29, 2025
5587eb7
Fix comments
ncooke3 Jan 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions FirebaseFunctions/Backend/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ const assert = require('assert');
const functionsV1 = require('firebase-functions/v1');
const functionsV2 = require('firebase-functions/v2');

// MARK: - Utilities

function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
};

// MARK: - Callable Functions

exports.dataTest = functionsV1.https.onRequest((request, response) => {
assert.deepEqual(request.body, {
data: {
Expand Down Expand Up @@ -121,10 +129,6 @@ exports.timeoutTest = functionsV1.https.onRequest((request, response) => {

const streamData = ["hello", "world", "this", "is", "cool"]

function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
};

async function* generateText() {
for (const chunk of streamData) {
yield chunk;
Expand Down Expand Up @@ -153,3 +157,29 @@ exports.genStreamError = functionsV2.https.onCall(
}
}
);

const weatherForecasts = {
Toronto: { conditions: 'snowy', temperature: 25 },
London: { conditions: 'rainy', temperature: 50 },
Dubai: { conditions: 'sunny', temperature: 75 }
};

async function* generateForecast(locations) {
for (const location of locations) {
yield { 'location': location, ...weatherForecasts[location.name] };
await sleep(500);
}
};

exports.genStreamWeather = functionsV2.https.onCall(
async (request, response) => {
if (request.acceptsStreaming) {
for await (const chunk of generateForecast(request.data)) {
response.sendChunk({ chunk });
}
}
return "Number of forecasts generated: " + request.data.length;
}
);

// TODO: Maybe a function that returns Void?
1 change: 1 addition & 0 deletions FirebaseFunctions/Backend/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ FUNCTIONS_BIN="./node_modules/.bin/functions"
"${FUNCTIONS_BIN}" deploy timeoutTest --trigger-http
"${FUNCTIONS_BIN}" deploy genStream --trigger-http
"${FUNCTIONS_BIN}" deploy genStreamError --trigger-http
"${FUNCTIONS_BIN}" deploy genStreamWeather --trigger-http

if [ "$1" != "synchronous" ]; then
# Wait for the user to tell us to stop the server.
Expand Down
23 changes: 23 additions & 0 deletions FirebaseFunctions/Sources/Callable+Codable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,27 @@ public struct Callable<Request: Encodable, Response: Decodable> {
public func callAsFunction(_ data: Request) async throws -> Response {
return try await call(data)
}

// TODO: Look into handling parameter-less functions.
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
public func stream(_ data: Request) -> AsyncThrowingStream<Response, Error> {
return AsyncThrowingStream { continuation in
Task {
do {
let encoded = try encoder.encode(data)
for try await result in callable.stream(encoded) {
if let response = try? decoder.decode([String: Response].self, from: result.data) {
continuation.yield(response["chunk"]!)
} else if let response = try? decoder.decode(Response.self, from: result.data) {
continuation.yield(response)
}
// TODO: Silently failing. The response cannot be decoded to the given type.
}
} catch {
continuation.finish(throwing: error)
}
continuation.finish()
}
}
}
}
167 changes: 165 additions & 2 deletions FirebaseFunctions/Sources/Functions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,167 @@ enum FunctionsConstants {
}
}

/// Function to initialize a streamaing event for an HTTPCallable
/// - Parameters:
/// - url: The url of the Callable HTTPS trigger.
/// - data: Object to be sent in the request.
/// - options: The options with which to customize the Callable HTTPS trigger.
/// - timeout: timeout for the HTTPSCallableResult request.
/// - Returns: HTTPSCallableResult Streaming.
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
func stream(at url: URL,
withObject data: Any?,
options: HTTPSCallableOptions?,
timeout: TimeInterval)
-> AsyncThrowingStream<HTTPSCallableResult, Error> {
AsyncThrowingStream { continuation in
// TODO: Vertex prints the curl command. Should this?

Task {
// TODO: This API does not throw. Should the throwing request
// setup be in the stream or one level up?
let urlRequest: URLRequest
do {
let context = try await contextProvider.context(options: options)
urlRequest = try makeRequestForStreamableContent(
url: url,
data: data,
options: options,
timeout: timeout,
context: context
)
} catch {
continuation.finish(throwing: error)
return
}

let stream: URLSession.AsyncBytes
let rawResponse: URLResponse
do {
// TODO: Look into injecting URLSession for unit tests.
(stream, rawResponse) = try await URLSession.shared.bytes(for: urlRequest)
} catch {
continuation.finish(throwing: error)
return
}

// Verify the status code is an HTTP response.
guard let response = rawResponse as? HTTPURLResponse else {
continuation.finish(
throwing: FunctionsError(
.internal,
userInfo: [NSLocalizedDescriptionKey: "Response was not an HTTP response."]
)
)
return
}

// Verify the status code is a 200.
guard response.statusCode == 200 else {
continuation.finish(
throwing: FunctionsError(
.internal,
userInfo: [NSLocalizedDescriptionKey: "Response is not a successful 200."]
)
)
return
}

for try await line in stream.lines {
if line.hasPrefix("data:") {
// We can assume 5 characters since it's utf-8 encoded, removing `data:`.
let jsonText = String(line.dropFirst(5))
let data: Data
do {
// TODO: The error potentially thrown here is not a Functions error.
data = try jsonData(jsonText: jsonText)
} catch {
continuation.finish(throwing: error)
return
}

// Handle the content and parse it.
do {
let content = try callableResult(fromResponseData: data)
continuation.yield(content)
} catch {
continuation.finish(throwing: error)
return
}
} else {
continuation.finish(
throwing: FunctionsError(
.internal,
userInfo: [NSLocalizedDescriptionKey: "Unexpected format for streamed response."]
)
)
}
}

continuation.finish(throwing: nil)
}
}
}

private func jsonData(jsonText: String) throws -> Data {
guard let data = jsonText.data(using: .utf8) else {
throw DecodingError.dataCorrupted(DecodingError.Context(
codingPath: [],
debugDescription: "Could not parse response as UTF8."
))
}
return data
}

private func makeRequestForStreamableContent(url: URL,
data: Any?,
options: HTTPSCallableOptions?,
timeout: TimeInterval,
context: FunctionsContext) throws
-> URLRequest {
var urlRequest = URLRequest(
url: url,
cachePolicy: .useProtocolCachePolicy,
timeoutInterval: timeout
)

let data = data ?? NSNull()
let encoded = try serializer.encode(data)
let body = ["data": encoded]
let payload = try JSONSerialization.data(withJSONObject: body, options: [.fragmentsAllowed])
urlRequest.httpBody = payload

// Set the headers for starting a streaming session.
urlRequest.setValue("application/json", forHTTPHeaderField: "Content-Type")
urlRequest.setValue("text/event-stream", forHTTPHeaderField: "Accept")
urlRequest.httpMethod = "POST"

if let authToken = context.authToken {
let value = "Bearer \(authToken)"
urlRequest.setValue(value, forHTTPHeaderField: "Authorization")
}

if let fcmToken = context.fcmToken {
urlRequest.setValue(fcmToken, forHTTPHeaderField: Constants.fcmTokenHeader)
}

if options?.requireLimitedUseAppCheckTokens == true {
if let appCheckToken = context.limitedUseAppCheckToken {
urlRequest.setValue(
appCheckToken,
forHTTPHeaderField: Constants.appCheckTokenHeader
)
}
} else if let appCheckToken = context.appCheckToken {
urlRequest.setValue(
appCheckToken,
forHTTPHeaderField: Constants.appCheckTokenHeader
)
}

return urlRequest
}

private func makeFetcher(url: URL,
data: Any?,
options: HTTPSCallableOptions?,
Expand Down Expand Up @@ -564,8 +725,10 @@ enum FunctionsConstants {
throw FunctionsError(.internal, userInfo: userInfo)
}

// `result` is checked for backwards compatibility:
guard let dataJSON = responseJSON["data"] ?? responseJSON["result"] else {
// `result` is checked for backwards compatibility,
// `message` is checked for StreamableContent:
guard let dataJSON = responseJSON["data"] ?? responseJSON["result"] ?? responseJSON["message"]
else {
let userInfo = [NSLocalizedDescriptionKey: "Response is missing data field."]
throw FunctionsError(.internal, userInfo: userInfo)
}
Expand Down
7 changes: 6 additions & 1 deletion FirebaseFunctions/Sources/HTTPSCallable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ open class HTTPSCallable: NSObject {
// The functions client to use for making calls.
private let functions: Functions

private let url: URL
let url: URL

private let options: HTTPSCallableOptions?

Expand Down Expand Up @@ -143,4 +143,9 @@ open class HTTPSCallable: NSObject {
try await functions
.callFunction(at: url, withObject: data, options: options, timeout: timeoutInterval)
}

@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
func stream(_ data: Any? = nil) -> AsyncThrowingStream<HTTPSCallableResult, Error> {
functions.stream(at: url, withObject: data, options: options, timeout: timeoutInterval)
}
}
Loading
Loading