feat(core): orm (#6536)

This commit is contained in:
forehalo
2024-04-25 03:03:45 +00:00
parent 31b284a2d0
commit a697ebe340
29 changed files with 1980 additions and 118 deletions

View File

@@ -1,125 +1,14 @@
import { nanoid } from 'nanoid';
import { describe, expect, test, vitest } from 'vitest';
import {
diffUpdate,
Doc as YDoc,
encodeStateAsUpdate,
encodeStateVectorFromUpdate,
mergeUpdates,
} from 'yjs';
import { Doc as YDoc, encodeStateAsUpdate } from 'yjs';
import { AsyncLock } from '../../../utils';
import { DocEngine } from '..';
import type { DocServer } from '../server';
import { MemoryStorage } from '../storage';
import { isEmptyUpdate } from '../utils';
class MiniServer {
lock = new AsyncLock();
db = new Map<string, { data: Uint8Array; clock: number }>();
listeners = new Set<{
cb: (updates: {
docId: string;
data: Uint8Array;
serverClock: number;
}) => void;
clientId: string;
}>();
client() {
return new MiniServerClient(nanoid(), this);
}
}
class MiniServerClient implements DocServer {
constructor(
private readonly id: string,
private readonly server: MiniServer
) {}
async pullDoc(docId: string, stateVector: Uint8Array) {
using _lock = await this.server.lock.acquire();
const doc = this.server.db.get(docId);
if (!doc) {
return null;
}
const data = doc.data;
return {
data:
!isEmptyUpdate(data) && stateVector.length > 0
? diffUpdate(data, stateVector)
: data,
serverClock: 0,
stateVector: !isEmptyUpdate(data)
? encodeStateVectorFromUpdate(data)
: new Uint8Array(),
};
}
async pushDoc(
docId: string,
data: Uint8Array
): Promise<{ serverClock: number }> {
using _lock = await this.server.lock.acquire();
const doc = this.server.db.get(docId);
const oldData = doc?.data ?? new Uint8Array();
const newClock = (doc?.clock ?? 0) + 1;
this.server.db.set(docId, {
data: !isEmptyUpdate(data)
? !isEmptyUpdate(oldData)
? mergeUpdates([oldData, data])
: data
: oldData,
clock: newClock,
});
for (const { clientId, cb } of this.server.listeners) {
if (clientId !== this.id) {
cb({
docId,
data,
serverClock: newClock,
});
}
}
return { serverClock: newClock };
}
async loadServerClock(after: number): Promise<Map<string, number>> {
using _lock = await this.server.lock.acquire();
const map = new Map<string, number>();
for (const [docId, { clock }] of this.server.db) {
if (clock > after) {
map.set(docId, clock);
}
}
return map;
}
async subscribeAllDocs(
cb: (updates: {
docId: string;
data: Uint8Array;
serverClock: number;
}) => void
): Promise<() => void> {
const listener = { cb, clientId: this.id };
this.server.listeners.add(listener);
return () => {
this.server.listeners.delete(listener);
};
}
async waitForConnectingServer(): Promise<void> {}
disconnectServer(): void {}
onInterrupted(_cb: (reason: string) => void): void {}
}
import { MiniSyncServer } from './utils';
describe('sync', () => {
test('basic sync', async () => {
const storage = new MemoryStorage();
const server = new MiniServer();
const server = new MiniSyncServer();
const engine = new DocEngine(storage, server.client()).start();
const doc = new YDoc({ guid: 'a' });
engine.addDoc(doc);
@@ -132,7 +21,7 @@ describe('sync', () => {
});
test('can pull from server', async () => {
const server = new MiniServer();
const server = new MiniSyncServer();
{
const engine = new DocEngine(
new MemoryStorage(),
@@ -158,7 +47,7 @@ describe('sync', () => {
});
test('2 client', async () => {
const server = new MiniServer();
const server = new MiniSyncServer();
await Promise.all([
(async () => {
const engine = new DocEngine(
@@ -190,7 +79,7 @@ describe('sync', () => {
});
test('2 client share storage and eventBus (simulate different tabs in same browser)', async () => {
const server = new MiniServer();
const server = new MiniSyncServer();
const storage = new MemoryStorage();
await Promise.all([
@@ -215,7 +104,7 @@ describe('sync', () => {
});
test('legacy data', async () => {
const server = new MiniServer();
const server = new MiniSyncServer();
const storage = new MemoryStorage();
{

View File

@@ -0,0 +1,108 @@
import { nanoid } from 'nanoid';
import { diffUpdate, encodeStateVectorFromUpdate, mergeUpdates } from 'yjs';
import { AsyncLock } from '../../../utils';
import type { DocServer } from '../server';
import { isEmptyUpdate } from '../utils';
export class MiniSyncServer {
lock = new AsyncLock();
db = new Map<string, { data: Uint8Array; clock: number }>();
listeners = new Set<{
cb: (updates: {
docId: string;
data: Uint8Array;
serverClock: number;
}) => void;
clientId: string;
}>();
client() {
return new MiniServerClient(nanoid(), this);
}
}
export class MiniServerClient implements DocServer {
constructor(
private readonly id: string,
private readonly server: MiniSyncServer
) {}
async pullDoc(docId: string, stateVector: Uint8Array) {
using _lock = await this.server.lock.acquire();
const doc = this.server.db.get(docId);
if (!doc) {
return null;
}
const data = doc.data;
return {
data:
!isEmptyUpdate(data) && stateVector.length > 0
? diffUpdate(data, stateVector)
: data,
serverClock: 0,
stateVector: !isEmptyUpdate(data)
? encodeStateVectorFromUpdate(data)
: new Uint8Array(),
};
}
async pushDoc(
docId: string,
data: Uint8Array
): Promise<{ serverClock: number }> {
using _lock = await this.server.lock.acquire();
const doc = this.server.db.get(docId);
const oldData = doc?.data ?? new Uint8Array();
const newClock = (doc?.clock ?? 0) + 1;
this.server.db.set(docId, {
data: !isEmptyUpdate(data)
? !isEmptyUpdate(oldData)
? mergeUpdates([oldData, data])
: data
: oldData,
clock: newClock,
});
for (const { clientId, cb } of this.server.listeners) {
if (clientId !== this.id) {
cb({
docId,
data,
serverClock: newClock,
});
}
}
return { serverClock: newClock };
}
async loadServerClock(after: number): Promise<Map<string, number>> {
using _lock = await this.server.lock.acquire();
const map = new Map<string, number>();
for (const [docId, { clock }] of this.server.db) {
if (clock > after) {
map.set(docId, clock);
}
}
return map;
}
async subscribeAllDocs(
cb: (updates: {
docId: string;
data: Uint8Array;
serverClock: number;
}) => void
): Promise<() => void> {
const listener = { cb, clientId: this.id };
this.server.listeners.add(listener);
return () => {
this.server.listeners.delete(listener);
};
}
async waitForConnectingServer(): Promise<void> {}
disconnectServer(): void {}
onInterrupted(_cb: (reason: string) => void): void {}
}