mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-15 05:37:32 +00:00
Compare commits
4 Commits
v0.26.3-be
...
v0.20.5-be
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b427a89c9a | ||
|
|
00398fc63a | ||
|
|
cbef681125 | ||
|
|
bf42a4ddb2 |
@@ -31,6 +31,7 @@ type CreateProxyOptions = {
|
|||||||
transform?: Transform;
|
transform?: Transform;
|
||||||
onDispose: Slot;
|
onDispose: Slot;
|
||||||
shouldByPassSignal: () => boolean;
|
shouldByPassSignal: () => boolean;
|
||||||
|
shouldByPassYjs: () => boolean;
|
||||||
byPassSignalUpdate: (fn: () => void) => void;
|
byPassSignalUpdate: (fn: () => void) => void;
|
||||||
stashed: Set<string | number>;
|
stashed: Set<string | number>;
|
||||||
initialized: () => boolean;
|
initialized: () => boolean;
|
||||||
@@ -58,6 +59,7 @@ function createProxy(
|
|||||||
const {
|
const {
|
||||||
onDispose,
|
onDispose,
|
||||||
shouldByPassSignal,
|
shouldByPassSignal,
|
||||||
|
shouldByPassYjs,
|
||||||
byPassSignalUpdate,
|
byPassSignalUpdate,
|
||||||
basePath,
|
basePath,
|
||||||
onChange,
|
onChange,
|
||||||
@@ -141,6 +143,9 @@ function createProxy(
|
|||||||
|
|
||||||
if (isPureObject(value)) {
|
if (isPureObject(value)) {
|
||||||
const syncYMap = () => {
|
const syncYMap = () => {
|
||||||
|
if (shouldByPassYjs()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
yMap.forEach((_, key) => {
|
yMap.forEach((_, key) => {
|
||||||
if (initialized() && keyWithoutPrefix(key).startsWith(fullPath)) {
|
if (initialized() && keyWithoutPrefix(key).startsWith(fullPath)) {
|
||||||
yMap.delete(key);
|
yMap.delete(key);
|
||||||
@@ -185,7 +190,7 @@ function createProxy(
|
|||||||
|
|
||||||
const yValue = native2Y(value);
|
const yValue = native2Y(value);
|
||||||
const next = transform(firstKey, value, yValue);
|
const next = transform(firstKey, value, yValue);
|
||||||
if (!isStashed && initialized()) {
|
if (!isStashed && initialized() && !shouldByPassYjs()) {
|
||||||
yMap.doc?.transact(
|
yMap.doc?.transact(
|
||||||
() => {
|
() => {
|
||||||
yMap.set(keyWithPrefix(fullPath), yValue);
|
yMap.set(keyWithPrefix(fullPath), yValue);
|
||||||
@@ -238,7 +243,7 @@ function createProxy(
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
if (!isStashed && initialized()) {
|
if (!isStashed && initialized() && !shouldByPassYjs()) {
|
||||||
yMap.doc?.transact(
|
yMap.doc?.transact(
|
||||||
() => {
|
() => {
|
||||||
const fullKey = keyWithPrefix(fullPath);
|
const fullKey = keyWithPrefix(fullPath);
|
||||||
@@ -292,12 +297,17 @@ export class ReactiveFlatYMap extends BaseReactiveYData<
|
|||||||
if (this._stashed.has(firstKey)) {
|
if (this._stashed.has(firstKey)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
void keys.reduce((acc, key, index, arr) => {
|
this._updateWithYjsSkip(() => {
|
||||||
if (index === arr.length - 1) {
|
void keys.reduce((acc, key, index, arr) => {
|
||||||
acc[key] = y2Native(value);
|
if (!acc[key] && index !== arr.length - 1) {
|
||||||
}
|
acc[key] = {};
|
||||||
return acc[key] as UnRecord;
|
}
|
||||||
}, proxy as UnRecord);
|
if (index === arr.length - 1) {
|
||||||
|
acc[key] = y2Native(value);
|
||||||
|
}
|
||||||
|
return acc[key] as UnRecord;
|
||||||
|
}, proxy as UnRecord);
|
||||||
|
});
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (type.action === 'delete') {
|
if (type.action === 'delete') {
|
||||||
@@ -307,12 +317,26 @@ export class ReactiveFlatYMap extends BaseReactiveYData<
|
|||||||
if (this._stashed.has(firstKey)) {
|
if (this._stashed.has(firstKey)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
void keys.reduce((acc, key, index, arr) => {
|
this._updateWithYjsSkip(() => {
|
||||||
if (index === arr.length - 1) {
|
void keys.reduce((acc, key, index, arr) => {
|
||||||
delete acc[key];
|
if (index === arr.length - 1) {
|
||||||
}
|
delete acc[key];
|
||||||
return acc[key] as UnRecord;
|
let i = index - 1;
|
||||||
}, proxy as UnRecord);
|
let curr = acc;
|
||||||
|
while (i > 0) {
|
||||||
|
const parentPath = keys.slice(0, i);
|
||||||
|
const parentKey = keys[i];
|
||||||
|
const parent = parentPath.reduce((acc, key) => {
|
||||||
|
return acc[key] as UnRecord;
|
||||||
|
}, proxy as UnRecord);
|
||||||
|
deleteEmptyObject(curr, parentKey, parent);
|
||||||
|
curr = parent;
|
||||||
|
i--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return acc[key] as UnRecord;
|
||||||
|
}, proxy as UnRecord);
|
||||||
|
});
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -393,6 +417,8 @@ export class ReactiveFlatYMap extends BaseReactiveYData<
|
|||||||
return root;
|
return root;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
private _byPassYjs = false;
|
||||||
|
|
||||||
private readonly _getProxy = (
|
private readonly _getProxy = (
|
||||||
source: UnRecord,
|
source: UnRecord,
|
||||||
root: UnRecord,
|
root: UnRecord,
|
||||||
@@ -402,6 +428,7 @@ export class ReactiveFlatYMap extends BaseReactiveYData<
|
|||||||
onDispose: this._onDispose,
|
onDispose: this._onDispose,
|
||||||
shouldByPassSignal: () => this._skipNext,
|
shouldByPassSignal: () => this._skipNext,
|
||||||
byPassSignalUpdate: this._updateWithSkip,
|
byPassSignalUpdate: this._updateWithSkip,
|
||||||
|
shouldByPassYjs: () => this._byPassYjs,
|
||||||
basePath: path,
|
basePath: path,
|
||||||
onChange: this._onChange,
|
onChange: this._onChange,
|
||||||
transform: this._transform,
|
transform: this._transform,
|
||||||
@@ -410,6 +437,12 @@ export class ReactiveFlatYMap extends BaseReactiveYData<
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
private readonly _updateWithYjsSkip = (fn: () => void) => {
|
||||||
|
this._byPassYjs = true;
|
||||||
|
fn();
|
||||||
|
this._byPassYjs = false;
|
||||||
|
};
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
protected readonly _ySource: YMap<unknown>,
|
protected readonly _ySource: YMap<unknown>,
|
||||||
private readonly _onDispose: Slot,
|
private readonly _onDispose: Slot,
|
||||||
@@ -453,3 +486,9 @@ export class ReactiveFlatYMap extends BaseReactiveYData<
|
|||||||
this._stashed.add(prop);
|
this._stashed.add(prop);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function deleteEmptyObject(obj: UnRecord, key: string, parent: UnRecord): void {
|
||||||
|
if (Object.keys(obj).length === 0) {
|
||||||
|
delete parent[key];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
import { Injectable } from '@nestjs/common';
|
import { Injectable } from '@nestjs/common';
|
||||||
|
import { Cron, CronExpression } from '@nestjs/schedule';
|
||||||
|
import { PrismaClient } from '@prisma/client';
|
||||||
|
|
||||||
import { OnJob } from '../../base';
|
import { JobQueue, OnJob } from '../../base';
|
||||||
import { PgWorkspaceDocStorageAdapter } from '../doc';
|
import { PgWorkspaceDocStorageAdapter } from '../doc';
|
||||||
|
|
||||||
declare global {
|
declare global {
|
||||||
@@ -14,7 +16,11 @@ declare global {
|
|||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class DocServiceCronJob {
|
export class DocServiceCronJob {
|
||||||
constructor(private readonly workspace: PgWorkspaceDocStorageAdapter) {}
|
constructor(
|
||||||
|
private readonly workspace: PgWorkspaceDocStorageAdapter,
|
||||||
|
private readonly prisma: PrismaClient,
|
||||||
|
private readonly job: JobQueue
|
||||||
|
) {}
|
||||||
|
|
||||||
@OnJob('doc.mergePendingDocUpdates')
|
@OnJob('doc.mergePendingDocUpdates')
|
||||||
async mergePendingDocUpdates({
|
async mergePendingDocUpdates({
|
||||||
@@ -23,4 +29,29 @@ export class DocServiceCronJob {
|
|||||||
}: Jobs['doc.mergePendingDocUpdates']) {
|
}: Jobs['doc.mergePendingDocUpdates']) {
|
||||||
await this.workspace.getDoc(workspaceId, docId);
|
await this.workspace.getDoc(workspaceId, docId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Cron(CronExpression.EVERY_10_SECONDS)
|
||||||
|
async schedule() {
|
||||||
|
const group = await this.prisma.update.groupBy({
|
||||||
|
by: ['workspaceId', 'id'],
|
||||||
|
_count: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
for (const update of group) {
|
||||||
|
if (update._count > 100) {
|
||||||
|
await this.job.add(
|
||||||
|
'doc.mergePendingDocUpdates',
|
||||||
|
{
|
||||||
|
workspaceId: update.workspaceId,
|
||||||
|
docId: update.id,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
jobId: `doc:merge-pending-updates:${update.workspaceId}:${update.id}`,
|
||||||
|
priority: update._count,
|
||||||
|
delay: 0,
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -92,6 +92,7 @@ export class DocModel extends BaseModel {
|
|||||||
orderBy: {
|
orderBy: {
|
||||||
createdAt: 'asc',
|
createdAt: 'asc',
|
||||||
},
|
},
|
||||||
|
take: 100,
|
||||||
});
|
});
|
||||||
return rows.map(r => this.updateToDocRecord(r));
|
return rows.map(r => this.updateToDocRecord(r));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ export class AwarenessFrontend {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
applyAwarenessUpdate(awareness, update.bin, origin);
|
applyAwarenessUpdate(awareness, update.bin, uniqueId);
|
||||||
};
|
};
|
||||||
const handleSyncCollect = () => {
|
const handleSyncCollect = () => {
|
||||||
return Promise.resolve({
|
return Promise.resolve({
|
||||||
|
|||||||
Reference in New Issue
Block a user