Skip to content

Commit 8ff9ffc

Browse files
committed
Fix reconnecting after checksum failure
1 parent 6e4ac91 commit 8ff9ffc

File tree

3 files changed

+50
-13
lines changed

3 files changed

+50
-13
lines changed

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class StreamingSyncImplementation implements StreamingSync {
3535
final InternalConnector connector;
3636
final ResolvedSyncOptions options;
3737

38-
final Logger logger = isolateLogger;
38+
final Logger logger;
3939

4040
final Stream<void> crudUpdateTriggerStream;
4141

@@ -68,14 +68,16 @@ class StreamingSyncImplementation implements StreamingSync {
6868
required http.Client client,
6969
Mutex? syncMutex,
7070
Mutex? crudMutex,
71+
Logger? logger,
7172

7273
/// A unique identifier for this streaming sync implementation
7374
/// A good value is typically the DB file path which it will mutate when syncing.
7475
String? identifier = "unknown",
7576
}) : _client = client,
7677
syncMutex = syncMutex ?? Mutex(identifier: "sync-$identifier"),
7778
crudMutex = crudMutex ?? Mutex(identifier: "crud-$identifier"),
78-
_userAgentHeaders = userAgentHeaders();
79+
_userAgentHeaders = userAgentHeaders(),
80+
logger = logger ?? isolateLogger;
7981

8082
Duration get _retryDelay => options.retryDelay;
8183

@@ -122,6 +124,7 @@ class StreamingSyncImplementation implements StreamingSync {
122124
@override
123125
Future<void> streamingSync() async {
124126
try {
127+
assert(_abort == null);
125128
_abort = AbortController();
126129
clientId = await adapter.getClientId();
127130
_crudLoop();
@@ -310,7 +313,7 @@ class StreamingSyncImplementation implements StreamingSync {
310313
var merged = addBroadcast(requestStream, _nonLineSyncEvents.stream);
311314

312315
Future<void>? credentialsInvalidation;
313-
bool haveInvalidated = false;
316+
bool shouldStopIteration = false;
314317

315318
// Trigger a CRUD upload on reconnect
316319
_internalCrudTriggerController.add(null);
@@ -336,6 +339,7 @@ class StreamingSyncImplementation implements StreamingSync {
336339
case StreamingSyncCheckpointComplete():
337340
final result = await _applyCheckpoint(targetCheckpoint!, _abort);
338341
if (result.abort) {
342+
shouldStopIteration = true;
339343
return;
340344
}
341345
case StreamingSyncCheckpointPartiallyComplete(:final bucketPriority):
@@ -345,6 +349,7 @@ class StreamingSyncImplementation implements StreamingSync {
345349
// This means checksums failed. Start again with a new checkpoint.
346350
// TODO: better back-off
347351
// await new Promise((resolve) => setTimeout(resolve, 50));
352+
shouldStopIteration = true;
348353
return;
349354
} else if (!result.ready) {
350355
// If we have pending uploads, we can't complete new checkpoints
@@ -404,7 +409,7 @@ class StreamingSyncImplementation implements StreamingSync {
404409
credentialsInvalidation ??=
405410
connector.prefetchCredentials().then((_) {
406411
// Token has been refreshed - we should restart the connection.
407-
haveInvalidated = true;
412+
shouldStopIteration = true;
408413
// trigger next loop iteration ASAP, don't wait for another
409414
// message from the server.
410415
if (!aborted) {
@@ -421,7 +426,7 @@ class StreamingSyncImplementation implements StreamingSync {
421426
}
422427

423428
await for (var line in merged) {
424-
if (aborted || haveInvalidated) {
429+
if (aborted || shouldStopIteration) {
425430
break;
426431
}
427432

@@ -434,10 +439,10 @@ class StreamingSyncImplementation implements StreamingSync {
434439
break;
435440
case TokenRefreshComplete():
436441
// We have a new token, so stop the iteration.
437-
haveInvalidated = true;
442+
shouldStopIteration = true;
438443
}
439444

440-
if (haveInvalidated) {
445+
if (shouldStopIteration) {
441446
// Stop this connection, so that a new one will be started
442447
break;
443448
}

packages/powersync_core/test/in_memory_sync_test.dart

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import 'package:async/async.dart';
44
import 'package:logging/logging.dart';
55
import 'package:powersync_core/powersync_core.dart';
66
import 'package:powersync_core/sqlite3_common.dart';
7-
import 'package:powersync_core/src/log_internal.dart';
87
import 'package:powersync_core/src/sync/streaming_sync.dart';
98
import 'package:powersync_core/src/sync/protocol.dart';
109
import 'package:test/test.dart';
@@ -25,6 +24,7 @@ void main() {
2524
late CommonDatabase raw;
2625
late PowerSyncDatabase database;
2726
late MockSyncService syncService;
27+
late Logger logger;
2828

2929
late StreamingSync syncClient;
3030
var credentialsCallbackCount = 0;
@@ -34,7 +34,7 @@ void main() {
3434
final (client, server) = inMemoryServer();
3535
server.mount(syncService.router.call);
3636

37-
syncClient = database.connectWithMockService(
37+
final thisSyncClient = syncClient = database.connectWithMockService(
3838
client,
3939
TestConnector(() async {
4040
credentialsCallbackCount++;
@@ -44,10 +44,17 @@ void main() {
4444
expiresAt: DateTime.now(),
4545
);
4646
}, uploadData: (db) => uploadData(db)),
47+
options: const SyncOptions(retryDelay: Duration(milliseconds: 200)),
48+
logger: logger,
4749
);
50+
51+
addTearDown(() async {
52+
await thisSyncClient.abort();
53+
});
4854
}
4955

5056
setUp(() async {
57+
logger = Logger.detached('powersync.active')..level = Level.ALL;
5158
credentialsCallbackCount = 0;
5259
syncService = MockSyncService();
5360

@@ -58,17 +65,16 @@ void main() {
5865
});
5966

6067
tearDown(() async {
61-
await syncClient.abort();
6268
await database.close();
6369
await syncService.stop();
6470
});
6571

6672
Future<StreamQueue<SyncStatus>> waitForConnection(
6773
{bool expectNoWarnings = true}) async {
6874
if (expectNoWarnings) {
69-
isolateLogger.onRecord.listen((e) {
75+
logger.onRecord.listen((e) {
7076
if (e.level >= Level.WARNING) {
71-
fail('Unexpected log: $e');
77+
fail('Unexpected log: $e, ${e.stackTrace}');
7278
}
7379
});
7480
}
@@ -655,7 +661,7 @@ void main() {
655661
});
656662
});
657663

658-
test('stopping closes connections', () async {
664+
test('stopping tions', () async {
659665
final status = await waitForConnection();
660666

661667
syncService.addLine({
@@ -671,6 +677,30 @@ void main() {
671677

672678
expect(syncService.controller.hasListener, isFalse);
673679
});
680+
681+
test('closes connection after failed checksum', () async {
682+
final status = await waitForConnection(expectNoWarnings: false);
683+
syncService.addLine({
684+
'checkpoint': Checkpoint(
685+
lastOpId: '4',
686+
writeCheckpoint: null,
687+
checksums: [checksum(bucket: 'a', checksum: 10)],
688+
)
689+
});
690+
691+
await expectLater(status, emits(isSyncStatus(downloading: true)));
692+
syncService.addLine({
693+
'checkpoint_complete': {'last_op_id': '10'}
694+
});
695+
696+
await pumpEventQueue();
697+
expect(syncService.controller.hasListener, isFalse);
698+
syncService.endCurrentListener();
699+
700+
// Should reconnect after delay.
701+
await Future<void>.delayed(const Duration(milliseconds: 500));
702+
expect(syncService.controller.hasListener, isTrue);
703+
});
674704
});
675705
}
676706

packages/powersync_core/test/utils/abstract_test_utils.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,13 +149,15 @@ extension MockSync on PowerSyncDatabase {
149149
StreamingSyncImplementation connectWithMockService(
150150
Client client,
151151
PowerSyncBackendConnector connector, {
152+
Logger? logger,
152153
SyncOptions options = const SyncOptions(retryDelay: Duration(seconds: 5)),
153154
}) {
154155
final impl = StreamingSyncImplementation(
155156
adapter: BucketStorage(this),
156157
client: client,
157158
options: ResolvedSyncOptions(options),
158159
connector: InternalConnector.wrap(connector, this),
160+
logger: logger,
159161
crudUpdateTriggerStream: database
160162
.onChange(['ps_crud'], throttle: const Duration(milliseconds: 10)),
161163
);

0 commit comments

Comments
 (0)