mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-04 16:44:56 +00:00
Compare commits
4 Commits
l-sun/enab
...
v0.20.5-be
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b427a89c9a | ||
|
|
00398fc63a | ||
|
|
cbef681125 | ||
|
|
bf42a4ddb2 |
@@ -31,6 +31,7 @@ type CreateProxyOptions = {
|
||||
transform?: Transform;
|
||||
onDispose: Slot;
|
||||
shouldByPassSignal: () => boolean;
|
||||
shouldByPassYjs: () => boolean;
|
||||
byPassSignalUpdate: (fn: () => void) => void;
|
||||
stashed: Set<string | number>;
|
||||
initialized: () => boolean;
|
||||
@@ -58,6 +59,7 @@ function createProxy(
|
||||
const {
|
||||
onDispose,
|
||||
shouldByPassSignal,
|
||||
shouldByPassYjs,
|
||||
byPassSignalUpdate,
|
||||
basePath,
|
||||
onChange,
|
||||
@@ -141,6 +143,9 @@ function createProxy(
|
||||
|
||||
if (isPureObject(value)) {
|
||||
const syncYMap = () => {
|
||||
if (shouldByPassYjs()) {
|
||||
return;
|
||||
}
|
||||
yMap.forEach((_, key) => {
|
||||
if (initialized() && keyWithoutPrefix(key).startsWith(fullPath)) {
|
||||
yMap.delete(key);
|
||||
@@ -185,7 +190,7 @@ function createProxy(
|
||||
|
||||
const yValue = native2Y(value);
|
||||
const next = transform(firstKey, value, yValue);
|
||||
if (!isStashed && initialized()) {
|
||||
if (!isStashed && initialized() && !shouldByPassYjs()) {
|
||||
yMap.doc?.transact(
|
||||
() => {
|
||||
yMap.set(keyWithPrefix(fullPath), yValue);
|
||||
@@ -238,7 +243,7 @@ function createProxy(
|
||||
});
|
||||
};
|
||||
|
||||
if (!isStashed && initialized()) {
|
||||
if (!isStashed && initialized() && !shouldByPassYjs()) {
|
||||
yMap.doc?.transact(
|
||||
() => {
|
||||
const fullKey = keyWithPrefix(fullPath);
|
||||
@@ -292,12 +297,17 @@ export class ReactiveFlatYMap extends BaseReactiveYData<
|
||||
if (this._stashed.has(firstKey)) {
|
||||
return;
|
||||
}
|
||||
void keys.reduce((acc, key, index, arr) => {
|
||||
if (index === arr.length - 1) {
|
||||
acc[key] = y2Native(value);
|
||||
}
|
||||
return acc[key] as UnRecord;
|
||||
}, proxy as UnRecord);
|
||||
this._updateWithYjsSkip(() => {
|
||||
void keys.reduce((acc, key, index, arr) => {
|
||||
if (!acc[key] && index !== arr.length - 1) {
|
||||
acc[key] = {};
|
||||
}
|
||||
if (index === arr.length - 1) {
|
||||
acc[key] = y2Native(value);
|
||||
}
|
||||
return acc[key] as UnRecord;
|
||||
}, proxy as UnRecord);
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (type.action === 'delete') {
|
||||
@@ -307,12 +317,26 @@ export class ReactiveFlatYMap extends BaseReactiveYData<
|
||||
if (this._stashed.has(firstKey)) {
|
||||
return;
|
||||
}
|
||||
void keys.reduce((acc, key, index, arr) => {
|
||||
if (index === arr.length - 1) {
|
||||
delete acc[key];
|
||||
}
|
||||
return acc[key] as UnRecord;
|
||||
}, proxy as UnRecord);
|
||||
this._updateWithYjsSkip(() => {
|
||||
void keys.reduce((acc, key, index, arr) => {
|
||||
if (index === arr.length - 1) {
|
||||
delete acc[key];
|
||||
let i = index - 1;
|
||||
let curr = acc;
|
||||
while (i > 0) {
|
||||
const parentPath = keys.slice(0, i);
|
||||
const parentKey = keys[i];
|
||||
const parent = parentPath.reduce((acc, key) => {
|
||||
return acc[key] as UnRecord;
|
||||
}, proxy as UnRecord);
|
||||
deleteEmptyObject(curr, parentKey, parent);
|
||||
curr = parent;
|
||||
i--;
|
||||
}
|
||||
}
|
||||
return acc[key] as UnRecord;
|
||||
}, proxy as UnRecord);
|
||||
});
|
||||
return;
|
||||
}
|
||||
});
|
||||
@@ -393,6 +417,8 @@ export class ReactiveFlatYMap extends BaseReactiveYData<
|
||||
return root;
|
||||
};
|
||||
|
||||
private _byPassYjs = false;
|
||||
|
||||
private readonly _getProxy = (
|
||||
source: UnRecord,
|
||||
root: UnRecord,
|
||||
@@ -402,6 +428,7 @@ export class ReactiveFlatYMap extends BaseReactiveYData<
|
||||
onDispose: this._onDispose,
|
||||
shouldByPassSignal: () => this._skipNext,
|
||||
byPassSignalUpdate: this._updateWithSkip,
|
||||
shouldByPassYjs: () => this._byPassYjs,
|
||||
basePath: path,
|
||||
onChange: this._onChange,
|
||||
transform: this._transform,
|
||||
@@ -410,6 +437,12 @@ export class ReactiveFlatYMap extends BaseReactiveYData<
|
||||
});
|
||||
};
|
||||
|
||||
private readonly _updateWithYjsSkip = (fn: () => void) => {
|
||||
this._byPassYjs = true;
|
||||
fn();
|
||||
this._byPassYjs = false;
|
||||
};
|
||||
|
||||
constructor(
|
||||
protected readonly _ySource: YMap<unknown>,
|
||||
private readonly _onDispose: Slot,
|
||||
@@ -453,3 +486,9 @@ export class ReactiveFlatYMap extends BaseReactiveYData<
|
||||
this._stashed.add(prop);
|
||||
};
|
||||
}
|
||||
|
||||
function deleteEmptyObject(obj: UnRecord, key: string, parent: UnRecord): void {
|
||||
if (Object.keys(obj).length === 0) {
|
||||
delete parent[key];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Cron, CronExpression } from '@nestjs/schedule';
|
||||
import { PrismaClient } from '@prisma/client';
|
||||
|
||||
import { OnJob } from '../../base';
|
||||
import { JobQueue, OnJob } from '../../base';
|
||||
import { PgWorkspaceDocStorageAdapter } from '../doc';
|
||||
|
||||
declare global {
|
||||
@@ -14,7 +16,11 @@ declare global {
|
||||
|
||||
@Injectable()
|
||||
export class DocServiceCronJob {
|
||||
constructor(private readonly workspace: PgWorkspaceDocStorageAdapter) {}
|
||||
constructor(
|
||||
private readonly workspace: PgWorkspaceDocStorageAdapter,
|
||||
private readonly prisma: PrismaClient,
|
||||
private readonly job: JobQueue
|
||||
) {}
|
||||
|
||||
@OnJob('doc.mergePendingDocUpdates')
|
||||
async mergePendingDocUpdates({
|
||||
@@ -23,4 +29,29 @@ export class DocServiceCronJob {
|
||||
}: Jobs['doc.mergePendingDocUpdates']) {
|
||||
await this.workspace.getDoc(workspaceId, docId);
|
||||
}
|
||||
|
||||
@Cron(CronExpression.EVERY_10_SECONDS)
|
||||
async schedule() {
|
||||
const group = await this.prisma.update.groupBy({
|
||||
by: ['workspaceId', 'id'],
|
||||
_count: true,
|
||||
});
|
||||
|
||||
for (const update of group) {
|
||||
if (update._count > 100) {
|
||||
await this.job.add(
|
||||
'doc.mergePendingDocUpdates',
|
||||
{
|
||||
workspaceId: update.workspaceId,
|
||||
docId: update.id,
|
||||
},
|
||||
{
|
||||
jobId: `doc:merge-pending-updates:${update.workspaceId}:${update.id}`,
|
||||
priority: update._count,
|
||||
delay: 0,
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,6 +92,7 @@ export class DocModel extends BaseModel {
|
||||
orderBy: {
|
||||
createdAt: 'asc',
|
||||
},
|
||||
take: 100,
|
||||
});
|
||||
return rows.map(r => this.updateToDocRecord(r));
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ export class AwarenessFrontend {
|
||||
return;
|
||||
}
|
||||
|
||||
applyAwarenessUpdate(awareness, update.bin, origin);
|
||||
applyAwarenessUpdate(awareness, update.bin, uniqueId);
|
||||
};
|
||||
const handleSyncCollect = () => {
|
||||
return Promise.resolve({
|
||||
|
||||
Reference in New Issue
Block a user