feat(infra): new workspace infra (#5617)

This PR copying @affine/workspace into common/infra, and adding definitions for services and unit tests.
This commit is contained in:
EYHN
2024-01-30 06:31:23 +00:00
parent 4f7e0d012d
commit 2e71c980cf
37 changed files with 3082 additions and 4 deletions

View File

@@ -0,0 +1,45 @@
import { describe, expect, test, vi } from 'vitest';
import { AsyncQueue } from '../async-queue';
describe('async-queue', () => {
test('push & pop', async () => {
const queue = new AsyncQueue();
queue.push(1, 2, 3);
expect(queue.length).toBe(3);
expect(await queue.next()).toBe(1);
expect(await queue.next()).toBe(2);
expect(await queue.next()).toBe(3);
expect(queue.length).toBe(0);
});
test('await', async () => {
const queue = new AsyncQueue<number>();
queue.push(1, 2);
expect(await queue.next()).toBe(1);
expect(await queue.next()).toBe(2);
let v = -1;
// setup 2 pop tasks
queue.next().then(next => {
v = next;
});
queue.next().then(next => {
v = next;
});
// Wait for 100ms
await new Promise(resolve => setTimeout(resolve, 100));
// v should not be changed
expect(v).toBe(-1);
// push 3, should trigger the first pop task
queue.push(3);
await vi.waitFor(() => v === 3);
// push 4, should trigger the second pop task
queue.push(4);
await vi.waitFor(() => v === 4);
});
});

View File

@@ -0,0 +1,13 @@
import { describe, expect, test } from 'vitest';
import { throwIfAborted } from '../throw-if-aborted';
describe('throw-if-aborted', () => {
test('basic', async () => {
const abortController = new AbortController();
const abortSignal = abortController.signal;
expect(throwIfAborted(abortSignal)).toBe(true);
abortController.abort('TEST_ABORT');
expect(() => throwIfAborted(abortSignal)).toThrowError('TEST_ABORT');
});
});

View File

@@ -0,0 +1,101 @@
export class AsyncQueue<T> {
private _queue: T[];
private _resolveUpdate: (() => void) | null = null;
private _waitForUpdate: Promise<void> | null = null;
constructor(init: T[] = []) {
this._queue = init;
}
get length() {
return this._queue.length;
}
async next(
abort?: AbortSignal,
dequeue: (arr: T[]) => T | undefined = a => a.shift()
): Promise<T> {
const update = dequeue(this._queue);
if (update) {
return update;
} else {
if (!this._waitForUpdate) {
this._waitForUpdate = new Promise(resolve => {
this._resolveUpdate = resolve;
});
}
await Promise.race([
this._waitForUpdate,
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
return this.next(abort, dequeue);
}
}
push(...updates: T[]) {
this._queue.push(...updates);
if (this._resolveUpdate) {
const resolve = this._resolveUpdate;
this._resolveUpdate = null;
this._waitForUpdate = null;
resolve();
}
}
remove(predicate: (update: T) => boolean) {
const index = this._queue.findIndex(predicate);
if (index !== -1) {
this._queue.splice(index, 1);
}
}
find(predicate: (update: T) => boolean) {
return this._queue.find(predicate);
}
clear() {
this._queue = [];
}
}
export class PriorityAsyncQueue<
T extends { id: string },
> extends AsyncQueue<T> {
constructor(
init: T[] = [],
public readonly priorityTarget: SharedPriorityTarget = new SharedPriorityTarget()
) {
super(init);
}
override next(abort?: AbortSignal | undefined): Promise<T> {
return super.next(abort, arr => {
if (this.priorityTarget.priorityRule !== null) {
const index = arr.findIndex(
update => this.priorityTarget.priorityRule?.(update.id)
);
if (index !== -1) {
return arr.splice(index, 1)[0];
}
}
return arr.shift();
});
}
}
/**
* Shared priority target can be shared by multiple queues.
*/
export class SharedPriorityTarget {
public priorityRule: ((id: string) => boolean) | null = null;
}

View File

@@ -0,0 +1,5 @@
export * from './async-queue';
export * from './merge-updates';
export * from './object-pool';
export * from './stable-hash';
export * from './throw-if-aborted';

View File

@@ -0,0 +1,17 @@
import { applyUpdate, Doc, encodeStateAsUpdate } from 'yjs';
export function mergeUpdates(updates: Uint8Array[]) {
if (updates.length === 0) {
return new Uint8Array();
}
if (updates.length === 1) {
return updates[0];
}
const doc = new Doc();
doc.transact(() => {
updates.forEach(update => {
applyUpdate(doc, update);
});
});
return encodeStateAsUpdate(doc);
}

View File

@@ -0,0 +1,96 @@
import { Unreachable } from '@affine/env/constant';
export interface RcRef<T> {
obj: T;
release: () => void;
}
export class ObjectPool<Key, T> {
objects = new Map<Key, { obj: T; rc: number }>();
timeoutToGc: NodeJS.Timeout | null = null;
constructor(
private readonly options: {
onDelete?: (obj: T) => void;
onDangling?: (obj: T) => boolean;
} = {}
) {}
get(key: Key): RcRef<T> | null {
const exist = this.objects.get(key);
if (exist) {
exist.rc++;
let released = false;
return {
obj: exist.obj,
release: () => {
// avoid double release
if (released) {
return;
}
released = true;
exist.rc--;
this.requestGc();
},
};
}
return null;
}
put(key: Key, obj: T) {
const ref = { obj, rc: 0 };
this.objects.set(key, ref);
const r = this.get(key);
if (!r) {
throw new Unreachable();
}
return r;
}
private requestGc() {
if (this.timeoutToGc) {
clearInterval(this.timeoutToGc);
}
// do gc every 1s
this.timeoutToGc = setInterval(() => {
this.gc();
}, 1000);
}
private gc() {
for (const [key, { obj, rc }] of new Map(
this.objects /* clone the map, because the origin will be modified during iteration */
)) {
if (
rc === 0 &&
(!this.options.onDangling || this.options.onDangling(obj))
) {
this.options.onDelete?.(obj);
this.objects.delete(key);
}
}
for (const [_, { rc }] of this.objects) {
if (rc === 0) {
return;
}
}
// if all object has referrer, stop gc
if (this.timeoutToGc) {
clearInterval(this.timeoutToGc);
}
}
clear() {
for (const { obj } of this.objects.values()) {
this.options.onDelete?.(obj);
}
this.objects.clear();
}
}

View File

@@ -0,0 +1,9 @@
// because AbortSignal.throwIfAborted is not available in abortcontroller-polyfill
export function throwIfAborted(abort?: AbortSignal) {
if (abort?.aborted) {
throw new Error(abort.reason);
}
return true;
}
export const MANUALLY_STOP = 'manually-stop';