fix(server): sync permission check (#15123)

fix #15121



#### PR Dependency Tree


* **PR #15123** 👈

This tree was auto-generated by
[Charcoal](https://github.com/danerwilliams/charcoal)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **Security Improvements**
* Enforced document-level `Doc.Read`/`Doc.Update` checks for key sync
websocket operations, including filtering workspace doc timestamp
results to only readable documents.
* Improved remote permission handling: once a remote denies access,
syncing stops for the affected document and retry behavior is
suppressed.
* **Improvements**
* `delete-doc` now relies on server acknowledgment and returns an
explicit `{ success: true }`.
* Websocket acknowledgment errors are now normalized for consistent
error details.
* **Tests**
* Expanded permission-denied and websocket error-handling coverage,
including timestamp filtering and no-retry behavior after permission
denial.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
DarkSky
2026-06-18 02:43:25 +08:00
committed by GitHub
parent da7781a751
commit 1256d66938
6 changed files with 666 additions and 42 deletions
@@ -176,6 +176,31 @@ function createYjsUpdateBase64() {
return Buffer.from(update).toString('base64');
}
async function createSnapshot(
db: PrismaClient,
input: {
workspaceId: string;
docId: string;
userId: string;
blob?: Buffer;
state?: Buffer;
updatedAt?: Date;
}
) {
await db.snapshot.create({
data: {
id: input.docId,
workspaceId: input.workspaceId,
blob: input.blob ?? Buffer.from([1, 1]),
state: input.state ?? Buffer.from([1, 1]),
createdAt: input.updatedAt ?? new Date(),
updatedAt: input.updatedAt ?? new Date(),
createdBy: input.userId,
updatedBy: input.userId,
},
});
}
async function ensureSyncActiveUsersTable(db: PrismaClient) {
await db.$executeRawUnsafe(`
CREATE TABLE IF NOT EXISTS sync_active_users_minutely (
@@ -612,17 +637,10 @@ test('workspace sync delete-doc should enforce doc permissions', async t => {
}
);
await models.doc.setDefaultRole(workspace.id, docId, DocRole.None);
await db.snapshot.create({
data: {
id: docId,
workspaceId: workspace.id,
blob: Buffer.from([1, 1]),
state: Buffer.from([1, 1]),
createdAt: new Date(),
updatedAt: new Date(),
createdBy: owner.id,
updatedBy: owner.id,
},
await createSnapshot(db, {
workspaceId: workspace.id,
docId,
userId: owner.id,
});
const socket = createClient(url, cookieHeader);
@@ -657,3 +675,206 @@ test('workspace sync delete-doc should enforce doc permissions', async t => {
socket.disconnect();
}
});
test('workspace sync load-doc should enforce doc read permissions', async t => {
const db = app.get(PrismaClient);
const models = app.get(Models);
const { user: owner } = await login(app);
const { user: collaborator, cookieHeader } = await login(app);
const workspace = await models.workspace.create(owner.id);
const docId = 'private-load-doc';
await models.workspaceUser.set(
workspace.id,
collaborator.id,
WorkspaceRole.Collaborator,
{
status: WorkspaceMemberStatus.Accepted,
}
);
await models.doc.setDefaultRole(workspace.id, docId, DocRole.None);
await createSnapshot(db, {
workspaceId: workspace.id,
docId,
userId: owner.id,
});
const socket = createClient(url, cookieHeader);
try {
await waitForConnect(socket);
const join = unwrapResponse(
t,
await emitWithAck<{ clientId: string; success: boolean }>(
socket,
'space:join',
{
spaceType: 'workspace',
spaceId: workspace.id,
clientVersion: '0.26.0',
}
)
);
t.true(join.success);
const error = getErrorResponse(
t,
await emitWithAck(socket, 'space:load-doc', {
spaceType: 'workspace',
spaceId: workspace.id,
docId,
})
);
t.true(error.message.includes('Doc.Read'));
} finally {
socket.disconnect();
}
});
test('workspace sync push-doc-update should enforce doc update permissions', async t => {
const db = app.get(PrismaClient);
const models = app.get(Models);
const { user: owner } = await login(app);
const { user: collaborator, cookieHeader } = await login(app);
const workspace = await models.workspace.create(owner.id);
const docId = 'readonly-push-doc';
await models.workspaceUser.set(
workspace.id,
collaborator.id,
WorkspaceRole.Collaborator,
{
status: WorkspaceMemberStatus.Accepted,
}
);
await models.doc.setDefaultRole(workspace.id, docId, DocRole.None);
await models.docUser.set(
workspace.id,
docId,
collaborator.id,
DocRole.Reader
);
await createSnapshot(db, {
workspaceId: workspace.id,
docId,
userId: owner.id,
});
const socket = createClient(url, cookieHeader);
try {
await waitForConnect(socket);
const join = unwrapResponse(
t,
await emitWithAck<{ clientId: string; success: boolean }>(
socket,
'space:join',
{
spaceType: 'workspace',
spaceId: workspace.id,
clientVersion: '0.26.0',
}
)
);
t.true(join.success);
const error = getErrorResponse(
t,
await emitWithAck(socket, 'space:push-doc-update', {
spaceType: 'workspace',
spaceId: workspace.id,
docId,
update: createYjsUpdateBase64(),
})
);
t.true(error.message.includes('Doc.Update'));
const updates = await db.update.count({
where: {
workspaceId: workspace.id,
id: docId,
},
});
t.is(updates, 0);
} finally {
socket.disconnect();
}
});
test('workspace sync load-doc-timestamps should filter unreadable docs', async t => {
const db = app.get(PrismaClient);
const models = app.get(Models);
const { user: owner } = await login(app);
const { user: collaborator, cookieHeader } = await login(app);
const workspace = await models.workspace.create(owner.id);
const privateDocId = 'private-timestamp-doc';
const readableDocId = 'readable-timestamp-doc';
await models.workspaceUser.set(
workspace.id,
collaborator.id,
WorkspaceRole.Collaborator,
{
status: WorkspaceMemberStatus.Accepted,
}
);
await models.doc.setDefaultRole(workspace.id, privateDocId, DocRole.None);
await models.doc.setDefaultRole(workspace.id, readableDocId, DocRole.None);
await models.docUser.set(
workspace.id,
readableDocId,
collaborator.id,
DocRole.Reader
);
await createSnapshot(db, {
workspaceId: workspace.id,
docId: privateDocId,
userId: owner.id,
updatedAt: new Date('2026-01-01T00:00:00.000Z'),
});
await createSnapshot(db, {
workspaceId: workspace.id,
docId: readableDocId,
userId: owner.id,
updatedAt: new Date('2026-01-02T00:00:00.000Z'),
});
const socket = createClient(url, cookieHeader);
try {
await waitForConnect(socket);
const join = unwrapResponse(
t,
await emitWithAck<{ clientId: string; success: boolean }>(
socket,
'space:join',
{
spaceType: 'workspace',
spaceId: workspace.id,
clientVersion: '0.26.0',
}
)
);
t.true(join.success);
const timestamps = unwrapResponse(
t,
await emitWithAck<Record<string, number>>(
socket,
'space:load-doc-timestamps',
{
spaceType: 'workspace',
spaceId: workspace.id,
}
)
);
t.false(privateDocId in timestamps);
t.true(readableDocId in timestamps);
} finally {
socket.disconnect();
}
});
@@ -633,6 +633,7 @@ export class SpaceSyncGateway
@SubscribeMessage('space:load-doc')
async onLoadSpaceDoc(
@ConnectedSocket() client: Socket,
@CurrentUser() user: CurrentUser,
@MessageBody()
{ spaceType, spaceId, docId, stateVector }: LoadDocMessage
): Promise<
@@ -641,6 +642,13 @@ export class SpaceSyncGateway
const id = new DocID(docId, spaceId);
const adapter = this.selectAdapter(client, spaceType);
adapter.assertIn(spaceId);
await this.assertDocActionAllowed(
spaceType,
user.id,
spaceId,
id.guid,
'Doc.Read'
);
const doc = await adapter.diff(
spaceId,
@@ -666,7 +674,7 @@ export class SpaceSyncGateway
@ConnectedSocket() client: Socket,
@CurrentUser() user: CurrentUser,
@MessageBody() { spaceType, spaceId, docId }: DeleteDocMessage
) {
): Promise<EventResponse<{ success: true }>> {
const adapter = this.selectAdapter(client, spaceType);
await this.assertDocActionAllowed(
spaceType,
@@ -676,6 +684,7 @@ export class SpaceSyncGateway
'Doc.Delete'
);
await adapter.delete(spaceId, docId);
return { data: { success: true } };
}
/**
@@ -692,8 +701,13 @@ export class SpaceSyncGateway
const adapter = this.selectAdapter(client, spaceType);
// Quota recovery mode is intentionally not applied to sync in this phase.
// TODO(@forehalo): enable after frontend supporting doc revert
// await this.ac.user(user.id).doc(spaceId, docId).assert('Doc.Update');
await this.assertDocActionAllowed(
spaceType,
user.id,
spaceId,
docId,
'Doc.Update'
);
const timestamp = await adapter.push(
spaceId,
docId,
@@ -740,15 +754,32 @@ export class SpaceSyncGateway
@SubscribeMessage('space:load-doc-timestamps')
async onLoadDocTimestamps(
@ConnectedSocket() client: Socket,
@CurrentUser() user: CurrentUser,
@MessageBody()
{ spaceType, spaceId, timestamp }: LoadDocTimestampsMessage
): Promise<EventResponse<Record<string, number>>> {
const adapter = this.selectAdapter(client, spaceType);
const stats = await adapter.getTimestamps(spaceId, timestamp);
if (!stats || spaceType === SpaceType.Userspace) {
return {
data: stats ?? {},
};
}
const readableDocs = await this.ac
.user(user.id)
.workspace(spaceId)
.docs(
Object.keys(stats).map(docId => ({ docId })),
'Doc.Read'
);
const readableDocIds = new Set(readableDocs.map(doc => doc.docId));
return {
data: stats ?? {},
data: Object.fromEntries(
Object.entries(stats).filter(([docId]) => readableDocIds.has(docId))
),
};
}
@@ -33,6 +33,7 @@ import {
SpaceStorage,
} from '../storage';
import { Sync } from '../sync';
import { DocSyncPeer } from '../sync/doc/peer';
import { IndexerSyncImpl } from '../sync/indexer';
import { expectYjsEqual } from './utils';
@@ -112,6 +113,64 @@ class TestDocStorage implements DocStorage {
}
}
class PermissionDeniedRemoteDocStorage implements DocStorage {
readonly storageType = 'doc' as const;
readonly connection = new DummyConnection();
readonly isReadonly = false;
pushCount = 0;
constructor(readonly spaceId: string) {}
async getDoc(_docId: string): Promise<DocRecord | null> {
return null;
}
async getDocDiff(
_docId: string,
_state?: Uint8Array
): Promise<DocDiff | null> {
return null;
}
async pushDocUpdate(_update: DocUpdate): Promise<DocClock> {
this.pushCount++;
const error = new Error('No permission to update doc');
error.name = 'DOC_ACTION_DENIED';
throw error;
}
async getDocTimestamp(_docId: string): Promise<DocClock | null> {
return null;
}
async getDocTimestamps(): Promise<DocClocks> {
return {};
}
async deleteDoc(_docId: string): Promise<void> {
return;
}
subscribeDocUpdate(_callback: (update: DocRecord, origin?: string) => void) {
return () => {};
}
}
class PermissionDeniedConnection extends DummyConnection {
waitCount = 0;
override async waitForConnected(_signal?: AbortSignal): Promise<void> {
this.waitCount++;
const error = new Error('No permission to access space');
error.name = 'SPACE_ACCESS_DENIED';
throw error;
}
}
class PermissionDeniedConnectionDocStorage extends PermissionDeniedRemoteDocStorage {
override readonly connection = new PermissionDeniedConnection();
}
class TrackingIndexerStorage extends IndexerStorageBase {
override readonly connection = new DummyConnection();
override readonly isReadonly = false;
@@ -425,6 +484,201 @@ test('blob', async () => {
}
});
test('doc sync peer stops retrying a doc when remote denies permission', async () => {
const local = new IndexedDBDocStorage({
id: 'ws-denied',
flavour: 'local-denied',
type: 'workspace',
});
const syncMetadata = new IndexedDBDocSyncStorage({
id: 'ws-denied',
flavour: 'local-denied',
type: 'workspace',
});
const remote = new PermissionDeniedRemoteDocStorage('ws-denied');
const peer = new DocSyncPeer('remote-denied', local, syncMetadata, remote);
const abort = new AbortController();
local.connection.connect();
syncMetadata.connection.connect();
await local.connection.waitForConnected();
await syncMetadata.connection.waitForConnected();
const doc = new YDoc();
doc.getMap('test').set('hello', 'world');
await local.pushDocUpdate({
docId: 'doc-denied',
bin: encodeStateAsUpdate(doc),
});
try {
void peer.mainLoop(abort.signal);
await vi.waitFor(() => {
expect(remote.pushCount).toBe(1);
});
await vi.waitFor(() => {
let state:
| {
syncing: boolean;
synced: boolean;
retrying: boolean;
errorMessage: string | null;
}
| undefined;
const dispose = peer.docState$('doc-denied').subscribe(next => {
state = next;
});
dispose.unsubscribe();
expect(state).toMatchObject({
syncing: false,
synced: false,
retrying: false,
errorMessage: expect.stringContaining('No permission'),
});
});
await vi.waitFor(() => {
let state:
| {
synced: boolean;
errorMessage: string | null;
}
| undefined;
const dispose = peer.peerState$.subscribe(next => {
state = next;
});
dispose.unsubscribe();
expect(state).toMatchObject({
synced: false,
errorMessage: expect.stringContaining('No permission'),
});
});
await new Promise(resolve => setTimeout(resolve, 1200));
expect(remote.pushCount).toBe(1);
} finally {
abort.abort();
local.connection.disconnect();
syncMetadata.connection.disconnect();
}
});
test('doc sync peer stops retrying when remote connection denies permission', async () => {
const local = new IndexedDBDocStorage({
id: 'ws-connection-denied',
flavour: 'local-connection-denied',
type: 'workspace',
});
const syncMetadata = new IndexedDBDocSyncStorage({
id: 'ws-connection-denied',
flavour: 'local-connection-denied',
type: 'workspace',
});
const remote = new PermissionDeniedConnectionDocStorage(
'ws-connection-denied'
);
const peer = new DocSyncPeer(
'remote-connection-denied',
local,
syncMetadata,
remote
);
const abort = new AbortController();
local.connection.connect();
syncMetadata.connection.connect();
await local.connection.waitForConnected();
await syncMetadata.connection.waitForConnected();
try {
void peer.mainLoop(abort.signal);
await vi.waitFor(() => {
expect(remote.connection.waitCount).toBe(1);
});
await vi.waitFor(() => {
let state:
| {
retrying: boolean;
errorMessage: string | null;
}
| undefined;
const dispose = peer.peerState$.subscribe(next => {
state = next;
});
dispose.unsubscribe();
expect(state).toMatchObject({
retrying: false,
errorMessage: expect.stringContaining('No permission'),
});
});
await new Promise(resolve => setTimeout(resolve, 1200));
expect(remote.connection.waitCount).toBe(1);
} finally {
abort.abort();
local.connection.disconnect();
syncMetadata.connection.disconnect();
}
});
test('doc sync peer resolves on terminal permission error without abort signal', async () => {
const local = new IndexedDBDocStorage({
id: 'ws-connection-denied-no-signal',
flavour: 'local-connection-denied-no-signal',
type: 'workspace',
});
const syncMetadata = new IndexedDBDocSyncStorage({
id: 'ws-connection-denied-no-signal',
flavour: 'local-connection-denied-no-signal',
type: 'workspace',
});
const remote = new PermissionDeniedConnectionDocStorage(
'ws-connection-denied-no-signal'
);
const peer = new DocSyncPeer(
'remote-connection-denied-no-signal',
local,
syncMetadata,
remote
);
local.connection.connect();
syncMetadata.connection.connect();
await local.connection.waitForConnected();
await syncMetadata.connection.waitForConnected();
try {
await expect(peer.mainLoop()).resolves.toBeUndefined();
expect(remote.connection.waitCount).toBe(1);
let state:
| {
retrying: boolean;
errorMessage: string | null;
}
| undefined;
const dispose = peer.peerState$.subscribe(next => {
state = next;
});
dispose.unsubscribe();
expect(state).toMatchObject({
retrying: false,
errorMessage: expect.stringContaining('No permission'),
});
} finally {
local.connection.disconnect();
syncMetadata.connection.disconnect();
}
});
test('indexer defers indexed clock persistence until a refresh happens on delayed refresh storages', async () => {
const calls: string[] = [];
const docsInRootDoc = new Map([['doc1', { title: 'Doc 1' }]]);
+19 -8
View File
@@ -22,6 +22,12 @@ interface CloudDocStorageOptions extends DocStorageOptions {
type: SpaceType;
}
function createWebsocketError(error: { name: string; message: string }) {
const err = new Error(error.message);
err.name = error.name;
return err;
}
export class CloudDocStorage extends DocStorageBase<CloudDocStorageOptions> {
static readonly identifier = 'CloudDocStorage';
@@ -88,7 +94,7 @@ export class CloudDocStorage extends DocStorageBase<CloudDocStorageOptions> {
return null;
}
// TODO: use [UserFriendlyError]
throw new Error(response.error.message);
throw createWebsocketError(response.error);
}
return {
@@ -111,7 +117,7 @@ export class CloudDocStorage extends DocStorageBase<CloudDocStorageOptions> {
return null;
}
// TODO: use [UserFriendlyError]
throw new Error(response.error.message);
throw createWebsocketError(response.error);
}
return {
@@ -132,7 +138,7 @@ export class CloudDocStorage extends DocStorageBase<CloudDocStorageOptions> {
if ('error' in response) {
// TODO(@forehalo): use [UserFriendlyError]
throw new Error(response.error.message);
throw createWebsocketError(response.error);
}
return {
@@ -153,7 +159,7 @@ export class CloudDocStorage extends DocStorageBase<CloudDocStorageOptions> {
if ('error' in response) {
// TODO: use [UserFriendlyError]
throw new Error(response.error.message);
throw createWebsocketError(response.error);
}
return {
@@ -174,7 +180,7 @@ export class CloudDocStorage extends DocStorageBase<CloudDocStorageOptions> {
if ('error' in response) {
// TODO(@forehalo): use [UserFriendlyError]
throw new Error(response.error.message);
throw createWebsocketError(response.error);
}
return Object.entries(response.data).reduce((ret, [docId, timestamp]) => {
@@ -184,11 +190,16 @@ export class CloudDocStorage extends DocStorageBase<CloudDocStorageOptions> {
}
override async deleteDoc(docId: string) {
this.socket.emit('space:delete-doc', {
const response = await this.socket.emitWithAck('space:delete-doc', {
spaceType: this.spaceType,
spaceId: this.spaceId,
docId: this.idConverter.newIdToOldId(docId),
});
if ('error' in response) {
// TODO(@forehalo): use [UserFriendlyError]
throw createWebsocketError(response.error);
}
}
protected async setDocSnapshot() {
@@ -224,7 +235,7 @@ class CloudDocStorageConnection extends SocketConnection {
});
if ('error' in res) {
throw new Error(res.error.message);
throw createWebsocketError(res.error);
}
if (!this.idConverter) {
@@ -272,7 +283,7 @@ class CloudDocStorageConnection extends SocketConnection {
return null;
}
// TODO: use [UserFriendlyError]
throw new Error(response.error.message);
throw createWebsocketError(response.error);
}
return base64ToUint8Array(response.data.missing);
@@ -121,7 +121,10 @@ interface ClientEvents {
timestamp: number;
},
];
'space:delete-doc': { spaceType: string; spaceId: string; docId: string };
'space:delete-doc': [
{ spaceType: string; spaceId: string; docId: string },
{ success?: true },
];
'telemetry:batch': [TelemetryBatch, TelemetryAck];
+122 -18
View File
@@ -38,6 +38,7 @@ type Job =
interface Status {
docs: Set<string>;
connectedDocs: Set<string>;
docErrors: Map<string, string>;
jobDocQueue: AsyncPriorityQueue;
jobMap: Map<string, Job[]>;
remoteClocks: ClockMap;
@@ -78,9 +79,12 @@ function createJobErrorCatcher<
await fn(docId, ...args);
} catch (err) {
if (err instanceof Error) {
throw new Error(
const wrapped = new Error(
`Error in job "${k}": ${err.stack || err.message}`
);
wrapped.name = err.name;
(wrapped as Error & { cause?: unknown }).cause = err;
throw wrapped;
} else {
throw err;
}
@@ -91,6 +95,14 @@ function createJobErrorCatcher<
) as Jobs;
}
function isRemotePermissionError(error: unknown) {
if (!(error instanceof Error)) {
return false;
}
const name = error.name.toUpperCase();
return name === 'DOC_ACTION_DENIED' || name === 'SPACE_ACCESS_DENIED';
}
function isEqualUint8Arrays(a: Uint8Array, b: Uint8Array) {
if (a.length !== b.length) {
return false;
@@ -155,6 +167,7 @@ export class DocSyncPeer {
private status: Status = {
docs: new Set<string>(),
connectedDocs: new Set<string>(),
docErrors: new Map<string, string>(),
jobDocQueue: new AsyncPriorityQueue(),
jobMap: new Map(),
remoteClocks: new ClockMap(new Map()),
@@ -165,6 +178,14 @@ export class DocSyncPeer {
};
private readonly statusUpdatedSubject$ = new Subject<string | true>();
private get currentErrorMessage() {
return (
this.status.errorMessage ??
this.status.docErrors.values().next().value ??
null
);
}
peerState$ = new Observable<PeerState>(subscribe => {
const next = () => {
if (this.status.skipped) {
@@ -182,7 +203,7 @@ export class DocSyncPeer {
syncing: this.status.docs.size,
synced: false,
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
errorMessage: this.currentErrorMessage,
});
} else {
const syncing = this.status.jobMap.size;
@@ -190,8 +211,8 @@ export class DocSyncPeer {
total: this.status.docs.size,
syncing: syncing,
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
synced: syncing === 0,
errorMessage: this.currentErrorMessage,
synced: syncing === 0 && this.status.docErrors.size === 0,
});
}
};
@@ -211,6 +232,7 @@ export class DocSyncPeer {
docState$(docId: string) {
return new Observable<PeerDocState>(subscribe => {
const next = () => {
const docErrorMessage = this.status.docErrors.get(docId) ?? null;
if (this.status.skipped) {
subscribe.next({
syncing: false,
@@ -218,14 +240,16 @@ export class DocSyncPeer {
retrying: false,
errorMessage: null,
});
return;
}
subscribe.next({
syncing:
!this.status.connectedDocs.has(docId) ||
this.status.jobMap.has(docId),
synced: !this.status.jobMap.has(docId),
!docErrorMessage &&
(!this.status.connectedDocs.has(docId) ||
this.status.jobMap.has(docId)),
synced: !docErrorMessage && !this.status.jobMap.has(docId),
retrying: this.status.retrying,
errorMessage: this.status.errorMessage,
errorMessage: docErrorMessage ?? this.status.errorMessage,
});
};
next();
@@ -469,6 +493,9 @@ export class DocSyncPeer {
private readonly actions = {
updateRemoteClock: (docId: string, remoteClock: Date) => {
if (this.status.docErrors.has(docId)) {
return;
}
this.status.remoteClocks.setIfBigger(docId, remoteClock);
this.statusUpdatedSubject$.next(docId);
},
@@ -494,6 +521,10 @@ export class DocSyncPeer {
update: Uint8Array;
clock: Date;
}) => {
if (this.status.docErrors.has(docId)) {
return;
}
// try add doc for new doc
this.actions.addDoc(docId);
@@ -514,6 +545,10 @@ export class DocSyncPeer {
update: Uint8Array;
remoteClock: Date;
}) => {
if (this.status.docErrors.has(docId)) {
return;
}
// try add doc for new doc
this.actions.addDoc(docId);
this.actions.updateRemoteClock(docId, remoteClock);
@@ -530,33 +565,45 @@ export class DocSyncPeer {
async mainLoop(signal?: AbortSignal) {
while (true) {
let shouldRetry = true;
try {
await this.retryLoop(signal);
} catch (err) {
if (signal?.aborted) {
return;
}
console.warn('Sync error, retry in 5s', err);
shouldRetry = !isRemotePermissionError(err);
console.warn(
shouldRetry
? 'Sync error, retry in 5s'
: 'Sync stopped due to remote permission error',
err
);
this.status.errorMessage =
err instanceof Error ? err.message : `${err}`;
this.status.retrying = shouldRetry;
this.statusUpdatedSubject$.next(true);
} finally {
// reset all status
this.status = {
docs: new Set(),
connectedDocs: new Set(),
docErrors: new Map(),
jobDocQueue: new AsyncPriorityQueue(),
jobMap: new Map(),
remoteClocks: new ClockMap(new Map()),
syncing: false,
skipped: false,
// tell ui to show retrying status
retrying: true,
retrying: shouldRetry,
// error message from last retry
errorMessage: this.status.errorMessage,
};
this.statusUpdatedSubject$.next(true);
}
if (!shouldRetry) {
return;
}
// wait for 5s before next retry
await Promise.race([
new Promise<void>(resolve => {
@@ -725,29 +772,53 @@ export class DocSyncPeer {
const connect = remove(jobs, j => j.type === 'connect');
if (connect && connect.length > 0) {
await this.jobs.connect(docId, signal);
if (
!(await this.runRemoteDocJob(docId, () =>
this.jobs.connect(docId, signal)
))
) {
break;
}
continue;
}
const pullAndPush = remove(jobs, j => j.type === 'pullAndPush');
if (pullAndPush && pullAndPush.length > 0) {
await this.jobs.pullAndPush(docId, signal);
if (
!(await this.runRemoteDocJob(docId, () =>
this.jobs.pullAndPush(docId, signal)
))
) {
break;
}
continue;
}
const pull = remove(jobs, j => j.type === 'pull');
if (pull && pull.length > 0) {
await this.jobs.pull(docId, signal);
if (
!(await this.runRemoteDocJob(docId, () =>
this.jobs.pull(docId, signal)
))
) {
break;
}
continue;
}
const push = remove(jobs, j => j.type === 'push');
if (push && push.length > 0) {
await this.jobs.push(
docId,
push as (Job & { type: 'push' })[],
signal
);
if (
!(await this.runRemoteDocJob(docId, () =>
this.jobs.push(
docId,
push as (Job & { type: 'push' })[],
signal
)
))
) {
break;
}
continue;
}
@@ -771,7 +842,40 @@ export class DocSyncPeer {
}
}
private async runRemoteDocJob(docId: string, job: () => Promise<void>) {
try {
await job();
return true;
} catch (error) {
if (!isRemotePermissionError(error)) {
throw error;
}
const message = error instanceof Error ? error.message : String(error);
console.warn('Sync skipped for doc due to remote permission error', {
docId,
error,
});
this.status.docErrors.set(docId, message);
this.status.connectedDocs.delete(docId);
this.status.jobMap.delete(docId);
this.statusUpdatedSubject$.next(docId);
this.statusUpdatedSubject$.next(true);
return false;
}
}
private schedule(job: Job) {
if (
this.status.docErrors.has(job.docId) &&
(job.type === 'connect' ||
job.type === 'push' ||
job.type === 'pull' ||
job.type === 'pullAndPush')
) {
return;
}
const priority = this.prioritySettings.get(job.docId) ?? 0;
this.status.jobDocQueue.push(job.docId, priority);