feat(server): make server storage adapters (#7902)

This commit is contained in:
forehalo
2024-08-21 05:30:26 +00:00
parent 6f9f579e5d
commit e20bdbf925
29 changed files with 1987 additions and 2111 deletions

View File

@@ -0,0 +1,175 @@
import { Injectable } from '@nestjs/common';
import { PrismaClient } from '@prisma/client';
import { Mutex } from '../../../fundamentals';
import { DocStorageOptions } from '../options';
import { DocRecord, DocStorageAdapter } from '../storage';
@Injectable()
export class PgUserspaceDocStorageAdapter extends DocStorageAdapter {
constructor(
private readonly db: PrismaClient,
private readonly mutex: Mutex,
options: DocStorageOptions
) {
super(options);
}
// no updates queue for userspace, directly merge them inplace
// no history record for userspace
protected async getDocUpdates() {
return [];
}
protected async markUpdatesMerged() {
return 0;
}
async listDocHistories() {
return [];
}
async getDocHistory() {
return null;
}
protected async createDocHistory() {
return false;
}
override async getDoc(spaceId: string, docId: string) {
return this.getDocSnapshot(spaceId, docId);
}
async pushDocUpdates(userId: string, docId: string, updates: Uint8Array[]) {
if (!updates.length) {
return 0;
}
await using _lock = await this.lockDocForUpdate(userId, docId);
const snapshot = await this.getDocSnapshot(userId, docId);
const now = Date.now();
const pendings = updates.map((update, i) => ({
bin: update,
timestamp: now + i,
}));
const { timestamp, bin } = await this.squash(
snapshot ? [snapshot, ...pendings] : pendings
);
await this.setDocSnapshot({
spaceId: userId,
docId,
bin,
timestamp,
});
return timestamp;
}
async deleteDoc(userId: string, docId: string) {
await this.db.userSnapshot.deleteMany({
where: {
userId,
id: docId,
},
});
}
async deleteSpace(userId: string) {
await this.db.userSnapshot.deleteMany({
where: {
userId,
},
});
}
async getSpaceDocTimestamps(userId: string, after?: number) {
const snapshots = await this.db.userSnapshot.findMany({
select: {
id: true,
updatedAt: true,
},
where: {
userId,
...(after
? {
updatedAt: {
gt: new Date(after),
},
}
: {}),
},
});
const result: Record<string, number> = {};
snapshots.forEach(s => {
result[s.id] = s.updatedAt.getTime();
});
return result;
}
protected async getDocSnapshot(userId: string, docId: string) {
const snapshot = await this.db.userSnapshot.findUnique({
where: {
userId_id: {
userId,
id: docId,
},
},
});
if (!snapshot) {
return null;
}
return {
spaceId: userId,
docId,
bin: snapshot.blob,
timestamp: snapshot.updatedAt.getTime(),
};
}
protected async setDocSnapshot(snapshot: DocRecord) {
// we always get lock before writing to user snapshot table,
// so a simple upsert without testing on updatedAt is safe
await this.db.snapshot.upsert({
where: {
id_workspaceId: {
workspaceId: snapshot.spaceId,
id: snapshot.docId,
},
},
update: {
blob: Buffer.from(snapshot.bin),
updatedAt: new Date(snapshot.timestamp),
},
create: {
workspaceId: snapshot.spaceId,
id: snapshot.docId,
blob: Buffer.from(snapshot.bin),
createdAt: new Date(snapshot.timestamp),
updatedAt: new Date(snapshot.timestamp),
},
});
return true;
}
protected override async lockDocForUpdate(
workspaceId: string,
docId: string
) {
const lock = await this.mutex.lock(`userspace:${workspaceId}:${docId}`);
if (!lock) {
throw new Error('Too many concurrent writings');
}
return lock;
}
}

View File

@@ -0,0 +1,594 @@
import { Injectable, Logger } from '@nestjs/common';
import { PrismaClient } from '@prisma/client';
import { chunk } from 'lodash-es';
import {
Cache,
DocHistoryNotFound,
DocNotFound,
FailedToSaveUpdates,
FailedToUpsertSnapshot,
metrics,
Mutex,
} from '../../../fundamentals';
import { retryable } from '../../../fundamentals/utils/promise';
import { DocStorageOptions } from '../options';
import {
DocRecord,
DocStorageAdapter,
DocUpdate,
HistoryFilter,
} from '../storage';
const UPDATES_QUEUE_CACHE_KEY = 'doc:manager:updates';
@Injectable()
export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
private readonly logger = new Logger(PgWorkspaceDocStorageAdapter.name);
constructor(
private readonly db: PrismaClient,
private readonly mutex: Mutex,
private readonly cache: Cache,
protected override readonly options: DocStorageOptions
) {
super(options);
}
async pushDocUpdates(
workspaceId: string,
docId: string,
updates: Uint8Array[]
) {
if (!updates.length) {
return 0;
}
let pendings = updates;
let done = 0;
let timestamp = Date.now();
try {
await retryable(async () => {
if (done !== 0) {
pendings = pendings.slice(done);
}
// TODO(@forehalo): remove in next release
const lastSeq = await this.getUpdateSeq(
workspaceId,
docId,
updates.length
);
let turn = 0;
const batchCount = 10;
for (const batch of chunk(pendings, batchCount)) {
const now = Date.now();
await this.db.update.createMany({
data: batch.map((update, i) => {
const subSeq = turn * batchCount + i + 1;
// `seq` is the last seq num of the batch
// example for 11 batched updates, start from seq num 20
// seq for first update in the batch should be:
// 31 - 11 + subSeq(0 * 10 + 0 + 1) = 21
// ^ last seq num ^ updates.length ^ turn ^ batchCount ^i
const seq = lastSeq - updates.length + subSeq;
const createdAt = now + subSeq;
timestamp = Math.max(timestamp, createdAt);
return {
workspaceId,
id: docId,
blob: Buffer.from(update),
seq,
createdAt: new Date(createdAt),
};
}),
});
turn++;
done += batch.length;
await this.updateCachedUpdatesCount(workspaceId, docId, batch.length);
}
});
} catch (e) {
this.logger.error('Failed to insert doc updates', e);
metrics.doc.counter('doc_update_insert_failed').add(1);
throw new FailedToSaveUpdates();
}
return timestamp;
}
protected async getDocUpdates(workspaceId: string, docId: string) {
const rows = await this.db.update.findMany({
where: {
workspaceId,
id: docId,
},
orderBy: {
createdAt: 'asc',
},
});
return rows.map(row => ({
bin: row.blob,
timestamp: row.createdAt.getTime(),
}));
}
async deleteDoc(workspaceId: string, docId: string) {
const ident = { where: { workspaceId, id: docId } };
await this.db.$transaction([
this.db.snapshot.deleteMany(ident),
this.db.update.deleteMany(ident),
this.db.snapshotHistory.deleteMany(ident),
]);
}
async deleteSpace(workspaceId: string) {
const ident = { where: { workspaceId } };
await this.db.$transaction([
this.db.workspace.deleteMany({
where: {
id: workspaceId,
},
}),
this.db.snapshot.deleteMany(ident),
this.db.update.deleteMany(ident),
this.db.snapshotHistory.deleteMany(ident),
]);
}
async getSpaceDocTimestamps(workspaceId: string, after?: number) {
const snapshots = await this.db.snapshot.findMany({
select: {
id: true,
updatedAt: true,
},
where: {
workspaceId,
...(after
? {
updatedAt: {
gt: new Date(after),
},
}
: {}),
},
});
const updates = await this.db.update.groupBy({
where: {
workspaceId,
...(after
? {
createdAt: {
gt: new Date(after),
},
}
: {}),
},
by: ['id'],
_max: {
createdAt: true,
},
});
const result: Record<string, number> = {};
snapshots.forEach(s => {
result[s.id] = s.updatedAt.getTime();
});
updates.forEach(u => {
if (u._max.createdAt) {
result[u.id] = u._max.createdAt.getTime();
}
});
return result;
}
protected async markUpdatesMerged(
workspaceId: string,
docId: string,
updates: DocUpdate[]
) {
const result = await this.db.update.deleteMany({
where: {
workspaceId,
id: docId,
createdAt: {
in: updates.map(u => new Date(u.timestamp)),
},
},
});
await this.updateCachedUpdatesCount(workspaceId, docId, -result.count);
return result.count;
}
async listDocHistories(
workspaceId: string,
docId: string,
query: HistoryFilter
) {
const histories = await this.db.snapshotHistory.findMany({
select: {
timestamp: true,
},
where: {
workspaceId,
id: docId,
timestamp: {
lt: query.before ? new Date(query.before) : new Date(),
},
},
orderBy: {
timestamp: 'desc',
},
take: query.limit,
});
return histories.map(h => h.timestamp.getTime());
}
async getDocHistory(workspaceId: string, docId: string, timestamp: number) {
const history = await this.db.snapshotHistory.findUnique({
where: {
workspaceId_id_timestamp: {
workspaceId,
id: docId,
timestamp: new Date(timestamp),
},
},
});
if (!history) {
return null;
}
return {
spaceId: workspaceId,
docId,
bin: history.blob,
timestamp,
};
}
override async rollbackDoc(
spaceId: string,
docId: string,
timestamp: number
): Promise<void> {
await using _lock = await this.lockDocForUpdate(spaceId, docId);
const toSnapshot = await this.getDocHistory(spaceId, docId, timestamp);
if (!toSnapshot) {
throw new DocHistoryNotFound({ spaceId, docId, timestamp });
}
const fromSnapshot = await this.getDocSnapshot(spaceId, docId);
if (!fromSnapshot) {
throw new DocNotFound({ spaceId, docId });
}
// force create a new history record after rollback
await this.createDocHistory(fromSnapshot, true);
// WARN:
// we should never do the snapshot updating in recovering,
// which is not the solution in CRDT.
// let user revert in client and update the data in sync system
// const change = this.generateChangeUpdate(fromSnapshot.bin, toSnapshot.bin);
// await this.pushDocUpdates(spaceId, docId, [change]);
metrics.doc
.counter('history_recovered_counter', {
description: 'How many times history recovered request happened',
})
.add(1);
}
protected async createDocHistory(snapshot: DocRecord, force = false) {
const last = await this.lastDocHistory(snapshot.spaceId, snapshot.docId);
let shouldCreateHistory = false;
if (!last) {
// never created
shouldCreateHistory = true;
} else {
const lastHistoryTimestamp = last.timestamp.getTime();
if (lastHistoryTimestamp === snapshot.timestamp) {
// no change
shouldCreateHistory = false;
} else if (
// force
force ||
// last history created before interval in configs
lastHistoryTimestamp <
snapshot.timestamp - this.options.historyMinInterval(snapshot.spaceId)
) {
shouldCreateHistory = true;
}
}
if (shouldCreateHistory) {
if (this.isEmptyBin(snapshot.bin)) {
this.logger.debug(
`Doc is empty, skip creating history record for ${snapshot.docId} in workspace ${snapshot.spaceId}`
);
return false;
}
await this.db.snapshotHistory
.create({
select: {
timestamp: true,
},
data: {
workspaceId: snapshot.spaceId,
id: snapshot.docId,
timestamp: new Date(snapshot.timestamp),
blob: Buffer.from(snapshot.bin),
expiredAt: new Date(
Date.now() + (await this.options.historyMaxAge(snapshot.spaceId))
),
},
})
.catch(() => {
// safe to ignore
// only happens when duplicated history record created in multi processes
});
metrics.doc
.counter('history_created_counter', {
description: 'How many times the snapshot history created',
})
.add(1);
this.logger.debug(
`History created for ${snapshot.docId} in workspace ${snapshot.spaceId}.`
);
return true;
}
return false;
}
protected async getDocSnapshot(workspaceId: string, docId: string) {
const snapshot = await this.db.snapshot.findUnique({
where: {
id_workspaceId: {
workspaceId,
id: docId,
},
},
});
if (!snapshot) {
return null;
}
return {
spaceId: workspaceId,
docId,
bin: snapshot.blob,
timestamp: snapshot.updatedAt.getTime(),
};
}
protected async setDocSnapshot(snapshot: DocRecord) {
const { spaceId, docId, bin, timestamp } = snapshot;
if (this.isEmptyBin(bin)) {
return false;
}
const updatedAt = new Date(timestamp);
// CONCERNS:
// i. Because we save the real user's last seen action time as `updatedAt`,
// it's possible to simply compare the `updatedAt` to determine if the snapshot is older than the one we are going to save.
//
// ii. Prisma doesn't support `upsert` with additional `where` condition along side unique constraint.
// In our case, we need to manually check the `updatedAt` to avoid overriding the newer snapshot.
// where: { id_workspaceId: {}, updatedAt: { lt: updatedAt } }
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
try {
const result: { updatedAt: Date }[] = await this.db.$queryRaw`
INSERT INTO "snapshots" ("workspace_id", "guid", "blob", "created_at", "updated_at")
VALUES (${spaceId}, ${docId}, ${bin}, DEFAULT, ${updatedAt})
ON CONFLICT ("workspace_id", "guid")
DO UPDATE SET "blob" = ${bin}, "updated_at" = ${updatedAt}
WHERE "snapshots"."workspace_id" = ${spaceId} AND "snapshots"."guid" = ${docId} AND "snapshots"."updated_at" <= ${updatedAt}
RETURNING "snapshots"."workspace_id" as "workspaceId", "snapshots"."guid" as "id", "snapshots"."updated_at" as "updatedAt"
`;
// const result = await this.db.snapshot.upsert({
// select: {
// updatedAt: true,
// seq: true,
// },
// where: {
// id_workspaceId: {
// workspaceId,
// id: guid,
// },
// ⬇️ NOT SUPPORTED BY PRISMA YET
// updatedAt: {
// lt: updatedAt,
// },
// },
// update: {
// blob,
// state,
// updatedAt,
// },
// create: {
// workspaceId,
// id: guid,
// blob,
// state,
// updatedAt,
// seq,
// },
// });
// if the condition `snapshot.updatedAt > updatedAt` is true, by which means the snapshot has already been updated by other process,
// the updates has been applied to current `doc` must have been seen by the other process as well.
// The `updatedSnapshot` will be `undefined` in this case.
const updatedSnapshot = result.at(0);
return !!updatedSnapshot;
} catch (e) {
metrics.doc.counter('snapshot_upsert_failed').add(1);
this.logger.error('Failed to upsert snapshot', e);
throw new FailedToUpsertSnapshot();
}
}
protected override async lockDocForUpdate(
workspaceId: string,
docId: string
) {
const lock = await this.mutex.lock(`doc:update:${workspaceId}:${docId}`);
if (!lock) {
throw new Error('Too many concurrent writings');
}
return lock;
}
protected async lastDocHistory(workspaceId: string, id: string) {
return this.db.snapshotHistory.findFirst({
where: {
workspaceId,
id,
},
select: {
timestamp: true,
state: true,
},
orderBy: {
timestamp: 'desc',
},
});
}
// for auto merging
async randomDoc() {
const key = await this.cache.mapRandomKey(UPDATES_QUEUE_CACHE_KEY);
if (key) {
const cachedCount = await this.cache.mapIncrease(
UPDATES_QUEUE_CACHE_KEY,
key,
0
);
if (cachedCount > 0) {
const [workspaceId, id] = key.split('::');
const count = await this.db.update.count({
where: {
workspaceId,
id,
},
});
// FIXME(@forehalo): somehow the update count in cache is not accurate
if (count === 0) {
metrics.doc
.counter('doc_update_count_inconsistent_with_cache')
.add(1);
await this.cache.mapDelete(UPDATES_QUEUE_CACHE_KEY, key);
return null;
}
return { workspaceId, docId: id };
}
}
return null;
}
private async updateCachedUpdatesCount(
workspaceId: string,
guid: string,
count: number
) {
const result = await this.cache.mapIncrease(
UPDATES_QUEUE_CACHE_KEY,
`${workspaceId}::${guid}`,
count
);
if (result <= 0) {
await this.cache.mapDelete(
UPDATES_QUEUE_CACHE_KEY,
`${workspaceId}::${guid}`
);
}
}
/**
* @deprecated
*/
private readonly seqMap = new Map<string, number>();
/**
*
* @deprecated updates do not rely on seq number anymore
*
* keep in next release to avoid downtime when upgrading instances
*/
private async getUpdateSeq(workspaceId: string, guid: string, batch = 1) {
const MAX_SEQ_NUM = 0x3fffffff; // u31
try {
const { seq } = await this.db.snapshot.update({
select: {
seq: true,
},
where: {
id_workspaceId: {
workspaceId,
id: guid,
},
},
data: {
seq: {
increment: batch,
},
},
});
if (!seq) {
return batch;
}
// reset
if (seq >= MAX_SEQ_NUM) {
await this.db.snapshot.update({
select: {
seq: true,
},
where: {
id_workspaceId: {
workspaceId,
id: guid,
},
},
data: {
seq: 0,
},
});
}
return seq;
} catch {
// not existing snapshot just count it from 1
const last = this.seqMap.get(workspaceId + guid) ?? 0;
this.seqMap.set(workspaceId + guid, last + batch);
return last + batch;
}
}
}

View File

@@ -1,253 +0,0 @@
import { isDeepStrictEqual } from 'node:util';
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { PrismaClient } from '@prisma/client';
import type { EventPayload } from '../../fundamentals';
import {
Config,
DocHistoryNotFound,
DocNotFound,
metrics,
OnEvent,
} from '../../fundamentals';
import { PermissionService } from '../permission';
import { QuotaService } from '../quota';
import { isEmptyBuffer } from './manager';
@Injectable()
export class DocHistoryManager {
private readonly logger = new Logger(DocHistoryManager.name);
constructor(
private readonly config: Config,
private readonly db: PrismaClient,
private readonly quota: QuotaService,
private readonly permission: PermissionService
) {}
@OnEvent('workspace.deleted')
onWorkspaceDeleted(workspaceId: EventPayload<'workspace.deleted'>) {
return this.db.snapshotHistory.deleteMany({
where: {
workspaceId,
},
});
}
@OnEvent('snapshot.deleted')
onSnapshotDeleted({ workspaceId, id }: EventPayload<'snapshot.deleted'>) {
return this.db.snapshotHistory.deleteMany({
where: {
workspaceId,
id,
},
});
}
@OnEvent('snapshot.updated')
async onDocUpdated(
{ workspaceId, id, previous }: EventPayload<'snapshot.updated'>,
forceCreate = false
) {
const last = await this.last(workspaceId, id);
let shouldCreateHistory = false;
if (!last) {
// never created
shouldCreateHistory = true;
} else if (last.timestamp === previous.updatedAt) {
// no change
shouldCreateHistory = false;
} else if (
// force
forceCreate ||
// last history created before interval in configs
last.timestamp.getTime() <
previous.updatedAt.getTime() - this.config.doc.history.interval
) {
shouldCreateHistory = true;
}
if (shouldCreateHistory) {
// skip the history recording when no actual update on snapshot happended
if (last && isDeepStrictEqual(last.state, previous.state)) {
this.logger.debug(
`State matches, skip creating history record for ${id} in workspace ${workspaceId}`
);
return;
}
if (isEmptyBuffer(previous.blob)) {
this.logger.debug(
`Doc is empty, skip creating history record for ${id} in workspace ${workspaceId}`
);
return;
}
await this.db.snapshotHistory
.create({
select: {
timestamp: true,
},
data: {
workspaceId,
id,
timestamp: previous.updatedAt,
blob: previous.blob,
state: previous.state,
expiredAt: await this.getExpiredDateFromNow(workspaceId),
},
})
.catch(() => {
// safe to ignore
// only happens when duplicated history record created in multi processes
});
metrics.doc
.counter('history_created_counter', {
description: 'How many times the snapshot history created',
})
.add(1);
this.logger.debug(
`History created for ${id} in workspace ${workspaceId}.`
);
}
}
async list(
workspaceId: string,
id: string,
before: Date = new Date(),
take: number = 10
) {
return this.db.snapshotHistory.findMany({
select: {
timestamp: true,
},
where: {
workspaceId,
id,
timestamp: {
lt: before,
},
// only include the ones has not expired
expiredAt: {
gt: new Date(),
},
},
orderBy: {
timestamp: 'desc',
},
take,
});
}
async count(workspaceId: string, id: string) {
return this.db.snapshotHistory.count({
where: {
workspaceId,
id,
expiredAt: {
gt: new Date(),
},
},
});
}
async get(workspaceId: string, id: string, timestamp: Date) {
return this.db.snapshotHistory.findUnique({
where: {
workspaceId_id_timestamp: {
workspaceId,
id,
timestamp,
},
expiredAt: {
gt: new Date(),
},
},
});
}
async last(workspaceId: string, id: string) {
return this.db.snapshotHistory.findFirst({
where: {
workspaceId,
id,
},
select: {
timestamp: true,
state: true,
},
orderBy: {
timestamp: 'desc',
},
});
}
async recover(workspaceId: string, id: string, timestamp: Date) {
const history = await this.db.snapshotHistory.findUnique({
where: {
workspaceId_id_timestamp: {
workspaceId,
id,
timestamp,
},
},
});
if (!history) {
throw new DocHistoryNotFound({
workspaceId,
docId: id,
timestamp: timestamp.getTime(),
});
}
const oldSnapshot = await this.db.snapshot.findUnique({
where: {
id_workspaceId: {
id,
workspaceId,
},
},
});
if (!oldSnapshot) {
throw new DocNotFound({ workspaceId, docId: id });
}
// save old snapshot as one history record
await this.onDocUpdated({ workspaceId, id, previous: oldSnapshot }, true);
// WARN:
// we should never do the snapshot updating in recovering,
// which is not the solution in CRDT.
// let user revert in client and update the data in sync system
// `await this.db.snapshot.update();`
metrics.doc
.counter('history_recovered_counter', {
description: 'How many times history recovered request happened',
})
.add(1);
return history.timestamp;
}
async getExpiredDateFromNow(workspaceId: string) {
const owner = await this.permission.getWorkspaceOwner(workspaceId);
const quota = await this.quota.getUserQuota(owner.id);
return quota.feature.historyPeriodFromNow;
}
@Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT /* everyday at 12am */)
async cleanupExpiredHistory() {
await this.db.snapshotHistory.deleteMany({
where: {
expiredAt: {
lte: new Date(),
},
},
});
}
}

View File

@@ -4,14 +4,22 @@ import { Module } from '@nestjs/common';
import { PermissionModule } from '../permission';
import { QuotaModule } from '../quota';
import { DocHistoryManager } from './history';
import { DocManager } from './manager';
import { PgUserspaceDocStorageAdapter } from './adapters/userspace';
import { PgWorkspaceDocStorageAdapter } from './adapters/workspace';
import { DocStorageCronJob } from './job';
import { DocStorageOptions } from './options';
@Module({
imports: [QuotaModule, PermissionModule],
providers: [DocManager, DocHistoryManager],
exports: [DocManager, DocHistoryManager],
providers: [
DocStorageOptions,
PgWorkspaceDocStorageAdapter,
PgUserspaceDocStorageAdapter,
DocStorageCronJob,
],
exports: [PgWorkspaceDocStorageAdapter, PgUserspaceDocStorageAdapter],
})
export class DocModule {}
export class DocStorageModule {}
export { PgUserspaceDocStorageAdapter, PgWorkspaceDocStorageAdapter };
export { DocHistoryManager, DocManager };
export { DocStorageAdapter } from './storage';

View File

@@ -0,0 +1,76 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { Cron, CronExpression, SchedulerRegistry } from '@nestjs/schedule';
import { PrismaClient } from '@prisma/client';
import { CallTimer, Config, metrics } from '../../fundamentals';
import { PgWorkspaceDocStorageAdapter } from './adapters/workspace';
@Injectable()
export class DocStorageCronJob implements OnModuleInit {
private readonly logger = new Logger(DocStorageCronJob.name);
private busy = false;
constructor(
private readonly registry: SchedulerRegistry,
private readonly config: Config,
private readonly db: PrismaClient,
private readonly workspace: PgWorkspaceDocStorageAdapter
) {}
onModuleInit() {
if (this.config.doc.manager.enableUpdateAutoMerging) {
this.registry.addInterval(
this.autoMergePendingDocUpdates.name,
// scheduler registry will clean up the interval when the app is stopped
setInterval(() => {
if (this.busy) {
return;
}
this.busy = true;
this.autoMergePendingDocUpdates()
.catch(() => {
/* never fail */
})
.finally(() => {
this.busy = false;
});
}, this.config.doc.manager.updatePollInterval)
);
this.logger.log('Updates pending queue auto merging cron started');
}
}
@CallTimer('doc', 'auto_merge_pending_doc_updates')
async autoMergePendingDocUpdates() {
try {
const randomDoc = await this.workspace.randomDoc();
if (!randomDoc) {
return;
}
await this.workspace.getDoc(randomDoc.workspaceId, randomDoc.docId);
} catch (e) {
metrics.doc.counter('auto_merge_pending_doc_updates_error').add(1);
this.logger.error('Failed to auto merge pending doc updates', e);
}
}
@Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT /* everyday at 12am */)
async cleanupExpiredHistory() {
await this.db.snapshotHistory.deleteMany({
where: {
expiredAt: {
lte: new Date(),
},
},
});
}
@Cron(CronExpression.EVERY_MINUTE)
async reportUpdatesQueueCount() {
metrics.doc
.gauge('updates_queue_count')
.record(await this.db.update.count());
}
}

View File

@@ -1,853 +0,0 @@
import {
Injectable,
Logger,
OnModuleDestroy,
OnModuleInit,
} from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { PrismaClient, Snapshot, Update } from '@prisma/client';
import { chunk } from 'lodash-es';
import { defer, retry } from 'rxjs';
import {
applyUpdate,
Doc,
encodeStateAsUpdate,
encodeStateVector,
transact,
} from 'yjs';
import type { EventPayload } from '../../fundamentals';
import {
Cache,
CallTimer,
Config,
EventEmitter,
mergeUpdatesInApplyWay as jwstMergeUpdates,
metrics,
OnEvent,
} from '../../fundamentals';
function compare(yBinary: Buffer, jwstBinary: Buffer, strict = false): boolean {
if (yBinary.equals(jwstBinary)) {
return true;
}
if (strict) {
return false;
}
const doc = new Doc();
applyUpdate(doc, jwstBinary);
const yBinary2 = Buffer.from(encodeStateAsUpdate(doc));
return compare(yBinary, yBinary2, true);
}
export function isEmptyBuffer(buf: Buffer): boolean {
return (
buf.length === 0 ||
// 0x0000
(buf.length === 2 && buf[0] === 0 && buf[1] === 0)
);
}
const MAX_SEQ_NUM = 0x3fffffff; // u31
const UPDATES_QUEUE_CACHE_KEY = 'doc:manager:updates';
interface DocResponse {
doc: Doc;
timestamp: number;
}
interface BinaryResponse {
binary: Buffer;
timestamp: number;
}
/**
* Since we can't directly save all client updates into database, in which way the database will overload,
* we need to buffer the updates and merge them to reduce db write.
*
* And also, if a new client join, it would be nice to see the latest doc asap,
* so we need to at least store a snapshot of the doc and return quickly,
* along side all the updates that have not been applies to that snapshot(timestamp).
*/
@Injectable()
export class DocManager implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(DocManager.name);
private job: NodeJS.Timeout | null = null;
private readonly seqMap = new Map<string, number>();
private busy = false;
constructor(
private readonly db: PrismaClient,
private readonly config: Config,
private readonly cache: Cache,
private readonly event: EventEmitter
) {}
onModuleInit() {
if (this.config.doc.manager.enableUpdateAutoMerging) {
this.logger.log('Use Database');
this.setup();
}
}
onModuleDestroy() {
this.destroy();
}
@CallTimer('doc', 'yjs_recover_updates_to_doc')
private recoverDoc(...updates: Buffer[]): Promise<Doc> {
const doc = new Doc();
const chunks = chunk(updates, 10);
return new Promise(resolve => {
const next = () => {
const updates = chunks.shift();
if (updates?.length) {
transact(doc, () => {
updates.forEach(u => {
try {
applyUpdate(doc, u);
} catch (e) {
this.logger.error('Failed to apply update', e);
}
});
});
// avoid applying too many updates in single round which will take the whole cpu time like dead lock
setImmediate(() => {
next();
});
} else {
resolve(doc);
}
};
next();
});
}
private async applyUpdates(guid: string, ...updates: Buffer[]): Promise<Doc> {
const doc = await this.recoverDoc(...updates);
const useYocto = await this.config.runtime.fetch(
'doc/experimentalMergeWithYOcto'
);
// test jwst codec
if (useYocto) {
metrics.jwst.counter('codec_merge_counter').add(1);
const yjsResult = Buffer.from(encodeStateAsUpdate(doc));
let log = false;
try {
const jwstResult = jwstMergeUpdates(updates);
if (!compare(yjsResult, jwstResult)) {
metrics.jwst.counter('codec_not_match').add(1);
this.logger.warn(
`jwst codec result doesn't match yjs codec result for: ${guid}`
);
log = true;
if (this.config.node.dev) {
this.logger.warn(`Expected:\n ${yjsResult.toString('hex')}`);
this.logger.warn(`Result:\n ${jwstResult.toString('hex')}`);
}
}
} catch (e) {
metrics.jwst.counter('codec_fails_counter').add(1);
this.logger.warn(`jwst apply update failed for ${guid}: ${e}`);
log = true;
} finally {
if (log && this.config.node.dev) {
this.logger.warn(
`Updates: ${updates.map(u => u.toString('hex')).join('\n')}`
);
}
}
}
return doc;
}
/**
* setup pending update processing loop
*/
setup() {
this.job = setInterval(() => {
if (!this.busy) {
this.busy = true;
this.autoSquash()
.catch(() => {
/* we handle all errors in work itself */
})
.finally(() => {
this.busy = false;
});
}
}, this.config.doc.manager.updatePollInterval);
this.logger.log('Automation started');
}
/**
* stop pending update processing loop
*/
destroy() {
if (this.job) {
clearInterval(this.job);
this.job = null;
this.logger.log('Automation stopped');
}
}
@OnEvent('workspace.deleted')
async onWorkspaceDeleted(workspaceId: string) {
await this.db.snapshot.deleteMany({
where: {
workspaceId,
},
});
await this.db.update.deleteMany({
where: {
workspaceId,
},
});
}
@OnEvent('snapshot.deleted')
async onSnapshotDeleted({
id,
workspaceId,
}: EventPayload<'snapshot.deleted'>) {
await this.db.update.deleteMany({
where: {
id,
workspaceId,
},
});
}
/**
* add update to manager for later processing.
*/
async push(
workspaceId: string,
guid: string,
update: Buffer,
retryTimes = 10
) {
const timestamp = await new Promise<number>((resolve, reject) => {
defer(async () => {
const seq = await this.getUpdateSeq(workspaceId, guid);
const { createdAt } = await this.db.update.create({
select: {
createdAt: true,
},
data: {
workspaceId,
id: guid,
seq,
blob: update,
},
});
return createdAt.getTime();
})
.pipe(retry(retryTimes)) // retry until seq num not conflict
.subscribe({
next: timestamp => {
this.logger.debug(
`pushed 1 update for ${guid} in workspace ${workspaceId}`
);
resolve(timestamp);
},
error: e => {
this.logger.error('Failed to push updates', e);
reject(new Error('Failed to push update'));
},
});
});
await this.updateCachedUpdatesCount(workspaceId, guid, 1);
return timestamp;
}
async batchPush(
workspaceId: string,
guid: string,
updates: Buffer[],
retryTimes = 10
) {
const timestamp = await new Promise<number>((resolve, reject) => {
defer(async () => {
const lastSeq = await this.getUpdateSeq(
workspaceId,
guid,
updates.length
);
const now = Date.now();
let timestamp = now;
let turn = 0;
const batchCount = 10;
for (const batch of chunk(updates, batchCount)) {
await this.db.update.createMany({
data: batch.map((update, i) => {
const subSeq = turn * batchCount + i + 1;
// `seq` is the last seq num of the batch
// example for 11 batched updates, start from seq num 20
// seq for first update in the batch should be:
// 31 - 11 + subSeq(0 * 10 + 0 + 1) = 21
// ^ last seq num ^ updates.length ^ turn ^ batchCount ^i
const seq = lastSeq - updates.length + subSeq;
const createdAt = now + subSeq;
timestamp = Math.max(timestamp, createdAt);
return {
workspaceId,
id: guid,
blob: update,
seq,
createdAt: new Date(createdAt), // make sure the updates can be ordered by create time
};
}),
});
turn++;
}
return timestamp;
})
.pipe(retry(retryTimes)) // retry until seq num not conflict
.subscribe({
next: timestamp => {
this.logger.debug(
`pushed ${updates.length} updates for ${guid} in workspace ${workspaceId}`
);
resolve(timestamp);
},
error: e => {
this.logger.error('Failed to push updates', e);
reject(new Error('Failed to push update'));
},
});
});
await this.updateCachedUpdatesCount(workspaceId, guid, updates.length);
return timestamp;
}
/**
* Get latest timestamp of all docs in the workspace.
*/
@CallTimer('doc', 'get_doc_timestamps')
async getDocTimestamps(workspaceId: string, after: number | undefined = 0) {
const snapshots = await this.db.snapshot.findMany({
where: {
workspaceId,
updatedAt: {
gt: new Date(after),
},
},
select: {
id: true,
updatedAt: true,
},
});
const updates = await this.db.update.groupBy({
where: {
workspaceId,
createdAt: {
gt: new Date(after),
},
},
by: ['id'],
_max: {
createdAt: true,
},
});
const result: Record<string, number> = {};
snapshots.forEach(s => {
result[s.id] = s.updatedAt.getTime();
});
updates.forEach(u => {
if (u._max.createdAt) {
result[u.id] = u._max.createdAt.getTime();
}
});
return result;
}
/**
* get the latest doc with all update applied.
*/
async get(workspaceId: string, guid: string): Promise<DocResponse | null> {
const result = await this._get(workspaceId, guid);
if (result) {
if ('doc' in result) {
return result;
} else {
const doc = await this.recoverDoc(result.binary);
return {
doc,
timestamp: result.timestamp,
};
}
}
return null;
}
/**
* get the latest doc binary with all update applied.
*/
async getBinary(
workspaceId: string,
guid: string
): Promise<BinaryResponse | null> {
const result = await this._get(workspaceId, guid);
if (result) {
if ('doc' in result) {
return {
binary: Buffer.from(encodeStateAsUpdate(result.doc)),
timestamp: result.timestamp,
};
} else {
return result;
}
}
return null;
}
/**
* get the latest doc state vector with all update applied.
*/
async getDocState(
workspaceId: string,
guid: string
): Promise<BinaryResponse | null> {
const snapshot = await this.getSnapshot(workspaceId, guid);
const updates = await this.getUpdates(workspaceId, guid);
if (updates.length) {
const { doc, timestamp } = await this.squash(snapshot, updates);
return {
binary: Buffer.from(encodeStateVector(doc)),
timestamp,
};
}
return snapshot?.state
? {
binary: snapshot.state,
timestamp: snapshot.updatedAt.getTime(),
}
: null;
}
/**
* get the snapshot of the doc we've seen.
*/
async getSnapshot(workspaceId: string, guid: string) {
return this.db.snapshot.findUnique({
where: {
id_workspaceId: {
workspaceId,
id: guid,
},
},
});
}
/**
* get pending updates
*/
async getUpdates(workspaceId: string, guid: string) {
const updates = await this.db.update.findMany({
where: {
workspaceId,
id: guid,
},
// take it ease, we don't want to overload db and or cpu
// if we limit the taken number here,
// user will never see the latest doc if there are too many updates pending to be merged.
take: this.config.doc.manager.maxUpdatesPullCount,
});
// perf(memory): avoid sorting in db
return updates.sort((a, b) => (a.createdAt < b.createdAt ? -1 : 1));
}
/**
* apply pending updates to snapshot
*/
private async autoSquash() {
// find the first update and batch process updates with same id
const candidate = await this.getAutoSquashCandidate();
// no pending updates
if (!candidate) {
return;
}
const { id, workspaceId } = candidate;
await this.lockUpdatesForAutoSquash(workspaceId, id, async () => {
try {
await this._get(workspaceId, id);
} catch (e) {
this.logger.error(
`Failed to apply updates for workspace: ${workspaceId}, guid: ${id}`
);
this.logger.error(e);
}
});
}
private async getAutoSquashCandidate() {
const cache = await this.getAutoSquashCandidateFromCache();
if (cache) {
return cache;
}
return this.db.update.findFirst({
select: {
id: true,
workspaceId: true,
},
});
}
/**
* @returns whether the snapshot is updated to the latest, `undefined` means the doc to be upserted is outdated.
*/
@CallTimer('doc', 'upsert')
private async upsert(
workspaceId: string,
guid: string,
doc: Doc,
// we always delay the snapshot update to avoid db overload,
// so the value of auto updated `updatedAt` by db will never be accurate to user's real action time
updatedAt: Date,
seq: number
) {
const blob = Buffer.from(encodeStateAsUpdate(doc));
if (isEmptyBuffer(blob)) {
return undefined;
}
const state = Buffer.from(encodeStateVector(doc));
// CONCERNS:
// i. Because we save the real user's last seen action time as `updatedAt`,
// it's possible to simply compare the `updatedAt` to determine if the snapshot is older than the one we are going to save.
//
// ii. Prisma doesn't support `upsert` with additional `where` condition along side unique constraint.
// In our case, we need to manually check the `updatedAt` to avoid overriding the newer snapshot.
// where: { id_workspaceId: {}, updatedAt: { lt: updatedAt } }
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
//
// iii. Only set the seq number when creating the snapshot.
// For updating scenario, the seq number will be updated when updates pushed to db.
try {
const result: { updatedAt: Date }[] = await this.db.$queryRaw`
INSERT INTO "snapshots" ("workspace_id", "guid", "blob", "state", "seq", "created_at", "updated_at")
VALUES (${workspaceId}, ${guid}, ${blob}, ${state}, ${seq}, DEFAULT, ${updatedAt})
ON CONFLICT ("workspace_id", "guid")
DO UPDATE SET "blob" = ${blob}, "state" = ${state}, "updated_at" = ${updatedAt}, "seq" = ${seq}
WHERE "snapshots"."workspace_id" = ${workspaceId} AND "snapshots"."guid" = ${guid} AND "snapshots"."updated_at" <= ${updatedAt}
RETURNING "snapshots"."workspace_id" as "workspaceId", "snapshots"."guid" as "id", "snapshots"."updated_at" as "updatedAt"
`;
// const result = await this.db.snapshot.upsert({
// select: {
// updatedAt: true,
// seq: true,
// },
// where: {
// id_workspaceId: {
// workspaceId,
// id: guid,
// },
// ⬇️ NOT SUPPORTED BY PRISMA YET
// updatedAt: {
// lt: updatedAt,
// },
// },
// update: {
// blob,
// state,
// updatedAt,
// },
// create: {
// workspaceId,
// id: guid,
// blob,
// state,
// updatedAt,
// seq,
// },
// });
// if the condition `snapshot.updatedAt > updatedAt` is true, by which means the snapshot has already been updated by other process,
// the updates has been applied to current `doc` must have been seen by the other process as well.
// The `updatedSnapshot` will be `undefined` in this case.
const updatedSnapshot = result.at(0);
if (!updatedSnapshot) {
return undefined;
}
return true;
} catch (e) {
this.logger.error('Failed to upsert snapshot', e);
return false;
}
}
private async _get(
workspaceId: string,
guid: string
): Promise<DocResponse | BinaryResponse | null> {
const snapshot = await this.getSnapshot(workspaceId, guid);
const updates = await this.getUpdates(workspaceId, guid);
if (updates.length) {
return this.squash(snapshot, updates);
}
return snapshot
? { binary: snapshot.blob, timestamp: snapshot.updatedAt.getTime() }
: null;
}
/**
* Squash updates into a single update and save it as snapshot,
* and delete the updates records at the same time.
*/
@CallTimer('doc', 'squash')
private async squash(
snapshot: Snapshot | null,
updates: Update[]
): Promise<DocResponse> {
if (!updates.length) {
throw new Error('No updates to squash');
}
const last = updates[updates.length - 1];
const { id, workspaceId } = last;
const doc = await this.applyUpdates(
id,
snapshot ? snapshot.blob : Buffer.from([0, 0]),
...updates.map(u => u.blob)
);
const done = await this.upsert(
workspaceId,
id,
doc,
last.createdAt,
last.seq
);
if (done) {
if (snapshot) {
this.event.emit('snapshot.updated', {
id,
workspaceId,
previous: {
blob: snapshot.blob,
state: snapshot.state,
updatedAt: snapshot.updatedAt,
},
});
}
this.logger.debug(
`Squashed ${updates.length} updates for ${id} in workspace ${workspaceId}`
);
}
// we will keep the updates only if the upsert failed on unknown reason
// `done === undefined` means the updates is outdated(have already been merged by other process), safe to be deleted
// `done === true` means the upsert is successful, safe to be deleted
if (done !== false) {
// always delete updates
// the upsert will return false if the state is not newer, so we don't need to worry about it
const { count } = await this.db.update.deleteMany({
where: {
id,
workspaceId,
seq: {
in: updates.map(u => u.seq),
},
},
});
await this.updateCachedUpdatesCount(workspaceId, id, -count);
}
return { doc, timestamp: last.createdAt.getTime() };
}
private async getUpdateSeq(workspaceId: string, guid: string, batch = 1) {
try {
const { seq } = await this.db.snapshot.update({
select: {
seq: true,
},
where: {
id_workspaceId: {
workspaceId,
id: guid,
},
},
data: {
seq: {
increment: batch,
},
},
});
// reset
if (seq >= MAX_SEQ_NUM) {
await this.db.snapshot.update({
select: {
seq: true,
},
where: {
id_workspaceId: {
workspaceId,
id: guid,
},
},
data: {
seq: 0,
},
});
}
return seq;
} catch {
// not existing snapshot just count it from 1
const last = this.seqMap.get(workspaceId + guid) ?? 0;
this.seqMap.set(workspaceId + guid, last + batch);
return last + batch;
}
}
private async updateCachedUpdatesCount(
workspaceId: string,
guid: string,
count: number
) {
const result = await this.cache.mapIncrease(
UPDATES_QUEUE_CACHE_KEY,
`${workspaceId}::${guid}`,
count
);
if (result <= 0) {
await this.cache.mapDelete(
UPDATES_QUEUE_CACHE_KEY,
`${workspaceId}::${guid}`
);
}
}
private async getAutoSquashCandidateFromCache() {
const key = await this.cache.mapRandomKey(UPDATES_QUEUE_CACHE_KEY);
if (key) {
const cachedCount = await this.cache.mapIncrease(
UPDATES_QUEUE_CACHE_KEY,
key,
0
);
if (cachedCount > 0) {
const [workspaceId, id] = key.split('::');
const count = await this.db.update.count({
where: {
workspaceId,
id,
},
});
// FIXME(@forehalo): somehow the update count in cache is not accurate
if (count === 0) {
await this.cache.mapDelete(UPDATES_QUEUE_CACHE_KEY, key);
return null;
}
return { id, workspaceId };
}
}
return null;
}
private async doWithLock<T>(
lockScope: string,
lockResource: string,
job: () => Promise<T>
) {
const lock = `lock:${lockScope}:${lockResource}`;
const acquired = await this.cache.setnx(lock, 1, {
ttl: 60 * 1000,
});
metrics.doc.counter('lock').add(1, { scope: lockScope });
if (!acquired) {
metrics.doc.counter('lock_failed').add(1, { scope: lockScope });
return;
}
metrics.doc.counter('lock_required').add(1, { scope: lockScope });
try {
return await job();
} finally {
await this.cache
.delete(lock)
.then(() => {
metrics.doc.counter('lock_released').add(1, { scope: lockScope });
})
.catch(e => {
metrics.doc
.counter('lock_release_failed')
.add(1, { scope: lockScope });
// safe, the lock will be expired when ttl ends
this.logger.error(`Failed to release lock ${lock}`, e);
});
}
}
private async lockUpdatesForAutoSquash<T>(
workspaceId: string,
guid: string,
job: () => Promise<T>
) {
return this.doWithLock(
'doc:manager:updates',
`${workspaceId}::${guid}`,
job
);
}
@Cron(CronExpression.EVERY_MINUTE)
async reportUpdatesQueueCount() {
metrics.doc
.gauge('updates_queue_count')
.record(await this.db.update.count());
}
}

View File

@@ -0,0 +1,130 @@
import { Injectable, Logger } from '@nestjs/common';
import { chunk } from 'lodash-es';
import * as Y from 'yjs';
import {
CallTimer,
Config,
mergeUpdatesInApplyWay as yotcoMergeUpdates,
metrics,
} from '../../fundamentals';
import { PermissionService } from '../permission';
import { QuotaService } from '../quota';
import { DocStorageOptions as IDocStorageOptions } from './storage';
function compare(yBinary: Buffer, jwstBinary: Buffer, strict = false): boolean {
if (yBinary.equals(jwstBinary)) {
return true;
}
if (strict) {
return false;
}
const doc = new Y.Doc();
Y.applyUpdate(doc, jwstBinary);
const yBinary2 = Buffer.from(Y.encodeStateAsUpdate(doc));
return compare(yBinary, yBinary2, true);
}
@Injectable()
export class DocStorageOptions implements IDocStorageOptions {
private readonly logger = new Logger('DocStorageOptions');
constructor(
private readonly config: Config,
private readonly permission: PermissionService,
private readonly quota: QuotaService
) {}
mergeUpdates = async (updates: Uint8Array[]) => {
const useYocto = await this.config.runtime.fetch(
'doc/experimentalMergeWithYOcto'
);
if (useYocto) {
const doc = await this.recoverDoc(updates);
metrics.jwst.counter('codec_merge_counter').add(1);
const yjsResult = Buffer.from(Y.encodeStateAsUpdate(doc));
let log = false;
try {
const yocto = yotcoMergeUpdates(updates.map(Buffer.from));
if (!compare(yjsResult, yocto)) {
metrics.jwst.counter('codec_not_match').add(1);
this.logger.warn(`yocto codec result doesn't match yjs codec result`);
log = true;
if (this.config.node.dev) {
this.logger.warn(`Expected:\n ${yjsResult.toString('hex')}`);
this.logger.warn(`Result:\n ${yocto.toString('hex')}`);
}
}
} catch (e) {
metrics.jwst.counter('codec_fails_counter').add(1);
this.logger.warn(`jwst apply update failed: ${e}`);
log = true;
}
if (log && this.config.node.dev) {
this.logger.warn(
`Updates: ${updates.map(u => Buffer.from(u).toString('hex')).join('\n')}`
);
}
return yjsResult;
} else {
return this.simpleMergeUpdates(updates);
}
};
historyMaxAge = async (spaceId: string) => {
const owner = await this.permission.getWorkspaceOwner(spaceId);
const quota = await this.quota.getUserQuota(owner.id);
return quota.feature.historyPeriod;
};
historyMinInterval = (_spaceId: string) => {
return this.config.doc.history.interval;
};
@CallTimer('doc', 'yjs_merge_updates')
private simpleMergeUpdates(updates: Uint8Array[]) {
return Y.mergeUpdates(updates);
}
@CallTimer('doc', 'yjs_recover_updates_to_doc')
private recoverDoc(updates: Uint8Array[]): Promise<Y.Doc> {
const doc = new Y.Doc();
const chunks = chunk(updates, 10);
let i = 0;
return new Promise(resolve => {
Y.transact(doc, () => {
const next = () => {
const updates = chunks.at(i++);
if (updates?.length) {
updates.forEach(u => {
try {
Y.applyUpdate(doc, u);
} catch (e) {
this.logger.error('Failed to apply update', e);
}
});
// avoid applying too many updates in single round which will take the whole cpu time like dead lock
setImmediate(() => {
next();
});
} else {
resolve(doc);
}
};
next();
});
});
}
}

View File

@@ -4,8 +4,8 @@ import { PrismaClient } from '@prisma/client';
import {
DocAccessDenied,
WorkspaceAccessDenied,
WorkspaceOwnerNotFound,
SpaceAccessDenied,
SpaceOwnerNotFound,
} from '../../fundamentals';
import { Permission, PublicPageMode } from './types';
@@ -69,7 +69,7 @@ export class PermissionService {
});
if (!owner) {
throw new WorkspaceOwnerNotFound({ workspaceId });
throw new SpaceOwnerNotFound({ spaceId: workspaceId });
}
return owner.user;
@@ -157,7 +157,7 @@ export class PermissionService {
permission: Permission = Permission.Read
) {
if (!(await this.tryCheckWorkspace(ws, user, permission))) {
throw new WorkspaceAccessDenied({ workspaceId: ws });
throw new SpaceAccessDenied({ spaceId: ws });
}
}
@@ -340,7 +340,7 @@ export class PermissionService {
permission = Permission.Read
) {
if (!(await this.tryCheckPage(ws, page, user, permission))) {
throw new DocAccessDenied({ workspaceId: ws, docId: page });
throw new DocAccessDenied({ spaceId: ws, docId: page });
}
}

View File

@@ -71,10 +71,6 @@ export class QuotaConfig {
return this.config.configs.historyPeriod;
}
get historyPeriodFromNow() {
return new Date(Date.now() + this.historyPeriod);
}
get memberLimit() {
return this.config.configs.memberLimit;
}

View File

@@ -1,327 +0,0 @@
import { applyDecorators, Logger } from '@nestjs/common';
import {
ConnectedSocket,
MessageBody,
OnGatewayConnection,
OnGatewayDisconnect,
SubscribeMessage as RawSubscribeMessage,
WebSocketGateway,
WebSocketServer,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { encodeStateAsUpdate, encodeStateVector } from 'yjs';
import {
CallTimer,
Config,
DocNotFound,
GatewayErrorWrapper,
metrics,
NotInWorkspace,
VersionRejected,
WorkspaceAccessDenied,
} from '../../../fundamentals';
import { Auth, CurrentUser } from '../../auth';
import { DocManager } from '../../doc';
import { Permission, PermissionService } from '../../permission';
import { DocID } from '../../utils/doc';
const SubscribeMessage = (event: string) =>
applyDecorators(
GatewayErrorWrapper(event),
CallTimer('socketio', 'event_duration', { event }),
RawSubscribeMessage(event)
);
type EventResponse<Data = any> = Data extends never
? {
data?: never;
}
: {
data: Data;
};
function Sync(workspaceId: string): `${string}:sync` {
return `${workspaceId}:sync`;
}
function Awareness(workspaceId: string): `${string}:awareness` {
return `${workspaceId}:awareness`;
}
@WebSocketGateway()
export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
protected logger = new Logger(EventsGateway.name);
private connectionCount = 0;
constructor(
private readonly config: Config,
private readonly docManager: DocManager,
private readonly permissions: PermissionService
) {}
@WebSocketServer()
server!: Server;
handleConnection() {
this.connectionCount++;
metrics.socketio.gauge('realtime_connections').record(this.connectionCount);
}
handleDisconnect() {
this.connectionCount--;
metrics.socketio.gauge('realtime_connections').record(this.connectionCount);
}
async assertVersion(client: Socket, version?: string) {
const shouldCheckClientVersion = await this.config.runtime.fetch(
'flags/syncClientVersionCheck'
);
if (
// @todo(@darkskygit): remove this flag after 0.12 goes stable
shouldCheckClientVersion &&
version !== AFFiNE.version
) {
client.emit('server-version-rejected', {
currentVersion: version,
requiredVersion: AFFiNE.version,
reason: `Client version${
version ? ` ${version}` : ''
} is outdated, please update to ${AFFiNE.version}`,
});
throw new VersionRejected({
version: version || 'unknown',
serverVersion: AFFiNE.version,
});
}
}
async joinWorkspace(
client: Socket,
room: `${string}:${'sync' | 'awareness'}`
) {
await client.join(room);
}
async leaveWorkspace(
client: Socket,
room: `${string}:${'sync' | 'awareness'}`
) {
await client.leave(room);
}
assertInWorkspace(client: Socket, room: `${string}:${'sync' | 'awareness'}`) {
if (!client.rooms.has(room)) {
throw new NotInWorkspace({ workspaceId: room.split(':')[0] });
}
}
async assertWorkspaceAccessible(
workspaceId: string,
userId: string,
permission: Permission = Permission.Read
) {
if (
!(await this.permissions.isWorkspaceMember(
workspaceId,
userId,
permission
))
) {
throw new WorkspaceAccessDenied({ workspaceId });
}
}
@Auth()
@SubscribeMessage('client-handshake-sync')
async handleClientHandshakeSync(
@CurrentUser() user: CurrentUser,
@MessageBody('workspaceId') workspaceId: string,
@MessageBody('version') version: string | undefined,
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ clientId: string }>> {
await this.assertVersion(client, version);
await this.assertWorkspaceAccessible(
workspaceId,
user.id,
Permission.Write
);
await this.joinWorkspace(client, Sync(workspaceId));
return {
data: {
clientId: client.id,
},
};
}
@Auth()
@SubscribeMessage('client-handshake-awareness')
async handleClientHandshakeAwareness(
@CurrentUser() user: CurrentUser,
@MessageBody('workspaceId') workspaceId: string,
@MessageBody('version') version: string | undefined,
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ clientId: string }>> {
await this.assertVersion(client, version);
await this.assertWorkspaceAccessible(
workspaceId,
user.id,
Permission.Write
);
await this.joinWorkspace(client, Awareness(workspaceId));
return {
data: {
clientId: client.id,
},
};
}
@SubscribeMessage('client-leave-sync')
async handleLeaveSync(
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse> {
this.assertInWorkspace(client, Sync(workspaceId));
await this.leaveWorkspace(client, Sync(workspaceId));
return {};
}
@SubscribeMessage('client-leave-awareness')
async handleLeaveAwareness(
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse> {
this.assertInWorkspace(client, Awareness(workspaceId));
await this.leaveWorkspace(client, Awareness(workspaceId));
return {};
}
@SubscribeMessage('client-pre-sync')
async loadDocStats(
@ConnectedSocket() client: Socket,
@MessageBody()
{ workspaceId, timestamp }: { workspaceId: string; timestamp?: number }
): Promise<EventResponse<Record<string, number>>> {
this.assertInWorkspace(client, Sync(workspaceId));
const stats = await this.docManager.getDocTimestamps(
workspaceId,
timestamp
);
return {
data: stats,
};
}
@SubscribeMessage('client-update-v2')
async handleClientUpdateV2(
@MessageBody()
{
workspaceId,
guid,
updates,
}: {
workspaceId: string;
guid: string;
updates: string[];
},
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ accepted: true; timestamp?: number }>> {
this.assertInWorkspace(client, Sync(workspaceId));
const docId = new DocID(guid, workspaceId);
const buffers = updates.map(update => Buffer.from(update, 'base64'));
const timestamp = await this.docManager.batchPush(
docId.workspace,
docId.guid,
buffers
);
client
.to(Sync(workspaceId))
.emit('server-updates', { workspaceId, guid, updates, timestamp });
return {
data: {
accepted: true,
timestamp,
},
};
}
@SubscribeMessage('doc-load-v2')
async loadDocV2(
@ConnectedSocket() client: Socket,
@MessageBody()
{
workspaceId,
guid,
stateVector,
}: {
workspaceId: string;
guid: string;
stateVector?: string;
}
): Promise<
EventResponse<{ missing: string; state?: string; timestamp: number }>
> {
this.assertInWorkspace(client, Sync(workspaceId));
const docId = new DocID(guid, workspaceId);
const res = await this.docManager.get(docId.workspace, docId.guid);
if (!res) {
throw new DocNotFound({ workspaceId, docId: docId.guid });
}
const missing = Buffer.from(
encodeStateAsUpdate(
res.doc,
stateVector ? Buffer.from(stateVector, 'base64') : undefined
)
).toString('base64');
const state = Buffer.from(encodeStateVector(res.doc)).toString('base64');
return {
data: {
missing,
state,
timestamp: res.timestamp,
},
};
}
@SubscribeMessage('awareness-init')
async handleInitAwareness(
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ clientId: string }>> {
this.assertInWorkspace(client, Awareness(workspaceId));
client.to(Awareness(workspaceId)).emit('new-client-awareness-init');
return {
data: {
clientId: client.id,
},
};
}
@SubscribeMessage('awareness-update')
async handleHelpGatheringAwareness(
@MessageBody()
{
workspaceId,
awarenessUpdate,
}: { workspaceId: string; awarenessUpdate: string },
@ConnectedSocket() client: Socket
): Promise<EventResponse> {
this.assertInWorkspace(client, Awareness(workspaceId));
client
.to(Awareness(workspaceId))
.emit('server-awareness-broadcast', { workspaceId, awarenessUpdate });
return {};
}
}

View File

@@ -1,11 +0,0 @@
import { Module } from '@nestjs/common';
import { DocModule } from '../../doc';
import { PermissionModule } from '../../permission';
import { EventsGateway } from './events.gateway';
@Module({
imports: [DocModule, PermissionModule],
providers: [EventsGateway],
})
export class EventsModule {}

View File

@@ -0,0 +1,666 @@
import { applyDecorators, Logger } from '@nestjs/common';
import {
ConnectedSocket,
MessageBody,
OnGatewayConnection,
OnGatewayDisconnect,
SubscribeMessage as RawSubscribeMessage,
WebSocketGateway,
} from '@nestjs/websockets';
import { Socket } from 'socket.io';
import { diffUpdate, encodeStateVectorFromUpdate } from 'yjs';
import {
CallTimer,
Config,
DocNotFound,
GatewayErrorWrapper,
metrics,
NotInSpace,
SpaceAccessDenied,
VersionRejected,
} from '../../fundamentals';
import { Auth, CurrentUser } from '../auth';
import {
DocStorageAdapter,
PgUserspaceDocStorageAdapter,
PgWorkspaceDocStorageAdapter,
} from '../doc';
import { Permission, PermissionService } from '../permission';
import { DocID } from '../utils/doc';
const SubscribeMessage = (event: string) =>
applyDecorators(
GatewayErrorWrapper(event),
CallTimer('socketio', 'event_duration', { event }),
RawSubscribeMessage(event)
);
type EventResponse<Data = any> = Data extends never
? {
data?: never;
}
: {
data: Data;
};
type RoomType = 'sync' | `${string}:awareness`;
function Room(
spaceId: string,
type: RoomType = 'sync'
): `${string}:${RoomType}` {
return `${spaceId}:${type}`;
}
enum SpaceType {
Workspace = 'workspace',
Userspace = 'userspace',
}
interface JoinSpaceMessage {
spaceType: SpaceType;
spaceId: string;
clientVersion: string;
}
interface JoinSpaceAwarenessMessage {
spaceType: SpaceType;
spaceId: string;
docId: string;
clientVersion: string;
}
interface LeaveSpaceMessage {
spaceType: SpaceType;
spaceId: string;
}
interface LeaveSpaceAwarenessMessage {
spaceType: SpaceType;
spaceId: string;
docId: string;
}
interface PushDocUpdatesMessage {
spaceType: SpaceType;
spaceId: string;
docId: string;
updates: string[];
}
interface LoadDocMessage {
spaceType: SpaceType;
spaceId: string;
docId: string;
stateVector?: string;
}
interface LoadDocTimestampsMessage {
spaceType: SpaceType;
spaceId: string;
timestamp?: number;
}
interface LoadSpaceAwarenessesMessage {
spaceType: SpaceType;
spaceId: string;
docId: string;
}
interface UpdateAwarenessMessage {
spaceType: SpaceType;
spaceId: string;
docId: string;
awarenessUpdate: string;
}
@WebSocketGateway()
export class SpaceSyncGateway
implements OnGatewayConnection, OnGatewayDisconnect
{
protected logger = new Logger(SpaceSyncGateway.name);
private connectionCount = 0;
constructor(
private readonly config: Config,
private readonly permissions: PermissionService,
private readonly workspace: PgWorkspaceDocStorageAdapter,
private readonly userspace: PgUserspaceDocStorageAdapter
) {}
handleConnection() {
this.connectionCount++;
metrics.socketio.gauge('realtime_connections').record(this.connectionCount);
}
handleDisconnect() {
this.connectionCount--;
metrics.socketio.gauge('realtime_connections').record(this.connectionCount);
}
selectAdapter(client: Socket, spaceType: SpaceType): SyncSocketAdapter {
let adapters: Record<SpaceType, SyncSocketAdapter> = (client as any)
.affineSyncAdapters;
if (!adapters) {
const workspace = new WorkspaceSyncAdapter(
client,
this.workspace,
this.permissions
);
const userspace = new UserspaceSyncAdapter(client, this.userspace);
adapters = { workspace, userspace };
(client as any).affineSyncAdapters = adapters;
}
return adapters[spaceType];
}
async assertVersion(client: Socket, version?: string) {
const shouldCheckClientVersion = await this.config.runtime.fetch(
'flags/syncClientVersionCheck'
);
if (
// @todo(@darkskygit): remove this flag after 0.12 goes stable
shouldCheckClientVersion &&
version !== AFFiNE.version
) {
client.emit('server-version-rejected', {
currentVersion: version,
requiredVersion: AFFiNE.version,
reason: `Client version${
version ? ` ${version}` : ''
} is outdated, please update to ${AFFiNE.version}`,
});
throw new VersionRejected({
version: version || 'unknown',
serverVersion: AFFiNE.version,
});
}
}
async joinWorkspace(
client: Socket,
room: `${string}:${'sync' | 'awareness'}`
) {
await client.join(room);
}
async leaveWorkspace(
client: Socket,
room: `${string}:${'sync' | 'awareness'}`
) {
await client.leave(room);
}
assertInWorkspace(client: Socket, room: `${string}:${'sync' | 'awareness'}`) {
if (!client.rooms.has(room)) {
throw new NotInSpace({ spaceId: room.split(':')[0] });
}
}
// v3
@Auth()
@SubscribeMessage('space:join')
async onJoinSpace(
@CurrentUser() user: CurrentUser,
@ConnectedSocket() client: Socket,
@MessageBody()
{ spaceType, spaceId, clientVersion }: JoinSpaceMessage
): Promise<EventResponse<{ clientId: string; success: true }>> {
await this.assertVersion(client, clientVersion);
await this.selectAdapter(client, spaceType).join(user.id, spaceId);
return { data: { clientId: client.id, success: true } };
}
@SubscribeMessage('space:leave')
async onLeaveSpace(
@ConnectedSocket() client: Socket,
@MessageBody() { spaceType, spaceId }: LeaveSpaceMessage
): Promise<EventResponse<{ clientId: string; success: true }>> {
await this.selectAdapter(client, spaceType).leave(spaceId);
return { data: { clientId: client.id, success: true } };
}
@SubscribeMessage('space:load-doc')
async onLoadSpaceDoc(
@ConnectedSocket() client: Socket,
@MessageBody()
{ spaceType, spaceId, docId, stateVector }: LoadDocMessage
): Promise<
EventResponse<{ missing: string; state?: string; timestamp: number }>
> {
const adapter = this.selectAdapter(client, spaceType);
adapter.assertIn(spaceId);
const doc = await adapter.get(spaceId, docId);
if (!doc) {
throw new DocNotFound({ spaceId, docId });
}
const missing = Buffer.from(
stateVector
? diffUpdate(doc.bin, Buffer.from(stateVector, 'base64'))
: doc.bin
).toString('base64');
const state = Buffer.from(encodeStateVectorFromUpdate(doc.bin)).toString(
'base64'
);
return {
data: {
missing,
state,
timestamp: doc.timestamp,
},
};
}
@SubscribeMessage('space:push-doc-updates')
async onReceiveDocUpdates(
@ConnectedSocket() client: Socket,
@MessageBody()
message: PushDocUpdatesMessage
): Promise<EventResponse<{ accepted: true; timestamp?: number }>> {
const { spaceType, spaceId, docId, updates } = message;
const adapter = this.selectAdapter(client, spaceType);
// TODO(@forehalo): we might need to check write permission before push updates
const timestamp = await adapter.push(
spaceId,
docId,
updates.map(update => Buffer.from(update, 'base64'))
);
// could be put in [adapter.push]
// but the event should be kept away from adapter
// so
client
.to(adapter.room(spaceId))
.emit('space:broadcast-doc-updates', { ...message, timestamp });
// TODO(@forehalo): remove backward compatibility
if (spaceType === SpaceType.Workspace) {
const id = new DocID(docId, spaceId);
client.to(adapter.room(spaceId)).emit('server-updates', {
workspaceId: spaceId,
guid: id.guid,
updates,
timestamp,
});
}
return {
data: {
accepted: true,
timestamp,
},
};
}
@SubscribeMessage('space:load-doc-timestamps')
async onLoadDocTimestamps(
@ConnectedSocket() client: Socket,
@MessageBody()
{ spaceType, spaceId, timestamp }: LoadDocTimestampsMessage
): Promise<EventResponse<Record<string, number>>> {
const adapter = this.selectAdapter(client, spaceType);
const stats = await adapter.getTimestamps(spaceId, timestamp);
return {
data: stats ?? {},
};
}
@Auth()
@SubscribeMessage('space:join-awareness')
async onJoinAwareness(
@ConnectedSocket() client: Socket,
@CurrentUser() user: CurrentUser,
@MessageBody()
{ spaceType, spaceId, docId, clientVersion }: JoinSpaceAwarenessMessage
) {
await this.assertVersion(client, clientVersion);
await this.selectAdapter(client, spaceType).join(
user.id,
spaceId,
`${docId}:awareness`
);
return { data: { clientId: client.id, success: true } };
}
@SubscribeMessage('space:leave-awareness')
async onLeaveAwareness(
@ConnectedSocket() client: Socket,
@MessageBody()
{ spaceType, spaceId, docId }: LeaveSpaceAwarenessMessage
) {
await this.selectAdapter(client, spaceType).leave(
spaceId,
`${docId}:awareness`
);
return { data: { clientId: client.id, success: true } };
}
@SubscribeMessage('space:load-awarenesses')
async onLoadAwareness(
@ConnectedSocket() client: Socket,
@MessageBody()
{ spaceType, spaceId, docId }: LoadSpaceAwarenessesMessage
) {
const adapter = this.selectAdapter(client, spaceType);
const roomType = `${docId}:awareness` as const;
adapter.assertIn(spaceId, roomType);
client
.to(adapter.room(spaceId, roomType))
.emit('space:collect-awareness', { spaceType, spaceId, docId });
// TODO(@forehalo): remove backward compatibility
if (spaceType === SpaceType.Workspace) {
client
.to(adapter.room(spaceId, roomType))
.emit('new-client-awareness-init');
}
return { data: { clientId: client.id } };
}
@SubscribeMessage('space:update-awareness')
async onUpdateAwareness(
@ConnectedSocket() client: Socket,
@MessageBody() message: UpdateAwarenessMessage
) {
const { spaceType, spaceId, docId } = message;
const adapter = this.selectAdapter(client, spaceType);
const roomType = `${docId}:awareness` as const;
adapter.assertIn(spaceId, roomType);
client
.to(adapter.room(spaceId, roomType))
.emit('space:broadcast-awareness-update', message);
// TODO(@forehalo): remove backward compatibility
if (spaceType === SpaceType.Workspace) {
client
.to(adapter.room(spaceId, roomType))
.emit('server-awareness-broadcast', {
workspaceId: spaceId,
awarenessUpdate: message.awarenessUpdate,
});
}
return {};
}
// TODO(@forehalo): remove
// deprecated section
@Auth()
@SubscribeMessage('client-handshake-sync')
async handleClientHandshakeSync(
@CurrentUser() user: CurrentUser,
@MessageBody('workspaceId') workspaceId: string,
@MessageBody('version') version: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ clientId: string }>> {
await this.assertVersion(client, version);
return this.onJoinSpace(user, client, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
clientVersion: version,
});
}
@SubscribeMessage('client-leave-sync')
async handleLeaveSync(
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse> {
return this.onLeaveSpace(client, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
});
}
@SubscribeMessage('client-pre-sync')
async loadDocStats(
@ConnectedSocket() client: Socket,
@MessageBody()
{ workspaceId, timestamp }: { workspaceId: string; timestamp?: number }
): Promise<EventResponse<Record<string, number>>> {
return this.onLoadDocTimestamps(client, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
timestamp,
});
}
@SubscribeMessage('client-update-v2')
async handleClientUpdateV2(
@MessageBody()
{
workspaceId,
guid,
updates,
}: {
workspaceId: string;
guid: string;
updates: string[];
},
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ accepted: true; timestamp?: number }>> {
return this.onReceiveDocUpdates(client, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
docId: guid,
updates,
});
}
@SubscribeMessage('doc-load-v2')
async loadDocV2(
@ConnectedSocket() client: Socket,
@MessageBody()
{
workspaceId,
guid,
stateVector,
}: {
workspaceId: string;
guid: string;
stateVector?: string;
}
): Promise<
EventResponse<{ missing: string; state?: string; timestamp: number }>
> {
return this.onLoadSpaceDoc(client, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
docId: guid,
stateVector,
});
}
@Auth()
@SubscribeMessage('client-handshake-awareness')
async handleClientHandshakeAwareness(
@ConnectedSocket() client: Socket,
@CurrentUser() user: CurrentUser,
@MessageBody('workspaceId') workspaceId: string,
@MessageBody('version') version: string
): Promise<EventResponse<{ clientId: string }>> {
return this.onJoinAwareness(client, user, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
docId: workspaceId,
clientVersion: version,
});
}
@SubscribeMessage('client-leave-awareness')
async handleLeaveAwareness(
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse> {
return this.onLeaveAwareness(client, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
docId: workspaceId,
});
}
@SubscribeMessage('awareness-init')
async handleInitAwareness(
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ clientId: string }>> {
return this.onLoadAwareness(client, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
docId: workspaceId,
});
}
@SubscribeMessage('awareness-update')
async handleHelpGatheringAwareness(
@MessageBody()
{
workspaceId,
awarenessUpdate,
}: { workspaceId: string; awarenessUpdate: string },
@ConnectedSocket() client: Socket
): Promise<EventResponse> {
return this.onUpdateAwareness(client, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
docId: workspaceId,
awarenessUpdate,
});
}
}
abstract class SyncSocketAdapter {
constructor(
private readonly spaceType: SpaceType,
public readonly client: Socket,
public readonly storage: DocStorageAdapter
) {}
room(spaceId: string, roomType: RoomType = 'sync') {
return `${this.spaceType}:${Room(spaceId, roomType)}`;
}
async join(userId: string, spaceId: string, roomType: RoomType = 'sync') {
this.assertNotIn(spaceId, roomType);
await this.assertAccessible(spaceId, userId, Permission.Read);
return this.client.join(this.room(spaceId, roomType));
}
async leave(spaceId: string, roomType: RoomType = 'sync') {
this.assertIn(spaceId, roomType);
return this.client.leave(this.room(spaceId, roomType));
}
in(spaceId: string, roomType: RoomType = 'sync') {
return this.client.rooms.has(this.room(spaceId, roomType));
}
assertNotIn(spaceId: string, roomType: RoomType = 'sync') {
if (this.client.rooms.has(this.room(spaceId, roomType))) {
// TODO(@forehalo): use new AlreadyInSpace({ spaceId }) instead
throw new NotInSpace({ spaceId });
}
}
assertIn(spaceId: string, roomType: RoomType = 'sync') {
if (!this.client.rooms.has(this.room(spaceId, roomType))) {
throw new NotInSpace({ spaceId });
}
}
abstract assertAccessible(
spaceId: string,
userId: string,
permission?: Permission
): Promise<void>;
push(spaceId: string, docId: string, updates: Buffer[]) {
this.assertIn(spaceId);
return this.storage.pushDocUpdates(spaceId, docId, updates);
}
get(spaceId: string, docId: string) {
this.assertIn(spaceId);
return this.storage.getDoc(spaceId, docId);
}
getTimestamps(spaceId: string, timestamp?: number) {
this.assertIn(spaceId);
return this.storage.getSpaceDocTimestamps(spaceId, timestamp);
}
}
class WorkspaceSyncAdapter extends SyncSocketAdapter {
constructor(
client: Socket,
storage: DocStorageAdapter,
private readonly permission: PermissionService
) {
super(SpaceType.Workspace, client, storage);
}
// backward compatibility
override room(spaceId: string, roomType: RoomType = 'sync') {
return Room(spaceId, roomType);
}
override push(spaceId: string, docId: string, updates: Buffer[]) {
const id = new DocID(docId, spaceId);
return super.push(spaceId, id.guid, updates);
}
override get(spaceId: string, docId: string) {
const id = new DocID(docId, spaceId);
return this.storage.getDoc(spaceId, id.guid);
}
async assertAccessible(
spaceId: string,
userId: string,
permission: Permission = Permission.Read
) {
if (
!(await this.permission.isWorkspaceMember(spaceId, userId, permission))
) {
throw new SpaceAccessDenied({ spaceId });
}
}
}
class UserspaceSyncAdapter extends SyncSocketAdapter {
constructor(client: Socket, storage: DocStorageAdapter) {
super(SpaceType.Userspace, client, storage);
}
async assertAccessible(
spaceId: string,
userId: string,
_permission: Permission = Permission.Read
) {
if (spaceId !== userId) {
throw new SpaceAccessDenied({ spaceId });
}
}
}

View File

@@ -1,8 +1,11 @@
import { Module } from '@nestjs/common';
import { EventsModule } from './events/events.module';
import { DocStorageModule } from '../doc';
import { PermissionModule } from '../permission';
import { SpaceSyncGateway } from './gateway';
@Module({
imports: [EventsModule],
imports: [DocStorageModule, PermissionModule],
providers: [SpaceSyncGateway],
})
export class SyncModule {}

View File

@@ -12,7 +12,7 @@ import {
InvalidHistoryTimestamp,
} from '../../fundamentals';
import { CurrentUser, Public } from '../auth';
import { DocHistoryManager, DocManager } from '../doc';
import { PgWorkspaceDocStorageAdapter } from '../doc';
import { Permission, PermissionService, PublicPageMode } from '../permission';
import { WorkspaceBlobStorage } from '../storage';
import { DocID } from '../utils/doc';
@@ -23,8 +23,7 @@ export class WorkspacesController {
constructor(
private readonly storage: WorkspaceBlobStorage,
private readonly permission: PermissionService,
private readonly docManager: DocManager,
private readonly historyManager: DocHistoryManager,
private readonly workspace: PgWorkspaceDocStorageAdapter,
private readonly prisma: PrismaClient
) {}
@@ -56,7 +55,7 @@ export class WorkspacesController {
if (!body) {
throw new BlobNotFound({
workspaceId,
spaceId: workspaceId,
blobId: name,
});
}
@@ -96,14 +95,14 @@ export class WorkspacesController {
throw new AccessDenied();
}
const binResponse = await this.docManager.getBinary(
const binResponse = await this.workspace.getDoc(
docId.workspace,
docId.guid
);
if (!binResponse) {
throw new DocNotFound({
workspaceId: docId.workspace,
spaceId: docId.workspace,
docId: docId.guid,
});
}
@@ -125,7 +124,7 @@ export class WorkspacesController {
}
res.setHeader('content-type', 'application/octet-stream');
res.send(binResponse.binary);
res.send(binResponse.bin);
}
@Get('/:id/docs/:guid/histories/:timestamp')
@@ -152,19 +151,19 @@ export class WorkspacesController {
Permission.Write
);
const history = await this.historyManager.get(
const history = await this.workspace.getDocHistory(
docId.workspace,
docId.guid,
ts
ts.getTime()
);
if (history) {
res.setHeader('content-type', 'application/octet-stream');
res.setHeader('cache-control', 'private, max-age=2592000, immutable');
res.send(history.blob);
res.send(history.bin);
} else {
throw new DocHistoryNotFound({
workspaceId: docId.workspace,
spaceId: docId.workspace,
docId: guid,
timestamp: ts.getTime(),
});

View File

@@ -1,6 +1,6 @@
import { Module } from '@nestjs/common';
import { DocModule } from '../doc';
import { DocStorageModule } from '../doc';
import { FeatureModule } from '../features';
import { PermissionModule } from '../permission';
import { QuotaModule } from '../quota';
@@ -17,7 +17,7 @@ import {
@Module({
imports: [
DocModule,
DocStorageModule,
FeatureModule,
QuotaModule,
StorageModule,

View File

@@ -12,7 +12,7 @@ import {
import type { SnapshotHistory } from '@prisma/client';
import { CurrentUser } from '../../auth';
import { DocHistoryManager } from '../../doc';
import { PgWorkspaceDocStorageAdapter } from '../../doc';
import { Permission, PermissionService } from '../../permission';
import { DocID } from '../../utils/doc';
import { WorkspaceType } from '../types';
@@ -32,7 +32,7 @@ class DocHistoryType implements Partial<SnapshotHistory> {
@Resolver(() => WorkspaceType)
export class DocHistoryResolver {
constructor(
private readonly historyManager: DocHistoryManager,
private readonly workspace: PgWorkspaceDocStorageAdapter,
private readonly permission: PermissionService
) {}
@@ -47,17 +47,19 @@ export class DocHistoryResolver {
): Promise<DocHistoryType[]> {
const docId = new DocID(guid, workspace.id);
return this.historyManager
.list(workspace.id, docId.guid, timestamp, take)
.then(rows =>
rows.map(({ timestamp }) => {
return {
workspaceId: workspace.id,
id: docId.guid,
timestamp,
};
})
);
const timestamps = await this.workspace.listDocHistories(
workspace.id,
docId.guid,
{ before: timestamp.getTime(), limit: take }
);
return timestamps.map(timestamp => {
return {
workspaceId: workspace.id,
id: docId.guid,
timestamp: new Date(timestamp),
};
});
}
@Mutation(() => Date)
@@ -76,6 +78,12 @@ export class DocHistoryResolver {
Permission.Write
);
return this.historyManager.recover(docId.workspace, docId.guid, timestamp);
await this.workspace.rollbackDoc(
docId.workspace,
docId.guid,
timestamp.getTime()
);
return timestamp;
}
}

View File

@@ -15,17 +15,17 @@ import { applyUpdate, Doc } from 'yjs';
import type { FileUpload } from '../../../fundamentals';
import {
CantChangeWorkspaceOwner,
CantChangeSpaceOwner,
EventEmitter,
InternalServerError,
MailService,
MemberQuotaExceeded,
RequestMutex,
SpaceAccessDenied,
SpaceNotFound,
Throttle,
TooManyRequest,
UserNotFound,
WorkspaceAccessDenied,
WorkspaceNotFound,
} from '../../../fundamentals';
import { CurrentUser, Public } from '../../auth';
import { Permission, PermissionService } from '../../permission';
@@ -76,7 +76,7 @@ export class WorkspaceResolver {
const permission = await this.permissions.get(workspace.id, user.id);
if (!permission) {
throw new WorkspaceAccessDenied({ workspaceId: workspace.id });
throw new SpaceAccessDenied({ spaceId: workspace.id });
}
return permission;
@@ -193,7 +193,7 @@ export class WorkspaceResolver {
const workspace = await this.prisma.workspace.findUnique({ where: { id } });
if (!workspace) {
throw new WorkspaceNotFound({ workspaceId: id });
throw new SpaceNotFound({ spaceId: id });
}
return workspace;
@@ -304,7 +304,7 @@ export class WorkspaceResolver {
);
if (permission === Permission.Owner) {
throw new CantChangeWorkspaceOwner();
throw new CantChangeSpaceOwner();
}
try {