diff --git a/packages/backend/server/src/__tests__/sync/gateway.spec.ts b/packages/backend/server/src/__tests__/sync/gateway.spec.ts index c81fd0e0e7..6a644963ae 100644 --- a/packages/backend/server/src/__tests__/sync/gateway.spec.ts +++ b/packages/backend/server/src/__tests__/sync/gateway.spec.ts @@ -176,6 +176,31 @@ function createYjsUpdateBase64() { return Buffer.from(update).toString('base64'); } +async function createSnapshot( + db: PrismaClient, + input: { + workspaceId: string; + docId: string; + userId: string; + blob?: Buffer; + state?: Buffer; + updatedAt?: Date; + } +) { + await db.snapshot.create({ + data: { + id: input.docId, + workspaceId: input.workspaceId, + blob: input.blob ?? Buffer.from([1, 1]), + state: input.state ?? Buffer.from([1, 1]), + createdAt: input.updatedAt ?? new Date(), + updatedAt: input.updatedAt ?? new Date(), + createdBy: input.userId, + updatedBy: input.userId, + }, + }); +} + async function ensureSyncActiveUsersTable(db: PrismaClient) { await db.$executeRawUnsafe(` CREATE TABLE IF NOT EXISTS sync_active_users_minutely ( @@ -612,17 +637,10 @@ test('workspace sync delete-doc should enforce doc permissions', async t => { } ); await models.doc.setDefaultRole(workspace.id, docId, DocRole.None); - await db.snapshot.create({ - data: { - id: docId, - workspaceId: workspace.id, - blob: Buffer.from([1, 1]), - state: Buffer.from([1, 1]), - createdAt: new Date(), - updatedAt: new Date(), - createdBy: owner.id, - updatedBy: owner.id, - }, + await createSnapshot(db, { + workspaceId: workspace.id, + docId, + userId: owner.id, }); const socket = createClient(url, cookieHeader); @@ -657,3 +675,206 @@ test('workspace sync delete-doc should enforce doc permissions', async t => { socket.disconnect(); } }); + +test('workspace sync load-doc should enforce doc read permissions', async t => { + const db = app.get(PrismaClient); + const models = app.get(Models); + const { user: owner } = await login(app); + const { user: collaborator, cookieHeader } = await login(app); + const workspace = await models.workspace.create(owner.id); + const docId = 'private-load-doc'; + + await models.workspaceUser.set( + workspace.id, + collaborator.id, + WorkspaceRole.Collaborator, + { + status: WorkspaceMemberStatus.Accepted, + } + ); + await models.doc.setDefaultRole(workspace.id, docId, DocRole.None); + await createSnapshot(db, { + workspaceId: workspace.id, + docId, + userId: owner.id, + }); + + const socket = createClient(url, cookieHeader); + + try { + await waitForConnect(socket); + + const join = unwrapResponse( + t, + await emitWithAck<{ clientId: string; success: boolean }>( + socket, + 'space:join', + { + spaceType: 'workspace', + spaceId: workspace.id, + clientVersion: '0.26.0', + } + ) + ); + t.true(join.success); + + const error = getErrorResponse( + t, + await emitWithAck(socket, 'space:load-doc', { + spaceType: 'workspace', + spaceId: workspace.id, + docId, + }) + ); + t.true(error.message.includes('Doc.Read')); + } finally { + socket.disconnect(); + } +}); + +test('workspace sync push-doc-update should enforce doc update permissions', async t => { + const db = app.get(PrismaClient); + const models = app.get(Models); + const { user: owner } = await login(app); + const { user: collaborator, cookieHeader } = await login(app); + const workspace = await models.workspace.create(owner.id); + const docId = 'readonly-push-doc'; + + await models.workspaceUser.set( + workspace.id, + collaborator.id, + WorkspaceRole.Collaborator, + { + status: WorkspaceMemberStatus.Accepted, + } + ); + await models.doc.setDefaultRole(workspace.id, docId, DocRole.None); + await models.docUser.set( + workspace.id, + docId, + collaborator.id, + DocRole.Reader + ); + await createSnapshot(db, { + workspaceId: workspace.id, + docId, + userId: owner.id, + }); + + const socket = createClient(url, cookieHeader); + + try { + await waitForConnect(socket); + + const join = unwrapResponse( + t, + await emitWithAck<{ clientId: string; success: boolean }>( + socket, + 'space:join', + { + spaceType: 'workspace', + spaceId: workspace.id, + clientVersion: '0.26.0', + } + ) + ); + t.true(join.success); + + const error = getErrorResponse( + t, + await emitWithAck(socket, 'space:push-doc-update', { + spaceType: 'workspace', + spaceId: workspace.id, + docId, + update: createYjsUpdateBase64(), + }) + ); + t.true(error.message.includes('Doc.Update')); + + const updates = await db.update.count({ + where: { + workspaceId: workspace.id, + id: docId, + }, + }); + t.is(updates, 0); + } finally { + socket.disconnect(); + } +}); + +test('workspace sync load-doc-timestamps should filter unreadable docs', async t => { + const db = app.get(PrismaClient); + const models = app.get(Models); + const { user: owner } = await login(app); + const { user: collaborator, cookieHeader } = await login(app); + const workspace = await models.workspace.create(owner.id); + const privateDocId = 'private-timestamp-doc'; + const readableDocId = 'readable-timestamp-doc'; + + await models.workspaceUser.set( + workspace.id, + collaborator.id, + WorkspaceRole.Collaborator, + { + status: WorkspaceMemberStatus.Accepted, + } + ); + await models.doc.setDefaultRole(workspace.id, privateDocId, DocRole.None); + await models.doc.setDefaultRole(workspace.id, readableDocId, DocRole.None); + await models.docUser.set( + workspace.id, + readableDocId, + collaborator.id, + DocRole.Reader + ); + await createSnapshot(db, { + workspaceId: workspace.id, + docId: privateDocId, + userId: owner.id, + updatedAt: new Date('2026-01-01T00:00:00.000Z'), + }); + await createSnapshot(db, { + workspaceId: workspace.id, + docId: readableDocId, + userId: owner.id, + updatedAt: new Date('2026-01-02T00:00:00.000Z'), + }); + + const socket = createClient(url, cookieHeader); + + try { + await waitForConnect(socket); + + const join = unwrapResponse( + t, + await emitWithAck<{ clientId: string; success: boolean }>( + socket, + 'space:join', + { + spaceType: 'workspace', + spaceId: workspace.id, + clientVersion: '0.26.0', + } + ) + ); + t.true(join.success); + + const timestamps = unwrapResponse( + t, + await emitWithAck>( + socket, + 'space:load-doc-timestamps', + { + spaceType: 'workspace', + spaceId: workspace.id, + } + ) + ); + + t.false(privateDocId in timestamps); + t.true(readableDocId in timestamps); + } finally { + socket.disconnect(); + } +}); diff --git a/packages/backend/server/src/core/sync/gateway.ts b/packages/backend/server/src/core/sync/gateway.ts index cfcfb51db8..0b2015a45c 100644 --- a/packages/backend/server/src/core/sync/gateway.ts +++ b/packages/backend/server/src/core/sync/gateway.ts @@ -633,6 +633,7 @@ export class SpaceSyncGateway @SubscribeMessage('space:load-doc') async onLoadSpaceDoc( @ConnectedSocket() client: Socket, + @CurrentUser() user: CurrentUser, @MessageBody() { spaceType, spaceId, docId, stateVector }: LoadDocMessage ): Promise< @@ -641,6 +642,13 @@ export class SpaceSyncGateway const id = new DocID(docId, spaceId); const adapter = this.selectAdapter(client, spaceType); adapter.assertIn(spaceId); + await this.assertDocActionAllowed( + spaceType, + user.id, + spaceId, + id.guid, + 'Doc.Read' + ); const doc = await adapter.diff( spaceId, @@ -666,7 +674,7 @@ export class SpaceSyncGateway @ConnectedSocket() client: Socket, @CurrentUser() user: CurrentUser, @MessageBody() { spaceType, spaceId, docId }: DeleteDocMessage - ) { + ): Promise> { const adapter = this.selectAdapter(client, spaceType); await this.assertDocActionAllowed( spaceType, @@ -676,6 +684,7 @@ export class SpaceSyncGateway 'Doc.Delete' ); await adapter.delete(spaceId, docId); + return { data: { success: true } }; } /** @@ -692,8 +701,13 @@ export class SpaceSyncGateway const adapter = this.selectAdapter(client, spaceType); // Quota recovery mode is intentionally not applied to sync in this phase. - // TODO(@forehalo): enable after frontend supporting doc revert - // await this.ac.user(user.id).doc(spaceId, docId).assert('Doc.Update'); + await this.assertDocActionAllowed( + spaceType, + user.id, + spaceId, + docId, + 'Doc.Update' + ); const timestamp = await adapter.push( spaceId, docId, @@ -740,15 +754,32 @@ export class SpaceSyncGateway @SubscribeMessage('space:load-doc-timestamps') async onLoadDocTimestamps( @ConnectedSocket() client: Socket, + @CurrentUser() user: CurrentUser, @MessageBody() { spaceType, spaceId, timestamp }: LoadDocTimestampsMessage ): Promise>> { const adapter = this.selectAdapter(client, spaceType); const stats = await adapter.getTimestamps(spaceId, timestamp); + if (!stats || spaceType === SpaceType.Userspace) { + return { + data: stats ?? {}, + }; + } + + const readableDocs = await this.ac + .user(user.id) + .workspace(spaceId) + .docs( + Object.keys(stats).map(docId => ({ docId })), + 'Doc.Read' + ); + const readableDocIds = new Set(readableDocs.map(doc => doc.docId)); return { - data: stats ?? {}, + data: Object.fromEntries( + Object.entries(stats).filter(([docId]) => readableDocIds.has(docId)) + ), }; } diff --git a/packages/common/nbstore/src/__tests__/sync.spec.ts b/packages/common/nbstore/src/__tests__/sync.spec.ts index 45a2313c03..29235e5234 100644 --- a/packages/common/nbstore/src/__tests__/sync.spec.ts +++ b/packages/common/nbstore/src/__tests__/sync.spec.ts @@ -33,6 +33,7 @@ import { SpaceStorage, } from '../storage'; import { Sync } from '../sync'; +import { DocSyncPeer } from '../sync/doc/peer'; import { IndexerSyncImpl } from '../sync/indexer'; import { expectYjsEqual } from './utils'; @@ -112,6 +113,64 @@ class TestDocStorage implements DocStorage { } } +class PermissionDeniedRemoteDocStorage implements DocStorage { + readonly storageType = 'doc' as const; + readonly connection = new DummyConnection(); + readonly isReadonly = false; + pushCount = 0; + + constructor(readonly spaceId: string) {} + + async getDoc(_docId: string): Promise { + return null; + } + + async getDocDiff( + _docId: string, + _state?: Uint8Array + ): Promise { + return null; + } + + async pushDocUpdate(_update: DocUpdate): Promise { + this.pushCount++; + const error = new Error('No permission to update doc'); + error.name = 'DOC_ACTION_DENIED'; + throw error; + } + + async getDocTimestamp(_docId: string): Promise { + return null; + } + + async getDocTimestamps(): Promise { + return {}; + } + + async deleteDoc(_docId: string): Promise { + return; + } + + subscribeDocUpdate(_callback: (update: DocRecord, origin?: string) => void) { + return () => {}; + } +} + +class PermissionDeniedConnection extends DummyConnection { + waitCount = 0; + + override async waitForConnected(_signal?: AbortSignal): Promise { + this.waitCount++; + const error = new Error('No permission to access space'); + error.name = 'SPACE_ACCESS_DENIED'; + throw error; + } +} + +class PermissionDeniedConnectionDocStorage extends PermissionDeniedRemoteDocStorage { + override readonly connection = new PermissionDeniedConnection(); +} + class TrackingIndexerStorage extends IndexerStorageBase { override readonly connection = new DummyConnection(); override readonly isReadonly = false; @@ -425,6 +484,201 @@ test('blob', async () => { } }); +test('doc sync peer stops retrying a doc when remote denies permission', async () => { + const local = new IndexedDBDocStorage({ + id: 'ws-denied', + flavour: 'local-denied', + type: 'workspace', + }); + const syncMetadata = new IndexedDBDocSyncStorage({ + id: 'ws-denied', + flavour: 'local-denied', + type: 'workspace', + }); + const remote = new PermissionDeniedRemoteDocStorage('ws-denied'); + const peer = new DocSyncPeer('remote-denied', local, syncMetadata, remote); + const abort = new AbortController(); + + local.connection.connect(); + syncMetadata.connection.connect(); + await local.connection.waitForConnected(); + await syncMetadata.connection.waitForConnected(); + + const doc = new YDoc(); + doc.getMap('test').set('hello', 'world'); + await local.pushDocUpdate({ + docId: 'doc-denied', + bin: encodeStateAsUpdate(doc), + }); + + try { + void peer.mainLoop(abort.signal); + + await vi.waitFor(() => { + expect(remote.pushCount).toBe(1); + }); + + await vi.waitFor(() => { + let state: + | { + syncing: boolean; + synced: boolean; + retrying: boolean; + errorMessage: string | null; + } + | undefined; + const dispose = peer.docState$('doc-denied').subscribe(next => { + state = next; + }); + dispose.unsubscribe(); + + expect(state).toMatchObject({ + syncing: false, + synced: false, + retrying: false, + errorMessage: expect.stringContaining('No permission'), + }); + }); + + await vi.waitFor(() => { + let state: + | { + synced: boolean; + errorMessage: string | null; + } + | undefined; + const dispose = peer.peerState$.subscribe(next => { + state = next; + }); + dispose.unsubscribe(); + + expect(state).toMatchObject({ + synced: false, + errorMessage: expect.stringContaining('No permission'), + }); + }); + + await new Promise(resolve => setTimeout(resolve, 1200)); + expect(remote.pushCount).toBe(1); + } finally { + abort.abort(); + local.connection.disconnect(); + syncMetadata.connection.disconnect(); + } +}); + +test('doc sync peer stops retrying when remote connection denies permission', async () => { + const local = new IndexedDBDocStorage({ + id: 'ws-connection-denied', + flavour: 'local-connection-denied', + type: 'workspace', + }); + const syncMetadata = new IndexedDBDocSyncStorage({ + id: 'ws-connection-denied', + flavour: 'local-connection-denied', + type: 'workspace', + }); + const remote = new PermissionDeniedConnectionDocStorage( + 'ws-connection-denied' + ); + const peer = new DocSyncPeer( + 'remote-connection-denied', + local, + syncMetadata, + remote + ); + const abort = new AbortController(); + + local.connection.connect(); + syncMetadata.connection.connect(); + await local.connection.waitForConnected(); + await syncMetadata.connection.waitForConnected(); + + try { + void peer.mainLoop(abort.signal); + + await vi.waitFor(() => { + expect(remote.connection.waitCount).toBe(1); + }); + + await vi.waitFor(() => { + let state: + | { + retrying: boolean; + errorMessage: string | null; + } + | undefined; + const dispose = peer.peerState$.subscribe(next => { + state = next; + }); + dispose.unsubscribe(); + + expect(state).toMatchObject({ + retrying: false, + errorMessage: expect.stringContaining('No permission'), + }); + }); + + await new Promise(resolve => setTimeout(resolve, 1200)); + expect(remote.connection.waitCount).toBe(1); + } finally { + abort.abort(); + local.connection.disconnect(); + syncMetadata.connection.disconnect(); + } +}); + +test('doc sync peer resolves on terminal permission error without abort signal', async () => { + const local = new IndexedDBDocStorage({ + id: 'ws-connection-denied-no-signal', + flavour: 'local-connection-denied-no-signal', + type: 'workspace', + }); + const syncMetadata = new IndexedDBDocSyncStorage({ + id: 'ws-connection-denied-no-signal', + flavour: 'local-connection-denied-no-signal', + type: 'workspace', + }); + const remote = new PermissionDeniedConnectionDocStorage( + 'ws-connection-denied-no-signal' + ); + const peer = new DocSyncPeer( + 'remote-connection-denied-no-signal', + local, + syncMetadata, + remote + ); + + local.connection.connect(); + syncMetadata.connection.connect(); + await local.connection.waitForConnected(); + await syncMetadata.connection.waitForConnected(); + + try { + await expect(peer.mainLoop()).resolves.toBeUndefined(); + expect(remote.connection.waitCount).toBe(1); + + let state: + | { + retrying: boolean; + errorMessage: string | null; + } + | undefined; + const dispose = peer.peerState$.subscribe(next => { + state = next; + }); + dispose.unsubscribe(); + + expect(state).toMatchObject({ + retrying: false, + errorMessage: expect.stringContaining('No permission'), + }); + } finally { + local.connection.disconnect(); + syncMetadata.connection.disconnect(); + } +}); + test('indexer defers indexed clock persistence until a refresh happens on delayed refresh storages', async () => { const calls: string[] = []; const docsInRootDoc = new Map([['doc1', { title: 'Doc 1' }]]); diff --git a/packages/common/nbstore/src/impls/cloud/doc.ts b/packages/common/nbstore/src/impls/cloud/doc.ts index 1bb13a8b48..84c20cb36f 100644 --- a/packages/common/nbstore/src/impls/cloud/doc.ts +++ b/packages/common/nbstore/src/impls/cloud/doc.ts @@ -22,6 +22,12 @@ interface CloudDocStorageOptions extends DocStorageOptions { type: SpaceType; } +function createWebsocketError(error: { name: string; message: string }) { + const err = new Error(error.message); + err.name = error.name; + return err; +} + export class CloudDocStorage extends DocStorageBase { static readonly identifier = 'CloudDocStorage'; @@ -88,7 +94,7 @@ export class CloudDocStorage extends DocStorageBase { return null; } // TODO: use [UserFriendlyError] - throw new Error(response.error.message); + throw createWebsocketError(response.error); } return { @@ -111,7 +117,7 @@ export class CloudDocStorage extends DocStorageBase { return null; } // TODO: use [UserFriendlyError] - throw new Error(response.error.message); + throw createWebsocketError(response.error); } return { @@ -132,7 +138,7 @@ export class CloudDocStorage extends DocStorageBase { if ('error' in response) { // TODO(@forehalo): use [UserFriendlyError] - throw new Error(response.error.message); + throw createWebsocketError(response.error); } return { @@ -153,7 +159,7 @@ export class CloudDocStorage extends DocStorageBase { if ('error' in response) { // TODO: use [UserFriendlyError] - throw new Error(response.error.message); + throw createWebsocketError(response.error); } return { @@ -174,7 +180,7 @@ export class CloudDocStorage extends DocStorageBase { if ('error' in response) { // TODO(@forehalo): use [UserFriendlyError] - throw new Error(response.error.message); + throw createWebsocketError(response.error); } return Object.entries(response.data).reduce((ret, [docId, timestamp]) => { @@ -184,11 +190,16 @@ export class CloudDocStorage extends DocStorageBase { } override async deleteDoc(docId: string) { - this.socket.emit('space:delete-doc', { + const response = await this.socket.emitWithAck('space:delete-doc', { spaceType: this.spaceType, spaceId: this.spaceId, docId: this.idConverter.newIdToOldId(docId), }); + + if ('error' in response) { + // TODO(@forehalo): use [UserFriendlyError] + throw createWebsocketError(response.error); + } } protected async setDocSnapshot() { @@ -224,7 +235,7 @@ class CloudDocStorageConnection extends SocketConnection { }); if ('error' in res) { - throw new Error(res.error.message); + throw createWebsocketError(res.error); } if (!this.idConverter) { @@ -272,7 +283,7 @@ class CloudDocStorageConnection extends SocketConnection { return null; } // TODO: use [UserFriendlyError] - throw new Error(response.error.message); + throw createWebsocketError(response.error); } return base64ToUint8Array(response.data.missing); diff --git a/packages/common/nbstore/src/impls/cloud/socket.ts b/packages/common/nbstore/src/impls/cloud/socket.ts index d0961d5868..0da5776d97 100644 --- a/packages/common/nbstore/src/impls/cloud/socket.ts +++ b/packages/common/nbstore/src/impls/cloud/socket.ts @@ -121,7 +121,10 @@ interface ClientEvents { timestamp: number; }, ]; - 'space:delete-doc': { spaceType: string; spaceId: string; docId: string }; + 'space:delete-doc': [ + { spaceType: string; spaceId: string; docId: string }, + { success?: true }, + ]; 'telemetry:batch': [TelemetryBatch, TelemetryAck]; diff --git a/packages/common/nbstore/src/sync/doc/peer.ts b/packages/common/nbstore/src/sync/doc/peer.ts index c94f6162f8..c44a67aede 100644 --- a/packages/common/nbstore/src/sync/doc/peer.ts +++ b/packages/common/nbstore/src/sync/doc/peer.ts @@ -38,6 +38,7 @@ type Job = interface Status { docs: Set; connectedDocs: Set; + docErrors: Map; jobDocQueue: AsyncPriorityQueue; jobMap: Map; remoteClocks: ClockMap; @@ -78,9 +79,12 @@ function createJobErrorCatcher< await fn(docId, ...args); } catch (err) { if (err instanceof Error) { - throw new Error( + const wrapped = new Error( `Error in job "${k}": ${err.stack || err.message}` ); + wrapped.name = err.name; + (wrapped as Error & { cause?: unknown }).cause = err; + throw wrapped; } else { throw err; } @@ -91,6 +95,14 @@ function createJobErrorCatcher< ) as Jobs; } +function isRemotePermissionError(error: unknown) { + if (!(error instanceof Error)) { + return false; + } + const name = error.name.toUpperCase(); + return name === 'DOC_ACTION_DENIED' || name === 'SPACE_ACCESS_DENIED'; +} + function isEqualUint8Arrays(a: Uint8Array, b: Uint8Array) { if (a.length !== b.length) { return false; @@ -155,6 +167,7 @@ export class DocSyncPeer { private status: Status = { docs: new Set(), connectedDocs: new Set(), + docErrors: new Map(), jobDocQueue: new AsyncPriorityQueue(), jobMap: new Map(), remoteClocks: new ClockMap(new Map()), @@ -165,6 +178,14 @@ export class DocSyncPeer { }; private readonly statusUpdatedSubject$ = new Subject(); + private get currentErrorMessage() { + return ( + this.status.errorMessage ?? + this.status.docErrors.values().next().value ?? + null + ); + } + peerState$ = new Observable(subscribe => { const next = () => { if (this.status.skipped) { @@ -182,7 +203,7 @@ export class DocSyncPeer { syncing: this.status.docs.size, synced: false, retrying: this.status.retrying, - errorMessage: this.status.errorMessage, + errorMessage: this.currentErrorMessage, }); } else { const syncing = this.status.jobMap.size; @@ -190,8 +211,8 @@ export class DocSyncPeer { total: this.status.docs.size, syncing: syncing, retrying: this.status.retrying, - errorMessage: this.status.errorMessage, - synced: syncing === 0, + errorMessage: this.currentErrorMessage, + synced: syncing === 0 && this.status.docErrors.size === 0, }); } }; @@ -211,6 +232,7 @@ export class DocSyncPeer { docState$(docId: string) { return new Observable(subscribe => { const next = () => { + const docErrorMessage = this.status.docErrors.get(docId) ?? null; if (this.status.skipped) { subscribe.next({ syncing: false, @@ -218,14 +240,16 @@ export class DocSyncPeer { retrying: false, errorMessage: null, }); + return; } subscribe.next({ syncing: - !this.status.connectedDocs.has(docId) || - this.status.jobMap.has(docId), - synced: !this.status.jobMap.has(docId), + !docErrorMessage && + (!this.status.connectedDocs.has(docId) || + this.status.jobMap.has(docId)), + synced: !docErrorMessage && !this.status.jobMap.has(docId), retrying: this.status.retrying, - errorMessage: this.status.errorMessage, + errorMessage: docErrorMessage ?? this.status.errorMessage, }); }; next(); @@ -469,6 +493,9 @@ export class DocSyncPeer { private readonly actions = { updateRemoteClock: (docId: string, remoteClock: Date) => { + if (this.status.docErrors.has(docId)) { + return; + } this.status.remoteClocks.setIfBigger(docId, remoteClock); this.statusUpdatedSubject$.next(docId); }, @@ -494,6 +521,10 @@ export class DocSyncPeer { update: Uint8Array; clock: Date; }) => { + if (this.status.docErrors.has(docId)) { + return; + } + // try add doc for new doc this.actions.addDoc(docId); @@ -514,6 +545,10 @@ export class DocSyncPeer { update: Uint8Array; remoteClock: Date; }) => { + if (this.status.docErrors.has(docId)) { + return; + } + // try add doc for new doc this.actions.addDoc(docId); this.actions.updateRemoteClock(docId, remoteClock); @@ -530,33 +565,45 @@ export class DocSyncPeer { async mainLoop(signal?: AbortSignal) { while (true) { + let shouldRetry = true; try { await this.retryLoop(signal); } catch (err) { if (signal?.aborted) { return; } - console.warn('Sync error, retry in 5s', err); + shouldRetry = !isRemotePermissionError(err); + console.warn( + shouldRetry + ? 'Sync error, retry in 5s' + : 'Sync stopped due to remote permission error', + err + ); this.status.errorMessage = err instanceof Error ? err.message : `${err}`; + this.status.retrying = shouldRetry; this.statusUpdatedSubject$.next(true); } finally { // reset all status this.status = { docs: new Set(), connectedDocs: new Set(), + docErrors: new Map(), jobDocQueue: new AsyncPriorityQueue(), jobMap: new Map(), remoteClocks: new ClockMap(new Map()), syncing: false, skipped: false, // tell ui to show retrying status - retrying: true, + retrying: shouldRetry, // error message from last retry errorMessage: this.status.errorMessage, }; this.statusUpdatedSubject$.next(true); } + if (!shouldRetry) { + return; + } // wait for 5s before next retry await Promise.race([ new Promise(resolve => { @@ -725,29 +772,53 @@ export class DocSyncPeer { const connect = remove(jobs, j => j.type === 'connect'); if (connect && connect.length > 0) { - await this.jobs.connect(docId, signal); + if ( + !(await this.runRemoteDocJob(docId, () => + this.jobs.connect(docId, signal) + )) + ) { + break; + } continue; } const pullAndPush = remove(jobs, j => j.type === 'pullAndPush'); if (pullAndPush && pullAndPush.length > 0) { - await this.jobs.pullAndPush(docId, signal); + if ( + !(await this.runRemoteDocJob(docId, () => + this.jobs.pullAndPush(docId, signal) + )) + ) { + break; + } continue; } const pull = remove(jobs, j => j.type === 'pull'); if (pull && pull.length > 0) { - await this.jobs.pull(docId, signal); + if ( + !(await this.runRemoteDocJob(docId, () => + this.jobs.pull(docId, signal) + )) + ) { + break; + } continue; } const push = remove(jobs, j => j.type === 'push'); if (push && push.length > 0) { - await this.jobs.push( - docId, - push as (Job & { type: 'push' })[], - signal - ); + if ( + !(await this.runRemoteDocJob(docId, () => + this.jobs.push( + docId, + push as (Job & { type: 'push' })[], + signal + ) + )) + ) { + break; + } continue; } @@ -771,7 +842,40 @@ export class DocSyncPeer { } } + private async runRemoteDocJob(docId: string, job: () => Promise) { + try { + await job(); + return true; + } catch (error) { + if (!isRemotePermissionError(error)) { + throw error; + } + + const message = error instanceof Error ? error.message : String(error); + console.warn('Sync skipped for doc due to remote permission error', { + docId, + error, + }); + this.status.docErrors.set(docId, message); + this.status.connectedDocs.delete(docId); + this.status.jobMap.delete(docId); + this.statusUpdatedSubject$.next(docId); + this.statusUpdatedSubject$.next(true); + return false; + } + } + private schedule(job: Job) { + if ( + this.status.docErrors.has(job.docId) && + (job.type === 'connect' || + job.type === 'push' || + job.type === 'pull' || + job.type === 'pullAndPush') + ) { + return; + } + const priority = this.prioritySettings.get(job.docId) ?? 0; this.status.jobDocQueue.push(job.docId, priority);