Skip to content

Commit 82a784e

Browse files
Merge remote-tracking branch 'origin/main' into release/v1.9.1
2 parents 47cdccf + 1987882 commit 82a784e

File tree

4 files changed

+116
-51
lines changed

4 files changed

+116
-51
lines changed

packages/powersync/lib/src/database/web/web_powersync_database.dart

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,11 +148,11 @@ class PowerSyncDatabaseImpl
148148
// duplicating work across tabs.
149149
try {
150150
sync = await SyncWorkerHandle.start(
151-
this,
152-
connector,
153-
crudThrottleTime.inMilliseconds,
154-
Uri.base.resolve('/powersync_sync.worker.js'),
155-
);
151+
database: this,
152+
connector: connector,
153+
crudThrottleTimeMs: crudThrottleTime.inMilliseconds,
154+
workerUri: Uri.base.resolve('/powersync_sync.worker.js'),
155+
syncParams: params);
156156
} catch (e) {
157157
logger.warning(
158158
'Could not use shared worker for synchronization, falling back to locks.',

packages/powersync/lib/src/web/sync_controller.dart

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,29 @@ import '../streaming_sync.dart';
1010
import 'sync_worker_protocol.dart';
1111

1212
class SyncWorkerHandle implements StreamingSync {
13-
final PowerSyncDatabaseImpl _database;
14-
final PowerSyncBackendConnector _connector;
15-
final int _crudThrottleTimeMs;
13+
final PowerSyncDatabaseImpl database;
14+
final PowerSyncBackendConnector connector;
15+
final int crudThrottleTimeMs;
16+
final Map<String, dynamic>? syncParams;
1617

1718
late final WorkerCommunicationChannel _channel;
1819

1920
final StreamController<SyncStatus> _status = StreamController.broadcast();
2021

21-
SyncWorkerHandle._(this._database, this._connector, this._crudThrottleTimeMs,
22-
MessagePort sendToWorker, SharedWorker worker) {
22+
SyncWorkerHandle._(
23+
{required this.database,
24+
required this.connector,
25+
required this.crudThrottleTimeMs,
26+
required MessagePort sendToWorker,
27+
required SharedWorker worker,
28+
this.syncParams}) {
2329
_channel = WorkerCommunicationChannel(
2430
port: sendToWorker,
2531
errors: EventStreamProviders.errorEvent.forTarget(worker),
2632
requestHandler: (type, payload) async {
2733
switch (type) {
2834
case SyncWorkerMessageType.requestEndpoint:
29-
final endpoint = await (_database.database as WebSqliteConnection)
35+
final endpoint = await (database.database as WebSqliteConnection)
3036
.exposeEndpoint();
3137

3238
return (
@@ -38,18 +44,18 @@ class SyncWorkerHandle implements StreamingSync {
3844
[endpoint.connectPort].toJS
3945
);
4046
case SyncWorkerMessageType.uploadCrud:
41-
await _connector.uploadData(_database);
47+
await connector.uploadData(database);
4248
return (JSObject(), null);
4349
case SyncWorkerMessageType.invalidCredentialsCallback:
44-
final credentials = await _connector.fetchCredentials();
50+
final credentials = await connector.fetchCredentials();
4551
return (
4652
credentials != null
4753
? SerializedCredentials.from(credentials)
4854
: null,
4955
null
5056
);
5157
case SyncWorkerMessageType.credentialsCallback:
52-
final credentials = await _connector.getCredentialsCached();
58+
final credentials = await connector.getCredentialsCached();
5359
return (
5460
credentials != null
5561
? SerializedCredentials.from(credentials)
@@ -71,13 +77,19 @@ class SyncWorkerHandle implements StreamingSync {
7177
}
7278

7379
static Future<SyncWorkerHandle> start(
74-
PowerSyncDatabaseImpl database,
75-
PowerSyncBackendConnector connector,
76-
int crudThrottleTimeMs,
77-
Uri workerUri) async {
80+
{required PowerSyncDatabaseImpl database,
81+
required PowerSyncBackendConnector connector,
82+
required int crudThrottleTimeMs,
83+
required Uri workerUri,
84+
Map<String, dynamic>? syncParams}) async {
7885
final worker = SharedWorker(workerUri.toString().toJS);
7986
final handle = SyncWorkerHandle._(
80-
database, connector, crudThrottleTimeMs, worker.port, worker);
87+
database: database,
88+
connector: connector,
89+
crudThrottleTimeMs: crudThrottleTimeMs,
90+
sendToWorker: worker.port,
91+
worker: worker,
92+
syncParams: syncParams);
8193

8294
// Make sure that the worker is working, or throw immediately.
8395
await handle._channel.ping();
@@ -101,6 +113,6 @@ class SyncWorkerHandle implements StreamingSync {
101113
@override
102114
Future<void> streamingSync() async {
103115
await _channel.startSynchronization(
104-
_database.openFactory.path, _crudThrottleTimeMs);
116+
database.openFactory.path, crudThrottleTimeMs, syncParams);
105117
}
106118
}

packages/powersync/lib/src/web/sync_worker.dart

Lines changed: 69 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
library;
55

66
import 'dart:async';
7+
import 'dart:convert';
78
import 'dart:js_interop';
89

910
import 'package:async/async.dart';
@@ -41,12 +42,15 @@ class _SyncWorker {
4142
});
4243
}
4344

44-
_SyncRunner referenceSyncTask(String databaseIdentifier,
45-
int crudThrottleTimeMs, _ConnectedClient client) {
45+
_SyncRunner referenceSyncTask(
46+
String databaseIdentifier,
47+
int crudThrottleTimeMs,
48+
String? syncParamsEncoded,
49+
_ConnectedClient client) {
4650
return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () {
47-
return _SyncRunner(databaseIdentifier, crudThrottleTimeMs);
51+
return _SyncRunner(databaseIdentifier);
4852
})
49-
..registerClient(client);
53+
..registerClient(client, crudThrottleTimeMs, syncParamsEncoded);
5054
}
5155
}
5256

@@ -64,11 +68,11 @@ class _ConnectedClient {
6468
switch (type) {
6569
case SyncWorkerMessageType.startSynchronization:
6670
final request = payload as StartSynchronization;
67-
_runner = _worker.referenceSyncTask(
68-
request.databaseName, request.crudThrottleTimeMs, this);
71+
_runner = _worker.referenceSyncTask(request.databaseName,
72+
request.crudThrottleTimeMs, request.syncParamsEncoded, this);
6973
return (JSObject(), null);
7074
case SyncWorkerMessageType.abortSynchronization:
71-
_runner?.unregisterClient(this);
75+
_runner?.disconnectClient(this);
7276
_runner = null;
7377
return (JSObject(), null);
7478
default:
@@ -105,7 +109,8 @@ class _ConnectedClient {
105109

106110
class _SyncRunner {
107111
final String identifier;
108-
final int crudThrottleTimeMs;
112+
int crudThrottleTimeMs = 1;
113+
String? syncParamsEncoded;
109114

110115
final StreamGroup<_RunnerEvent> _group = StreamGroup();
111116
final StreamController<_RunnerEvent> _mainEvents = StreamController();
@@ -114,24 +119,46 @@ class _SyncRunner {
114119
_ConnectedClient? databaseHost;
115120
final connections = <_ConnectedClient>[];
116121

117-
_SyncRunner(this.identifier, this.crudThrottleTimeMs) {
122+
_SyncRunner(this.identifier) {
118123
_group.add(_mainEvents.stream);
119124

120125
Future(() async {
121126
await for (final event in _group.stream) {
122127
try {
123128
switch (event) {
124-
case _AddConnection(:final client):
129+
case _AddConnection(
130+
:final client,
131+
:final crudThrottleTimeMs,
132+
:final syncParamsEncoded
133+
):
125134
connections.add(client);
135+
var reconnect = false;
136+
if (this.crudThrottleTimeMs != crudThrottleTimeMs) {
137+
this.crudThrottleTimeMs = crudThrottleTimeMs;
138+
reconnect = true;
139+
}
140+
if (this.syncParamsEncoded != syncParamsEncoded) {
141+
this.syncParamsEncoded = syncParamsEncoded;
142+
reconnect = true;
143+
}
126144
if (sync == null) {
127145
await _requestDatabase(client);
146+
} else if (reconnect) {
147+
// Parameters changed - reconnect.
148+
sync?.abort();
149+
sync = null;
150+
await _requestDatabase(client);
128151
}
129152
case _RemoveConnection(:final client):
130153
connections.remove(client);
131154
if (connections.isEmpty) {
132155
await sync?.abort();
133156
sync = null;
134157
}
158+
case _DisconnectClient(:final client):
159+
connections.remove(client);
160+
await sync?.abort();
161+
sync = null;
135162
case _ActiveDatabaseClosed():
136163
_logger.info('Remote database closed, finding a new client');
137164
sync?.abort();
@@ -226,16 +253,20 @@ class _SyncRunner {
226253
);
227254
}
228255

256+
final syncParams = syncParamsEncoded == null
257+
? null
258+
: jsonDecode(syncParamsEncoded!) as Map<String, dynamic>;
259+
229260
sync = StreamingSyncImplementation(
230-
adapter: BucketStorage(database),
231-
credentialsCallback: client.channel.credentialsCallback,
232-
invalidCredentialsCallback: client.channel.invalidCredentialsCallback,
233-
uploadCrud: client.channel.uploadCrud,
234-
crudUpdateTriggerStream: crudStream,
235-
retryDelay: Duration(seconds: 3),
236-
client: FetchClient(mode: RequestMode.cors),
237-
identifier: identifier,
238-
);
261+
adapter: BucketStorage(database),
262+
credentialsCallback: client.channel.credentialsCallback,
263+
invalidCredentialsCallback: client.channel.invalidCredentialsCallback,
264+
uploadCrud: client.channel.uploadCrud,
265+
crudUpdateTriggerStream: crudStream,
266+
retryDelay: Duration(seconds: 3),
267+
client: FetchClient(mode: RequestMode.cors),
268+
identifier: identifier,
269+
syncParameters: syncParams);
239270
sync!.statusStream.listen((event) {
240271
_logger.fine('Broadcasting sync event: $event');
241272
for (final client in connections) {
@@ -246,21 +277,31 @@ class _SyncRunner {
246277
sync!.streamingSync();
247278
}
248279

249-
void registerClient(_ConnectedClient client) {
250-
_mainEvents.add(_AddConnection(client));
280+
void registerClient(_ConnectedClient client, int currentCrudThrottleTimeMs,
281+
String? currentSyncParamsEncoded) {
282+
_mainEvents.add(_AddConnection(
283+
client, currentCrudThrottleTimeMs, currentSyncParamsEncoded));
251284
}
252285

286+
/// Remove a client, disconnecting if no clients remain..
253287
void unregisterClient(_ConnectedClient client) {
254288
_mainEvents.add(_RemoveConnection(client));
255289
}
290+
291+
/// Remove a client, and immediately disconnect.
292+
void disconnectClient(_ConnectedClient client) {
293+
_mainEvents.add(_DisconnectClient(client));
294+
}
256295
}
257296

258297
sealed class _RunnerEvent {}
259298

260299
final class _AddConnection implements _RunnerEvent {
261300
final _ConnectedClient client;
301+
final int crudThrottleTimeMs;
302+
final String? syncParamsEncoded;
262303

263-
_AddConnection(this.client);
304+
_AddConnection(this.client, this.crudThrottleTimeMs, this.syncParamsEncoded);
264305
}
265306

266307
final class _RemoveConnection implements _RunnerEvent {
@@ -269,6 +310,12 @@ final class _RemoveConnection implements _RunnerEvent {
269310
_RemoveConnection(this.client);
270311
}
271312

313+
final class _DisconnectClient implements _RunnerEvent {
314+
final _ConnectedClient client;
315+
316+
_DisconnectClient(this.client);
317+
}
318+
272319
final class _ActiveDatabaseClosed implements _RunnerEvent {
273320
const _ActiveDatabaseClosed();
274321
}

packages/powersync/lib/src/web/sync_worker_protocol.dart

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import 'dart:async';
2+
import 'dart:convert';
23
import 'dart:js_interop';
34

45
import 'package:web/web.dart';
@@ -13,10 +14,12 @@ enum SyncWorkerMessageType {
1314

1415
/// Sent from client to the sync worker to request the synchronization
1516
/// starting.
17+
/// If parameters change, the sync worker reconnects.
1618
startSynchronization,
1719

18-
/// Te [SyncWorkerMessage.payload] for the request is a numeric id, the
20+
/// The [SyncWorkerMessage.payload] for the request is a numeric id, the
1921
/// response can be anything (void).
22+
/// This disconnects immediately, even if other clients are still open.
2023
abortSynchronization,
2124

2225
/// Sent from the sync worker to the client when it needs an endpoint to
@@ -60,15 +63,16 @@ extension type SyncWorkerMessage._(JSObject _) implements JSObject {
6063

6164
@anonymous
6265
extension type StartSynchronization._(JSObject _) implements JSObject {
63-
external factory StartSynchronization({
64-
required String databaseName,
65-
required int crudThrottleTimeMs,
66-
required int requestId,
67-
});
66+
external factory StartSynchronization(
67+
{required String databaseName,
68+
required int crudThrottleTimeMs,
69+
required int requestId,
70+
String? syncParamsEncoded});
6871

6972
external String get databaseName;
7073
external int get requestId;
7174
external int get crudThrottleTimeMs;
75+
external String? get syncParamsEncoded;
7276
}
7377

7478
@anonymous
@@ -315,15 +319,17 @@ final class WorkerCommunicationChannel {
315319
await _numericRequest(SyncWorkerMessageType.ping);
316320
}
317321

318-
Future<void> startSynchronization(
319-
String databaseName, int crudThrottleTimeMs) async {
322+
Future<void> startSynchronization(String databaseName, int crudThrottleTimeMs,
323+
Map<String, dynamic>? syncParams) async {
320324
final (id, completion) = _newRequest();
321325
port.postMessage(SyncWorkerMessage(
322326
type: SyncWorkerMessageType.startSynchronization.name,
323327
payload: StartSynchronization(
324328
databaseName: databaseName,
325329
crudThrottleTimeMs: crudThrottleTimeMs,
326-
requestId: id),
330+
requestId: id,
331+
syncParamsEncoded:
332+
syncParams == null ? null : jsonEncode(syncParams)),
327333
));
328334
await completion;
329335
}

0 commit comments

Comments
 (0)