refactor(infra): migrate to new infra (#5565)

This commit is contained in:
EYHN
2024-01-30 07:16:39 +00:00
parent 1e3499c323
commit 329fc19852
170 changed files with 2007 additions and 4354 deletions

View File

@@ -1,6 +1,7 @@
import { DebugLogger } from '@affine/debug';
import { fetchWithTraceReport } from '@affine/graphql';
import type { SyncStorage } from '@affine/workspace';
import { type SyncStorage } from '@toeverything/infra';
import type { CleanupService } from '@toeverything/infra/lifecycle';
import { getIoManager } from '../../utils/affine-io';
import { base64ToUint8Array, uint8ArrayToBase64 } from '../../utils/base64';
@@ -8,22 +9,21 @@ import { MultipleBatchSyncSender } from './batch-sync-sender';
const logger = new DebugLogger('affine:storage:socketio');
export function createAffineStorage(
workspaceId: string
): SyncStorage & { disconnect: () => void } {
logger.debug('createAffineStorage', workspaceId);
const socket = getIoManager().socket('/');
export class AffineSyncStorage implements SyncStorage {
name = 'affine-cloud';
const syncSender = new MultipleBatchSyncSender(async (guid, updates) => {
socket = getIoManager().socket('/');
syncSender = new MultipleBatchSyncSender(async (guid, updates) => {
const payload = await Promise.all(
updates.map(update => uint8ArrayToBase64(update))
);
return new Promise(resolve => {
socket.emit(
this.socket.emit(
'client-update-v2',
{
workspaceId,
workspaceId: this.workspaceId,
guid,
updates: payload,
},
@@ -35,7 +35,7 @@ export function createAffineStorage(
// TODO: raise error with different code to users
if (response.error) {
logger.error('client-update-v2 error', {
workspaceId,
workspaceId: this.workspaceId,
guid,
response,
});
@@ -51,145 +51,160 @@ export function createAffineStorage(
});
});
function handleConnect() {
socket.emit(
constructor(
private readonly workspaceId: string,
cleanupService: CleanupService
) {
this.socket.on('connect', this.handleConnect);
this.socket.connect();
this.socket.emit(
'client-handshake-sync',
workspaceId,
this.workspaceId,
(response: { error?: any }) => {
if (!response.error) {
syncSender.start();
this.syncSender.start();
}
}
);
cleanupService.add(() => {
this.cleanup();
});
}
socket.on('connect', handleConnect);
socket.connect();
socket.emit(
'client-handshake-sync',
workspaceId,
(response: { error?: any }) => {
if (!response.error) {
syncSender.start();
handleConnect = () => {
this.socket.emit(
'client-handshake-sync',
this.workspaceId,
(response: { error?: any }) => {
if (!response.error) {
this.syncSender.start();
}
}
}
);
);
};
return {
name: 'affine-cloud',
async pull(docId, state) {
const stateVector = state ? await uint8ArrayToBase64(state) : undefined;
async pull(
docId: string,
state: Uint8Array
): Promise<{ data: Uint8Array; state?: Uint8Array } | null> {
const stateVector = state ? await uint8ArrayToBase64(state) : undefined;
return new Promise((resolve, reject) => {
logger.debug('doc-load-v2', {
workspaceId: workspaceId,
return new Promise((resolve, reject) => {
logger.debug('doc-load-v2', {
workspaceId: this.workspaceId,
guid: docId,
stateVector,
});
this.socket.emit(
'doc-load-v2',
{
workspaceId: this.workspaceId,
guid: docId,
stateVector,
});
socket.emit(
'doc-load-v2',
{
workspaceId: workspaceId,
},
(
response: // TODO: reuse `EventError` with server
{ error: any } | { data: { missing: string; state: string } }
) => {
logger.debug('doc-load callback', {
workspaceId: this.workspaceId,
guid: docId,
stateVector,
},
(
response: // TODO: reuse `EventError` with server
{ error: any } | { data: { missing: string; state: string } }
) => {
logger.debug('doc-load callback', {
workspaceId: workspaceId,
guid: docId,
stateVector,
response,
});
if ('error' in response) {
// TODO: result `EventError` with server
if (response.error.code === 'DOC_NOT_FOUND') {
resolve(null);
} else {
reject(new Error(response.error.message));
}
} else {
resolve({
data: base64ToUint8Array(response.data.missing),
state: response.data.state
? base64ToUint8Array(response.data.state)
: undefined,
});
}
}
);
});
},
async push(docId, update) {
logger.debug('client-update-v2', {
workspaceId,
guid: docId,
update,
});
await syncSender.send(docId, update);
},
async subscribe(cb, disconnect) {
const handleUpdate = async (message: {
workspaceId: string;
guid: string;
updates: string[];
}) => {
if (message.workspaceId === workspaceId) {
message.updates.forEach(update => {
cb(message.guid, base64ToUint8Array(update));
response,
});
}
};
socket.on('server-updates', handleUpdate);
socket.on('disconnect', reason => {
socket.off('server-updates', handleUpdate);
disconnect(reason);
});
return () => {
socket.off('server-updates', handleUpdate);
};
},
disconnect() {
syncSender.stop();
socket.emit('client-leave-sync', workspaceId);
socket.off('connect', handleConnect);
},
};
}
export function createAffineStaticStorage(workspaceId: string): SyncStorage {
logger.debug('createAffineStaticStorage', workspaceId);
return {
name: 'affine-cloud-static',
async pull(docId) {
const response = await fetchWithTraceReport(
`/api/workspaces/${workspaceId}/docs/${docId}`,
{
priority: 'high',
if ('error' in response) {
// TODO: result `EventError` with server
if (response.error.code === 'DOC_NOT_FOUND') {
resolve(null);
} else {
reject(new Error(response.error.message));
}
} else {
resolve({
data: base64ToUint8Array(response.data.missing),
state: response.data.state
? base64ToUint8Array(response.data.state)
: undefined,
});
}
}
);
if (response.ok) {
const arrayBuffer = await response.arrayBuffer();
});
}
return { data: new Uint8Array(arrayBuffer) };
async push(docId: string, update: Uint8Array) {
logger.debug('client-update-v2', {
workspaceId: this.workspaceId,
guid: docId,
update,
});
await this.syncSender.send(docId, update);
}
async subscribe(
cb: (docId: string, data: Uint8Array) => void,
disconnect: (reason: string) => void
) {
const handleUpdate = async (message: {
workspaceId: string;
guid: string;
updates: string[];
}) => {
if (message.workspaceId === this.workspaceId) {
message.updates.forEach(update => {
cb(message.guid, base64ToUint8Array(update));
});
}
};
this.socket.on('server-updates', handleUpdate);
return null;
},
async push() {
throw new Error('Not implemented');
},
async subscribe() {
throw new Error('Not implemented');
},
};
this.socket.on('disconnect', reason => {
this.socket.off('server-updates', handleUpdate);
disconnect(reason);
});
return () => {
this.socket.off('server-updates', handleUpdate);
};
}
cleanup() {
this.syncSender.stop();
this.socket.emit('client-leave-sync', this.workspaceId);
this.socket.off('connect', this.handleConnect);
}
}
export class AffineStaticSyncStorage implements SyncStorage {
name = 'affine-cloud-static';
constructor(private readonly workspaceId: string) {}
async pull(
docId: string
): Promise<{ data: Uint8Array; state?: Uint8Array | undefined } | null> {
const response = await fetchWithTraceReport(
`/api/workspaces/${this.workspaceId}/docs/${docId}`,
{
priority: 'high',
}
);
if (response.ok) {
const arrayBuffer = await response.arrayBuffer();
return { data: new Uint8Array(arrayBuffer) };
}
return null;
}
push(): Promise<void> {
throw new Error('Method not implemented.');
}
subscribe(): Promise<() => void> {
throw new Error('Method not implemented.');
}
}