feat(server): allow prefetch doc stats before sync (#6115)

This commit is contained in:
liuyi
2024-03-14 17:34:32 +00:00
parent 7fdb1f2d97
commit 79ffca314d
2 changed files with 109 additions and 24 deletions

View File

@@ -229,12 +229,12 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
update: Buffer, update: Buffer,
retryTimes = 10 retryTimes = 10
) { ) {
await new Promise<void>((resolve, reject) => { const timestamp = await new Promise<number>((resolve, reject) => {
defer(async () => { defer(async () => {
const seq = await this.getUpdateSeq(workspaceId, guid); const seq = await this.getUpdateSeq(workspaceId, guid);
await this.db.update.create({ const { createdAt } = await this.db.update.create({
select: { select: {
seq: true, createdAt: true,
}, },
data: { data: {
workspaceId, workspaceId,
@@ -243,23 +243,27 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
blob: update, blob: update,
}, },
}); });
return createdAt.getTime();
}) })
.pipe(retry(retryTimes)) // retry until seq num not conflict .pipe(retry(retryTimes)) // retry until seq num not conflict
.subscribe({ .subscribe({
next: () => { next: timestamp => {
this.logger.debug( this.logger.debug(
`pushed 1 update for ${guid} in workspace ${workspaceId}` `pushed 1 update for ${guid} in workspace ${workspaceId}`
); );
resolve(); resolve(timestamp);
}, },
error: e => { error: e => {
this.logger.error('Failed to push updates', e); this.logger.error('Failed to push updates', e);
reject(new Error('Failed to push update')); reject(new Error('Failed to push update'));
}, },
}); });
}).then(() => {
return this.updateCachedUpdatesCount(workspaceId, guid, 1);
}); });
await this.updateCachedUpdatesCount(workspaceId, guid, 1);
return timestamp;
} }
async batchPush( async batchPush(
@@ -268,24 +272,34 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
updates: Buffer[], updates: Buffer[],
retryTimes = 10 retryTimes = 10
) { ) {
const lastSeq = await this.getUpdateSeq(workspaceId, guid, updates.length);
const now = Date.now();
let timestamp = now;
await new Promise<void>((resolve, reject) => { await new Promise<void>((resolve, reject) => {
defer(async () => { defer(async () => {
const seq = await this.getUpdateSeq(workspaceId, guid, updates.length);
let turn = 0; let turn = 0;
const batchCount = 10; const batchCount = 10;
for (const batch of chunk(updates, batchCount)) { for (const batch of chunk(updates, batchCount)) {
await this.db.update.createMany({ await this.db.update.createMany({
data: batch.map((update, i) => ({ data: batch.map((update, i) => {
workspaceId, const subSeq = turn * batchCount + i + 1;
id: guid,
// `seq` is the last seq num of the batch // `seq` is the last seq num of the batch
// example for 11 batched updates, start from seq num 20 // example for 11 batched updates, start from seq num 20
// seq for first update in the batch should be: // seq for first update in the batch should be:
// 31 - 11 + 0 * 10 + 0 + 1 = 21 // 31 - 11 + subSeq(0 * 10 + 0 + 1) = 21
// ^ last seq num ^ updates.length ^ turn ^ batchCount ^i // ^ last seq num ^ updates.length ^ turn ^ batchCount ^i
seq: seq - updates.length + turn * batchCount + i + 1, const seq = lastSeq - updates.length + subSeq;
blob: update, const createdAt = now + subSeq;
})), timestamp = Math.max(timestamp, createdAt);
return {
workspaceId,
id: guid,
blob: update,
seq,
createdAt: new Date(createdAt), // make sure the updates can be ordered by create time
};
}),
}); });
turn++; turn++;
} }
@@ -303,9 +317,56 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
reject(new Error('Failed to push update')); reject(new Error('Failed to push update'));
}, },
}); });
}).then(() => {
return this.updateCachedUpdatesCount(workspaceId, guid, updates.length);
}); });
await this.updateCachedUpdatesCount(workspaceId, guid, updates.length);
return timestamp;
}
/**
* Get latest timestamp of all docs in the workspace.
*/
@CallTimer('doc', 'get_stats')
async getStats(workspaceId: string, after: number | undefined = 0) {
const snapshots = await this.db.snapshot.findMany({
where: {
workspaceId,
updatedAt: {
gt: new Date(after),
},
},
select: {
id: true,
updatedAt: true,
},
});
const updates = await this.db.update.groupBy({
where: {
workspaceId,
createdAt: {
gt: new Date(after),
},
},
by: ['id'],
_max: {
createdAt: true,
},
});
const result: Record<string, number> = {};
snapshots.forEach(s => {
result[s.id] = s.updatedAt.getTime();
});
updates.forEach(u => {
if (u._max.createdAt) {
result[u.id] = u._max.createdAt.getTime();
}
});
return result;
} }
/** /**

View File

@@ -86,7 +86,7 @@ type EventResponse<Data = any> =
}); });
@WebSocketGateway({ @WebSocketGateway({
cors: process.env.NODE_ENV !== 'production', cors: !AFFiNE.node.prod,
transports: ['websocket'], transports: ['websocket'],
// see: https://socket.io/docs/v4/server-options/#maxhttpbuffersize // see: https://socket.io/docs/v4/server-options/#maxhttpbuffersize
maxHttpBufferSize: 1e8, // 100 MB maxHttpBufferSize: 1e8, // 100 MB
@@ -251,6 +251,25 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
} }
} }
@SubscribeMessage('client-pre-sync')
async loadDocStats(
@ConnectedSocket() client: Socket,
@MessageBody()
{ workspaceId, timestamp }: { workspaceId: string; timestamp?: number }
): Promise<EventResponse<Record<string, number>>> {
if (!client.rooms.has(`${workspaceId}:sync`)) {
return {
error: new NotInWorkspaceError(workspaceId),
};
}
const stats = await this.docManager.getStats(workspaceId, timestamp);
return {
data: stats,
};
}
@SubscribeMessage('client-update-v2') @SubscribeMessage('client-update-v2')
async handleClientUpdateV2( async handleClientUpdateV2(
@MessageBody() @MessageBody()
@@ -264,7 +283,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
updates: string[]; updates: string[];
}, },
@ConnectedSocket() client: Socket @ConnectedSocket() client: Socket
): Promise<EventResponse<{ accepted: true }>> { ): Promise<EventResponse<{ accepted: true; timestamp?: number }>> {
if (!client.rooms.has(`${workspaceId}:sync`)) { if (!client.rooms.has(`${workspaceId}:sync`)) {
return { return {
error: new NotInWorkspaceError(workspaceId), error: new NotInWorkspaceError(workspaceId),
@@ -272,16 +291,21 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
} }
const docId = new DocID(guid, workspaceId); const docId = new DocID(guid, workspaceId);
const buffers = updates.map(update => Buffer.from(update, 'base64'));
const timestamp = await this.docManager.batchPush(
docId.workspace,
docId.guid,
buffers
);
client client
.to(`${docId.workspace}:sync`) .to(`${docId.workspace}:sync`)
.emit('server-updates', { workspaceId, guid, updates }); .emit('server-updates', { workspaceId, guid, updates, timestamp });
const buffers = updates.map(update => Buffer.from(update, 'base64'));
await this.docManager.batchPush(docId.workspace, docId.guid, buffers);
return { return {
data: { data: {
accepted: true, accepted: true,
timestamp,
}, },
}; };
} }