mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-26 18:55:57 +08:00
fix: static file handle & ws connect
This commit is contained in:
@@ -10,6 +10,16 @@ import { isMobileRequest } from '../utils/user-agent';
|
||||
|
||||
const staticPathRegex = /^\/(_plugin|assets|imgs|js|plugins|static)\//;
|
||||
|
||||
function isMissingStaticAssetError(error: unknown) {
|
||||
if (!error || typeof error !== 'object') {
|
||||
return false;
|
||||
}
|
||||
|
||||
const err = error as { code?: string; status?: number; statusCode?: number };
|
||||
|
||||
return err.code === 'ENOENT' || err.status === 404 || err.statusCode === 404;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class StaticFilesResolver implements OnModuleInit {
|
||||
constructor(
|
||||
@@ -86,7 +96,18 @@ export class StaticFilesResolver implements OnModuleInit {
|
||||
next();
|
||||
return;
|
||||
}
|
||||
routeByUA(req, res, next, true);
|
||||
routeByUA(
|
||||
req,
|
||||
res,
|
||||
error => {
|
||||
if (isMissingStaticAssetError(error)) {
|
||||
res.status(404).end();
|
||||
return;
|
||||
}
|
||||
next(error);
|
||||
},
|
||||
true
|
||||
);
|
||||
});
|
||||
|
||||
// /
|
||||
|
||||
@@ -210,6 +210,9 @@ export class SpaceSyncGateway
|
||||
private readonly server!: Server;
|
||||
|
||||
private connectionCount = 0;
|
||||
private readonly socketUsers = new Map<string, string>();
|
||||
private readonly localUserConnectionCounts = new Map<string, number>();
|
||||
private unresolvedPresenceSockets = 0;
|
||||
private flushTimer?: NodeJS.Timeout;
|
||||
|
||||
constructor(
|
||||
@@ -224,7 +227,9 @@ export class SpaceSyncGateway
|
||||
onModuleInit() {
|
||||
this.flushTimer = setInterval(() => {
|
||||
this.flushActiveUsersMinute().catch(error => {
|
||||
this.logger.warn('Failed to flush active users minute', error as Error);
|
||||
this.logger.warn(
|
||||
`Failed to flush active users minute: ${this.formatError(error)}`
|
||||
);
|
||||
});
|
||||
}, 60_000);
|
||||
this.flushTimer.unref?.();
|
||||
@@ -278,8 +283,7 @@ export class SpaceSyncGateway
|
||||
};
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
'Failed to merge updates for broadcast, falling back to batch',
|
||||
error as Error
|
||||
`Failed to merge updates for broadcast, falling back to batch: ${this.formatError(error)}`
|
||||
);
|
||||
return {
|
||||
spaceType,
|
||||
@@ -302,14 +306,20 @@ export class SpaceSyncGateway
|
||||
this.connectionCount++;
|
||||
this.logger.debug(`New connection, total: ${this.connectionCount}`);
|
||||
metrics.socketio.gauge('connections').record(this.connectionCount);
|
||||
this.attachPresenceUserId(client);
|
||||
this.flushActiveUsersMinute().catch(error => {
|
||||
this.logger.warn('Failed to flush active users minute', error as Error);
|
||||
const userId = this.attachPresenceUserId(client);
|
||||
this.trackConnectedSocket(client.id, userId);
|
||||
void this.flushActiveUsersMinute({
|
||||
aggregateAcrossCluster: false,
|
||||
}).catch(error => {
|
||||
this.logger.warn(
|
||||
`Failed to flush active users minute: ${this.formatError(error)}`
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
handleDisconnect(_client: Socket) {
|
||||
handleDisconnect(client: Socket) {
|
||||
this.connectionCount = Math.max(0, this.connectionCount - 1);
|
||||
this.trackDisconnectedSocket(client.id);
|
||||
this.logger.debug(
|
||||
`Connection disconnected, total: ${this.connectionCount}`
|
||||
);
|
||||
@@ -317,21 +327,24 @@ export class SpaceSyncGateway
|
||||
void this.flushActiveUsersMinute({
|
||||
aggregateAcrossCluster: false,
|
||||
}).catch(error => {
|
||||
this.logger.warn('Failed to flush active users minute', error as Error);
|
||||
this.logger.warn(
|
||||
`Failed to flush active users minute: ${this.formatError(error)}`
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
private attachPresenceUserId(client: Socket) {
|
||||
private attachPresenceUserId(client: Socket): string | null {
|
||||
const request = client.request as Request;
|
||||
const userId = request.session?.user.id ?? request.token?.user.id;
|
||||
if (typeof userId !== 'string' || !userId) {
|
||||
this.logger.warn(
|
||||
`Unable to resolve authenticated user id for socket ${client.id}`
|
||||
);
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
|
||||
client.data[SOCKET_PRESENCE_USER_ID_KEY] = userId;
|
||||
return userId;
|
||||
}
|
||||
|
||||
private resolvePresenceUserId(socket: { data?: unknown }) {
|
||||
@@ -345,6 +358,60 @@ export class SpaceSyncGateway
|
||||
return typeof userId === 'string' && userId ? userId : null;
|
||||
}
|
||||
|
||||
private trackConnectedSocket(socketId: string, userId: string | null) {
|
||||
if (!userId) {
|
||||
this.unresolvedPresenceSockets++;
|
||||
return;
|
||||
}
|
||||
|
||||
this.socketUsers.set(socketId, userId);
|
||||
const prev = this.localUserConnectionCounts.get(userId) ?? 0;
|
||||
this.localUserConnectionCounts.set(userId, prev + 1);
|
||||
}
|
||||
|
||||
private trackDisconnectedSocket(socketId: string) {
|
||||
const userId = this.socketUsers.get(socketId);
|
||||
if (!userId) {
|
||||
this.unresolvedPresenceSockets = Math.max(
|
||||
0,
|
||||
this.unresolvedPresenceSockets - 1
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
this.socketUsers.delete(socketId);
|
||||
const next = (this.localUserConnectionCounts.get(userId) ?? 1) - 1;
|
||||
if (next <= 0) {
|
||||
this.localUserConnectionCounts.delete(userId);
|
||||
} else {
|
||||
this.localUserConnectionCounts.set(userId, next);
|
||||
}
|
||||
}
|
||||
|
||||
private resolveLocalActiveUsers() {
|
||||
if (this.unresolvedPresenceSockets > 0) {
|
||||
return Math.max(0, this.connectionCount);
|
||||
}
|
||||
|
||||
return this.localUserConnectionCounts.size;
|
||||
}
|
||||
|
||||
private formatError(error: unknown) {
|
||||
if (error instanceof Error) {
|
||||
return error.stack ?? error.message;
|
||||
}
|
||||
|
||||
if (typeof error === 'string') {
|
||||
return error;
|
||||
}
|
||||
|
||||
try {
|
||||
return JSON.stringify(error);
|
||||
} catch {
|
||||
return String(error);
|
||||
}
|
||||
}
|
||||
|
||||
private async flushActiveUsersMinute(options?: {
|
||||
aggregateAcrossCluster?: boolean;
|
||||
}) {
|
||||
@@ -352,7 +419,7 @@ export class SpaceSyncGateway
|
||||
minute.setSeconds(0, 0);
|
||||
|
||||
const aggregateAcrossCluster = options?.aggregateAcrossCluster ?? true;
|
||||
let activeUsers = Math.max(0, this.connectionCount);
|
||||
let activeUsers = this.resolveLocalActiveUsers();
|
||||
if (aggregateAcrossCluster) {
|
||||
try {
|
||||
const sockets = await this.server.fetchSockets();
|
||||
@@ -377,8 +444,7 @@ export class SpaceSyncGateway
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
'Failed to aggregate active users from sockets, using local value',
|
||||
error as Error
|
||||
`Failed to aggregate active users from sockets, using local value: ${this.formatError(error)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user