feat(server): sync data with ack (#4791)

This commit is contained in:
liuyi
2023-11-02 17:05:28 +08:00
committed by GitHub
parent af9663d3e7
commit 6a93203d68
11 changed files with 697 additions and 133 deletions

View File

@@ -1,4 +1,8 @@
import { Counter, Gauge, Summary } from 'prom-client';
import { Counter, Gauge, register, Summary } from 'prom-client';
function getOr<T>(name: string, or: () => T): T {
return (register.getSingleMetric(name) as T) || or();
}
type LabelValues<T extends string> = Partial<Record<T, string | number>>;
type MetricsCreator<T extends string> = (
@@ -14,11 +18,15 @@ export const metricsCreatorGenerator = () => {
name: string,
labelNames?: T[]
): MetricsCreator<T> => {
const counter = new Counter({
const counter = getOr(
name,
help: name,
...(labelNames ? { labelNames } : {}),
});
() =>
new Counter({
name,
help: name,
...(labelNames ? { labelNames } : {}),
})
);
return (value: number, labels: LabelValues<T>) => {
counter.inc(labels, value);
@@ -29,11 +37,15 @@ export const metricsCreatorGenerator = () => {
name: string,
labelNames?: T[]
): MetricsCreator<T> => {
const gauge = new Gauge({
const gauge = getOr(
name,
help: name,
...(labelNames ? { labelNames } : {}),
});
() =>
new Gauge({
name,
help: name,
...(labelNames ? { labelNames } : {}),
})
);
return (value: number, labels: LabelValues<T>) => {
gauge.set(labels, value);
@@ -44,11 +56,15 @@ export const metricsCreatorGenerator = () => {
name: string,
labelNames?: T[]
): TimerMetricsCreator<T> => {
const summary = new Summary({
const summary = getOr(
name,
help: name,
...(labelNames ? { labelNames } : {}),
});
() =>
new Summary({
name,
help: name,
...(labelNames ? { labelNames } : {}),
})
);
return (labels: LabelValues<T>) => {
const now = process.hrtime();
@@ -71,3 +87,68 @@ export const metricsCreatorGenerator = () => {
};
export const metricsCreator = metricsCreatorGenerator();
export const CallTimer = (
name: string,
labels: Record<string, any> = {}
): MethodDecorator => {
const timer = metricsCreator.timer(name, Object.keys(labels));
// @ts-expect-error allow
return (
_target,
_key,
desc: TypedPropertyDescriptor<(...args: any[]) => any>
) => {
const originalMethod = desc.value;
if (!originalMethod) {
return desc;
}
desc.value = function (...args: any[]) {
const endTimer = timer(labels);
let result: any;
try {
result = originalMethod.apply(this, args);
} catch (e) {
endTimer();
throw e;
}
if (result instanceof Promise) {
return result.finally(endTimer);
} else {
endTimer();
return result;
}
};
return desc;
};
};
export const CallCounter = (
name: string,
labels: Record<string, any> = {}
): MethodDecorator => {
const count = metricsCreator.counter(name, Object.keys(labels));
// @ts-expect-error allow
return (
_target,
_key,
desc: TypedPropertyDescriptor<(...args: any[]) => any>
) => {
const originalMethod = desc.value;
if (!originalMethod) {
return desc;
}
desc.value = function (...args: any[]) {
count(1, labels);
return originalMethod.apply(this, args);
};
return desc;
};
};

View File

@@ -6,6 +6,7 @@ import {
OnModuleInit,
} from '@nestjs/common';
import { Snapshot, Update } from '@prisma/client';
import { chunk } from 'lodash-es';
import { defer, retry } from 'rxjs';
import { applyUpdate, Doc, encodeStateAsUpdate, encodeStateVector } from 'yjs';
@@ -89,10 +90,10 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
protected applyUpdates(guid: string, ...updates: Buffer[]): Doc {
const doc = this.recoverDoc(...updates);
this.metrics.jwstCodecMerge(1, {});
// test jwst codec
if (this.config.doc.manager.experimentalMergeWithJwstCodec) {
this.metrics.jwstCodecMerge(1, {});
const yjsResult = Buffer.from(encodeStateAsUpdate(doc));
let log = false;
try {
@@ -163,7 +164,12 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
/**
* add update to manager for later processing.
*/
async push(workspaceId: string, guid: string, update: Buffer) {
async push(
workspaceId: string,
guid: string,
update: Buffer,
retryTimes = 10
) {
await new Promise<void>((resolve, reject) => {
defer(async () => {
const seq = await this.getUpdateSeq(workspaceId, guid);
@@ -176,7 +182,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
},
});
})
.pipe(retry(MAX_SEQ_NUM)) // retry until seq num not conflict
.pipe(retry(retryTimes)) // retry until seq num not conflict
.subscribe({
next: () => {
this.logger.verbose(
@@ -184,7 +190,54 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
);
resolve();
},
error: reject,
error: e => {
this.logger.error('Failed to push updates', e);
reject(new Error('Failed to push update'));
},
});
});
}
async batchPush(
workspaceId: string,
guid: string,
updates: Buffer[],
retryTimes = 10
) {
await new Promise<void>((resolve, reject) => {
defer(async () => {
const seq = await this.getUpdateSeq(workspaceId, guid, updates.length);
let turn = 0;
const batchCount = 10;
for (const batch of chunk(updates, batchCount)) {
await this.db.update.createMany({
data: batch.map((update, i) => ({
workspaceId,
id: guid,
// `seq` is the last seq num of the batch
// example for 11 batched updates, start from seq num 20
// seq for first update in the batch should be:
// 31 - 11 + 0 * 10 + 0 + 1 = 21
// ^ last seq num ^ updates.length ^ turn ^ batchCount ^i
seq: seq - updates.length + turn * batchCount + i + 1,
blob: update,
})),
});
turn++;
}
})
.pipe(retry(retryTimes)) // retry until seq num not conflict
.subscribe({
next: () => {
this.logger.verbose(
`pushed updates for workspace: ${workspaceId}, guid: ${guid}`
);
resolve();
},
error: e => {
this.logger.error('Failed to push updates', e);
reject(new Error('Failed to push update'));
},
});
});
}
@@ -370,7 +423,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
return doc;
}
private async getUpdateSeq(workspaceId: string, guid: string) {
private async getUpdateSeq(workspaceId: string, guid: string, batch = 1) {
try {
const { seq } = await this.db.snapshot.update({
select: {
@@ -384,13 +437,13 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
},
data: {
seq: {
increment: 1,
increment: batch,
},
},
});
// reset
if (seq === MAX_SEQ_NUM) {
if (seq >= MAX_SEQ_NUM) {
await this.db.snapshot.update({
where: {
id_workspaceId: {
@@ -406,9 +459,10 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
return seq;
} catch {
// not existing snapshot just count it from 1
const last = this.seqMap.get(workspaceId + guid) ?? 0;
this.seqMap.set(workspaceId + guid, last + 1);
return last + 1;
this.seqMap.set(workspaceId + guid, last + batch);
return last + batch;
}
}
}

View File

@@ -0,0 +1,81 @@
enum EventErrorCode {
WORKSPACE_NOT_FOUND = 'WORKSPACE_NOT_FOUND',
DOC_NOT_FOUND = 'DOC_NOT_FOUND',
NOT_IN_WORKSPACE = 'NOT_IN_WORKSPACE',
ACCESS_DENIED = 'ACCESS_DENIED',
INTERNAL = 'INTERNAL',
VERSION_REJECTED = 'VERSION_REJECTED',
}
// Such errore are generally raised from the gateway handling to user,
// the stack must be full of internal code,
// so there is no need to inherit from `Error` class.
export class EventError {
constructor(
public readonly code: EventErrorCode,
public readonly message: string
) {}
toJSON() {
return {
code: this.code,
message: this.message,
};
}
}
export class WorkspaceNotFoundError extends EventError {
constructor(public readonly workspaceId: string) {
super(
EventErrorCode.WORKSPACE_NOT_FOUND,
`You are trying to access an unknown workspace ${workspaceId}.`
);
}
}
export class DocNotFoundError extends EventError {
constructor(
public readonly workspaceId: string,
public readonly docId: string
) {
super(
EventErrorCode.DOC_NOT_FOUND,
`You are trying to access an unknown doc ${docId} under workspace ${workspaceId}.`
);
}
}
export class NotInWorkspaceError extends EventError {
constructor(public readonly workspaceId: string) {
super(
EventErrorCode.NOT_IN_WORKSPACE,
`You should join in workspace ${workspaceId} before broadcasting messages.`
);
}
}
export class AccessDeniedError extends EventError {
constructor(public readonly workspaceId: string) {
super(
EventErrorCode.ACCESS_DENIED,
`You have no permission to access workspace ${workspaceId}.`
);
}
}
export class InternalError extends EventError {
constructor(public readonly error: Error) {
super(EventErrorCode.INTERNAL, `Internal error happened: ${error.message}`);
}
}
export class VersionRejectedError extends EventError {
constructor(public readonly version: number) {
super(
EventErrorCode.VERSION_REJECTED,
// TODO: Too general error message,
// need to be more specific when versioning system is implemented.
`The version ${version} is rejected by server.`
);
}
}

View File

@@ -1,10 +1,10 @@
import { Logger } from '@nestjs/common';
import { applyDecorators, Logger } from '@nestjs/common';
import {
ConnectedSocket,
MessageBody,
OnGatewayConnection,
OnGatewayDisconnect,
SubscribeMessage,
SubscribeMessage as RawSubscribeMessage,
WebSocketGateway,
WebSocketServer,
} from '@nestjs/websockets';
@@ -12,12 +12,40 @@ import { Server, Socket } from 'socket.io';
import { encodeStateAsUpdate, encodeStateVector } from 'yjs';
import { Metrics } from '../../../metrics/metrics';
import { CallCounter, CallTimer } from '../../../metrics/utils';
import { DocID } from '../../../utils/doc';
import { Auth, CurrentUser } from '../../auth';
import { DocManager } from '../../doc';
import { UserType } from '../../users';
import { PermissionService } from '../../workspaces/permission';
import { Permission } from '../../workspaces/types';
import {
AccessDeniedError,
DocNotFoundError,
EventError,
InternalError,
NotInWorkspaceError,
WorkspaceNotFoundError,
} from './error';
const SubscribeMessage = (event: string) =>
applyDecorators(
CallCounter('socket_io_counter', { event }),
CallTimer('socket_io_timer', { event }),
RawSubscribeMessage(event)
);
type EventResponse<Data = any> =
| {
error: EventError;
}
| (Data extends never
? {
data?: never;
}
: {
data: Data;
});
@WebSocketGateway({
cors: process.env.NODE_ENV !== 'production',
@@ -52,38 +80,50 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
@CurrentUser() user: UserType,
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
) {
this.metric.socketIOEventCounter(1, { event: 'client-handshake' });
const endTimer = this.metric.socketIOEventTimer({
event: 'client-handshake',
});
): Promise<EventResponse<{ clientId: string }>> {
const canWrite = await this.permissions.tryCheck(
workspaceId,
user.id,
Permission.Write
);
if (canWrite) await client.join(workspaceId);
endTimer();
return canWrite;
if (canWrite) {
await client.join(workspaceId);
return {
data: {
clientId: client.id,
},
};
} else {
return {
error: new AccessDeniedError(workspaceId),
};
}
}
@SubscribeMessage('client-leave')
async handleClientLeave(
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
) {
this.metric.socketIOEventCounter(1, { event: 'client-leave' });
const endTimer = this.metric.socketIOEventTimer({
event: 'client-leave',
});
await client.leave(workspaceId);
endTimer();
): Promise<EventResponse> {
if (client.rooms.has(workspaceId)) {
await client.leave(workspaceId);
return {};
} else {
return {
error: new NotInWorkspaceError(workspaceId),
};
}
}
/**
* This is the old version of the `client-update` event without any data protocol.
* It only exists for backwards compatibility to adapt older clients.
*
* @deprecated
*/
@SubscribeMessage('client-update')
async handleClientUpdate(
async handleClientUpdateV1(
@MessageBody()
{
workspaceId,
@@ -96,31 +136,37 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
},
@ConnectedSocket() client: Socket
) {
this.metric.socketIOEventCounter(1, { event: 'client-update' });
const endTimer = this.metric.socketIOEventTimer({ event: 'client-update' });
if (!client.rooms.has(workspaceId)) {
this.logger.verbose(
`Client ${client.id} tried to push update to workspace ${workspaceId} without joining it first`
);
endTimer();
return;
}
const docId = new DocID(guid, workspaceId);
client
.to(docId.workspace)
.emit('server-update', { workspaceId, guid, update });
const buf = Buffer.from(update, 'base64');
// broadcast to all clients with newer version that only listen to `server-updates`
client
.to(docId.workspace)
.emit('server-updates', { workspaceId, guid, updates: [update] });
const buf = Buffer.from(update, 'base64');
await this.docManager.push(docId.workspace, docId.guid, buf);
endTimer();
}
/**
* This is the old version of the `doc-load` event without any data protocol.
* It only exists for backwards compatibility to adapt older clients.
*
* @deprecated
*/
@Auth()
@SubscribeMessage('doc-load')
async loadDoc(
async loadDocV1(
@ConnectedSocket() client: Socket,
@CurrentUser() user: UserType,
@MessageBody()
@@ -134,12 +180,9 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
stateVector?: string;
}
): Promise<{ missing: string; state?: string } | false> {
this.metric.socketIOEventCounter(1, { event: 'doc-load' });
const endTimer = this.metric.socketIOEventTimer({ event: 'doc-load' });
if (!client.rooms.has(workspaceId)) {
const canRead = await this.permissions.tryCheck(workspaceId, user.id);
if (!canRead) {
endTimer();
return false;
}
}
@@ -148,7 +191,6 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
const doc = await this.docManager.get(docId.workspace, docId.guid);
if (!doc) {
endTimer();
return false;
}
@@ -160,53 +202,138 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
).toString('base64');
const state = Buffer.from(encodeStateVector(doc)).toString('base64');
endTimer();
return {
missing,
state,
};
}
@SubscribeMessage('client-update-v2')
async handleClientUpdateV2(
@MessageBody()
{
workspaceId,
guid,
updates,
}: {
workspaceId: string;
guid: string;
updates: string[];
},
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ accepted: true }>> {
if (!client.rooms.has(workspaceId)) {
return {
error: new NotInWorkspaceError(workspaceId),
};
}
try {
const docId = new DocID(guid, workspaceId);
client
.to(docId.workspace)
.emit('server-updates', { workspaceId, guid, updates });
const buffers = updates.map(update => Buffer.from(update, 'base64'));
await this.docManager.batchPush(docId.workspace, docId.guid, buffers);
return {
data: {
accepted: true,
},
};
} catch (e) {
return {
error: new InternalError(e as Error),
};
}
}
@Auth()
@SubscribeMessage('doc-load-v2')
async loadDocV2(
@ConnectedSocket() client: Socket,
@CurrentUser() user: UserType,
@MessageBody()
{
workspaceId,
guid,
stateVector,
}: {
workspaceId: string;
guid: string;
stateVector?: string;
}
): Promise<EventResponse<{ missing: string; state?: string }>> {
if (!client.rooms.has(workspaceId)) {
const canRead = await this.permissions.tryCheck(workspaceId, user.id);
if (!canRead) {
return {
error: new AccessDeniedError(workspaceId),
};
}
}
const docId = new DocID(guid, workspaceId);
const doc = await this.docManager.get(docId.workspace, docId.guid);
if (!doc) {
return {
error: docId.isWorkspace
? new WorkspaceNotFoundError(workspaceId)
: new DocNotFoundError(workspaceId, docId.guid),
};
}
const missing = Buffer.from(
encodeStateAsUpdate(
doc,
stateVector ? Buffer.from(stateVector, 'base64') : undefined
)
).toString('base64');
const state = Buffer.from(encodeStateVector(doc)).toString('base64');
return {
data: {
missing,
state,
},
};
}
@SubscribeMessage('awareness-init')
async handleInitAwareness(
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
) {
this.metric.socketIOEventCounter(1, { event: 'awareness-init' });
const endTimer = this.metric.socketIOEventTimer({
event: 'init-awareness',
});
): Promise<EventResponse<{ clientId: string }>> {
if (client.rooms.has(workspaceId)) {
client.to(workspaceId).emit('new-client-awareness-init');
return {
data: {
clientId: client.id,
},
};
} else {
this.logger.verbose(
`Client ${client.id} tried to init awareness for workspace ${workspaceId} without joining it first`
);
return {
error: new NotInWorkspaceError(workspaceId),
};
}
endTimer();
}
@SubscribeMessage('awareness-update')
async handleHelpGatheringAwareness(
@MessageBody() message: { workspaceId: string; awarenessUpdate: string },
@ConnectedSocket() client: Socket
) {
this.metric.socketIOEventCounter(1, { event: 'awareness-update' });
const endTimer = this.metric.socketIOEventTimer({
event: 'awareness-update',
});
): Promise<EventResponse> {
if (client.rooms.has(message.workspaceId)) {
client.to(message.workspaceId).emit('server-awareness-broadcast', {
...message,
});
client
.to(message.workspaceId)
.emit('server-awareness-broadcast', message);
return {};
} else {
this.logger.verbose(
`Client ${client.id} tried to update awareness for workspace ${message.workspaceId} without joining it first`
);
return {
error: new NotInWorkspaceError(message.workspaceId),
};
}
endTimer();
return 'ack';
}
}

View File

@@ -225,6 +225,31 @@ test('should have sequential update number', async t => {
t.not(records.length, 0);
});
test('should have correct sequential update number with batching push', async t => {
const manager = m.get(DocManager);
const doc = new YDoc();
const text = doc.getText('content');
const updates: Buffer[] = [];
doc.on('update', update => {
updates.push(Buffer.from(update));
});
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
await manager.batchPush('2', '2', updates);
// [1,2,3]
const records = await manager.getUpdates('2', '2');
t.deepEqual(
records.map(({ seq }) => seq),
[1, 2, 3]
);
});
test('should retry if seq num conflict', async t => {
const manager = m.get(DocManager);
@@ -240,3 +265,19 @@ test('should retry if seq num conflict', async t => {
t.is(stub.callCount, 3);
});
test('should throw if meet max retry times', async t => {
const manager = m.get(DocManager);
// @ts-expect-error private method
const stub = Sinon.stub(manager, 'getUpdateSeq');
stub.resolves(1);
await t.notThrowsAsync(() => manager.push('1', '1', Buffer.from([0, 0])));
await t.throwsAsync(
() => manager.push('1', '1', Buffer.from([0, 0]), 3 /* retry 3 times */),
{ message: 'Failed to push update' }
);
t.is(stub.callCount, 5);
});