feat: multipart blob sync support (#14138)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Flexible blob uploads: GRAPHQL, presigned, and multipart flows with
per‑part URLs, abort/complete operations, presigned proxy endpoints, and
nightly cleanup of expired pending uploads.

* **API / Schema**
* GraphQL additions: new types, mutations, enum and error to manage
upload lifecycle (create, complete, abort, get part URL).

* **Database**
* New blob status enum and columns (status, upload_id); listing now
defaults to completed blobs.

* **Localization**
  * Added user-facing message: "Blob is invalid."

* **Tests**
* Expanded unit and end‑to‑end coverage for upload flows, proxy
behavior, multipart and provider integrations.

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
DarkSky
2025-12-23 22:09:21 +08:00
committed by GitHub
parent a9937e18b6
commit 76524084d1
36 changed files with 2880 additions and 33 deletions

View File

@@ -0,0 +1,460 @@
import { createHash } from 'node:crypto';
import { mock } from 'node:test';
import {
Config,
ConfigFactory,
PROXY_MULTIPART_PATH,
PROXY_UPLOAD_PATH,
StorageProviderConfig,
StorageProviderFactory,
toBuffer,
} from '../../../base';
import {
R2StorageConfig,
R2StorageProvider,
} from '../../../base/storage/providers/r2';
import { SIGNED_URL_EXPIRED } from '../../../base/storage/providers/utils';
import { WorkspaceBlobStorage } from '../../../core/storage';
import { MULTIPART_THRESHOLD } from '../../../core/storage/constants';
import { R2UploadController } from '../../../core/storage/r2-proxy';
import { app, e2e, Mockers } from '../test';
class MockR2Provider extends R2StorageProvider {
createMultipartCalls = 0;
putCalls: {
key: string;
body: Buffer;
contentType?: string;
contentLength?: number;
}[] = [];
partCalls: {
key: string;
uploadId: string;
partNumber: number;
etag: string;
body: Buffer;
contentLength?: number;
}[] = [];
constructor(config: R2StorageConfig, bucket: string) {
super(config, bucket);
}
destroy() {
this.client.destroy();
}
// @ts-ignore expect override
override async proxyPutObject(
key: string,
body: any,
options: { contentType?: string; contentLength?: number } = {}
) {
this.putCalls.push({
key,
body: await toBuffer(body),
contentType: options.contentType,
contentLength: options.contentLength,
});
}
override async proxyUploadPart(
key: string,
uploadId: string,
partNumber: number,
body: any,
options: { contentLength?: number } = {}
) {
const etag = `"etag-${partNumber}"`;
this.partCalls.push({
key,
uploadId,
partNumber,
etag,
body: await toBuffer(body),
contentLength: options.contentLength,
});
return etag;
}
override async createMultipartUpload() {
this.createMultipartCalls += 1;
return {
uploadId: 'upload-id',
expiresAt: new Date(Date.now() + SIGNED_URL_EXPIRED * 1000),
};
}
override async listMultipartUploadParts(key: string, uploadId: string) {
const latest = new Map<number, string>();
for (const part of this.partCalls) {
if (part.key !== key || part.uploadId !== uploadId) {
continue;
}
latest.set(part.partNumber, part.etag);
}
return [...latest.entries()]
.sort((left, right) => left[0] - right[0])
.map(([partNumber, etag]) => ({ partNumber, etag }));
}
}
const baseR2Storage: StorageProviderConfig = {
provider: 'cloudflare-r2',
bucket: 'test-bucket',
config: {
accountId: 'test-account',
region: 'auto',
credentials: {
accessKeyId: 'test',
secretAccessKey: 'test',
},
usePresignedURL: {
enabled: true,
urlPrefix: 'https://cdn.example.com',
signKey: 'r2-sign-key',
},
},
};
let defaultBlobStorage: StorageProviderConfig;
let provider: MockR2Provider | null = null;
let factoryCreateUnmocked: StorageProviderFactory['create'];
e2e.before(() => {
defaultBlobStorage = structuredClone(app.get(Config).storages.blob.storage);
const factory = app.get(StorageProviderFactory);
factoryCreateUnmocked = factory.create.bind(factory);
});
e2e.beforeEach(async () => {
provider?.destroy();
provider = null;
const factory = app.get(StorageProviderFactory);
mock.method(factory, 'create', (config: StorageProviderConfig) => {
if (config.provider === 'cloudflare-r2') {
if (!provider) {
provider = new MockR2Provider(
config.config as R2StorageConfig,
config.bucket
);
}
return provider;
}
return factoryCreateUnmocked(config);
});
await useR2Storage();
});
e2e.afterEach.always(async () => {
await setBlobStorage(defaultBlobStorage);
provider?.destroy();
provider = null;
mock.reset();
});
async function setBlobStorage(storage: StorageProviderConfig) {
provider?.destroy();
provider = null;
const configFactory = app.get(ConfigFactory);
configFactory.override({ storages: { blob: { storage } } });
const blobStorage = app.get(WorkspaceBlobStorage);
await blobStorage.onConfigInit();
const controller = app.get(R2UploadController);
// reset cached provider in controller
(controller as any).provider = null;
}
async function useR2Storage(
overrides?: Partial<R2StorageConfig['usePresignedURL']>
) {
const storage = structuredClone(baseR2Storage) as StorageProviderConfig;
const usePresignedURL = {
...(structuredClone(
((baseR2Storage as StorageProviderConfig).config as R2StorageConfig)
.usePresignedURL ?? {}
) as R2StorageConfig['usePresignedURL']),
...overrides,
};
(storage.config as R2StorageConfig).usePresignedURL =
usePresignedURL as R2StorageConfig['usePresignedURL'];
await setBlobStorage(storage);
return storage;
}
function getProvider(): MockR2Provider {
if (!provider) {
throw new Error('R2 provider is not initialized');
}
return provider;
}
async function createBlobUpload(
workspaceId: string,
key: string,
size: number,
mime: string
) {
const data = await gql(
`
mutation createBlobUpload($workspaceId: String!, $key: String!, $size: Int!, $mime: String!) {
createBlobUpload(workspaceId: $workspaceId, key: $key, size: $size, mime: $mime) {
method
blobKey
alreadyUploaded
uploadUrl
uploadId
partSize
uploadedParts {
partNumber
etag
}
}
}
`,
{ workspaceId, key, size, mime },
'createBlobUpload'
);
return data.createBlobUpload;
}
async function getBlobUploadPartUrl(
workspaceId: string,
key: string,
uploadId: string,
partNumber: number
) {
const data = await gql(
`
mutation getBlobUploadPartUrl($workspaceId: String!, $key: String!, $uploadId: String!, $partNumber: Int!) {
getBlobUploadPartUrl(workspaceId: $workspaceId, key: $key, uploadId: $uploadId, partNumber: $partNumber) {
uploadUrl
headers
expiresAt
}
}
`,
{ workspaceId, key, uploadId, partNumber },
'getBlobUploadPartUrl'
);
return data.getBlobUploadPartUrl;
}
async function setupWorkspace() {
const owner = await app.signup({ feature: 'pro_plan_v1' });
const workspace = await app.create(Mockers.Workspace, { owner });
return { owner, workspace };
}
async function gql<QueryData = any>(
query: string,
variables: Record<string, any>,
operationName: string
): Promise<QueryData> {
const res = await app
.POST('/graphql')
.set({ 'x-request-id': 'test', 'x-operation-name': operationName })
.send({ query, variables })
.expect(200);
if (res.body.errors?.length) {
throw new Error(res.body.errors[0].message);
}
return res.body.data;
}
e2e('should proxy single upload with valid signature', async t => {
const { workspace } = await setupWorkspace();
const buffer = Buffer.from('r2-proxy');
const key = sha256Base64urlWithPadding(buffer);
const init = await createBlobUpload(
workspace.id,
key,
buffer.length,
'text/plain'
);
t.is(init.method, 'PRESIGNED');
t.truthy(init.uploadUrl);
const uploadUrl = new URL(init.uploadUrl, app.url);
t.is(uploadUrl.pathname, PROXY_UPLOAD_PATH);
const res = await app
.PUT(uploadUrl.pathname + uploadUrl.search)
.set('content-type', 'text/plain')
.set('content-length', buffer.length.toString())
.send(buffer);
t.is(res.status, 200);
const calls = getProvider().putCalls;
t.is(calls.length, 1);
t.is(calls[0].key, `${workspace.id}/${key}`);
t.is(calls[0].contentType, 'text/plain');
t.is(calls[0].contentLength, buffer.length);
t.deepEqual(calls[0].body, buffer);
});
e2e('should proxy multipart upload and return etag', async t => {
const { workspace } = await setupWorkspace();
const key = 'multipart-object';
const totalSize = MULTIPART_THRESHOLD + 1024;
const init = await createBlobUpload(workspace.id, key, totalSize, 'bin');
t.is(init.method, 'MULTIPART');
t.is(init.uploadId, 'upload-id');
t.deepEqual(init.uploadedParts, []);
const part = await getBlobUploadPartUrl(workspace.id, key, init.uploadId, 1);
const partUrl = new URL(part.uploadUrl, app.url);
t.is(partUrl.pathname, PROXY_MULTIPART_PATH);
const payload = Buffer.from('part-body');
const res = await app
.PUT(partUrl.pathname + partUrl.search)
.set('content-length', payload.length.toString())
.send(payload);
t.is(res.status, 200);
t.is(res.get('etag'), '"etag-1"');
const calls = getProvider().partCalls;
t.is(calls.length, 1);
t.is(calls[0].key, `${workspace.id}/${key}`);
t.is(calls[0].uploadId, 'upload-id');
t.is(calls[0].partNumber, 1);
t.is(calls[0].contentLength, payload.length);
t.deepEqual(calls[0].body, payload);
});
e2e('should resume multipart upload and return uploaded parts', async t => {
const { workspace } = await setupWorkspace();
const key = 'multipart-resume';
const totalSize = MULTIPART_THRESHOLD + 1024;
const init1 = await createBlobUpload(workspace.id, key, totalSize, 'bin');
t.is(init1.method, 'MULTIPART');
t.is(init1.uploadId, 'upload-id');
t.deepEqual(init1.uploadedParts, []);
t.is(getProvider().createMultipartCalls, 1);
const part = await getBlobUploadPartUrl(workspace.id, key, init1.uploadId, 1);
const payload = Buffer.from('part-body');
const partUrl = new URL(part.uploadUrl, app.url);
await app
.PUT(partUrl.pathname + partUrl.search)
.set('content-length', payload.length.toString())
.send(payload)
.expect(200);
const init2 = await createBlobUpload(workspace.id, key, totalSize, 'bin');
t.is(init2.method, 'MULTIPART');
t.is(init2.uploadId, 'upload-id');
t.deepEqual(init2.uploadedParts, [{ partNumber: 1, etag: '"etag-1"' }]);
t.is(getProvider().createMultipartCalls, 1);
});
e2e('should reject upload when token is invalid', async t => {
const { workspace } = await setupWorkspace();
const buffer = Buffer.from('payload');
const init = await createBlobUpload(
workspace.id,
sha256Base64urlWithPadding(buffer),
buffer.length,
'text/plain'
);
const uploadUrl = new URL(init.uploadUrl, app.url);
uploadUrl.searchParams.set('token', 'invalid-token');
const res = await app
.PUT(uploadUrl.pathname + uploadUrl.search)
.set('content-type', 'text/plain')
.set('content-length', buffer.length.toString())
.send(buffer);
t.is(res.status, 400);
t.is(res.body.message, 'Invalid upload token');
t.is(getProvider().putCalls.length, 0);
});
e2e('should reject upload when url is expired', async t => {
const { workspace } = await setupWorkspace();
const buffer = Buffer.from('expired');
const init = await createBlobUpload(
workspace.id,
sha256Base64urlWithPadding(buffer),
buffer.length,
'text/plain'
);
const uploadUrl = new URL(init.uploadUrl, app.url);
uploadUrl.searchParams.set(
'exp',
(Math.floor(Date.now() / 1000) - 1).toString()
);
const res = await app
.PUT(uploadUrl.pathname + uploadUrl.search)
.set('content-type', 'text/plain')
.set('content-length', buffer.length.toString())
.send(buffer);
t.is(res.status, 400);
t.is(res.body.message, 'Upload URL expired');
t.is(getProvider().putCalls.length, 0);
});
e2e(
'should fall back to direct presign when custom domain is disabled',
async t => {
await useR2Storage({
enabled: false,
urlPrefix: undefined,
signKey: undefined,
});
const { workspace } = await setupWorkspace();
const buffer = Buffer.from('plain');
const init = await createBlobUpload(
workspace.id,
sha256Base64urlWithPadding(buffer),
buffer.length,
'text/plain'
);
t.is(init.method, 'PRESIGNED');
t.truthy(init.uploadUrl.includes('X-Amz-Algorithm=AWS4-HMAC-SHA256'));
t.not(new URL(init.uploadUrl, app.url).pathname, PROXY_UPLOAD_PATH);
}
);
e2e(
'should still fallback to graphql when provider does not support presign',
async t => {
await setBlobStorage(defaultBlobStorage);
const { workspace } = await setupWorkspace();
const buffer = Buffer.from('graph');
const init = await createBlobUpload(
workspace.id,
sha256Base64urlWithPadding(buffer),
buffer.length,
'text/plain'
);
t.is(init.method, 'GRAPHQL');
}
);
function sha256Base64urlWithPadding(buffer: Buffer) {
return createHash('sha256')
.update(buffer)
.digest('base64')
.replace(/\+/g, '-')
.replace(/\//g, '_');
}

View File

@@ -0,0 +1,106 @@
import { ScheduleModule } from '@nestjs/schedule';
import { PrismaClient } from '@prisma/client';
import ava, { TestFn } from 'ava';
import Sinon from 'sinon';
import { OneDay } from '../../base';
import { StorageModule, WorkspaceBlobStorage } from '../../core/storage';
import { BlobUploadCleanupJob } from '../../core/storage/job';
import { MockUser, MockWorkspace } from '../mocks';
import { createTestingModule, TestingModule } from '../utils';
interface Context {
module: TestingModule;
db: PrismaClient;
job: BlobUploadCleanupJob;
storage: WorkspaceBlobStorage;
}
const test = ava as TestFn<Context>;
test.before(async t => {
t.context.module = await createTestingModule({
imports: [ScheduleModule.forRoot(), StorageModule],
});
t.context.db = t.context.module.get(PrismaClient);
t.context.job = t.context.module.get(BlobUploadCleanupJob);
t.context.storage = t.context.module.get(WorkspaceBlobStorage);
});
test.beforeEach(async t => {
await t.context.module.initTestingDB();
});
test.after.always(async t => {
await t.context.module.close();
});
test('should cleanup expired pending blobs', async t => {
const user = await t.context.module.create(MockUser);
const workspace = await t.context.module.create(MockWorkspace, {
owner: { id: user.id },
});
const expiredAt = new Date(Date.now() - OneDay - 1000);
const activeAt = new Date();
await t.context.db.blob.createMany({
data: [
{
workspaceId: workspace.id,
key: 'expired-pending',
size: 4,
mime: 'text/plain',
status: 'pending',
uploadId: null,
createdAt: expiredAt,
},
{
workspaceId: workspace.id,
key: 'expired-multipart',
size: 4,
mime: 'text/plain',
status: 'pending',
uploadId: 'upload-1',
createdAt: expiredAt,
},
{
workspaceId: workspace.id,
key: 'pending-active',
size: 4,
mime: 'text/plain',
status: 'pending',
uploadId: null,
createdAt: activeAt,
},
{
workspaceId: workspace.id,
key: 'completed-keep',
size: 4,
mime: 'text/plain',
status: 'completed',
uploadId: null,
createdAt: expiredAt,
},
],
});
const abortSpy = Sinon.spy(t.context.storage, 'abortMultipartUpload');
const deleteSpy = Sinon.spy(t.context.storage, 'delete');
t.teardown(() => {
abortSpy.restore();
deleteSpy.restore();
});
await t.context.job.cleanExpiredPendingBlobs();
t.is(abortSpy.callCount, 1);
t.is(deleteSpy.callCount, 2);
const remaining = await t.context.db.blob.findMany({
where: { workspaceId: workspace.id },
});
const remainingKeys = remaining.map(record => record.key).sort();
t.deepEqual(remainingKeys, ['completed-keep', 'pending-active']);
});

View File

@@ -88,3 +88,82 @@ export async function setBlob(
}
return res.body.data.setBlob;
}
export async function createBlobUpload(
app: TestingApp,
workspaceId: string,
key: string,
size: number,
mime: string
) {
const res = await app.gql(
`
mutation createBlobUpload($workspaceId: String!, $key: String!, $size: Int!, $mime: String!) {
createBlobUpload(workspaceId: $workspaceId, key: $key, size: $size, mime: $mime) {
method
blobKey
uploadUrl
uploadId
partSize
}
}
`,
{
workspaceId,
key,
size,
mime,
}
);
return res.createBlobUpload;
}
export async function completeBlobUpload(
app: TestingApp,
workspaceId: string,
key: string,
options?: {
uploadId?: string;
parts?: { partNumber: number; etag: string }[];
}
) {
const res = await app.gql(
`
mutation completeBlobUpload($workspaceId: String!, $key: String!, $uploadId: String, $parts: [BlobUploadPartInput!]) {
completeBlobUpload(workspaceId: $workspaceId, key: $key, uploadId: $uploadId, parts: $parts)
}
`,
{
workspaceId,
key,
uploadId: options?.uploadId,
parts: options?.parts,
}
);
return res.completeBlobUpload;
}
export async function getBlobUploadPartUrl(
app: TestingApp,
workspaceId: string,
key: string,
uploadId: string,
partNumber: number
) {
const res = await app.gql(
`
mutation getBlobUploadPartUrl($workspaceId: String!, $key: String!, $uploadId: String!, $partNumber: Int!) {
getBlobUploadPartUrl(workspaceId: $workspaceId, key: $key, uploadId: $uploadId, partNumber: $partNumber) {
uploadUrl
}
}
`,
{
workspaceId,
key,
uploadId,
partNumber,
}
);
return res.getBlobUploadPartUrl;
}

View File

@@ -1,13 +1,19 @@
import { createHash } from 'node:crypto';
import test from 'ava';
import Sinon from 'sinon';
import { Config, StorageProviderFactory } from '../../base';
import { WorkspaceBlobStorage } from '../../core/storage/wrappers/blob';
import { WorkspaceFeatureModel } from '../../models';
import { BlobModel, WorkspaceFeatureModel } from '../../models';
import {
collectAllBlobSizes,
completeBlobUpload,
createBlobUpload,
createTestingApp,
createWorkspace,
deleteWorkspace,
getBlobUploadPartUrl,
getWorkspaceBlobsSize,
listBlobs,
setBlob,
@@ -80,6 +86,95 @@ test('should list blobs', async t => {
t.deepEqual(ret.map(x => x.key).sort(), [hash1, hash2].sort());
});
test('should create pending blob upload with graphql fallback', async t => {
await app.signupV1('u1@affine.pro');
const workspace = await createWorkspace(app);
const key = `upload-${Math.random().toString(16).slice(2, 8)}`;
const size = 4;
const mime = 'text/plain';
const init = await createBlobUpload(app, workspace.id, key, size, mime);
t.is(init.method, 'GRAPHQL');
t.is(init.blobKey, key);
const blobModel = app.get(BlobModel);
const record = await blobModel.get(workspace.id, key);
t.truthy(record);
t.is(record?.status, 'pending');
const listed = await listBlobs(app, workspace.id);
t.is(listed.length, 0);
});
test('should complete pending blob upload', async t => {
await app.signupV1('u1@affine.pro');
const workspace = await createWorkspace(app);
const buffer = Buffer.from('done');
const mime = 'text/plain';
const key = sha256Base64urlWithPadding(buffer);
await createBlobUpload(app, workspace.id, key, buffer.length, mime);
const config = app.get(Config);
const factory = app.get(StorageProviderFactory);
const provider = factory.create(config.storages.blob.storage);
await provider.put(`${workspace.id}/${key}`, buffer, {
contentType: mime,
contentLength: buffer.length,
});
const completed = await completeBlobUpload(app, workspace.id, key);
t.is(completed, key);
const blobModel = app.get(BlobModel);
const record = await blobModel.get(workspace.id, key);
t.truthy(record);
t.is(record?.status, 'completed');
const listed = await listBlobs(app, workspace.id);
t.is(listed.length, 1);
});
test('should reject complete when blob key mismatched', async t => {
await app.signupV1('u1@affine.pro');
const workspace = await createWorkspace(app);
const buffer = Buffer.from('mismatch');
const mime = 'text/plain';
const wrongKey = sha256Base64urlWithPadding(Buffer.from('other'));
await createBlobUpload(app, workspace.id, wrongKey, buffer.length, mime);
const config = app.get(Config);
const factory = app.get(StorageProviderFactory);
const provider = factory.create(config.storages.blob.storage);
await provider.put(`${workspace.id}/${wrongKey}`, buffer, {
contentType: mime,
contentLength: buffer.length,
});
await t.throwsAsync(() => completeBlobUpload(app, workspace.id, wrongKey), {
message: 'Blob key mismatch',
});
});
test('should reject multipart upload part url on fs provider', async t => {
await app.signupV1('u1@affine.pro');
const workspace = await createWorkspace(app);
await t.throwsAsync(
() => getBlobUploadPartUrl(app, workspace.id, 'blob-key', 'upload', 1),
{
message: 'Multipart upload is not supported',
}
);
});
test('should auto delete blobs when workspace is deleted', async t => {
await app.signupV1('u1@affine.pro');
@@ -185,3 +280,11 @@ test('should throw error when blob size large than max file size', async t => {
'HTTP request error, message: File truncated as it exceeds the 10485760 byte size limit.',
});
});
function sha256Base64urlWithPadding(buffer: Buffer) {
return createHash('sha256')
.update(buffer)
.digest('base64')
.replace(/\+/g, '-')
.replace(/\//g, '_');
}

View File

@@ -506,6 +506,10 @@ export const USER_FRIENDLY_ERRORS = {
message: ({ spaceId, blobId }) =>
`Blob ${blobId} not found in Space ${spaceId}.`,
},
blob_invalid: {
type: 'invalid_input',
message: 'Blob is invalid.',
},
expect_to_publish_doc: {
type: 'invalid_input',
message: 'Expected to publish a doc, not a Space.',

View File

@@ -437,6 +437,12 @@ export class BlobNotFound extends UserFriendlyError {
}
}
export class BlobInvalid extends UserFriendlyError {
constructor(message?: string) {
super('invalid_input', 'blob_invalid', message);
}
}
export class ExpectToPublishDoc extends UserFriendlyError {
constructor(message?: string) {
super('invalid_input', 'expect_to_publish_doc', message);
@@ -1166,6 +1172,7 @@ export enum ErrorNames {
INVALID_HISTORY_TIMESTAMP,
DOC_HISTORY_NOT_FOUND,
BLOB_NOT_FOUND,
BLOB_INVALID,
EXPECT_TO_PUBLISH_DOC,
EXPECT_TO_REVOKE_PUBLIC_DOC,
EXPECT_TO_GRANT_DOC_USER_ROLES,

View File

@@ -19,8 +19,12 @@ class Redis extends IORedis implements OnModuleInit, OnModuleDestroy {
this.on('error', this.errorHandler);
}
onModuleDestroy() {
this.disconnect();
async onModuleDestroy() {
try {
await this.quit();
} catch {
this.disconnect();
}
}
override duplicate(override?: Partial<RedisOptions>): IORedis {

View File

@@ -0,0 +1,80 @@
import test from 'ava';
import { S3StorageProvider } from '../providers/s3';
import { SIGNED_URL_EXPIRED } from '../providers/utils';
const config = {
region: 'auto',
credentials: {
accessKeyId: 'test',
secretAccessKey: 'test',
},
};
function createProvider() {
return new S3StorageProvider(config, 'test-bucket');
}
test('presignPut should return url and headers', async t => {
const provider = createProvider();
const result = await provider.presignPut('key', {
contentType: 'text/plain',
});
t.truthy(result);
t.true(result!.url.length > 0);
t.true(result!.url.includes('X-Amz-Algorithm=AWS4-HMAC-SHA256'));
t.deepEqual(result!.headers, { 'Content-Type': 'text/plain' });
const now = Date.now();
t.true(result!.expiresAt.getTime() >= now + SIGNED_URL_EXPIRED * 1000 - 2000);
t.true(result!.expiresAt.getTime() <= now + SIGNED_URL_EXPIRED * 1000 + 2000);
});
test('presignUploadPart should return url', async t => {
const provider = createProvider();
const result = await provider.presignUploadPart('key', 'upload-1', 3);
t.truthy(result);
t.true(result!.url.length > 0);
t.true(result!.url.includes('X-Amz-Algorithm=AWS4-HMAC-SHA256'));
});
test('createMultipartUpload should return uploadId', async t => {
const provider = createProvider();
let receivedCommand: any;
const sendStub = async (command: any) => {
receivedCommand = command;
return { UploadId: 'upload-1' };
};
(provider as any).client = { send: sendStub };
const now = Date.now();
const result = await provider.createMultipartUpload('key', {
contentType: 'text/plain',
});
t.is(result?.uploadId, 'upload-1');
t.true(result!.expiresAt.getTime() >= now + SIGNED_URL_EXPIRED * 1000 - 2000);
t.true(result!.expiresAt.getTime() <= now + SIGNED_URL_EXPIRED * 1000 + 2000);
t.is(receivedCommand.input.Key, 'key');
t.is(receivedCommand.input.ContentType, 'text/plain');
});
test('completeMultipartUpload should order parts', async t => {
const provider = createProvider();
let called = false;
const sendStub = async (command: any) => {
called = true;
t.deepEqual(command.input.MultipartUpload.Parts, [
{ ETag: 'a', PartNumber: 1 },
{ ETag: 'b', PartNumber: 2 },
]);
};
(provider as any).client = { send: sendStub };
await provider.completeMultipartUpload('key', 'upload-1', [
{ partNumber: 2, etag: 'b' },
{ partNumber: 1, etag: 'a' },
]);
t.true(called);
});

View File

@@ -118,7 +118,7 @@ export const StorageJSONSchema: JSONSchema = {
urlPrefix: {
type: 'string' as const,
description:
'The presigned url prefix for the cloudflare r2 storage provider.\nsee https://developers.cloudflare.com/waf/custom-rules/use-cases/configure-token-authentication/ to configure it.\nExample value: "https://storage.example.com"\nExample rule: is_timed_hmac_valid_v0("your_secret", http.request.uri, 10800, http.request.timestamp.sec, 6)',
'The custom domain URL prefix for the cloudflare r2 storage provider.\nWhen `enabled=true` and `urlPrefix` + `signKey` are provided, the server will:\n- Redirect GET requests to this custom domain with an HMAC token.\n- Return upload URLs under `/api/storage/*` for uploads.\nPresigned/upload proxy TTL is 1 hour.\nsee https://developers.cloudflare.com/waf/custom-rules/use-cases/configure-token-authentication/ to configure it.\nExample value: "https://storage.example.com"\nExample rule: is_timed_hmac_valid_v0("your_secret", http.request.uri, 10800, http.request.timestamp.sec, 6)',
},
signKey: {
type: 'string' as const,
@@ -135,4 +135,12 @@ export const StorageJSONSchema: JSONSchema = {
};
export type * from './provider';
export { applyAttachHeaders, autoMetadata, sniffMime, toBuffer } from './utils';
export {
applyAttachHeaders,
autoMetadata,
PROXY_MULTIPART_PATH,
PROXY_UPLOAD_PATH,
sniffMime,
STORAGE_PROXY_ROOT,
toBuffer,
} from './utils';

View File

@@ -25,12 +25,51 @@ export interface ListObjectsMetadata {
export type BlobInputType = Buffer | Readable | string;
export type BlobOutputType = Readable;
export interface PresignedUpload {
url: string;
headers?: Record<string, string>;
expiresAt: Date;
}
export interface MultipartUploadInit {
uploadId: string;
expiresAt: Date;
}
export interface MultipartUploadPart {
partNumber: number;
etag: string;
}
export interface StorageProvider {
put(
key: string,
body: BlobInputType,
metadata?: PutObjectMetadata
): Promise<void>;
presignPut?(
key: string,
metadata?: PutObjectMetadata
): Promise<PresignedUpload | undefined>;
createMultipartUpload?(
key: string,
metadata?: PutObjectMetadata
): Promise<MultipartUploadInit | undefined>;
presignUploadPart?(
key: string,
uploadId: string,
partNumber: number
): Promise<PresignedUpload | undefined>;
listMultipartUploadParts?(
key: string,
uploadId: string
): Promise<MultipartUploadPart[] | undefined>;
completeMultipartUpload?(
key: string,
uploadId: string,
parts: MultipartUploadPart[]
): Promise<void>;
abortMultipartUpload?(key: string, uploadId: string): Promise<void>;
head(key: string): Promise<GetObjectMetadata | undefined>;
get(
key: string,

View File

@@ -1,10 +1,20 @@
import assert from 'node:assert';
import { Readable } from 'node:stream';
import { PutObjectCommand, UploadPartCommand } from '@aws-sdk/client-s3';
import { Logger } from '@nestjs/common';
import { GetObjectMetadata } from './provider';
import {
GetObjectMetadata,
PresignedUpload,
PutObjectMetadata,
} from './provider';
import { S3StorageConfig, S3StorageProvider } from './s3';
import {
PROXY_MULTIPART_PATH,
PROXY_UPLOAD_PATH,
SIGNED_URL_EXPIRED,
} from './utils';
export interface R2StorageConfig extends S3StorageConfig {
accountId: string;
@@ -39,8 +49,24 @@ export class R2StorageProvider extends S3StorageProvider {
this.key = this.encoder.encode(config.usePresignedURL?.signKey ?? '');
}
private async signUrl(url: URL): Promise<string> {
const timestamp = Math.floor(Date.now() / 1000);
private get shouldUseProxyUpload() {
const { usePresignedURL } = this.config;
return (
!!usePresignedURL?.enabled &&
!!usePresignedURL.signKey &&
this.key.length > 0
);
}
private parseWorkspaceKey(fullKey: string) {
const [workspaceId, ...rest] = fullKey.split('/');
if (!workspaceId || rest.length !== 1) {
return null;
}
return { workspaceId, key: rest.join('/') };
}
private async signPayload(payload: string) {
const key = await crypto.subtle.importKey(
'raw',
this.key,
@@ -51,14 +77,140 @@ export class R2StorageProvider extends S3StorageProvider {
const mac = await crypto.subtle.sign(
'HMAC',
key,
this.encoder.encode(`${url.pathname}${timestamp}`)
this.encoder.encode(payload)
);
const base64Mac = Buffer.from(mac).toString('base64');
return Buffer.from(mac).toString('base64');
}
private async signUrl(url: URL): Promise<string> {
const timestamp = Math.floor(Date.now() / 1000);
const base64Mac = await this.signPayload(`${url.pathname}${timestamp}`);
url.searchParams.set('sign', `${timestamp}-${base64Mac}`);
return url.toString();
}
private async createProxyUrl(
path: string,
canonicalFields: (string | number | undefined)[],
query: Record<string, string | number | undefined>
) {
const exp = Math.floor(Date.now() / 1000) + SIGNED_URL_EXPIRED;
const canonical = [
path,
...canonicalFields.map(field =>
field === undefined ? '' : field.toString()
),
exp.toString(),
].join('\n');
const token = await this.signPayload(canonical);
const url = new URL(`http://localhost${path}`);
for (const [key, value] of Object.entries(query)) {
if (value === undefined) continue;
url.searchParams.set(key, value.toString());
}
url.searchParams.set('exp', exp.toString());
url.searchParams.set('token', `${exp}-${token}`);
return { url: url.pathname + url.search, expiresAt: new Date(exp * 1000) };
}
override async presignPut(
key: string,
metadata: PutObjectMetadata = {}
): Promise<PresignedUpload | undefined> {
if (!this.shouldUseProxyUpload) {
return super.presignPut(key, metadata);
}
const parsed = this.parseWorkspaceKey(key);
if (!parsed) {
return super.presignPut(key, metadata);
}
const contentType = metadata.contentType ?? 'application/octet-stream';
const { url, expiresAt } = await this.createProxyUrl(
PROXY_UPLOAD_PATH,
[parsed.workspaceId, parsed.key, contentType, metadata.contentLength],
{
workspaceId: parsed.workspaceId,
key: parsed.key,
contentType,
contentLength: metadata.contentLength,
}
);
return {
url,
headers: { 'Content-Type': contentType },
expiresAt,
};
}
override async presignUploadPart(
key: string,
uploadId: string,
partNumber: number
): Promise<PresignedUpload | undefined> {
if (!this.shouldUseProxyUpload) {
return super.presignUploadPart(key, uploadId, partNumber);
}
const parsed = this.parseWorkspaceKey(key);
if (!parsed) {
return super.presignUploadPart(key, uploadId, partNumber);
}
return this.createProxyUrl(
PROXY_MULTIPART_PATH,
[parsed.workspaceId, parsed.key, uploadId, partNumber],
{
workspaceId: parsed.workspaceId,
key: parsed.key,
uploadId,
partNumber,
}
);
}
async proxyPutObject(
key: string,
body: Readable | Buffer | Uint8Array | string,
options: { contentType?: string; contentLength?: number } = {}
) {
return this.client.send(
new PutObjectCommand({
Bucket: this.bucket,
Key: key,
Body: body,
ContentType: options.contentType,
ContentLength: options.contentLength,
})
);
}
async proxyUploadPart(
key: string,
uploadId: string,
partNumber: number,
body: Readable | Buffer | Uint8Array | string,
options: { contentLength?: number } = {}
) {
const result = await this.client.send(
new UploadPartCommand({
Bucket: this.bucket,
Key: key,
UploadId: uploadId,
PartNumber: partNumber,
Body: body,
ContentLength: options.contentLength,
})
);
return result.ETag;
}
override async get(
key: string,
signedUrl?: boolean

View File

@@ -2,15 +2,21 @@
import { Readable } from 'node:stream';
import {
AbortMultipartUploadCommand,
CompleteMultipartUploadCommand,
CreateMultipartUploadCommand,
DeleteObjectCommand,
GetObjectCommand,
HeadObjectCommand,
ListObjectsV2Command,
ListPartsCommand,
NoSuchKey,
NoSuchUpload,
NotFound,
PutObjectCommand,
S3Client,
S3ClientConfig,
UploadPartCommand,
} from '@aws-sdk/client-s3';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import { Logger } from '@nestjs/common';
@@ -19,6 +25,9 @@ import {
BlobInputType,
GetObjectMetadata,
ListObjectsMetadata,
MultipartUploadInit,
MultipartUploadPart,
PresignedUpload,
PutObjectMetadata,
StorageProvider,
} from './provider';
@@ -89,6 +98,196 @@ export class S3StorageProvider implements StorageProvider {
}
}
async presignPut(
key: string,
metadata: PutObjectMetadata = {}
): Promise<PresignedUpload | undefined> {
try {
const contentType = metadata.contentType ?? 'application/octet-stream';
const url = await getSignedUrl(
this.client,
new PutObjectCommand({
Bucket: this.bucket,
Key: key,
ContentType: contentType,
}),
{ expiresIn: SIGNED_URL_EXPIRED }
);
return {
url,
headers: { 'Content-Type': contentType },
expiresAt: new Date(Date.now() + SIGNED_URL_EXPIRED * 1000),
};
} catch (e) {
this.logger.error(
`Failed to presign put object (${JSON.stringify({
key,
bucket: this.bucket,
metadata,
})}`
);
throw e;
}
}
async createMultipartUpload(
key: string,
metadata: PutObjectMetadata = {}
): Promise<MultipartUploadInit | undefined> {
try {
const contentType = metadata.contentType ?? 'application/octet-stream';
const response = await this.client.send(
new CreateMultipartUploadCommand({
Bucket: this.bucket,
Key: key,
ContentType: contentType,
})
);
if (!response.UploadId) {
return;
}
return {
uploadId: response.UploadId,
expiresAt: new Date(Date.now() + SIGNED_URL_EXPIRED * 1000),
};
} catch (e) {
this.logger.error(
`Failed to create multipart upload (${JSON.stringify({
key,
bucket: this.bucket,
metadata,
})}`
);
throw e;
}
}
async presignUploadPart(
key: string,
uploadId: string,
partNumber: number
): Promise<PresignedUpload | undefined> {
try {
const url = await getSignedUrl(
this.client,
new UploadPartCommand({
Bucket: this.bucket,
Key: key,
UploadId: uploadId,
PartNumber: partNumber,
}),
{ expiresIn: SIGNED_URL_EXPIRED }
);
return {
url,
expiresAt: new Date(Date.now() + SIGNED_URL_EXPIRED * 1000),
};
} catch (e) {
this.logger.error(
`Failed to presign upload part (${JSON.stringify({ key, bucket: this.bucket, uploadId, partNumber })}`
);
throw e;
}
}
async listMultipartUploadParts(
key: string,
uploadId: string
): Promise<MultipartUploadPart[] | undefined> {
const parts: MultipartUploadPart[] = [];
let partNumberMarker: string | undefined;
try {
// ListParts is paginated by part number marker
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html
// R2 follows S3 semantics here.
while (true) {
const response = await this.client.send(
new ListPartsCommand({
Bucket: this.bucket,
Key: key,
UploadId: uploadId,
PartNumberMarker: partNumberMarker,
})
);
for (const part of response.Parts ?? []) {
if (!part.PartNumber || !part.ETag) {
continue;
}
parts.push({ partNumber: part.PartNumber, etag: part.ETag });
}
if (!response.IsTruncated) {
break;
}
if (response.NextPartNumberMarker === undefined) {
break;
}
partNumberMarker = response.NextPartNumberMarker;
}
return parts;
} catch (e) {
// the upload may have been aborted/expired by provider lifecycle rules
if (e instanceof NoSuchUpload || e instanceof NotFound) {
return undefined;
}
this.logger.error(`Failed to list multipart upload parts for \`${key}\``);
throw e;
}
}
async completeMultipartUpload(
key: string,
uploadId: string,
parts: MultipartUploadPart[]
): Promise<void> {
try {
const orderedParts = [...parts].sort(
(left, right) => left.partNumber - right.partNumber
);
await this.client.send(
new CompleteMultipartUploadCommand({
Bucket: this.bucket,
Key: key,
UploadId: uploadId,
MultipartUpload: {
Parts: orderedParts.map(part => ({
ETag: part.etag,
PartNumber: part.partNumber,
})),
},
})
);
} catch (e) {
this.logger.error(`Failed to complete multipart upload for \`${key}\``);
throw e;
}
}
async abortMultipartUpload(key: string, uploadId: string): Promise<void> {
try {
await this.client.send(
new AbortMultipartUploadCommand({
Bucket: this.bucket,
Key: key,
UploadId: uploadId,
})
);
} catch (e) {
this.logger.error(`Failed to abort multipart upload for \`${key}\``);
throw e;
}
}
async head(key: string) {
try {
const obj = await this.client.send(
@@ -164,7 +363,7 @@ export class S3StorageProvider implements StorageProvider {
body: obj.Body,
metadata: {
// always set when putting object
contentType: obj.ContentType!,
contentType: obj.ContentType ?? 'application/octet-stream',
contentLength: obj.ContentLength!,
lastModified: obj.LastModified!,
checksumCRC32: obj.ChecksumCRC32,

View File

@@ -94,3 +94,7 @@ export function sniffMime(
}
export const SIGNED_URL_EXPIRED = 60 * 60; // 1 hour
export const STORAGE_PROXY_ROOT = '/api/storage';
export const PROXY_UPLOAD_PATH = `${STORAGE_PROXY_ROOT}/upload`;
export const PROXY_MULTIPART_PATH = `${STORAGE_PROXY_ROOT}/multipart`;

View File

@@ -0,0 +1,4 @@
import { OneMB } from '../../base';
export const MULTIPART_PART_SIZE = 5 * OneMB;
export const MULTIPART_THRESHOLD = 10 * OneMB;

View File

@@ -2,6 +2,8 @@ import './config';
import { Module } from '@nestjs/common';
import { BlobUploadCleanupJob } from './job';
import { R2UploadController } from './r2-proxy';
import {
AvatarStorage,
CommentAttachmentStorage,
@@ -9,7 +11,13 @@ import {
} from './wrappers';
@Module({
providers: [WorkspaceBlobStorage, AvatarStorage, CommentAttachmentStorage],
controllers: [R2UploadController],
providers: [
WorkspaceBlobStorage,
AvatarStorage,
CommentAttachmentStorage,
BlobUploadCleanupJob,
],
exports: [WorkspaceBlobStorage, AvatarStorage, CommentAttachmentStorage],
})
export class StorageModule {}

View File

@@ -0,0 +1,54 @@
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { JobQueue, OneDay, OnJob } from '../../base';
import { Models } from '../../models';
import { WorkspaceBlobStorage } from './wrappers/blob';
declare global {
interface Jobs {
'nightly.cleanExpiredPendingBlobs': {};
}
}
@Injectable()
export class BlobUploadCleanupJob {
private readonly logger = new Logger(BlobUploadCleanupJob.name);
constructor(
private readonly models: Models,
private readonly storage: WorkspaceBlobStorage,
private readonly queue: JobQueue
) {}
@Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT)
async nightlyJob() {
await this.queue.add(
'nightly.cleanExpiredPendingBlobs',
{},
{
jobId: 'nightly-blob-clean-expired-pending',
}
);
}
@OnJob('nightly.cleanExpiredPendingBlobs')
async cleanExpiredPendingBlobs() {
const cutoff = new Date(Date.now() - OneDay);
const pending = await this.models.blob.listPendingExpired(cutoff);
for (const blob of pending) {
if (blob.uploadId) {
await this.storage.abortMultipartUpload(
blob.workspaceId,
blob.key,
blob.uploadId
);
}
await this.storage.delete(blob.workspaceId, blob.key, true);
}
this.logger.log(`cleaned ${pending.length} expired pending blobs`);
}
}

View File

@@ -0,0 +1,331 @@
import { createHmac, timingSafeEqual } from 'node:crypto';
import { Controller, Logger, Put, Req, Res } from '@nestjs/common';
import type { Request, Response } from 'express';
import {
BlobInvalid,
CallMetric,
Config,
OnEvent,
PROXY_MULTIPART_PATH,
PROXY_UPLOAD_PATH,
STORAGE_PROXY_ROOT,
StorageProviderConfig,
StorageProviderFactory,
} from '../../base';
import {
R2StorageConfig,
R2StorageProvider,
} from '../../base/storage/providers/r2';
import { Models } from '../../models';
import { Public } from '../auth/guard';
import { MULTIPART_PART_SIZE } from './constants';
type R2BlobStorageConfig = StorageProviderConfig & {
provider: 'cloudflare-r2';
config: R2StorageConfig;
};
type QueryValue = Request['query'][string];
type R2Config = {
storage: R2BlobStorageConfig;
signKey: string;
};
@Controller(STORAGE_PROXY_ROOT)
export class R2UploadController {
private readonly logger = new Logger(R2UploadController.name);
private provider: R2StorageProvider | null = null;
constructor(
private readonly config: Config,
private readonly models: Models,
private readonly storageFactory: StorageProviderFactory
) {}
@OnEvent('config.changed')
onConfigChanged(event: Events['config.changed']) {
if (event.updates.storages?.blob?.storage) {
this.provider = null;
}
}
private getR2Config(): R2Config {
const storage = this.config.storages.blob.storage as StorageProviderConfig;
if (storage.provider !== 'cloudflare-r2') {
throw new BlobInvalid('Invalid endpoint');
}
const r2Config = storage.config as R2StorageConfig;
const signKey = r2Config.usePresignedURL?.signKey;
if (
!r2Config.usePresignedURL?.enabled ||
!r2Config.usePresignedURL.urlPrefix ||
!signKey
) {
throw new BlobInvalid('Invalid endpoint');
}
return { storage: storage as R2BlobStorageConfig, signKey };
}
private getProvider(storage: R2BlobStorageConfig) {
if (!this.provider) {
const candidate = this.storageFactory.create(storage);
if (candidate instanceof R2StorageProvider) {
this.provider = candidate;
}
}
return this.provider;
}
private sign(canonical: string, signKey: string) {
return createHmac('sha256', signKey).update(canonical).digest('base64');
}
private safeEqual(expected: string, actual: string) {
const a = Buffer.from(expected);
const b = Buffer.from(actual);
if (a.length !== b.length) {
return false;
}
return timingSafeEqual(a, b);
}
private verifyToken(
path: string,
canonicalFields: (string | number | undefined)[],
exp: number,
token: string,
signKey: string
) {
const canonical = [
path,
...canonicalFields.map(field =>
field === undefined ? '' : field.toString()
),
exp.toString(),
].join('\n');
const expected = `${exp}-${this.sign(canonical, signKey)}`;
return this.safeEqual(expected, token);
}
private expectString(value: QueryValue, field: string): string {
if (Array.isArray(value)) {
return String(value[0]);
}
if (typeof value === 'string' && value.length > 0) {
return value;
}
throw new BlobInvalid(`Missing ${field}.`);
}
private optionalString(value: QueryValue) {
if (Array.isArray(value)) {
return String(value[0]);
}
return typeof value === 'string' && value.length > 0 ? value : undefined;
}
private number(value: QueryValue, field: string): number {
const str = this.expectString(value, field);
const num = Number(str);
if (!Number.isFinite(num)) {
throw new BlobInvalid(`Invalid ${field}.`);
}
return num;
}
private optionalNumber(value: QueryValue, field: string): number | undefined {
if (value === undefined) {
return undefined;
}
const num = Number(Array.isArray(value) ? value[0] : value);
if (!Number.isFinite(num)) {
throw new BlobInvalid(`Invalid ${field}.`);
}
return num;
}
private parseContentLength(req: Request) {
const raw = req.header('content-length');
if (!raw) {
return undefined;
}
const num = Number(raw);
if (!Number.isFinite(num) || num < 0) {
throw new BlobInvalid('Invalid Content-Length header');
}
return num;
}
private ensureNotExpired(exp: number) {
const now = Math.floor(Date.now() / 1000);
if (exp < now) {
throw new BlobInvalid('Upload URL expired');
}
}
@Public()
@Put('upload')
@CallMetric('controllers', 'r2_proxy_upload')
async upload(@Req() req: Request, @Res() res: Response) {
const { storage, signKey } = this.getR2Config();
const workspaceId = this.expectString(req.query.workspaceId, 'workspaceId');
const key = this.expectString(req.query.key, 'key');
const token = this.expectString(req.query.token, 'token');
const exp = this.number(req.query.exp, 'exp');
const contentType = this.optionalString(req.query.contentType);
const contentLengthFromQuery = this.optionalNumber(
req.query.contentLength,
'contentLength'
);
this.ensureNotExpired(exp);
if (
!this.verifyToken(
PROXY_UPLOAD_PATH,
[workspaceId, key, contentType, contentLengthFromQuery],
exp,
token,
signKey
)
) {
throw new BlobInvalid('Invalid upload token');
}
const record = await this.models.blob.get(workspaceId, key);
if (!record) {
throw new BlobInvalid('Blob upload is not initialized');
}
if (record.status === 'completed') {
throw new BlobInvalid('Blob upload is already completed');
}
const contentLengthHeader = this.parseContentLength(req);
if (
contentLengthFromQuery !== undefined &&
contentLengthHeader !== undefined &&
contentLengthFromQuery !== contentLengthHeader
) {
throw new BlobInvalid('Content length mismatch');
}
const contentLength = contentLengthHeader ?? contentLengthFromQuery;
if (contentLength === undefined) {
throw new BlobInvalid('Missing Content-Length header');
}
if (record.size && contentLength !== record.size) {
throw new BlobInvalid('Content length does not match upload metadata');
}
const mime = contentType ?? record.mime;
if (record.mime && mime && record.mime !== mime) {
throw new BlobInvalid('Mime type mismatch');
}
const provider = this.getProvider(storage);
if (!provider) {
throw new BlobInvalid('R2 provider is not available');
}
try {
await provider.proxyPutObject(`${workspaceId}/${key}`, req, {
contentType: mime,
contentLength,
});
} catch (error) {
this.logger.error('Failed to proxy upload', error as Error);
throw new BlobInvalid('Upload failed');
}
res.status(200).end();
}
@Public()
@Put('multipart')
@CallMetric('controllers', 'r2_proxy_multipart')
async uploadPart(@Req() req: Request, @Res() res: Response) {
const { storage, signKey } = this.getR2Config();
const workspaceId = this.expectString(req.query.workspaceId, 'workspaceId');
const key = this.expectString(req.query.key, 'key');
const uploadId = this.expectString(req.query.uploadId, 'uploadId');
const token = this.expectString(req.query.token, 'token');
const exp = this.number(req.query.exp, 'exp');
const partNumber = this.number(req.query.partNumber, 'partNumber');
if (partNumber < 1) {
throw new BlobInvalid('Invalid part number');
}
this.ensureNotExpired(exp);
if (
!this.verifyToken(
PROXY_MULTIPART_PATH,
[workspaceId, key, uploadId, partNumber],
exp,
token,
signKey
)
) {
throw new BlobInvalid('Invalid upload token');
}
const record = await this.models.blob.get(workspaceId, key);
if (!record) {
throw new BlobInvalid('Multipart upload is not initialized');
}
if (record.status === 'completed') {
throw new BlobInvalid('Blob upload is already completed');
}
if (record.uploadId !== uploadId) {
throw new BlobInvalid('Upload id mismatch');
}
const contentLength = this.parseContentLength(req);
if (contentLength === undefined || contentLength === 0) {
throw new BlobInvalid('Missing Content-Length header');
}
const maxPartNumber = Math.ceil(record.size / MULTIPART_PART_SIZE);
if (partNumber > maxPartNumber) {
throw new BlobInvalid('Part number exceeds upload size');
}
if (
record.size &&
(partNumber - 1) * MULTIPART_PART_SIZE + contentLength > record.size
) {
throw new BlobInvalid('Part size exceeds upload metadata');
}
const provider = this.getProvider(storage);
if (!provider) {
throw new BlobInvalid('R2 provider is not available');
}
try {
const etag = await provider.proxyUploadPart(
`${workspaceId}/${key}`,
uploadId,
partNumber,
req,
{ contentLength }
);
if (etag) {
res.setHeader('etag', etag);
}
} catch (error) {
this.logger.error('Failed to proxy multipart upload', error as Error);
throw new BlobInvalid('Upload failed');
}
res.status(200).end();
}
}

View File

@@ -1,3 +1,5 @@
import { createHash } from 'node:crypto';
import { Injectable, Logger } from '@nestjs/common';
import {
@@ -27,6 +29,17 @@ declare global {
}
}
type BlobCompleteResult =
| { ok: true; metadata: GetObjectMetadata }
| {
ok: false;
reason:
| 'not_found'
| 'size_mismatch'
| 'mime_mismatch'
| 'checksum_mismatch';
};
@Injectable()
export class WorkspaceBlobStorage {
private readonly logger = new Logger(WorkspaceBlobStorage.name);
@@ -71,6 +84,142 @@ export class WorkspaceBlobStorage {
return this.provider.get(`${workspaceId}/${key}`, signedUrl);
}
async presignPut(
workspaceId: string,
key: string,
metadata?: PutObjectMetadata
) {
return this.provider.presignPut?.(`${workspaceId}/${key}`, metadata);
}
async createMultipartUpload(
workspaceId: string,
key: string,
metadata?: PutObjectMetadata
) {
return this.provider.createMultipartUpload?.(
`${workspaceId}/${key}`,
metadata
);
}
async presignUploadPart(
workspaceId: string,
key: string,
uploadId: string,
partNumber: number
) {
return this.provider.presignUploadPart?.(
`${workspaceId}/${key}`,
uploadId,
partNumber
);
}
async listMultipartUploadParts(
workspaceId: string,
key: string,
uploadId: string
) {
return this.provider.listMultipartUploadParts?.(
`${workspaceId}/${key}`,
uploadId
);
}
async completeMultipartUpload(
workspaceId: string,
key: string,
uploadId: string,
parts: { partNumber: number; etag: string }[]
) {
if (!this.provider.completeMultipartUpload) {
return false;
}
await this.provider.completeMultipartUpload(
`${workspaceId}/${key}`,
uploadId,
parts
);
return true;
}
async abortMultipartUpload(
workspaceId: string,
key: string,
uploadId: string
) {
if (!this.provider.abortMultipartUpload) {
return false;
}
await this.provider.abortMultipartUpload(`${workspaceId}/${key}`, uploadId);
return true;
}
async head(workspaceId: string, key: string) {
return this.provider.head(`${workspaceId}/${key}`);
}
async complete(
workspaceId: string,
key: string,
expected: { size: number; mime: string }
): Promise<BlobCompleteResult> {
const metadata = await this.head(workspaceId, key);
if (!metadata) {
return { ok: false, reason: 'not_found' };
}
if (metadata.contentLength !== expected.size) {
return { ok: false, reason: 'size_mismatch' };
}
if (expected.mime && metadata.contentType !== expected.mime) {
return { ok: false, reason: 'mime_mismatch' };
}
const object = await this.provider.get(`${workspaceId}/${key}`);
if (!object.body) {
return { ok: false, reason: 'not_found' };
}
const checksum = createHash('sha256');
try {
for await (const chunk of object.body) {
checksum.update(chunk as Buffer);
}
} catch (e) {
this.logger.error('failed to read blob for checksum verification', e);
return { ok: false, reason: 'checksum_mismatch' };
}
const base64 = checksum.digest('base64');
const base64urlWithPadding = base64.replace(/\+/g, '-').replace(/\//g, '_');
if (base64urlWithPadding !== key) {
try {
await this.provider.delete(`${workspaceId}/${key}`);
} catch (e) {
// never throw
this.logger.error('failed to delete invalid blob', e);
}
return { ok: false, reason: 'checksum_mismatch' };
}
await this.models.blob.upsert({
workspaceId,
key,
mime: metadata.contentType,
size: metadata.contentLength,
status: 'completed',
uploadId: null,
});
return { ok: true, metadata };
}
async list(workspaceId: string, syncBlobMeta = true) {
const blobsInDb = await this.models.blob.list(workspaceId);
@@ -78,6 +227,12 @@ export class WorkspaceBlobStorage {
return blobsInDb;
}
// all blobs are uploading but not completed yet
const hasDbBlobs = await this.models.blob.hasAny(workspaceId);
if (hasDbBlobs) {
return blobsInDb;
}
const blobs = await this.provider.list(workspaceId + '/');
blobs.forEach(blob => {
blob.key = blob.key.slice(workspaceId.length + 1);
@@ -147,6 +302,8 @@ export class WorkspaceBlobStorage {
key,
mime: meta.contentType,
size: meta.contentLength,
status: 'completed',
uploadId: null,
});
}

View File

@@ -2,29 +2,110 @@ import { Logger, UseGuards } from '@nestjs/common';
import {
Args,
Field,
InputType,
Int,
Mutation,
ObjectType,
Parent,
Query,
registerEnumType,
ResolveField,
Resolver,
} from '@nestjs/graphql';
import { GraphQLJSONObject } from 'graphql-scalars';
import GraphQLUpload from 'graphql-upload/GraphQLUpload.mjs';
import type { FileUpload } from '../../../base';
import {
BlobInvalid,
BlobNotFound,
BlobQuotaExceeded,
CloudThrottlerGuard,
readBuffer,
StorageQuotaExceeded,
} from '../../../base';
import { Models } from '../../../models';
import { CurrentUser } from '../../auth';
import { AccessController } from '../../permission';
import { QuotaService } from '../../quota';
import { WorkspaceBlobStorage } from '../../storage';
import {
MULTIPART_PART_SIZE,
MULTIPART_THRESHOLD,
} from '../../storage/constants';
import { WorkspaceBlobSizes, WorkspaceType } from '../types';
enum BlobUploadMethod {
GRAPHQL = 'GRAPHQL',
PRESIGNED = 'PRESIGNED',
MULTIPART = 'MULTIPART',
}
registerEnumType(BlobUploadMethod, {
name: 'BlobUploadMethod',
description: 'Blob upload method',
});
@ObjectType()
class BlobUploadedPart {
@Field(() => Int)
partNumber!: number;
@Field()
etag!: string;
}
@ObjectType()
class BlobUploadInit {
@Field(() => BlobUploadMethod)
method!: BlobUploadMethod;
@Field()
blobKey!: string;
@Field(() => Boolean, { nullable: true })
alreadyUploaded?: boolean;
@Field(() => String, { nullable: true })
uploadUrl?: string;
@Field(() => GraphQLJSONObject, { nullable: true })
headers?: Record<string, string>;
@Field(() => Date, { nullable: true })
expiresAt?: Date;
@Field(() => String, { nullable: true })
uploadId?: string;
@Field(() => Int, { nullable: true })
partSize?: number;
@Field(() => [BlobUploadedPart], { nullable: true })
uploadedParts?: BlobUploadedPart[];
}
@ObjectType()
class BlobUploadPart {
@Field()
uploadUrl!: string;
@Field(() => GraphQLJSONObject, { nullable: true })
headers?: Record<string, string>;
@Field(() => Date, { nullable: true })
expiresAt?: Date;
}
@InputType()
class BlobUploadPartInput {
@Field(() => Int)
partNumber!: number;
@Field()
etag!: string;
}
@ObjectType()
class ListedBlob {
@Field()
@@ -47,7 +128,8 @@ export class WorkspaceBlobResolver {
constructor(
private readonly ac: AccessController,
private readonly quota: QuotaService,
private readonly storage: WorkspaceBlobStorage
private readonly storage: WorkspaceBlobStorage,
private readonly models: Models
) {}
@ResolveField(() => [ListedBlob], {
@@ -110,6 +192,258 @@ export class WorkspaceBlobResolver {
return blob.filename;
}
@Mutation(() => BlobUploadInit)
async createBlobUpload(
@CurrentUser() user: CurrentUser,
@Args('workspaceId') workspaceId: string,
@Args('key') key: string,
@Args('size', { type: () => Int }) size: number,
@Args('mime') mime: string
): Promise<BlobUploadInit> {
await this.ac
.user(user.id)
.workspace(workspaceId)
.assert('Workspace.Blobs.Write');
let record = await this.models.blob.get(workspaceId, key);
mime = mime || 'application/octet-stream';
if (record) {
if (record.size !== size) {
throw new BlobInvalid('Blob size mismatch');
}
if (record.mime !== mime) {
throw new BlobInvalid('Blob mime mismatch');
}
if (record.status === 'completed') {
const existingMetadata = await this.storage.head(workspaceId, key);
if (!existingMetadata) {
// record exists but object is missing, treat as a new upload
record = null;
} else if (existingMetadata.contentLength !== size) {
throw new BlobInvalid('Blob size mismatch');
} else if (existingMetadata.contentType !== mime) {
throw new BlobInvalid('Blob mime mismatch');
} else {
return {
method: BlobUploadMethod.GRAPHQL,
blobKey: key,
alreadyUploaded: true,
};
}
}
}
const checkExceeded =
await this.quota.getWorkspaceQuotaCalculator(workspaceId);
const result = checkExceeded(record ? 0 : size);
if (result?.blobQuotaExceeded) {
throw new BlobQuotaExceeded();
} else if (result?.storageQuotaExceeded) {
throw new StorageQuotaExceeded();
}
const metadata = { contentType: mime, contentLength: size };
let init: BlobUploadInit | null = null;
let uploadIdForRecord: string | null = null;
// try to resume multipart uploads
if (record && record.uploadId) {
const uploadedParts = await this.storage.listMultipartUploadParts(
workspaceId,
key,
record.uploadId
);
if (uploadedParts) {
return {
method: BlobUploadMethod.MULTIPART,
blobKey: key,
uploadId: record.uploadId,
partSize: MULTIPART_PART_SIZE,
uploadedParts,
};
}
}
if (size >= MULTIPART_THRESHOLD) {
const multipart = await this.storage.createMultipartUpload(
workspaceId,
key,
metadata
);
if (multipart) {
uploadIdForRecord = multipart.uploadId;
init = {
method: BlobUploadMethod.MULTIPART,
blobKey: key,
uploadId: multipart.uploadId,
partSize: MULTIPART_PART_SIZE,
expiresAt: multipart.expiresAt,
uploadedParts: [],
};
}
}
if (!init) {
const presigned = await this.storage.presignPut(
workspaceId,
key,
metadata
);
if (presigned) {
init = {
method: BlobUploadMethod.PRESIGNED,
blobKey: key,
uploadUrl: presigned.url,
headers: presigned.headers,
expiresAt: presigned.expiresAt,
};
}
}
if (!init) {
init = {
method: BlobUploadMethod.GRAPHQL,
blobKey: key,
};
}
await this.models.blob.upsert({
workspaceId,
key,
mime,
size,
status: 'pending',
uploadId: uploadIdForRecord,
});
return init;
}
@Mutation(() => String)
async completeBlobUpload(
@CurrentUser() user: CurrentUser,
@Args('workspaceId') workspaceId: string,
@Args('key') key: string,
@Args('uploadId', { nullable: true }) uploadId?: string,
@Args({
name: 'parts',
type: () => [BlobUploadPartInput],
nullable: true,
})
parts?: BlobUploadPartInput[]
): Promise<string> {
await this.ac
.user(user.id)
.workspace(workspaceId)
.assert('Workspace.Blobs.Write');
const record = await this.models.blob.get(workspaceId, key);
if (!record) {
throw new BlobNotFound({ spaceId: workspaceId, blobId: key });
}
if (record.status === 'completed') {
return key;
}
const hasMultipartInput =
uploadId !== undefined || (parts?.length ?? 0) > 0;
const hasMultipartRecord = !!record.uploadId;
if (hasMultipartRecord) {
if (!uploadId || !parts || parts.length === 0) {
throw new BlobInvalid(
'Multipart upload requires both uploadId and parts'
);
}
if (uploadId !== record.uploadId) {
throw new BlobInvalid('Upload id mismatch');
}
const metadata = await this.storage.head(workspaceId, key);
if (!metadata) {
const completed = await this.storage.completeMultipartUpload(
workspaceId,
key,
uploadId,
parts
);
if (!completed) {
throw new BlobInvalid('Multipart upload is not supported');
}
}
} else if (hasMultipartInput) {
throw new BlobInvalid('Multipart upload is not initialized');
}
const result = await this.storage.complete(workspaceId, key, {
size: record.size,
mime: record.mime,
});
if (!result.ok) {
if (result.reason === 'not_found') {
throw new BlobNotFound({
spaceId: workspaceId,
blobId: key,
});
}
if (result.reason === 'size_mismatch') {
throw new BlobInvalid('Blob size mismatch');
}
if (result.reason === 'mime_mismatch') {
throw new BlobInvalid('Blob mime mismatch');
}
throw new BlobInvalid('Blob key mismatch');
}
return key;
}
@Mutation(() => BlobUploadPart)
async getBlobUploadPartUrl(
@CurrentUser() user: CurrentUser,
@Args('workspaceId') workspaceId: string,
@Args('key') key: string,
@Args('uploadId') uploadId: string,
@Args('partNumber', { type: () => Int }) partNumber: number
): Promise<BlobUploadPart> {
await this.ac
.user(user.id)
.workspace(workspaceId)
.assert('Workspace.Blobs.Write');
const part = await this.storage.presignUploadPart(
workspaceId,
key,
uploadId,
partNumber
);
if (!part) {
throw new BlobInvalid('Multipart upload is not supported');
}
return {
uploadUrl: part.url,
headers: part.headers,
expiresAt: part.expiresAt,
};
}
@Mutation(() => Boolean)
async abortBlobUpload(
@CurrentUser() user: CurrentUser,
@Args('workspaceId') workspaceId: string,
@Args('key') key: string,
@Args('uploadId') uploadId: string
) {
await this.ac
.user(user.id)
.workspace(workspaceId)
.assert('Workspace.Blobs.Write');
return this.storage.abortMultipartUpload(workspaceId, key, uploadId);
}
@Mutation(() => Boolean)
async deleteBlob(
@CurrentUser() user: CurrentUser,

View File

@@ -21,12 +21,16 @@ export class BlobModel extends BaseModel {
update: {
mime: blob.mime,
size: blob.size,
status: blob.status,
uploadId: blob.uploadId,
},
create: {
workspaceId: blob.workspaceId,
key: blob.key,
mime: blob.mime,
size: blob.size,
status: blob.status,
uploadId: blob.uploadId,
},
});
}
@@ -76,11 +80,39 @@ export class BlobModel extends BaseModel {
...options?.where,
workspaceId,
deletedAt: null,
status: 'completed',
},
select: options?.select,
});
}
async hasAny(workspaceId: string) {
const count = await this.db.blob.count({
where: {
workspaceId,
deletedAt: null,
},
});
return count > 0;
}
async listPendingExpired(before: Date) {
return await this.db.blob.findMany({
where: {
status: 'pending',
deletedAt: null,
createdAt: {
lt: before,
},
},
select: {
workspaceId: true,
key: true,
uploadId: true,
},
});
}
async listDeleted(workspaceId: string) {
return await this.db.blob.findMany({
where: {

View File

@@ -96,6 +96,41 @@ type BlobNotFoundDataType {
spaceId: String!
}
type BlobUploadInit {
alreadyUploaded: Boolean
blobKey: String!
expiresAt: DateTime
headers: JSONObject
method: BlobUploadMethod!
partSize: Int
uploadId: String
uploadUrl: String
uploadedParts: [BlobUploadedPart!]
}
"""Blob upload method"""
enum BlobUploadMethod {
GRAPHQL
MULTIPART
PRESIGNED
}
type BlobUploadPart {
expiresAt: DateTime
headers: JSONObject
uploadUrl: String!
}
input BlobUploadPartInput {
etag: String!
partNumber: Int!
}
type BlobUploadedPart {
etag: String!
partNumber: Int!
}
enum ChatHistoryOrder {
asc
desc
@@ -659,6 +694,7 @@ enum ErrorNames {
ALREADY_IN_SPACE
AUTHENTICATION_REQUIRED
BAD_REQUEST
BLOB_INVALID
BLOB_NOT_FOUND
BLOB_QUOTA_EXCEEDED
CANNOT_DELETE_ACCOUNT_WITH_OWNED_TEAM_WORKSPACE
@@ -1194,6 +1230,7 @@ type MissingOauthQueryParameterDataType {
}
type Mutation {
abortBlobUpload(key: String!, uploadId: String!, workspaceId: String!): Boolean!
acceptInviteById(inviteId: String!, sendAcceptMail: Boolean @deprecated(reason: "never used"), workspaceId: String @deprecated(reason: "never used")): Boolean!
activateLicense(license: String!, workspaceId: String!): License!
@@ -1223,6 +1260,8 @@ type Mutation {
"""Cleanup sessions"""
cleanupCopilotSession(options: DeleteSessionInput!): [String!]!
completeBlobUpload(key: String!, parts: [BlobUploadPartInput!], uploadId: String, workspaceId: String!): String!
createBlobUpload(key: String!, mime: String!, size: Int!, workspaceId: String!): BlobUploadInit!
"""Create change password url"""
createChangePasswordUrl(callbackUrl: String!, userId: String!): String!
@@ -1275,6 +1314,7 @@ type Mutation {
forkCopilotSession(options: ForkChatSessionInput!): String!
generateLicenseKey(sessionId: String!): String!
generateUserAccessToken(input: GenerateAccessTokenInput!): RevealedAccessToken!
getBlobUploadPartUrl(key: String!, partNumber: Int!, uploadId: String!, workspaceId: String!): BlobUploadPart!
grantDocUserRoles(input: GrantDocUserRolesInput!): Boolean!
grantMember(permission: Permission!, userId: String!, workspaceId: String!): Boolean!