mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-11 20:08:37 +00:00
feat(server): cleanup gateway code (#6118)
This commit is contained in:
@@ -38,27 +38,21 @@ export const GatewayErrorWrapper = (): MethodDecorator => {
|
||||
return desc;
|
||||
}
|
||||
|
||||
desc.value = function (...args: any[]) {
|
||||
let result: any;
|
||||
desc.value = async function (...args: any[]) {
|
||||
try {
|
||||
result = originalMethod.apply(this, args);
|
||||
return await originalMethod.apply(this, args);
|
||||
} catch (e) {
|
||||
metrics.socketio.counter('unhandled_errors').add(1);
|
||||
return {
|
||||
error: new InternalError(e as Error),
|
||||
};
|
||||
}
|
||||
|
||||
if (result instanceof Promise) {
|
||||
return result.catch(e => {
|
||||
metrics.socketio.counter('unhandled_errors').add(1);
|
||||
new Logger('EventsGateway').error(e, e.stack);
|
||||
if (e instanceof EventError) {
|
||||
return {
|
||||
error: new InternalError(e),
|
||||
error: e,
|
||||
};
|
||||
});
|
||||
} else {
|
||||
return result;
|
||||
} else {
|
||||
metrics.socketio.counter('unhandled_errors').add(1);
|
||||
new Logger('EventsGateway').error(e, (e as Error).stack);
|
||||
return {
|
||||
error: new InternalError(e as Error),
|
||||
};
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -85,6 +79,14 @@ type EventResponse<Data = any> =
|
||||
data: Data;
|
||||
});
|
||||
|
||||
function Sync(workspaceId: string): `${string}:sync` {
|
||||
return `${workspaceId}:sync`;
|
||||
}
|
||||
|
||||
function Awareness(workspaceId: string): `${string}:awareness` {
|
||||
return `${workspaceId}:awareness`;
|
||||
}
|
||||
|
||||
@WebSocketGateway({
|
||||
cors: !AFFiNE.node.prod,
|
||||
transports: ['websocket'],
|
||||
@@ -113,7 +115,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
|
||||
metrics.socketio.gauge('realtime_connections').record(this.connectionCount);
|
||||
}
|
||||
|
||||
checkVersion(client: Socket, version?: string) {
|
||||
assertVersion(client: Socket, version?: string) {
|
||||
if (
|
||||
// @todo(@darkskygit): remove this flag after 0.12 goes stable
|
||||
AFFiNE.featureFlags.syncClientVersionCheck &&
|
||||
@@ -126,14 +128,48 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
|
||||
version ? ` ${version}` : ''
|
||||
} is outdated, please update to ${AFFiNE.version}`,
|
||||
});
|
||||
return {
|
||||
error: new EventError(
|
||||
EventErrorCode.VERSION_REJECTED,
|
||||
`Client version ${version} is outdated, please update to ${AFFiNE.version}`
|
||||
),
|
||||
};
|
||||
|
||||
throw new EventError(
|
||||
EventErrorCode.VERSION_REJECTED,
|
||||
`Client version ${version} is outdated, please update to ${AFFiNE.version}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async joinWorkspace(
|
||||
client: Socket,
|
||||
room: `${string}:${'sync' | 'awareness'}`
|
||||
) {
|
||||
await client.join(room);
|
||||
}
|
||||
|
||||
async leaveWorkspace(
|
||||
client: Socket,
|
||||
room: `${string}:${'sync' | 'awareness'}`
|
||||
) {
|
||||
await client.leave(room);
|
||||
}
|
||||
|
||||
assertInWorkspace(client: Socket, room: `${string}:${'sync' | 'awareness'}`) {
|
||||
if (!client.rooms.has(room)) {
|
||||
throw new NotInWorkspaceError(room);
|
||||
}
|
||||
}
|
||||
|
||||
async assertWorkspaceAccessible(
|
||||
workspaceId: string,
|
||||
userId: string,
|
||||
permission: Permission = Permission.Read
|
||||
) {
|
||||
if (
|
||||
!(await this.permissions.isWorkspaceMember(
|
||||
workspaceId,
|
||||
userId,
|
||||
permission
|
||||
))
|
||||
) {
|
||||
throw new AccessDeniedError(workspaceId);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Auth()
|
||||
@@ -144,29 +180,19 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
|
||||
@MessageBody('version') version: string | undefined,
|
||||
@ConnectedSocket() client: Socket
|
||||
): Promise<EventResponse<{ clientId: string }>> {
|
||||
const versionError = this.checkVersion(client, version);
|
||||
if (versionError) {
|
||||
return versionError;
|
||||
}
|
||||
|
||||
const canWrite = await this.permissions.tryCheckWorkspace(
|
||||
this.assertVersion(client, version);
|
||||
await this.assertWorkspaceAccessible(
|
||||
workspaceId,
|
||||
user.id,
|
||||
Permission.Write
|
||||
);
|
||||
|
||||
if (canWrite) {
|
||||
await client.join(`${workspaceId}:sync`);
|
||||
return {
|
||||
data: {
|
||||
clientId: client.id,
|
||||
},
|
||||
};
|
||||
} else {
|
||||
return {
|
||||
error: new AccessDeniedError(workspaceId),
|
||||
};
|
||||
}
|
||||
await this.joinWorkspace(client, Sync(workspaceId));
|
||||
return {
|
||||
data: {
|
||||
clientId: client.id,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@Auth()
|
||||
@@ -177,47 +203,18 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
|
||||
@MessageBody('version') version: string | undefined,
|
||||
@ConnectedSocket() client: Socket
|
||||
): Promise<EventResponse<{ clientId: string }>> {
|
||||
const versionError = this.checkVersion(client, version);
|
||||
if (versionError) {
|
||||
return versionError;
|
||||
}
|
||||
|
||||
const canWrite = await this.permissions.tryCheckWorkspace(
|
||||
this.assertVersion(client, version);
|
||||
await this.assertWorkspaceAccessible(
|
||||
workspaceId,
|
||||
user.id,
|
||||
Permission.Write
|
||||
);
|
||||
|
||||
if (canWrite) {
|
||||
await client.join(`${workspaceId}:awareness`);
|
||||
return {
|
||||
data: {
|
||||
clientId: client.id,
|
||||
},
|
||||
};
|
||||
} else {
|
||||
return {
|
||||
error: new AccessDeniedError(workspaceId),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use `client-handshake-sync` and `client-handshake-awareness` instead
|
||||
*/
|
||||
@Auth()
|
||||
@SubscribeMessage('client-handshake')
|
||||
async handleClientHandShake(
|
||||
@MessageBody() workspaceId: string,
|
||||
@ConnectedSocket() client: Socket
|
||||
): Promise<EventResponse<{ clientId: string }>> {
|
||||
const versionError = this.checkVersion(client);
|
||||
if (versionError) {
|
||||
return versionError;
|
||||
}
|
||||
// should unreachable
|
||||
await this.joinWorkspace(client, Awareness(workspaceId));
|
||||
return {
|
||||
error: new AccessDeniedError(workspaceId),
|
||||
data: {
|
||||
clientId: client.id,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -226,14 +223,9 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
|
||||
@MessageBody() workspaceId: string,
|
||||
@ConnectedSocket() client: Socket
|
||||
): Promise<EventResponse> {
|
||||
if (client.rooms.has(`${workspaceId}:sync`)) {
|
||||
await client.leave(`${workspaceId}:sync`);
|
||||
return {};
|
||||
} else {
|
||||
return {
|
||||
error: new NotInWorkspaceError(workspaceId),
|
||||
};
|
||||
}
|
||||
this.assertInWorkspace(client, Sync(workspaceId));
|
||||
await this.leaveWorkspace(client, Sync(workspaceId));
|
||||
return {};
|
||||
}
|
||||
|
||||
@SubscribeMessage('client-leave-awareness')
|
||||
@@ -241,14 +233,9 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
|
||||
@MessageBody() workspaceId: string,
|
||||
@ConnectedSocket() client: Socket
|
||||
): Promise<EventResponse> {
|
||||
if (client.rooms.has(`${workspaceId}:awareness`)) {
|
||||
await client.leave(`${workspaceId}:awareness`);
|
||||
return {};
|
||||
} else {
|
||||
return {
|
||||
error: new NotInWorkspaceError(workspaceId),
|
||||
};
|
||||
}
|
||||
this.assertInWorkspace(client, Awareness(workspaceId));
|
||||
await this.leaveWorkspace(client, Awareness(workspaceId));
|
||||
return {};
|
||||
}
|
||||
|
||||
@SubscribeMessage('client-pre-sync')
|
||||
@@ -257,11 +244,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
|
||||
@MessageBody()
|
||||
{ workspaceId, timestamp }: { workspaceId: string; timestamp?: number }
|
||||
): Promise<EventResponse<Record<string, number>>> {
|
||||
if (!client.rooms.has(`${workspaceId}:sync`)) {
|
||||
return {
|
||||
error: new NotInWorkspaceError(workspaceId),
|
||||
};
|
||||
}
|
||||
this.assertInWorkspace(client, Sync(workspaceId));
|
||||
|
||||
const stats = await this.docManager.getStats(workspaceId, timestamp);
|
||||
|
||||
@@ -284,11 +267,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
|
||||
},
|
||||
@ConnectedSocket() client: Socket
|
||||
): Promise<EventResponse<{ accepted: true; timestamp?: number }>> {
|
||||
if (!client.rooms.has(`${workspaceId}:sync`)) {
|
||||
return {
|
||||
error: new NotInWorkspaceError(workspaceId),
|
||||
};
|
||||
}
|
||||
this.assertInWorkspace(client, Sync(workspaceId));
|
||||
|
||||
const docId = new DocID(guid, workspaceId);
|
||||
const buffers = updates.map(update => Buffer.from(update, 'base64'));
|
||||
@@ -299,7 +278,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
|
||||
);
|
||||
|
||||
client
|
||||
.to(`${docId.workspace}:sync`)
|
||||
.to(Sync(workspaceId))
|
||||
.emit('server-updates', { workspaceId, guid, updates, timestamp });
|
||||
|
||||
return {
|
||||
@@ -310,11 +289,9 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
|
||||
};
|
||||
}
|
||||
|
||||
@Auth()
|
||||
@SubscribeMessage('doc-load-v2')
|
||||
async loadDocV2(
|
||||
@ConnectedSocket() client: Socket,
|
||||
@CurrentUser() user: CurrentUser,
|
||||
@MessageBody()
|
||||
{
|
||||
workspaceId,
|
||||
@@ -326,17 +303,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
|
||||
stateVector?: string;
|
||||
}
|
||||
): Promise<EventResponse<{ missing: string; state?: string }>> {
|
||||
if (!client.rooms.has(`${workspaceId}:sync`)) {
|
||||
const canRead = await this.permissions.tryCheckWorkspace(
|
||||
workspaceId,
|
||||
user.id
|
||||
);
|
||||
if (!canRead) {
|
||||
return {
|
||||
error: new AccessDeniedError(workspaceId),
|
||||
};
|
||||
}
|
||||
}
|
||||
this.assertInWorkspace(client, Sync(workspaceId));
|
||||
|
||||
const docId = new DocID(guid, workspaceId);
|
||||
const doc = await this.docManager.get(docId.workspace, docId.guid);
|
||||
@@ -363,40 +330,33 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
|
||||
};
|
||||
}
|
||||
|
||||
@Auth()
|
||||
@SubscribeMessage('awareness-init')
|
||||
async handleInitAwareness(
|
||||
@MessageBody() workspaceId: string,
|
||||
@ConnectedSocket() client: Socket
|
||||
): Promise<EventResponse<{ clientId: string }>> {
|
||||
if (client.rooms.has(`${workspaceId}:awareness`)) {
|
||||
client.to(`${workspaceId}:awareness`).emit('new-client-awareness-init');
|
||||
return {
|
||||
data: {
|
||||
clientId: client.id,
|
||||
},
|
||||
};
|
||||
} else {
|
||||
return {
|
||||
error: new NotInWorkspaceError(workspaceId),
|
||||
};
|
||||
}
|
||||
this.assertInWorkspace(client, Awareness(workspaceId));
|
||||
client.to(Awareness(workspaceId)).emit('new-client-awareness-init');
|
||||
return {
|
||||
data: {
|
||||
clientId: client.id,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@SubscribeMessage('awareness-update')
|
||||
async handleHelpGatheringAwareness(
|
||||
@MessageBody() message: { workspaceId: string; awarenessUpdate: string },
|
||||
@MessageBody()
|
||||
{
|
||||
workspaceId,
|
||||
awarenessUpdate,
|
||||
}: { workspaceId: string; awarenessUpdate: string },
|
||||
@ConnectedSocket() client: Socket
|
||||
): Promise<EventResponse> {
|
||||
if (client.rooms.has(`${message.workspaceId}:awareness`)) {
|
||||
client
|
||||
.to(`${message.workspaceId}:awareness`)
|
||||
.emit('server-awareness-broadcast', message);
|
||||
return {};
|
||||
} else {
|
||||
return {
|
||||
error: new NotInWorkspaceError(message.workspaceId),
|
||||
};
|
||||
}
|
||||
this.assertInWorkspace(client, Awareness(workspaceId));
|
||||
client
|
||||
.to(Awareness(workspaceId))
|
||||
.emit('server-awareness-broadcast', { workspaceId, awarenessUpdate });
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,6 +73,28 @@ export class PermissionService {
|
||||
return this.tryCheckPage(ws, id, user);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether a given user is a member of a workspace and has the given or higher permission.
|
||||
*/
|
||||
async isWorkspaceMember(
|
||||
ws: string,
|
||||
user: string,
|
||||
permission: Permission
|
||||
): Promise<boolean> {
|
||||
const count = await this.prisma.workspaceUserPermission.count({
|
||||
where: {
|
||||
workspaceId: ws,
|
||||
userId: user,
|
||||
accepted: true,
|
||||
type: {
|
||||
gte: permission,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
return count !== 0;
|
||||
}
|
||||
|
||||
async checkWorkspace(
|
||||
ws: string,
|
||||
user?: string,
|
||||
|
||||
@@ -18,11 +18,16 @@ export const CallTimer = (
|
||||
return desc;
|
||||
}
|
||||
|
||||
desc.value = function (...args: any[]) {
|
||||
desc.value = async function (...args: any[]) {
|
||||
const timer = metrics[scope].histogram(name, {
|
||||
description: `function call time costs of ${name}`,
|
||||
unit: 'ms',
|
||||
});
|
||||
metrics[scope]
|
||||
.counter(`${name}_calls`, {
|
||||
description: `function call counts of ${name}`,
|
||||
})
|
||||
.add(1, attrs);
|
||||
|
||||
const start = Date.now();
|
||||
|
||||
@@ -30,19 +35,10 @@ export const CallTimer = (
|
||||
timer.record(Date.now() - start, attrs);
|
||||
};
|
||||
|
||||
let result: any;
|
||||
try {
|
||||
result = originalMethod.apply(this, args);
|
||||
} catch (e) {
|
||||
return await originalMethod.apply(this, args);
|
||||
} finally {
|
||||
end();
|
||||
throw e;
|
||||
}
|
||||
|
||||
if (result instanceof Promise) {
|
||||
return result.finally(end);
|
||||
} else {
|
||||
end();
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user