Skip to content

feat(cloud_functions): add support for cloud functions stream #17214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
5bbf13d
chore: add platform interface and method channel implementation for C…
SelaseKay Mar 18, 2025
24acdda
chore: add `httpsCallableStreamFromUrl` and `httpsStreamCallableWithUri`
SelaseKay Mar 18, 2025
fa00dd9
chore: resolve comments
SelaseKay Mar 21, 2025
fa750ed
chore: add Android implementation for Cloud Functions stream
SelaseKay Mar 21, 2025
3e8ac29
chore: resolve formatting issues
SelaseKay Mar 21, 2025
c248cd6
chore: correct variable name
SelaseKay Mar 21, 2025
1ac4533
chore: add support for Cloud Functions Stream(Android)
SelaseKay Mar 23, 2025
d8e0fce
Merge branch 'main' into feat/cloud_functions_stream_support
SelaseKay Mar 24, 2025
95236c8
chore: create dedicated StreamHandler class
SelaseKay Mar 24, 2025
785e019
Merge branch 'main' into feat/cloud_functions_stream_support
SelaseKay Mar 24, 2025
6161988
chore: add streamhandler implementation for ios
SelaseKay Mar 25, 2025
debdc46
Merge branch 'main' into feat/cloud_functions_stream_support
SelaseKay Mar 26, 2025
4d0c10e
chore: add iOS implementation for Cloud Functions stream
SelaseKay Mar 28, 2025
1190fde
chore: add license header to stream handler files
SelaseKay Mar 28, 2025
6bbde2f
chore: web Cloud Functions stream wip
SelaseKay Apr 1, 2025
66be89b
chore: push all
SelaseKay Apr 1, 2025
4804ab9
chore: update functions based on API Doc modification
SelaseKay Apr 2, 2025
4f83c36
chore: clean up code
SelaseKay Apr 3, 2025
213e283
chore: add web package
SelaseKay Apr 3, 2025
de10bcc
Merge branch 'main' into feat/cloud_functions_stream_support
SelaseKay Apr 3, 2025
794d441
chore: add streaming example
SelaseKay Apr 3, 2025
560e3eb
Merge branch 'feat/cloud_functions_stream_support' of github.com:fire…
SelaseKay Apr 3, 2025
6ad0820
chore: fix ci issues
SelaseKay Apr 3, 2025
a9819db
chore: fix ci
SelaseKay Apr 3, 2025
295c6c6
chore: fix cloud function test
SelaseKay Apr 3, 2025
7a5ad4e
chore: add missing doc
SelaseKay Apr 4, 2025
45fea6f
chore: fixes and clean up
SelaseKay Apr 9, 2025
047354a
chore: add e2e for Cloud Functions Stream
SelaseKay Apr 10, 2025
045150d
chore: fix formatting issue
SelaseKay Apr 10, 2025
69ef58f
chore: add more tests and fix timeout for Android
SelaseKay Apr 11, 2025
693aa89
chore: add test for map and list
SelaseKay Apr 11, 2025
aa77160
chore: fix test
SelaseKay Apr 11, 2025
06c1c9c
chore: update year to 2025 in files
SelaseKay Apr 16, 2025
1404359
chore(web): add support for abort signal
SelaseKay Apr 17, 2025
43c223c
chore: resolve comments and add test for Abort
SelaseKay Apr 17, 2025
df743ee
chore: fix test
SelaseKay Apr 17, 2025
c5136d6
chore: fix test
SelaseKay Apr 17, 2025
3964f3b
chore: update copyright year
SelaseKay Apr 18, 2025
f3af9e8
chore: print error to console
SelaseKay Apr 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: clean up code
  • Loading branch information
SelaseKay committed Apr 3, 2025
commit 4f83c36c2fa4c683e575df3ef5921794cc311cd0
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@

import android.net.Uri;
import com.google.firebase.functions.FirebaseFunctions;
import com.google.firebase.functions.HttpsCallableOptions;
import com.google.firebase.functions.HttpsCallableReference;
import com.google.firebase.functions.StreamResponse;
import io.flutter.plugin.common.EventChannel;
import io.flutter.plugin.common.EventChannel.StreamHandler;
import java.net.URL;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;

public class FirebaseFunctionsStreamHandler implements StreamHandler {
Expand Down Expand Up @@ -41,21 +45,37 @@ private void httpsStreamCall(Map<String, Object> arguments, EventChannel.EventSi
String functionName = (String) arguments.get("functionName");
String functionUri = (String) arguments.get("functionUri");
String origin = (String) arguments.get("origin");
Integer timeout = (Integer) arguments.get("timeout");
Object parameters = arguments.get("parameters");
boolean limitedUseAppCheckToken =
(boolean) Objects.requireNonNull(arguments.get("limitedUseAppCheckToken"));

if (origin != null) {
Uri originUri = Uri.parse(origin);
firebaseFunctions.useEmulator(originUri.getHost(), originUri.getPort());
}

HttpsCallableReference httpsCallableReference;
HttpsCallableOptions options =
new HttpsCallableOptions.Builder()
.setLimitedUseAppCheckTokens(limitedUseAppCheckToken)
.build();

Publisher<StreamResponse> publisher;
if (functionName != null) {
publisher = firebaseFunctions.getHttpsCallable(functionName).stream(parameters);
httpsCallableReference = firebaseFunctions.getHttpsCallable(functionName, options);
publisher = httpsCallableReference.stream(parameters);
} else if (functionUri != null) {
publisher = firebaseFunctions.getHttpsCallableFromUrl(new URL(functionUri)).stream();
httpsCallableReference =
firebaseFunctions.getHttpsCallableFromUrl(new URL(functionUri), options);
publisher = httpsCallableReference.stream();
} else {
throw new IllegalArgumentException("Either functionName or functionUri must be set");
}

if (timeout != null) {
httpsCallableReference.setTimeout(timeout.longValue(), TimeUnit.MILLISECONDS);
}
subscriber = new StreamResponseSubscriber(events);
publisher.subscribe(subscriber);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(StreamResponse streamResponse) {
Map<String, Object> messageMap = new HashMap<>();
Map<String, Object> responseMap = new HashMap<>();
if (streamResponse instanceof StreamResponse.Message) {
Object message = ((StreamResponse.Message) streamResponse).getMessage().getData();
messageMap.put("message", message);
mainThreadHandler.post(() -> eventSink.success(messageMap));
responseMap.put("message", message);
mainThreadHandler.post(() -> eventSink.success(responseMap));
} else {
Object result = ((StreamResponse.Result) streamResponse).getResult().getData();
messageMap.put("result", result);
mainThreadHandler.post(() -> eventSink.success(messageMap));
responseMap.put("result", result);
mainThreadHandler.post(() -> eventSink.success(responseMap));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class FunctionsStreamHandler: NSObject, FlutterStreamHandler {
let functionUri = arguments["functionUri"] as? String
let origin = arguments["origin"] as? String
let parameters = arguments["arguments"]
let timeout = arguments["timeout"] as? Double
let limitedUseAppCheckToken = arguments["limitedUseAppCheckToken"] as? Bool ?? false

if let origin,
let url = URL(string: origin),
Expand All @@ -53,14 +55,16 @@ class FunctionsStreamHandler: NSObject, FlutterStreamHandler {
functions.useEmulator(withHost: host, port: port)
}

let options = HTTPSCallableOptions(requireLimitedUseAppCheckTokens: limitedUseAppCheckToken)

// Stream handling for iOS 15+
if #available(iOS 15.0, *) {
let function: Callable<AnyEncodable, StreamResponse<AnyDecodable, AnyDecodable>>
var function: Callable<AnyEncodable, StreamResponse<AnyDecodable, AnyDecodable>>

if let functionName {
function = self.functions.httpsCallable(functionName)
function = self.functions.httpsCallable(functionName, options: options)
} else if let functionUri, let url = URL(string: functionUri) {
function = self.functions.httpsCallable(url)
function = self.functions.httpsCallable(url, options: options)
} else {
await MainActor.run {
events(FlutterError(code: "IllegalArgumentException",
Expand All @@ -70,6 +74,10 @@ class FunctionsStreamHandler: NSObject, FlutterStreamHandler {
return
}

if let timeout {
function.timeoutInterval = timeout / 1000
}

do {
let encodedParameters = AnyEncodable(parameters)

Expand All @@ -79,11 +87,9 @@ class FunctionsStreamHandler: NSObject, FlutterStreamHandler {
await MainActor.run {
switch response {
case let .message(message):
var wrappedMessage: [String: Any?] = ["message": message.value]
events(wrappedMessage)
events(["message": message.value])
case let .result(result):
var wrappedResult: [String: Any?] = ["result": result.value]
events(wrappedResult)
events(["result": result.value])
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,27 +100,38 @@ class FirebaseFunctions extends FirebasePluginPlatform {
///
/// Should be the name of the Callable function in Firebase that supports streaming.
HttpsCallableStream httpsStreamCallable(
String name,
) {
String name, {
HttpsCallableOptions? options,
}) {
options ??= HttpsCallableOptions();
assert(name.isNotEmpty);
return HttpsCallableStream._(delegate.httpsStreamCallable(_origin, name));
return HttpsCallableStream._(
delegate.httpsStreamCallable(_origin, name, options));
}

/// A reference to the streaming Callable HTTPS trigger with the given name.
///
/// Should be URL of the 2nd gen Callable function in Firebase.
HttpsCallableStream httpsCallableStreamFromUrl(String url) {
HttpsCallableStream httpsCallableStreamFromUrl(
String url, {
HttpsCallableOptions? options,
}) {
options ??= HttpsCallableOptions();
final uri = Uri.parse(url);
return HttpsCallableStream._(
delegate.httpsStreamCallableWithUri(_origin, uri));
delegate.httpsStreamCallableWithUri(_origin, uri, options));
}

/// A reference to the streaming Callable HTTPS trigger with the given name.
///
/// Should be Uri of the 2nd gen Callable function in Firebase.
HttpsCallableStream httpsCallableStreamFromUri(Uri uri) {
HttpsCallableStream httpsCallableStreamFromUri(
Uri uri, {
HttpsCallableOptions? options,
}) {
options ??= HttpsCallableOptions();
return HttpsCallableStream._(
delegate.httpsStreamCallableWithUri(_origin, uri));
delegate.httpsStreamCallableWithUri(_origin, uri, options));
}

/// Changes this instance to point to a Cloud Functions emulator running locally.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ class MethodChannelFirebaseFunctions extends FirebaseFunctionsPlatform {

@override
HttpsCallableStreamsPlatform httpsStreamCallable(
String? origin, String name) {
return MethodChannelHttpsCallableStreams(this, origin, name, null);
String? origin, String name, HttpsCallableOptions options) {
return MethodChannelHttpsCallableStreams(this, origin, name, options, null);
}

@override
HttpsCallableStreamsPlatform httpsStreamCallableWithUri(
String? origin, Uri uri) {
return MethodChannelHttpsCallableStreams(this, origin, null, uri);
String? origin, Uri uri, HttpsCallableOptions options) {
return MethodChannelHttpsCallableStreams(this, origin, null, options, uri);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import '../../cloud_functions_platform_interface.dart';
class MethodChannelHttpsCallableStreams<R>
extends HttpsCallableStreamsPlatform<R> {
MethodChannelHttpsCallableStreams(FirebaseFunctionsPlatform functions,
String? origin, String? name, Uri? uri)
String? origin, String? name, HttpsCallableOptions options, Uri? uri)
: _transformedUri = uri?.pathSegments.join('_').replaceAll('.', '_'),
super(functions, origin, name, uri) {
super(functions, origin, name, options, uri) {
_eventChannelId = name ?? _transformedUri ?? '';
_channel =
EventChannel('plugins.flutter.io/firebase_functions/$_eventChannelId');
Expand All @@ -32,6 +32,8 @@ class MethodChannelHttpsCallableStreams<R>
'functionUri': uri?.toString(),
'origin': origin,
'parameters': parameters,
'limitedUseAppCheckToken': options.limitedUseAppCheckToken,
'timeout': options.timeout.inMilliseconds,
};
yield* _channel.receiveBroadcastStream(eventData).map((message) {
if (message is Map) {
Expand All @@ -52,14 +54,4 @@ class MethodChannelHttpsCallableStreams<R>
'region': functions.region,
});
}

@override
Future<dynamic> get data async {
final result = await MethodChannelFirebaseFunctions.channel
.invokeMethod('FirebaseFunctions#getCompleteResult');
if (result is Map) {
return Map<String, dynamic>.from(result);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,12 @@ abstract class FirebaseFunctionsPlatform extends PlatformInterface {
}

HttpsCallableStreamsPlatform httpsStreamCallable(
String? origin,
String name,
) {
String? origin, String name, HttpsCallableOptions options) {
throw UnimplementedError('httpsStreamCallable() is not implemented');
}

HttpsCallableStreamsPlatform httpsStreamCallableWithUri(
String? origin, Uri uri) {
String? origin, Uri uri, HttpsCallableOptions options) {
throw UnimplementedError('httpsStreamCallableWithUri() is not implemented');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ abstract class HttpsCallableStreamsPlatform<R> extends PlatformInterface {
this.functions,
this.origin,
this.name,
this.options,
this.uri,
) : assert(name != null || uri != null),
super(token: _token);
Expand All @@ -33,7 +34,8 @@ abstract class HttpsCallableStreamsPlatform<R> extends PlatformInterface {
/// The URI of the function for 2nd gen functions
final Uri? uri;

Stream<dynamic> stream(Object? parameters);
/// Used to set the options for this instance.
HttpsCallableOptions options;

Future<dynamic> get data;
Stream<dynamic> stream(Object? parameters);
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,14 @@ class FirebaseFunctionsWeb extends FirebaseFunctionsPlatform {
}

@override
HttpsCallableStreamsPlatform httpsStreamCallable(String? origin, String name) {
return HttpsCallableStreamWeb(this, _delegate, origin, name, null);
HttpsCallableStreamsPlatform httpsStreamCallable(
String? origin, String name, HttpsCallableOptions options) {
return HttpsCallableStreamWeb(this, _delegate, origin, name, options, null);
}

@override
HttpsCallableStreamsPlatform httpsStreamCallableWithUri(
String? origin, Uri uri, HttpsCallableOptions options) {
return HttpsCallableStreamWeb(this, _delegate, origin, null, options, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,23 @@
import 'dart:js_interop';

import 'package:cloud_functions_platform_interface/cloud_functions_platform_interface.dart';
import 'package:cloud_functions_web/interop/functions_interop.dart' as interop;

import 'interop/functions.dart' as functions_interop;
import 'utils.dart';

class HttpsCallableStreamWeb extends HttpsCallableStreamsPlatform {
HttpsCallableStreamWeb(
super.functions, this._webFunctions, super.origin, super.name, super.uri);
FirebaseFunctionsPlatform functions,
this._webFunctions,
String? origin,
String? name,
HttpsCallableOptions options,
Uri? uri)
: super(functions, origin, name, options, uri);

final functions_interop.Functions _webFunctions;

@override
Future get data => throw UnimplementedError();

@override
Stream<dynamic> stream(Object? parameters) async* {
if (origin != null) {
Expand All @@ -37,9 +42,16 @@ class HttpsCallableStreamWeb extends HttpsCallableStreamsPlatform {
}

final JSAny? parametersJS = parameters?.jsify();

await for (final value in callable.stream(parametersJS)){
yield value;
interop.HttpsCallableStreamOptions callableStreamOptions =
interop.HttpsCallableStreamOptions(
limitedUseAppCheckTokens: options.limitedUseAppCheckToken.toJS);
try {
await for (final value
in callable.stream(parametersJS, callableStreamOptions)) {
yield value;
}
} catch (e, s) {
throw convertFirebaseFunctionsException(e as JSObject, s);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,26 @@ class HttpsCallable extends JsObjectWrapper<JSFunction> {
);
}

Stream<HttpsCallableResult> stream(JSAny? data) async* {
final streamCallable =
await (jsObject as functions_interop.HttpsCallable).stream().toDart;
Stream<dynamic> stream(JSAny? data,
functions_interop.HttpsCallableStreamOptions? options) async* {
final streamCallable = await (jsObject as functions_interop.HttpsCallable)
.stream(data, options)
.toDart;
final streamResult =
streamCallable! as functions_interop.HttpsCallableStreamResultJsImpl;

await for (final value in streamResult.stream.asStream()) {
yield HttpsCallableResult.getInstance(
value as functions_interop.HttpsCallableResultJsImpl,
);
// ignore: invalid_runtime_check_with_js_interop_types
final message = value is JSObject
? HttpsCallableResult.getInstance(
value as functions_interop.HttpsCallableResultJsImpl,
)
: value;
yield {'message': message};
}

final result = await streamResult.data.toDart;
yield {'result': result};
}
}

Expand Down
Loading
Loading