feat(server): lightweight s3 client (#14348)

#### PR Dependency Tree


* **PR #14348** 👈

This tree was auto-generated by
[Charcoal](https://github.com/danerwilliams/charcoal)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Added a dedicated S3-compatible client package and expanded
S3-compatible storage config (endpoint, region, forcePathStyle,
requestTimeoutMs, minPartSize, presign options, sessionToken).
* Document sync now broadcasts batched/compressed doc updates for more
efficient real-time syncing.

* **Tests**
* New unit and benchmark tests for base64 utilities and S3 multipart
listing; updated storage-related tests to match new formats.

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
DarkSky
2026-02-01 21:54:39 +08:00
committed by GitHub
parent 059d3aa04a
commit f1a6e409cb
37 changed files with 1539 additions and 1712 deletions

View File

@@ -26,6 +26,7 @@
"postinstall": "prisma generate"
},
"dependencies": {
"@affine/s3-compat": "workspace:*",
"@affine/server-native": "workspace:*",
"@ai-sdk/anthropic": "^2.0.54",
"@ai-sdk/google": "^2.0.45",
@@ -34,8 +35,6 @@
"@ai-sdk/openai-compatible": "^1.0.28",
"@ai-sdk/perplexity": "^2.0.21",
"@apollo/server": "^4.12.2",
"@aws-sdk/client-s3": "^3.948.0",
"@aws-sdk/s3-request-presigner": "^3.948.0",
"@fal-ai/serverless-client": "^0.15.0",
"@google-cloud/opentelemetry-cloud-trace-exporter": "^3.0.0",
"@google-cloud/opentelemetry-resource-util": "^3.0.0",

View File

@@ -41,9 +41,7 @@ class MockR2Provider extends R2StorageProvider {
super(config, bucket);
}
destroy() {
this.client.destroy();
}
destroy() {}
// @ts-ignore expect override
override async proxyPutObject(
@@ -66,7 +64,7 @@ class MockR2Provider extends R2StorageProvider {
body: any,
options: { contentLength?: number } = {}
) {
const etag = `"etag-${partNumber}"`;
const etag = `etag-${partNumber}`;
this.partCalls.push({
key,
uploadId,
@@ -322,7 +320,7 @@ e2e('should proxy multipart upload and return etag', async t => {
.send(payload);
t.is(res.status, 200);
t.is(res.get('etag'), '"etag-1"');
t.is(res.get('etag'), 'etag-1');
const calls = getProvider().partCalls;
t.is(calls.length, 1);
@@ -356,7 +354,7 @@ e2e('should resume multipart upload and return uploaded parts', async t => {
const init2 = await createBlobUpload(workspace.id, key, totalSize, 'bin');
t.is(init2.method, 'MULTIPART');
t.is(init2.uploadId, 'upload-id');
t.deepEqual(init2.uploadedParts, [{ partNumber: 1, etag: '"etag-1"' }]);
t.deepEqual(init2.uploadedParts, [{ partNumber: 1, etag: 'etag-1' }]);
t.is(getProvider().createMultipartCalls, 1);
});

View File

@@ -141,7 +141,7 @@ test('should override correctly', t => {
config: {
credentials: {
accessKeyId: '1',
accessKeySecret: '1',
secretAccessKey: '1',
},
},
},
@@ -169,7 +169,7 @@ test('should override correctly', t => {
config: {
credentials: {
accessKeyId: '1',
accessKeySecret: '1',
secretAccessKey: '1',
},
},
});

View File

@@ -0,0 +1,49 @@
import { parseListPartsXml } from '@affine/s3-compat';
import test from 'ava';
test('parseListPartsXml handles array parts and pagination', t => {
const xml = `<?xml version="1.0" encoding="UTF-8"?>
<ListPartsResult>
<Bucket>test</Bucket>
<Key>key</Key>
<UploadId>upload-id</UploadId>
<PartNumberMarker>0</PartNumberMarker>
<NextPartNumberMarker>3</NextPartNumberMarker>
<IsTruncated>true</IsTruncated>
<Part>
<PartNumber>1</PartNumber>
<ETag>"etag-1"</ETag>
</Part>
<Part>
<PartNumber>2</PartNumber>
<ETag>etag-2</ETag>
</Part>
</ListPartsResult>`;
const result = parseListPartsXml(xml);
t.deepEqual(result.parts, [
{ partNumber: 1, etag: 'etag-1' },
{ partNumber: 2, etag: 'etag-2' },
]);
t.true(result.isTruncated);
t.is(result.nextPartNumberMarker, '3');
});
test('parseListPartsXml handles single part', t => {
const xml = `<?xml version="1.0" encoding="UTF-8"?>
<ListPartsResult>
<Bucket>test</Bucket>
<Key>key</Key>
<UploadId>upload-id</UploadId>
<IsTruncated>false</IsTruncated>
<Part>
<PartNumber>5</PartNumber>
<ETag>"etag-5"</ETag>
</Part>
</ListPartsResult>`;
const result = parseListPartsXml(xml);
t.deepEqual(result.parts, [{ partNumber: 5, etag: 'etag-5' }]);
t.false(result.isTruncated);
t.is(result.nextPartNumberMarker, undefined);
});

View File

@@ -4,7 +4,8 @@ import { S3StorageProvider } from '../providers/s3';
import { SIGNED_URL_EXPIRED } from '../providers/utils';
const config = {
region: 'auto',
region: 'us-east-1',
endpoint: 'https://s3.us-east-1.amazonaws.com',
credentials: {
accessKeyId: 'test',
secretAccessKey: 'test',
@@ -24,6 +25,8 @@ test('presignPut should return url and headers', async t => {
t.truthy(result);
t.true(result!.url.length > 0);
t.true(result!.url.includes('X-Amz-Algorithm=AWS4-HMAC-SHA256'));
t.true(result!.url.includes('X-Amz-SignedHeaders='));
t.true(result!.url.includes('content-type'));
t.deepEqual(result!.headers, { 'Content-Type': 'text/plain' });
const now = Date.now();
t.true(result!.expiresAt.getTime() >= now + SIGNED_URL_EXPIRED * 1000 - 2000);
@@ -41,12 +44,15 @@ test('presignUploadPart should return url', async t => {
test('createMultipartUpload should return uploadId', async t => {
const provider = createProvider();
let receivedCommand: any;
const sendStub = async (command: any) => {
receivedCommand = command;
return { UploadId: 'upload-1' };
let receivedKey: string | undefined;
let receivedMeta: any;
(provider as any).client = {
createMultipartUpload: async (key: string, meta: any) => {
receivedKey = key;
receivedMeta = meta;
return { uploadId: 'upload-1' };
},
};
(provider as any).client = { send: sendStub };
const now = Date.now();
const result = await provider.createMultipartUpload('key', {
@@ -56,25 +62,29 @@ test('createMultipartUpload should return uploadId', async t => {
t.is(result?.uploadId, 'upload-1');
t.true(result!.expiresAt.getTime() >= now + SIGNED_URL_EXPIRED * 1000 - 2000);
t.true(result!.expiresAt.getTime() <= now + SIGNED_URL_EXPIRED * 1000 + 2000);
t.is(receivedCommand.input.Key, 'key');
t.is(receivedCommand.input.ContentType, 'text/plain');
t.is(receivedKey, 'key');
t.is(receivedMeta.contentType, 'text/plain');
});
test('completeMultipartUpload should order parts', async t => {
const provider = createProvider();
let called = false;
const sendStub = async (command: any) => {
called = true;
t.deepEqual(command.input.MultipartUpload.Parts, [
{ ETag: 'a', PartNumber: 1 },
{ ETag: 'b', PartNumber: 2 },
]);
let receivedParts: any;
(provider as any).client = {
completeMultipartUpload: async (
_key: string,
_uploadId: string,
parts: any
) => {
receivedParts = parts;
},
};
(provider as any).client = { send: sendStub };
await provider.completeMultipartUpload('key', 'upload-1', [
{ partNumber: 2, etag: 'b' },
{ partNumber: 1, etag: 'a' },
]);
t.true(called);
t.deepEqual(receivedParts, [
{ partNumber: 1, etag: 'a' },
{ partNumber: 2, etag: 'b' },
]);
});

View File

@@ -33,9 +33,44 @@ export type StorageProviderConfig = { bucket: string } & (
const S3ConfigSchema: JSONSchema = {
type: 'object',
description:
'The config for the s3 compatible storage provider. directly passed to aws-sdk client.\n@link https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html',
description: 'The config for the S3 compatible storage provider.',
properties: {
endpoint: {
type: 'string',
description:
'The S3 compatible endpoint. Example: "https://s3.us-east-1.amazonaws.com" or "https://<account>.r2.cloudflarestorage.com".',
},
region: {
type: 'string',
description:
'The region for the storage provider. Example: "us-east-1" or "auto" for R2.',
},
forcePathStyle: {
type: 'boolean',
description: 'Whether to use path-style bucket addressing.',
},
requestTimeoutMs: {
type: 'number',
description: 'Request timeout in milliseconds.',
},
minPartSize: {
type: 'number',
description: 'Minimum multipart part size in bytes.',
},
presign: {
type: 'object',
description: 'Presigned URL behavior configuration.',
properties: {
expiresInSeconds: {
type: 'number',
description: 'Expiration time in seconds for presigned URLs.',
},
signContentTypeForPut: {
type: 'boolean',
description: 'Whether to sign Content-Type for presigned PUT.',
},
},
},
credentials: {
type: 'object',
description: 'The credentials for the s3 compatible storage provider.',
@@ -46,6 +81,9 @@ const S3ConfigSchema: JSONSchema = {
secretAccessKey: {
type: 'string',
},
sessionToken: {
type: 'string',
},
},
},
},

View File

@@ -1,7 +1,6 @@
import assert from 'node:assert';
import { Readable } from 'node:stream';
import { PutObjectCommand, UploadPartCommand } from '@aws-sdk/client-s3';
import { Logger } from '@nestjs/common';
import {
@@ -39,9 +38,6 @@ export class R2StorageProvider extends S3StorageProvider {
...config,
forcePathStyle: true,
endpoint: `https://${config.accountId}.r2.cloudflarestorage.com`,
// see https://github.com/aws/aws-sdk-js-v3/issues/6810
requestChecksumCalculation: 'WHEN_REQUIRED',
responseChecksumValidation: 'WHEN_REQUIRED',
},
bucket
);
@@ -179,15 +175,10 @@ export class R2StorageProvider extends S3StorageProvider {
body: Readable | Buffer | Uint8Array | string,
options: { contentType?: string; contentLength?: number } = {}
) {
return this.client.send(
new PutObjectCommand({
Bucket: this.bucket,
Key: key,
Body: body,
ContentType: options.contentType,
ContentLength: options.contentLength,
})
);
return this.client.putObject(key, body as any, {
contentType: options.contentType,
contentLength: options.contentLength,
});
}
async proxyUploadPart(
@@ -197,18 +188,15 @@ export class R2StorageProvider extends S3StorageProvider {
body: Readable | Buffer | Uint8Array | string,
options: { contentLength?: number } = {}
) {
const result = await this.client.send(
new UploadPartCommand({
Bucket: this.bucket,
Key: key,
UploadId: uploadId,
PartNumber: partNumber,
Body: body,
ContentLength: options.contentLength,
})
const result = await this.client.uploadPart(
key,
uploadId,
partNumber,
body as any,
{ contentLength: options.contentLength }
);
return result.ETag;
return result.etag;
}
override async get(

View File

@@ -1,24 +1,12 @@
/* oxlint-disable @typescript-eslint/no-non-null-assertion */
import { Readable } from 'node:stream';
import {
AbortMultipartUploadCommand,
CompleteMultipartUploadCommand,
CreateMultipartUploadCommand,
DeleteObjectCommand,
GetObjectCommand,
HeadObjectCommand,
ListObjectsV2Command,
ListPartsCommand,
NoSuchKey,
NoSuchUpload,
NotFound,
PutObjectCommand,
S3Client,
S3ClientConfig,
UploadPartCommand,
} from '@aws-sdk/client-s3';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import type {
S3CompatClient,
S3CompatConfig,
S3CompatCredentials,
} from '@affine/s3-compat';
import { createS3CompatClient } from '@affine/s3-compat';
import { Logger } from '@nestjs/common';
import {
@@ -33,30 +21,55 @@ import {
} from './provider';
import { autoMetadata, SIGNED_URL_EXPIRED, toBuffer } from './utils';
export interface S3StorageConfig extends S3ClientConfig {
export interface S3StorageConfig {
endpoint?: string;
region: string;
credentials: S3CompatCredentials;
forcePathStyle?: boolean;
requestTimeoutMs?: number;
minPartSize?: number;
presign?: {
expiresInSeconds?: number;
signContentTypeForPut?: boolean;
};
usePresignedURL?: {
enabled: boolean;
};
}
function resolveEndpoint(config: S3StorageConfig) {
if (config.endpoint) {
return config.endpoint;
}
if (config.region === 'us-east-1') {
return 'https://s3.amazonaws.com';
}
return `https://s3.${config.region}.amazonaws.com`;
}
export class S3StorageProvider implements StorageProvider {
protected logger: Logger;
protected client: S3Client;
protected client: S3CompatClient;
private readonly usePresignedURL: boolean;
constructor(
config: S3StorageConfig,
public readonly bucket: string
) {
const { usePresignedURL, ...clientConfig } = config;
this.client = new S3Client({
region: 'auto',
// s3 client uses keep-alive by default to accelerate requests, and max requests queue is 50.
// If some of them are long holding or dead without response, the whole queue will block.
// By default no timeout is set for requests or connections, so we set them here.
requestHandler: { requestTimeout: 60_000, connectionTimeout: 10_000 },
const { usePresignedURL, presign, credentials, ...clientConfig } = config;
const compatConfig: S3CompatConfig = {
...clientConfig,
});
endpoint: resolveEndpoint(config),
bucket,
requestTimeoutMs: clientConfig.requestTimeoutMs ?? 60_000,
presign: {
expiresInSeconds: presign?.expiresInSeconds ?? SIGNED_URL_EXPIRED,
signContentTypeForPut: presign?.signContentTypeForPut ?? true,
},
};
this.client = createS3CompatClient(compatConfig, credentials);
this.usePresignedURL = usePresignedURL?.enabled ?? false;
this.logger = new Logger(`${S3StorageProvider.name}:${bucket}`);
}
@@ -71,19 +84,10 @@ export class S3StorageProvider implements StorageProvider {
metadata = autoMetadata(blob, metadata);
try {
await this.client.send(
new PutObjectCommand({
Bucket: this.bucket,
Key: key,
Body: blob,
// metadata
ContentType: metadata.contentType,
ContentLength: metadata.contentLength,
// TODO(@forehalo): Cloudflare doesn't support CRC32, use md5 instead later.
// ChecksumCRC32: metadata.checksumCRC32,
})
);
await this.client.putObject(key, blob, {
contentType: metadata.contentType,
contentLength: metadata.contentLength,
});
this.logger.verbose(`Object \`${key}\` put`);
} catch (e) {
@@ -104,20 +108,12 @@ export class S3StorageProvider implements StorageProvider {
): Promise<PresignedUpload | undefined> {
try {
const contentType = metadata.contentType ?? 'application/octet-stream';
const url = await getSignedUrl(
this.client,
new PutObjectCommand({
Bucket: this.bucket,
Key: key,
ContentType: contentType,
}),
{ expiresIn: SIGNED_URL_EXPIRED }
);
const result = await this.client.presignPutObject(key, { contentType });
return {
url,
headers: { 'Content-Type': contentType },
expiresAt: new Date(Date.now() + SIGNED_URL_EXPIRED * 1000),
url: result.url,
headers: result.headers,
expiresAt: result.expiresAt,
};
} catch (e) {
this.logger.error(
@@ -137,20 +133,16 @@ export class S3StorageProvider implements StorageProvider {
): Promise<MultipartUploadInit | undefined> {
try {
const contentType = metadata.contentType ?? 'application/octet-stream';
const response = await this.client.send(
new CreateMultipartUploadCommand({
Bucket: this.bucket,
Key: key,
ContentType: contentType,
})
);
const response = await this.client.createMultipartUpload(key, {
contentType,
});
if (!response.UploadId) {
if (!response.uploadId) {
return;
}
return {
uploadId: response.UploadId,
uploadId: response.uploadId,
expiresAt: new Date(Date.now() + SIGNED_URL_EXPIRED * 1000),
};
} catch (e) {
@@ -171,20 +163,15 @@ export class S3StorageProvider implements StorageProvider {
partNumber: number
): Promise<PresignedUpload | undefined> {
try {
const url = await getSignedUrl(
this.client,
new UploadPartCommand({
Bucket: this.bucket,
Key: key,
UploadId: uploadId,
PartNumber: partNumber,
}),
{ expiresIn: SIGNED_URL_EXPIRED }
const result = await this.client.presignUploadPart(
key,
uploadId,
partNumber
);
return {
url,
expiresAt: new Date(Date.now() + SIGNED_URL_EXPIRED * 1000),
url: result.url,
expiresAt: result.expiresAt,
};
} catch (e) {
this.logger.error(
@@ -198,47 +185,9 @@ export class S3StorageProvider implements StorageProvider {
key: string,
uploadId: string
): Promise<MultipartUploadPart[] | undefined> {
const parts: MultipartUploadPart[] = [];
let partNumberMarker: string | undefined;
try {
// ListParts is paginated by part number marker
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html
// R2 follows S3 semantics here.
while (true) {
const response = await this.client.send(
new ListPartsCommand({
Bucket: this.bucket,
Key: key,
UploadId: uploadId,
PartNumberMarker: partNumberMarker,
})
);
for (const part of response.Parts ?? []) {
if (!part.PartNumber || !part.ETag) {
continue;
}
parts.push({ partNumber: part.PartNumber, etag: part.ETag });
}
if (!response.IsTruncated) {
break;
}
if (response.NextPartNumberMarker === undefined) {
break;
}
partNumberMarker = response.NextPartNumberMarker;
}
return parts;
return await this.client.listParts(key, uploadId);
} catch (e) {
// the upload may have been aborted/expired by provider lifecycle rules
if (e instanceof NoSuchUpload || e instanceof NotFound) {
return undefined;
}
this.logger.error(`Failed to list multipart upload parts for \`${key}\``);
throw e;
}
@@ -254,19 +203,7 @@ export class S3StorageProvider implements StorageProvider {
(left, right) => left.partNumber - right.partNumber
);
await this.client.send(
new CompleteMultipartUploadCommand({
Bucket: this.bucket,
Key: key,
UploadId: uploadId,
MultipartUpload: {
Parts: orderedParts.map(part => ({
ETag: part.etag,
PartNumber: part.partNumber,
})),
},
})
);
await this.client.completeMultipartUpload(key, uploadId, orderedParts);
} catch (e) {
this.logger.error(`Failed to complete multipart upload for \`${key}\``);
throw e;
@@ -275,13 +212,7 @@ export class S3StorageProvider implements StorageProvider {
async abortMultipartUpload(key: string, uploadId: string): Promise<void> {
try {
await this.client.send(
new AbortMultipartUploadCommand({
Bucket: this.bucket,
Key: key,
UploadId: uploadId,
})
);
await this.client.abortMultipartUpload(key, uploadId);
} catch (e) {
this.logger.error(`Failed to abort multipart upload for \`${key}\``);
throw e;
@@ -290,25 +221,19 @@ export class S3StorageProvider implements StorageProvider {
async head(key: string) {
try {
const obj = await this.client.send(
new HeadObjectCommand({
Bucket: this.bucket,
Key: key,
})
);
return {
contentType: obj.ContentType!,
contentLength: obj.ContentLength!,
lastModified: obj.LastModified!,
checksumCRC32: obj.ChecksumCRC32,
};
} catch (e) {
// 404
if (e instanceof NoSuchKey || e instanceof NotFound) {
const obj = await this.client.headObject(key);
if (!obj) {
this.logger.verbose(`Object \`${key}\` not found`);
return undefined;
}
return {
contentType: obj.contentType ?? 'application/octet-stream',
contentLength: obj.contentLength ?? 0,
lastModified: obj.lastModified ?? new Date(0),
checksumCRC32: obj.checksumCRC32,
};
} catch (e) {
this.logger.error(`Failed to head object \`${key}\``);
throw e;
}
@@ -323,25 +248,13 @@ export class S3StorageProvider implements StorageProvider {
redirectUrl?: string;
}> {
try {
const command = new GetObjectCommand({
Bucket: this.bucket,
Key: key,
});
if (this.usePresignedURL && signedUrl) {
const metadata = await this.head(key);
if (metadata) {
const url = await getSignedUrl(
this.client,
new GetObjectCommand({
Bucket: this.bucket,
Key: key,
}),
{ expiresIn: SIGNED_URL_EXPIRED }
);
const result = await this.client.presignGetObject(key);
return {
redirectUrl: url,
redirectUrl: result.url,
metadata,
};
}
@@ -350,68 +263,41 @@ export class S3StorageProvider implements StorageProvider {
return {};
}
const obj = await this.client.send(command);
if (!obj.Body) {
const obj = await this.client.getObjectResponse(key);
if (!obj || !obj.body) {
this.logger.verbose(`Object \`${key}\` not found`);
return {};
}
const contentType = obj.headers.get('content-type') ?? undefined;
const contentLengthHeader = obj.headers.get('content-length');
const contentLength = contentLengthHeader
? Number(contentLengthHeader)
: undefined;
const lastModifiedHeader = obj.headers.get('last-modified');
const lastModified = lastModifiedHeader
? new Date(lastModifiedHeader)
: undefined;
this.logger.verbose(`Read object \`${key}\``);
return {
// @ts-expect-errors ignore browser response type `Blob`
body: obj.Body,
body: Readable.fromWeb(obj.body as any),
metadata: {
// always set when putting object
contentType: obj.ContentType ?? 'application/octet-stream',
contentLength: obj.ContentLength!,
lastModified: obj.LastModified!,
checksumCRC32: obj.ChecksumCRC32,
contentType: contentType ?? 'application/octet-stream',
contentLength: contentLength ?? 0,
lastModified: lastModified ?? new Date(0),
checksumCRC32: obj.headers.get('x-amz-checksum-crc32') ?? undefined,
},
};
} catch (e) {
// 404
if (e instanceof NoSuchKey) {
this.logger.verbose(`Object \`${key}\` not found`);
return {};
}
this.logger.error(`Failed to read object \`${key}\``);
throw e;
}
}
async list(prefix?: string): Promise<ListObjectsMetadata[]> {
// continuationToken should be `string | undefined`,
// but TypeScript will fail on type infer in the code below.
// Seems to be a bug in TypeScript
let continuationToken: any = undefined;
let hasMore = true;
let result: ListObjectsMetadata[] = [];
try {
while (hasMore) {
const listResult = await this.client.send(
new ListObjectsV2Command({
Bucket: this.bucket,
Prefix: prefix,
ContinuationToken: continuationToken,
})
);
if (listResult.Contents?.length) {
result = result.concat(
listResult.Contents.map(r => ({
key: r.Key!,
lastModified: r.LastModified!,
contentLength: r.Size!,
}))
);
}
// has more items not listed
hasMore = !!listResult.IsTruncated;
continuationToken = listResult.NextContinuationToken;
}
const result = await this.client.listObjectsV2(prefix);
this.logger.verbose(
`List ${result.length} objects with prefix \`${prefix}\``
@@ -425,12 +311,7 @@ export class S3StorageProvider implements StorageProvider {
async delete(key: string): Promise<void> {
try {
await this.client.send(
new DeleteObjectCommand({
Bucket: this.bucket,
Key: key,
})
);
await this.client.deleteObject(key);
this.logger.verbose(`Deleted object \`${key}\``);
} catch (e) {

View File

@@ -23,6 +23,7 @@ import {
SpaceAccessDenied,
} from '../../base';
import { Models } from '../../models';
import { mergeUpdatesInApplyWay } from '../../native';
import { CurrentUser } from '../auth';
import {
DocReader,
@@ -48,8 +49,9 @@ type EventResponse<Data = any> = Data extends never
data: Data;
};
// 019 only receives space:broadcast-doc-updates and send space:push-doc-updates
// 020 only receives space:broadcast-doc-update and send space:push-doc-update
// sync-019: legacy 0.19.x clients (broadcast-doc-updates/push-doc-updates).
// Remove after 2026-06-30 once metrics show 0 usage for 30 days.
// 020+: receives space:broadcast-doc-updates (batch) and sends space:push-doc-update.
type RoomType = 'sync' | `${string}:awareness` | 'sync-019';
function Room(
@@ -105,6 +107,16 @@ interface PushDocUpdateMessage {
update: string;
}
interface BroadcastDocUpdatesMessage {
spaceType: SpaceType;
spaceId: string;
docId: string;
updates: string[];
timestamp: number;
editor?: string;
compressed?: boolean;
}
interface LoadDocMessage {
spaceType: SpaceType;
spaceId: string;
@@ -157,6 +169,62 @@ export class SpaceSyncGateway
private readonly models: Models
) {}
private encodeUpdates(updates: Uint8Array[]) {
return updates.map(update => Buffer.from(update).toString('base64'));
}
private buildBroadcastPayload(
spaceType: SpaceType,
spaceId: string,
docId: string,
updates: Uint8Array[],
timestamp: number,
editor?: string
): BroadcastDocUpdatesMessage {
const encodedUpdates = this.encodeUpdates(updates);
if (updates.length <= 1) {
return {
spaceType,
spaceId,
docId,
updates: encodedUpdates,
timestamp,
editor,
compressed: false,
};
}
try {
const merged = mergeUpdatesInApplyWay(
updates.map(update => Buffer.from(update))
);
metrics.socketio.counter('doc_updates_compressed').add(1);
return {
spaceType,
spaceId,
docId,
updates: [Buffer.from(merged).toString('base64')],
timestamp,
editor,
compressed: true,
};
} catch (error) {
this.logger.warn(
'Failed to merge updates for broadcast, falling back to batch',
error as Error
);
return {
spaceType,
spaceId,
docId,
updates: encodedUpdates,
timestamp,
editor,
compressed: false,
};
}
}
handleConnection() {
this.connectionCount++;
this.logger.debug(`New connection, total: ${this.connectionCount}`);
@@ -184,9 +252,7 @@ export class SpaceSyncGateway
return;
}
const encodedUpdates = updates.map(update =>
Buffer.from(update).toString('base64')
);
const encodedUpdates = this.encodeUpdates(updates);
this.server
.to(Room(spaceId, 'sync-019'))
@@ -196,19 +262,27 @@ export class SpaceSyncGateway
docId,
updates: encodedUpdates,
timestamp,
});
const room = `${spaceType}:${Room(spaceId)}`;
encodedUpdates.forEach(update => {
this.server.to(room).emit('space:broadcast-doc-update', {
spaceType,
spaceId,
docId,
update,
timestamp,
editor,
});
});
metrics.socketio
.counter('sync_019_broadcast')
.add(encodedUpdates.length, { event: 'doc_updates_pushed' });
const room = `${spaceType}:${Room(spaceId)}`;
const payload = this.buildBroadcastPayload(
spaceType as SpaceType,
spaceId,
docId,
updates,
timestamp,
editor
);
this.server.to(room).emit('space:broadcast-doc-updates', payload);
metrics.socketio
.counter('doc_updates_broadcast')
.add(payload.updates.length, {
mode: payload.compressed ? 'compressed' : 'batch',
});
}
selectAdapter(client: Socket, spaceType: SpaceType): SyncSocketAdapter {
@@ -330,19 +404,35 @@ export class SpaceSyncGateway
user.id
);
metrics.socketio
.counter('sync_019_event')
.add(1, { event: 'push-doc-updates' });
// broadcast to 0.19.x clients
client
.to(Room(spaceId, 'sync-019'))
.emit('space:broadcast-doc-updates', { ...message, timestamp });
client.to(Room(spaceId, 'sync-019')).emit('space:broadcast-doc-updates', {
...message,
timestamp,
editor: user.id,
});
// broadcast to new clients
updates.forEach(update => {
client.to(adapter.room(spaceId)).emit('space:broadcast-doc-update', {
...message,
update,
timestamp,
const decodedUpdates = updates.map(update => Buffer.from(update, 'base64'));
const payload = this.buildBroadcastPayload(
spaceType,
spaceId,
docId,
decodedUpdates,
timestamp,
user.id
);
client
.to(adapter.room(spaceId))
.emit('space:broadcast-doc-updates', payload);
metrics.socketio
.counter('doc_updates_broadcast')
.add(payload.updates.length, {
mode: payload.compressed ? 'compressed' : 'batch',
});
});
return {
data: {
@@ -378,16 +468,25 @@ export class SpaceSyncGateway
docId,
updates: [update],
timestamp,
editor: user.id,
});
client.to(adapter.room(spaceId)).emit('space:broadcast-doc-update', {
const payload = this.buildBroadcastPayload(
spaceType,
spaceId,
docId,
update,
[Buffer.from(update, 'base64')],
timestamp,
editor: user.id,
});
user.id
);
client
.to(adapter.room(spaceId))
.emit('space:broadcast-doc-updates', payload);
metrics.socketio
.counter('doc_updates_broadcast')
.add(payload.updates.length, {
mode: payload.compressed ? 'compressed' : 'batch',
});
return {
data: {

View File

@@ -12,6 +12,7 @@
},
"include": ["./src"],
"references": [
{ "path": "../../common/s3-compat" },
{ "path": "../native" },
{ "path": "../../../tools/cli" },
{ "path": "../../../tools/utils" },