Skip to content

Commit

Permalink
Extend Firebase SDK with new APIs to consume streaming callable funct…
Browse files Browse the repository at this point in the history
…ion response.

- Handling the server-sent event (SSE) parsing internally
- Providing proper error handling and connection management
- Maintaining memory efficiency for long-running streams
  • Loading branch information
MustafaJadid2025 committed Dec 26, 2024
1 parent 50eacb7 commit 9e13ef7
Show file tree
Hide file tree
Showing 5 changed files with 536 additions and 30 deletions.
116 changes: 86 additions & 30 deletions firebase-functions/src/androidTest/backend/functions/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.

const assert = require('assert');
const functions = require('firebase-functions');
const assert = require("assert");
const functions = require("firebase-functions");

exports.dataTest = functions.https.onRequest((request, response) => {
assert.deepEqual(request.body, {
data: {
bool: true,
int: 2,
long: {
value: '3',
'@type': 'type.googleapis.com/google.protobuf.Int64Value',
"bool": true,
"int": 2,
"long": {
"value": "3",
"@type": "type.googleapis.com/google.protobuf.Int64Value",
},
string: 'four',
array: [5, 6],
'null': null,
}
"string": "four",
"array": [5, 6],
"null": null,
},
});
response.send({
data: {
message: 'stub response',
message: "stub response",
code: 42,
long: {
value: '420',
'@type': 'type.googleapis.com/google.protobuf.Int64Value',
"value": "420",
"@type": "type.googleapis.com/google.protobuf.Int64Value",
},
}
},
});
});

Expand All @@ -47,28 +47,29 @@ exports.scalarTest = functions.https.onRequest((request, response) => {
});

exports.tokenTest = functions.https.onRequest((request, response) => {
assert.equal(request.get('Authorization'), 'Bearer token');
assert.equal(request.get("Authorization"), "Bearer token");
assert.deepEqual(request.body, {data: {}});
response.send({data: {}});
});

exports.instanceIdTest = functions.https.onRequest((request, response) => {
assert.equal(request.get('Firebase-Instance-ID-Token'), 'iid');
assert.equal(request.get("Firebase-Instance-ID-Token"), "iid");
assert.deepEqual(request.body, {data: {}});
response.send({data: {}});
});

exports.appCheckTest = functions.https.onRequest((request, response) => {
assert.equal(request.get('X-Firebase-AppCheck'), 'appCheck');
assert.equal(request.get("X-Firebase-AppCheck"), "appCheck");
assert.deepEqual(request.body, {data: {}});
response.send({data: {}});
});

exports.appCheckLimitedUseTest = functions.https.onRequest((request, response) => {
assert.equal(request.get('X-Firebase-AppCheck'), 'appCheck-limited-use');
assert.deepEqual(request.body, {data: {}});
response.send({data: {}});
});
exports.appCheckLimitedUseTest = functions.https.onRequest(
(request, response) => {
assert.equal(request.get("X-Firebase-AppCheck"), "appCheck-limited-use");
assert.deepEqual(request.body, {data: {}});
response.send({data: {}});
});

exports.nullTest = functions.https.onRequest((request, response) => {
assert.deepEqual(request.body, {data: null});
Expand All @@ -82,15 +83,15 @@ exports.missingResultTest = functions.https.onRequest((request, response) => {

exports.unhandledErrorTest = functions.https.onRequest((request, response) => {
// Fail in a way that the client shouldn't see.
throw 'nope';
throw new Error("nope");
});

exports.unknownErrorTest = functions.https.onRequest((request, response) => {
// Send an http error with a body with an explicit code.
response.status(400).send({
error: {
status: 'THIS_IS_NOT_VALID',
message: 'this should be ignored',
status: "THIS_IS_NOT_VALID",
message: "this should be ignored",
},
});
});
Expand All @@ -99,14 +100,14 @@ exports.explicitErrorTest = functions.https.onRequest((request, response) => {
// Send an http error with a body with an explicit code.
response.status(400).send({
error: {
status: 'OUT_OF_RANGE',
message: 'explicit nope',
status: "OUT_OF_RANGE",
message: "explicit nope",
details: {
start: 10,
end: 20,
long: {
value: '30',
'@type': 'type.googleapis.com/google.protobuf.Int64Value',
"value": "30",
"@type": "type.googleapis.com/google.protobuf.Int64Value",
},
},
},
Expand All @@ -122,3 +123,58 @@ exports.timeoutTest = functions.https.onRequest((request, response) => {
// Wait for longer than 500ms.
setTimeout(() => response.send({data: true}), 500);
});

const data = ["hello", "world", "this", "is", "cool"];

/**
* Pauses the execution for a specified amount of time.
* @param {number} ms - The number of milliseconds to sleep.
* @return {Promise<void>} A promise that resolves after the specified time.
*/
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

/**
* Generates chunks of text asynchronously, yielding one chunk at a time.
* @async
* @generator
* @yields {string} A chunk of text from the data array.
*/
async function* generateText() {
for (const chunk of data) {
yield chunk;
await sleep(1000);
}
}

exports.genStream = functions.https.onCall(async (request, response) => {
if (response && response.acceptsStreaming) {
for await (const chunk of generateText()) {
console.log("got chunk", chunk);
response.write({chunk});
}
}
return data.join(" ");
});

exports.genStreamError = functions.https.onCall(async (request, response) => {
if (response && response.acceptsStreaming) {
for await (const chunk of generateText()) {
console.log("got chunk", chunk);
response.write({chunk});
}
throw new Error("BOOM");
}
});

exports.genStreamNoReturn = functions.https.onCall(
async (request, response) => {
if (response && response.acceptsStreaming) {
for await (const chunk of generateText()) {
console.log("got chunk", chunk);
response.write({chunk});
}
}
},
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package com.google.firebase.functions.ktx

import androidx.test.InstrumentationRegistry
import androidx.test.runner.AndroidJUnit4
import com.google.android.gms.tasks.Tasks
import com.google.common.truth.Truth.assertThat
import com.google.firebase.FirebaseApp
import com.google.firebase.functions.FirebaseFunctions
import com.google.firebase.functions.FirebaseFunctionsException
import com.google.firebase.functions.SSETaskListener
import com.google.firebase.ktx.Firebase
import com.google.firebase.ktx.initialize
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit
import org.junit.After
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith

@RunWith(AndroidJUnit4::class)
class StreamTests {

private lateinit var app: FirebaseApp
private lateinit var listener: SSETaskListener

private lateinit var functions: FirebaseFunctions
var onNext = mutableListOf<Any>()
var onError: Any? = null
var onComplete: Any? = null

@Before
fun setup() {
app = Firebase.initialize(InstrumentationRegistry.getContext())!!
functions = FirebaseFunctions.getInstance()
listener =
object : SSETaskListener {
override fun onNext(event: Any) {
onNext.add(event)
}

override fun onError(event: Any) {
onError = event
}

override fun onComplete(event: Any) {
onComplete = event
}
}
}

@After
fun clear() {
onNext.clear()
onError = null
onComplete = null
}

@Test
fun testGenStream() {
val input = hashMapOf("data" to "Why is the sky blue")

val function = functions.getHttpsCallable("genStream")
val httpsCallableResult = Tasks.await(function.stream(input, listener))

val onNextStringList = onNext.map { it.toString() }
assertThat(onNextStringList)
.containsExactly(
"{chunk=hello}",
"{chunk=world}",
"{chunk=this}",
"{chunk=is}",
"{chunk=cool}"
)
assertThat(onError).isNull()
assertThat(onComplete).isEqualTo("hello world this is cool")
assertThat(httpsCallableResult.data).isEqualTo("hello world this is cool")
}

@Test
fun testGenStreamError() {
val input = hashMapOf("data" to "Why is the sky blue")
val function = functions.getHttpsCallable("genStreamError").withTimeout(7, TimeUnit.SECONDS)

try {
Tasks.await(function.stream(input, listener))
} catch (exception: Exception) {
onError = exception
}

val onNextStringList = onNext.map { it.toString() }
assertThat(onNextStringList)
.containsExactly(
"{chunk=hello}",
"{chunk=world}",
"{chunk=this}",
"{chunk=is}",
"{chunk=cool}"
)
assertThat(onError).isInstanceOf(ExecutionException::class.java)
val cause = (onError as ExecutionException).cause
assertThat(cause).isInstanceOf(FirebaseFunctionsException::class.java)
assertThat((cause as FirebaseFunctionsException).message).contains("stream was reset: CANCEL")
assertThat(onComplete).isNull()
}

@Test
fun testGenStreamNoReturn() {
val input = hashMapOf("data" to "Why is the sky blue")

val function = functions.getHttpsCallable("genStreamNoReturn")
try {
Tasks.await(function.stream(input, listener), 7, TimeUnit.SECONDS)
} catch (_: Exception) {}

val onNextStringList = onNext.map { it.toString() }
assertThat(onNextStringList)
.containsExactly(
"{chunk=hello}",
"{chunk=world}",
"{chunk=this}",
"{chunk=is}",
"{chunk=cool}"
)
assertThat(onError).isNull()
assertThat(onComplete).isNull()
}
}
Loading

0 comments on commit 9e13ef7

Please sign in to comment.