From 76524084d1938ab541639e91dd8e2e266da65ad0 Mon Sep 17 00:00:00 2001 From: DarkSky <25152247+darkskygit@users.noreply.github.com> Date: Tue, 23 Dec 2025 22:09:21 +0800 Subject: [PATCH] feat: multipart blob sync support (#14138) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary by CodeRabbit * **New Features** * Flexible blob uploads: GRAPHQL, presigned, and multipart flows with per‑part URLs, abort/complete operations, presigned proxy endpoints, and nightly cleanup of expired pending uploads. * **API / Schema** * GraphQL additions: new types, mutations, enum and error to manage upload lifecycle (create, complete, abort, get part URL). * **Database** * New blob status enum and columns (status, upload_id); listing now defaults to completed blobs. * **Localization** * Added user-facing message: "Blob is invalid." * **Tests** * Expanded unit and end‑to‑end coverage for upload flows, proxy behavior, multipart and provider integrations. ✏️ Tip: You can customize this high-level summary in your review settings. --- .docker/selfhost/schema.json | 6 +- .../migration.sql | 6 + packages/backend/server/schema.prisma | 19 +- .../__tests__/e2e/storage/r2-proxy.spec.ts | 460 ++++++++++++++++++ .../storage/blob-upload-cleanup.spec.ts | 106 ++++ .../server/src/__tests__/utils/blobs.ts | 79 +++ .../src/__tests__/workspace/blobs.e2e.ts | 105 +++- packages/backend/server/src/base/error/def.ts | 4 + .../server/src/base/error/errors.gen.ts | 7 + .../server/src/base/redis/instances.ts | 8 +- .../src/base/storage/__tests__/s3.spec.ts | 80 +++ .../src/base/storage/providers/index.ts | 12 +- .../src/base/storage/providers/provider.ts | 39 ++ .../server/src/base/storage/providers/r2.ts | 162 +++++- .../server/src/base/storage/providers/s3.ts | 201 +++++++- .../src/base/storage/providers/utils.ts | 4 + .../server/src/core/storage/constants.ts | 4 + .../backend/server/src/core/storage/index.ts | 10 +- .../backend/server/src/core/storage/job.ts | 54 ++ .../server/src/core/storage/r2-proxy.ts | 331 +++++++++++++ .../server/src/core/storage/wrappers/blob.ts | 157 ++++++ .../src/core/workspaces/resolvers/blob.ts | 336 ++++++++++++- packages/backend/server/src/models/blob.ts | 32 ++ packages/backend/server/src/schema.gql | 40 ++ .../graphql/src/graphql/blob-upload-abort.gql | 3 + .../src/graphql/blob-upload-complete.gql | 3 + .../src/graphql/blob-upload-create.gql | 16 + .../src/graphql/blob-upload-part-url.gql | 7 + packages/common/graphql/src/graphql/index.ts | 59 +++ packages/common/graphql/src/schema.ts | 157 ++++++ .../nbstore/src/__tests__/cloud-blob.spec.ts | 162 ++++++ .../common/nbstore/src/impls/cloud/blob.ts | 224 ++++++++- .../common/nbstore/src/impls/cloud/http.ts | 13 +- .../i18n/src/i18n-completenesses.json | 2 +- packages/frontend/i18n/src/i18n.gen.ts | 4 + packages/frontend/i18n/src/resources/en.json | 1 + 36 files changed, 2880 insertions(+), 33 deletions(-) create mode 100644 packages/backend/server/migrations/20251222184223_add_blob_status/migration.sql create mode 100644 packages/backend/server/src/__tests__/e2e/storage/r2-proxy.spec.ts create mode 100644 packages/backend/server/src/__tests__/storage/blob-upload-cleanup.spec.ts create mode 100644 packages/backend/server/src/base/storage/__tests__/s3.spec.ts create mode 100644 packages/backend/server/src/core/storage/constants.ts create mode 100644 packages/backend/server/src/core/storage/job.ts create mode 100644 packages/backend/server/src/core/storage/r2-proxy.ts create mode 100644 packages/common/graphql/src/graphql/blob-upload-abort.gql create mode 100644 packages/common/graphql/src/graphql/blob-upload-complete.gql create mode 100644 packages/common/graphql/src/graphql/blob-upload-create.gql create mode 100644 packages/common/graphql/src/graphql/blob-upload-part-url.gql create mode 100644 packages/common/nbstore/src/__tests__/cloud-blob.spec.ts diff --git a/.docker/selfhost/schema.json b/.docker/selfhost/schema.json index 58ddd8bba2..8366e071f6 100644 --- a/.docker/selfhost/schema.json +++ b/.docker/selfhost/schema.json @@ -397,7 +397,7 @@ }, "urlPrefix": { "type": "string", - "description": "The presigned url prefix for the cloudflare r2 storage provider.\nsee https://developers.cloudflare.com/waf/custom-rules/use-cases/configure-token-authentication/ to configure it.\nExample value: \"https://storage.example.com\"\nExample rule: is_timed_hmac_valid_v0(\"your_secret\", http.request.uri, 10800, http.request.timestamp.sec, 6)" + "description": "The custom domain URL prefix for the cloudflare r2 storage provider.\nWhen `enabled=true` and `urlPrefix` + `signKey` are provided, the server will:\n- Redirect GET requests to this custom domain with an HMAC token.\n- Return upload URLs under `/api/storage/*` for uploads.\nPresigned/upload proxy TTL is 1 hour.\nsee https://developers.cloudflare.com/waf/custom-rules/use-cases/configure-token-authentication/ to configure it.\nExample value: \"https://storage.example.com\"\nExample rule: is_timed_hmac_valid_v0(\"your_secret\", http.request.uri, 10800, http.request.timestamp.sec, 6)" }, "signKey": { "type": "string", @@ -518,7 +518,7 @@ }, "urlPrefix": { "type": "string", - "description": "The presigned url prefix for the cloudflare r2 storage provider.\nsee https://developers.cloudflare.com/waf/custom-rules/use-cases/configure-token-authentication/ to configure it.\nExample value: \"https://storage.example.com\"\nExample rule: is_timed_hmac_valid_v0(\"your_secret\", http.request.uri, 10800, http.request.timestamp.sec, 6)" + "description": "The custom domain URL prefix for the cloudflare r2 storage provider.\nWhen `enabled=true` and `urlPrefix` + `signKey` are provided, the server will:\n- Redirect GET requests to this custom domain with an HMAC token.\n- Return upload URLs under `/api/storage/*` for uploads.\nPresigned/upload proxy TTL is 1 hour.\nsee https://developers.cloudflare.com/waf/custom-rules/use-cases/configure-token-authentication/ to configure it.\nExample value: \"https://storage.example.com\"\nExample rule: is_timed_hmac_valid_v0(\"your_secret\", http.request.uri, 10800, http.request.timestamp.sec, 6)" }, "signKey": { "type": "string", @@ -928,7 +928,7 @@ }, "urlPrefix": { "type": "string", - "description": "The presigned url prefix for the cloudflare r2 storage provider.\nsee https://developers.cloudflare.com/waf/custom-rules/use-cases/configure-token-authentication/ to configure it.\nExample value: \"https://storage.example.com\"\nExample rule: is_timed_hmac_valid_v0(\"your_secret\", http.request.uri, 10800, http.request.timestamp.sec, 6)" + "description": "The custom domain URL prefix for the cloudflare r2 storage provider.\nWhen `enabled=true` and `urlPrefix` + `signKey` are provided, the server will:\n- Redirect GET requests to this custom domain with an HMAC token.\n- Return upload URLs under `/api/storage/*` for uploads.\nPresigned/upload proxy TTL is 1 hour.\nsee https://developers.cloudflare.com/waf/custom-rules/use-cases/configure-token-authentication/ to configure it.\nExample value: \"https://storage.example.com\"\nExample rule: is_timed_hmac_valid_v0(\"your_secret\", http.request.uri, 10800, http.request.timestamp.sec, 6)" }, "signKey": { "type": "string", diff --git a/packages/backend/server/migrations/20251222184223_add_blob_status/migration.sql b/packages/backend/server/migrations/20251222184223_add_blob_status/migration.sql new file mode 100644 index 0000000000..e52991a52c --- /dev/null +++ b/packages/backend/server/migrations/20251222184223_add_blob_status/migration.sql @@ -0,0 +1,6 @@ +-- CreateEnum +CREATE TYPE "BlobStatus" AS ENUM ('pending', 'completed'); + +-- AlterTable +ALTER TABLE "blobs" ADD COLUMN "status" "BlobStatus" NOT NULL DEFAULT 'completed', +ADD COLUMN "upload_id" VARCHAR; diff --git a/packages/backend/server/schema.prisma b/packages/backend/server/schema.prisma index d33304f48e..25d2569ff6 100644 --- a/packages/backend/server/schema.prisma +++ b/packages/backend/server/schema.prisma @@ -836,16 +836,23 @@ model InstalledLicense { @@map("installed_licenses") } +enum BlobStatus { + pending + completed +} + // Blob table only exists for fast non-data queries. // like, total size of blobs in a workspace, or blob list for sync service. // it should only be a map of metadata of blobs stored anywhere else model Blob { - workspaceId String @map("workspace_id") @db.VarChar - key String @db.VarChar - size Int @db.Integer - mime String @db.VarChar - createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(3) - deletedAt DateTime? @map("deleted_at") @db.Timestamptz(3) + workspaceId String @map("workspace_id") @db.VarChar + key String @db.VarChar + size Int @db.Integer + mime String @db.VarChar + status BlobStatus @default(completed) + uploadId String? @map("upload_id") @db.VarChar + createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(3) + deletedAt DateTime? @map("deleted_at") @db.Timestamptz(3) workspace Workspace @relation(fields: [workspaceId], references: [id], onDelete: Cascade) AiWorkspaceBlobEmbedding AiWorkspaceBlobEmbedding[] diff --git a/packages/backend/server/src/__tests__/e2e/storage/r2-proxy.spec.ts b/packages/backend/server/src/__tests__/e2e/storage/r2-proxy.spec.ts new file mode 100644 index 0000000000..18fb9a2863 --- /dev/null +++ b/packages/backend/server/src/__tests__/e2e/storage/r2-proxy.spec.ts @@ -0,0 +1,460 @@ +import { createHash } from 'node:crypto'; +import { mock } from 'node:test'; + +import { + Config, + ConfigFactory, + PROXY_MULTIPART_PATH, + PROXY_UPLOAD_PATH, + StorageProviderConfig, + StorageProviderFactory, + toBuffer, +} from '../../../base'; +import { + R2StorageConfig, + R2StorageProvider, +} from '../../../base/storage/providers/r2'; +import { SIGNED_URL_EXPIRED } from '../../../base/storage/providers/utils'; +import { WorkspaceBlobStorage } from '../../../core/storage'; +import { MULTIPART_THRESHOLD } from '../../../core/storage/constants'; +import { R2UploadController } from '../../../core/storage/r2-proxy'; +import { app, e2e, Mockers } from '../test'; + +class MockR2Provider extends R2StorageProvider { + createMultipartCalls = 0; + putCalls: { + key: string; + body: Buffer; + contentType?: string; + contentLength?: number; + }[] = []; + partCalls: { + key: string; + uploadId: string; + partNumber: number; + etag: string; + body: Buffer; + contentLength?: number; + }[] = []; + + constructor(config: R2StorageConfig, bucket: string) { + super(config, bucket); + } + + destroy() { + this.client.destroy(); + } + + // @ts-ignore expect override + override async proxyPutObject( + key: string, + body: any, + options: { contentType?: string; contentLength?: number } = {} + ) { + this.putCalls.push({ + key, + body: await toBuffer(body), + contentType: options.contentType, + contentLength: options.contentLength, + }); + } + + override async proxyUploadPart( + key: string, + uploadId: string, + partNumber: number, + body: any, + options: { contentLength?: number } = {} + ) { + const etag = `"etag-${partNumber}"`; + this.partCalls.push({ + key, + uploadId, + partNumber, + etag, + body: await toBuffer(body), + contentLength: options.contentLength, + }); + return etag; + } + + override async createMultipartUpload() { + this.createMultipartCalls += 1; + return { + uploadId: 'upload-id', + expiresAt: new Date(Date.now() + SIGNED_URL_EXPIRED * 1000), + }; + } + + override async listMultipartUploadParts(key: string, uploadId: string) { + const latest = new Map(); + for (const part of this.partCalls) { + if (part.key !== key || part.uploadId !== uploadId) { + continue; + } + latest.set(part.partNumber, part.etag); + } + return [...latest.entries()] + .sort((left, right) => left[0] - right[0]) + .map(([partNumber, etag]) => ({ partNumber, etag })); + } +} + +const baseR2Storage: StorageProviderConfig = { + provider: 'cloudflare-r2', + bucket: 'test-bucket', + config: { + accountId: 'test-account', + region: 'auto', + credentials: { + accessKeyId: 'test', + secretAccessKey: 'test', + }, + usePresignedURL: { + enabled: true, + urlPrefix: 'https://cdn.example.com', + signKey: 'r2-sign-key', + }, + }, +}; + +let defaultBlobStorage: StorageProviderConfig; +let provider: MockR2Provider | null = null; +let factoryCreateUnmocked: StorageProviderFactory['create']; + +e2e.before(() => { + defaultBlobStorage = structuredClone(app.get(Config).storages.blob.storage); + const factory = app.get(StorageProviderFactory); + factoryCreateUnmocked = factory.create.bind(factory); +}); + +e2e.beforeEach(async () => { + provider?.destroy(); + provider = null; + + const factory = app.get(StorageProviderFactory); + mock.method(factory, 'create', (config: StorageProviderConfig) => { + if (config.provider === 'cloudflare-r2') { + if (!provider) { + provider = new MockR2Provider( + config.config as R2StorageConfig, + config.bucket + ); + } + return provider; + } + return factoryCreateUnmocked(config); + }); + + await useR2Storage(); +}); + +e2e.afterEach.always(async () => { + await setBlobStorage(defaultBlobStorage); + provider?.destroy(); + provider = null; + mock.reset(); +}); + +async function setBlobStorage(storage: StorageProviderConfig) { + provider?.destroy(); + provider = null; + const configFactory = app.get(ConfigFactory); + configFactory.override({ storages: { blob: { storage } } }); + const blobStorage = app.get(WorkspaceBlobStorage); + await blobStorage.onConfigInit(); + const controller = app.get(R2UploadController); + // reset cached provider in controller + (controller as any).provider = null; +} + +async function useR2Storage( + overrides?: Partial +) { + const storage = structuredClone(baseR2Storage) as StorageProviderConfig; + const usePresignedURL = { + ...(structuredClone( + ((baseR2Storage as StorageProviderConfig).config as R2StorageConfig) + .usePresignedURL ?? {} + ) as R2StorageConfig['usePresignedURL']), + ...overrides, + }; + (storage.config as R2StorageConfig).usePresignedURL = + usePresignedURL as R2StorageConfig['usePresignedURL']; + await setBlobStorage(storage); + return storage; +} + +function getProvider(): MockR2Provider { + if (!provider) { + throw new Error('R2 provider is not initialized'); + } + return provider; +} + +async function createBlobUpload( + workspaceId: string, + key: string, + size: number, + mime: string +) { + const data = await gql( + ` + mutation createBlobUpload($workspaceId: String!, $key: String!, $size: Int!, $mime: String!) { + createBlobUpload(workspaceId: $workspaceId, key: $key, size: $size, mime: $mime) { + method + blobKey + alreadyUploaded + uploadUrl + uploadId + partSize + uploadedParts { + partNumber + etag + } + } + } + `, + { workspaceId, key, size, mime }, + 'createBlobUpload' + ); + + return data.createBlobUpload; +} + +async function getBlobUploadPartUrl( + workspaceId: string, + key: string, + uploadId: string, + partNumber: number +) { + const data = await gql( + ` + mutation getBlobUploadPartUrl($workspaceId: String!, $key: String!, $uploadId: String!, $partNumber: Int!) { + getBlobUploadPartUrl(workspaceId: $workspaceId, key: $key, uploadId: $uploadId, partNumber: $partNumber) { + uploadUrl + headers + expiresAt + } + } + `, + { workspaceId, key, uploadId, partNumber }, + 'getBlobUploadPartUrl' + ); + + return data.getBlobUploadPartUrl; +} + +async function setupWorkspace() { + const owner = await app.signup({ feature: 'pro_plan_v1' }); + const workspace = await app.create(Mockers.Workspace, { owner }); + return { owner, workspace }; +} + +async function gql( + query: string, + variables: Record, + operationName: string +): Promise { + const res = await app + .POST('/graphql') + .set({ 'x-request-id': 'test', 'x-operation-name': operationName }) + .send({ query, variables }) + .expect(200); + + if (res.body.errors?.length) { + throw new Error(res.body.errors[0].message); + } + + return res.body.data; +} + +e2e('should proxy single upload with valid signature', async t => { + const { workspace } = await setupWorkspace(); + const buffer = Buffer.from('r2-proxy'); + const key = sha256Base64urlWithPadding(buffer); + + const init = await createBlobUpload( + workspace.id, + key, + buffer.length, + 'text/plain' + ); + + t.is(init.method, 'PRESIGNED'); + t.truthy(init.uploadUrl); + const uploadUrl = new URL(init.uploadUrl, app.url); + t.is(uploadUrl.pathname, PROXY_UPLOAD_PATH); + + const res = await app + .PUT(uploadUrl.pathname + uploadUrl.search) + .set('content-type', 'text/plain') + .set('content-length', buffer.length.toString()) + .send(buffer); + + t.is(res.status, 200); + const calls = getProvider().putCalls; + t.is(calls.length, 1); + t.is(calls[0].key, `${workspace.id}/${key}`); + t.is(calls[0].contentType, 'text/plain'); + t.is(calls[0].contentLength, buffer.length); + t.deepEqual(calls[0].body, buffer); +}); + +e2e('should proxy multipart upload and return etag', async t => { + const { workspace } = await setupWorkspace(); + const key = 'multipart-object'; + const totalSize = MULTIPART_THRESHOLD + 1024; + const init = await createBlobUpload(workspace.id, key, totalSize, 'bin'); + + t.is(init.method, 'MULTIPART'); + t.is(init.uploadId, 'upload-id'); + t.deepEqual(init.uploadedParts, []); + + const part = await getBlobUploadPartUrl(workspace.id, key, init.uploadId, 1); + const partUrl = new URL(part.uploadUrl, app.url); + t.is(partUrl.pathname, PROXY_MULTIPART_PATH); + + const payload = Buffer.from('part-body'); + const res = await app + .PUT(partUrl.pathname + partUrl.search) + .set('content-length', payload.length.toString()) + .send(payload); + + t.is(res.status, 200); + t.is(res.get('etag'), '"etag-1"'); + + const calls = getProvider().partCalls; + t.is(calls.length, 1); + t.is(calls[0].key, `${workspace.id}/${key}`); + t.is(calls[0].uploadId, 'upload-id'); + t.is(calls[0].partNumber, 1); + t.is(calls[0].contentLength, payload.length); + t.deepEqual(calls[0].body, payload); +}); + +e2e('should resume multipart upload and return uploaded parts', async t => { + const { workspace } = await setupWorkspace(); + const key = 'multipart-resume'; + const totalSize = MULTIPART_THRESHOLD + 1024; + + const init1 = await createBlobUpload(workspace.id, key, totalSize, 'bin'); + t.is(init1.method, 'MULTIPART'); + t.is(init1.uploadId, 'upload-id'); + t.deepEqual(init1.uploadedParts, []); + t.is(getProvider().createMultipartCalls, 1); + + const part = await getBlobUploadPartUrl(workspace.id, key, init1.uploadId, 1); + const payload = Buffer.from('part-body'); + const partUrl = new URL(part.uploadUrl, app.url); + await app + .PUT(partUrl.pathname + partUrl.search) + .set('content-length', payload.length.toString()) + .send(payload) + .expect(200); + + const init2 = await createBlobUpload(workspace.id, key, totalSize, 'bin'); + t.is(init2.method, 'MULTIPART'); + t.is(init2.uploadId, 'upload-id'); + t.deepEqual(init2.uploadedParts, [{ partNumber: 1, etag: '"etag-1"' }]); + t.is(getProvider().createMultipartCalls, 1); +}); + +e2e('should reject upload when token is invalid', async t => { + const { workspace } = await setupWorkspace(); + const buffer = Buffer.from('payload'); + const init = await createBlobUpload( + workspace.id, + sha256Base64urlWithPadding(buffer), + buffer.length, + 'text/plain' + ); + const uploadUrl = new URL(init.uploadUrl, app.url); + uploadUrl.searchParams.set('token', 'invalid-token'); + + const res = await app + .PUT(uploadUrl.pathname + uploadUrl.search) + .set('content-type', 'text/plain') + .set('content-length', buffer.length.toString()) + .send(buffer); + + t.is(res.status, 400); + t.is(res.body.message, 'Invalid upload token'); + t.is(getProvider().putCalls.length, 0); +}); + +e2e('should reject upload when url is expired', async t => { + const { workspace } = await setupWorkspace(); + const buffer = Buffer.from('expired'); + const init = await createBlobUpload( + workspace.id, + sha256Base64urlWithPadding(buffer), + buffer.length, + 'text/plain' + ); + const uploadUrl = new URL(init.uploadUrl, app.url); + uploadUrl.searchParams.set( + 'exp', + (Math.floor(Date.now() / 1000) - 1).toString() + ); + + const res = await app + .PUT(uploadUrl.pathname + uploadUrl.search) + .set('content-type', 'text/plain') + .set('content-length', buffer.length.toString()) + .send(buffer); + + t.is(res.status, 400); + t.is(res.body.message, 'Upload URL expired'); + t.is(getProvider().putCalls.length, 0); +}); + +e2e( + 'should fall back to direct presign when custom domain is disabled', + async t => { + await useR2Storage({ + enabled: false, + urlPrefix: undefined, + signKey: undefined, + }); + const { workspace } = await setupWorkspace(); + const buffer = Buffer.from('plain'); + + const init = await createBlobUpload( + workspace.id, + sha256Base64urlWithPadding(buffer), + buffer.length, + 'text/plain' + ); + + t.is(init.method, 'PRESIGNED'); + t.truthy(init.uploadUrl.includes('X-Amz-Algorithm=AWS4-HMAC-SHA256')); + t.not(new URL(init.uploadUrl, app.url).pathname, PROXY_UPLOAD_PATH); + } +); + +e2e( + 'should still fallback to graphql when provider does not support presign', + async t => { + await setBlobStorage(defaultBlobStorage); + const { workspace } = await setupWorkspace(); + const buffer = Buffer.from('graph'); + + const init = await createBlobUpload( + workspace.id, + sha256Base64urlWithPadding(buffer), + buffer.length, + 'text/plain' + ); + + t.is(init.method, 'GRAPHQL'); + } +); + +function sha256Base64urlWithPadding(buffer: Buffer) { + return createHash('sha256') + .update(buffer) + .digest('base64') + .replace(/\+/g, '-') + .replace(/\//g, '_'); +} diff --git a/packages/backend/server/src/__tests__/storage/blob-upload-cleanup.spec.ts b/packages/backend/server/src/__tests__/storage/blob-upload-cleanup.spec.ts new file mode 100644 index 0000000000..7fb71e9a9e --- /dev/null +++ b/packages/backend/server/src/__tests__/storage/blob-upload-cleanup.spec.ts @@ -0,0 +1,106 @@ +import { ScheduleModule } from '@nestjs/schedule'; +import { PrismaClient } from '@prisma/client'; +import ava, { TestFn } from 'ava'; +import Sinon from 'sinon'; + +import { OneDay } from '../../base'; +import { StorageModule, WorkspaceBlobStorage } from '../../core/storage'; +import { BlobUploadCleanupJob } from '../../core/storage/job'; +import { MockUser, MockWorkspace } from '../mocks'; +import { createTestingModule, TestingModule } from '../utils'; + +interface Context { + module: TestingModule; + db: PrismaClient; + job: BlobUploadCleanupJob; + storage: WorkspaceBlobStorage; +} + +const test = ava as TestFn; + +test.before(async t => { + t.context.module = await createTestingModule({ + imports: [ScheduleModule.forRoot(), StorageModule], + }); + + t.context.db = t.context.module.get(PrismaClient); + t.context.job = t.context.module.get(BlobUploadCleanupJob); + t.context.storage = t.context.module.get(WorkspaceBlobStorage); +}); + +test.beforeEach(async t => { + await t.context.module.initTestingDB(); +}); + +test.after.always(async t => { + await t.context.module.close(); +}); + +test('should cleanup expired pending blobs', async t => { + const user = await t.context.module.create(MockUser); + const workspace = await t.context.module.create(MockWorkspace, { + owner: { id: user.id }, + }); + + const expiredAt = new Date(Date.now() - OneDay - 1000); + const activeAt = new Date(); + + await t.context.db.blob.createMany({ + data: [ + { + workspaceId: workspace.id, + key: 'expired-pending', + size: 4, + mime: 'text/plain', + status: 'pending', + uploadId: null, + createdAt: expiredAt, + }, + { + workspaceId: workspace.id, + key: 'expired-multipart', + size: 4, + mime: 'text/plain', + status: 'pending', + uploadId: 'upload-1', + createdAt: expiredAt, + }, + { + workspaceId: workspace.id, + key: 'pending-active', + size: 4, + mime: 'text/plain', + status: 'pending', + uploadId: null, + createdAt: activeAt, + }, + { + workspaceId: workspace.id, + key: 'completed-keep', + size: 4, + mime: 'text/plain', + status: 'completed', + uploadId: null, + createdAt: expiredAt, + }, + ], + }); + + const abortSpy = Sinon.spy(t.context.storage, 'abortMultipartUpload'); + const deleteSpy = Sinon.spy(t.context.storage, 'delete'); + t.teardown(() => { + abortSpy.restore(); + deleteSpy.restore(); + }); + + await t.context.job.cleanExpiredPendingBlobs(); + + t.is(abortSpy.callCount, 1); + t.is(deleteSpy.callCount, 2); + + const remaining = await t.context.db.blob.findMany({ + where: { workspaceId: workspace.id }, + }); + const remainingKeys = remaining.map(record => record.key).sort(); + t.deepEqual(remainingKeys, ['completed-keep', 'pending-active']); +}); diff --git a/packages/backend/server/src/__tests__/utils/blobs.ts b/packages/backend/server/src/__tests__/utils/blobs.ts index f92163af99..4bb2f340ec 100644 --- a/packages/backend/server/src/__tests__/utils/blobs.ts +++ b/packages/backend/server/src/__tests__/utils/blobs.ts @@ -88,3 +88,82 @@ export async function setBlob( } return res.body.data.setBlob; } + +export async function createBlobUpload( + app: TestingApp, + workspaceId: string, + key: string, + size: number, + mime: string +) { + const res = await app.gql( + ` + mutation createBlobUpload($workspaceId: String!, $key: String!, $size: Int!, $mime: String!) { + createBlobUpload(workspaceId: $workspaceId, key: $key, size: $size, mime: $mime) { + method + blobKey + uploadUrl + uploadId + partSize + } + } + `, + { + workspaceId, + key, + size, + mime, + } + ); + return res.createBlobUpload; +} + +export async function completeBlobUpload( + app: TestingApp, + workspaceId: string, + key: string, + options?: { + uploadId?: string; + parts?: { partNumber: number; etag: string }[]; + } +) { + const res = await app.gql( + ` + mutation completeBlobUpload($workspaceId: String!, $key: String!, $uploadId: String, $parts: [BlobUploadPartInput!]) { + completeBlobUpload(workspaceId: $workspaceId, key: $key, uploadId: $uploadId, parts: $parts) + } + `, + { + workspaceId, + key, + uploadId: options?.uploadId, + parts: options?.parts, + } + ); + return res.completeBlobUpload; +} + +export async function getBlobUploadPartUrl( + app: TestingApp, + workspaceId: string, + key: string, + uploadId: string, + partNumber: number +) { + const res = await app.gql( + ` + mutation getBlobUploadPartUrl($workspaceId: String!, $key: String!, $uploadId: String!, $partNumber: Int!) { + getBlobUploadPartUrl(workspaceId: $workspaceId, key: $key, uploadId: $uploadId, partNumber: $partNumber) { + uploadUrl + } + } + `, + { + workspaceId, + key, + uploadId, + partNumber, + } + ); + return res.getBlobUploadPartUrl; +} diff --git a/packages/backend/server/src/__tests__/workspace/blobs.e2e.ts b/packages/backend/server/src/__tests__/workspace/blobs.e2e.ts index c6e2c431a7..4698413623 100644 --- a/packages/backend/server/src/__tests__/workspace/blobs.e2e.ts +++ b/packages/backend/server/src/__tests__/workspace/blobs.e2e.ts @@ -1,13 +1,19 @@ +import { createHash } from 'node:crypto'; + import test from 'ava'; import Sinon from 'sinon'; +import { Config, StorageProviderFactory } from '../../base'; import { WorkspaceBlobStorage } from '../../core/storage/wrappers/blob'; -import { WorkspaceFeatureModel } from '../../models'; +import { BlobModel, WorkspaceFeatureModel } from '../../models'; import { collectAllBlobSizes, + completeBlobUpload, + createBlobUpload, createTestingApp, createWorkspace, deleteWorkspace, + getBlobUploadPartUrl, getWorkspaceBlobsSize, listBlobs, setBlob, @@ -80,6 +86,95 @@ test('should list blobs', async t => { t.deepEqual(ret.map(x => x.key).sort(), [hash1, hash2].sort()); }); +test('should create pending blob upload with graphql fallback', async t => { + await app.signupV1('u1@affine.pro'); + + const workspace = await createWorkspace(app); + const key = `upload-${Math.random().toString(16).slice(2, 8)}`; + const size = 4; + const mime = 'text/plain'; + + const init = await createBlobUpload(app, workspace.id, key, size, mime); + t.is(init.method, 'GRAPHQL'); + t.is(init.blobKey, key); + + const blobModel = app.get(BlobModel); + const record = await blobModel.get(workspace.id, key); + t.truthy(record); + t.is(record?.status, 'pending'); + + const listed = await listBlobs(app, workspace.id); + t.is(listed.length, 0); +}); + +test('should complete pending blob upload', async t => { + await app.signupV1('u1@affine.pro'); + + const workspace = await createWorkspace(app); + const buffer = Buffer.from('done'); + const mime = 'text/plain'; + const key = sha256Base64urlWithPadding(buffer); + + await createBlobUpload(app, workspace.id, key, buffer.length, mime); + + const config = app.get(Config); + const factory = app.get(StorageProviderFactory); + const provider = factory.create(config.storages.blob.storage); + + await provider.put(`${workspace.id}/${key}`, buffer, { + contentType: mime, + contentLength: buffer.length, + }); + + const completed = await completeBlobUpload(app, workspace.id, key); + t.is(completed, key); + + const blobModel = app.get(BlobModel); + const record = await blobModel.get(workspace.id, key); + t.truthy(record); + t.is(record?.status, 'completed'); + + const listed = await listBlobs(app, workspace.id); + t.is(listed.length, 1); +}); + +test('should reject complete when blob key mismatched', async t => { + await app.signupV1('u1@affine.pro'); + + const workspace = await createWorkspace(app); + const buffer = Buffer.from('mismatch'); + const mime = 'text/plain'; + + const wrongKey = sha256Base64urlWithPadding(Buffer.from('other')); + await createBlobUpload(app, workspace.id, wrongKey, buffer.length, mime); + + const config = app.get(Config); + const factory = app.get(StorageProviderFactory); + const provider = factory.create(config.storages.blob.storage); + + await provider.put(`${workspace.id}/${wrongKey}`, buffer, { + contentType: mime, + contentLength: buffer.length, + }); + + await t.throwsAsync(() => completeBlobUpload(app, workspace.id, wrongKey), { + message: 'Blob key mismatch', + }); +}); + +test('should reject multipart upload part url on fs provider', async t => { + await app.signupV1('u1@affine.pro'); + + const workspace = await createWorkspace(app); + + await t.throwsAsync( + () => getBlobUploadPartUrl(app, workspace.id, 'blob-key', 'upload', 1), + { + message: 'Multipart upload is not supported', + } + ); +}); + test('should auto delete blobs when workspace is deleted', async t => { await app.signupV1('u1@affine.pro'); @@ -185,3 +280,11 @@ test('should throw error when blob size large than max file size', async t => { 'HTTP request error, message: File truncated as it exceeds the 10485760 byte size limit.', }); }); + +function sha256Base64urlWithPadding(buffer: Buffer) { + return createHash('sha256') + .update(buffer) + .digest('base64') + .replace(/\+/g, '-') + .replace(/\//g, '_'); +} diff --git a/packages/backend/server/src/base/error/def.ts b/packages/backend/server/src/base/error/def.ts index 05a2ec7c5c..bc34c4994d 100644 --- a/packages/backend/server/src/base/error/def.ts +++ b/packages/backend/server/src/base/error/def.ts @@ -506,6 +506,10 @@ export const USER_FRIENDLY_ERRORS = { message: ({ spaceId, blobId }) => `Blob ${blobId} not found in Space ${spaceId}.`, }, + blob_invalid: { + type: 'invalid_input', + message: 'Blob is invalid.', + }, expect_to_publish_doc: { type: 'invalid_input', message: 'Expected to publish a doc, not a Space.', diff --git a/packages/backend/server/src/base/error/errors.gen.ts b/packages/backend/server/src/base/error/errors.gen.ts index 7c6047d7ff..88fbe46e4e 100644 --- a/packages/backend/server/src/base/error/errors.gen.ts +++ b/packages/backend/server/src/base/error/errors.gen.ts @@ -437,6 +437,12 @@ export class BlobNotFound extends UserFriendlyError { } } +export class BlobInvalid extends UserFriendlyError { + constructor(message?: string) { + super('invalid_input', 'blob_invalid', message); + } +} + export class ExpectToPublishDoc extends UserFriendlyError { constructor(message?: string) { super('invalid_input', 'expect_to_publish_doc', message); @@ -1166,6 +1172,7 @@ export enum ErrorNames { INVALID_HISTORY_TIMESTAMP, DOC_HISTORY_NOT_FOUND, BLOB_NOT_FOUND, + BLOB_INVALID, EXPECT_TO_PUBLISH_DOC, EXPECT_TO_REVOKE_PUBLIC_DOC, EXPECT_TO_GRANT_DOC_USER_ROLES, diff --git a/packages/backend/server/src/base/redis/instances.ts b/packages/backend/server/src/base/redis/instances.ts index aca83c9d44..506d2cb7fb 100644 --- a/packages/backend/server/src/base/redis/instances.ts +++ b/packages/backend/server/src/base/redis/instances.ts @@ -19,8 +19,12 @@ class Redis extends IORedis implements OnModuleInit, OnModuleDestroy { this.on('error', this.errorHandler); } - onModuleDestroy() { - this.disconnect(); + async onModuleDestroy() { + try { + await this.quit(); + } catch { + this.disconnect(); + } } override duplicate(override?: Partial): IORedis { diff --git a/packages/backend/server/src/base/storage/__tests__/s3.spec.ts b/packages/backend/server/src/base/storage/__tests__/s3.spec.ts new file mode 100644 index 0000000000..8d789a73a7 --- /dev/null +++ b/packages/backend/server/src/base/storage/__tests__/s3.spec.ts @@ -0,0 +1,80 @@ +import test from 'ava'; + +import { S3StorageProvider } from '../providers/s3'; +import { SIGNED_URL_EXPIRED } from '../providers/utils'; + +const config = { + region: 'auto', + credentials: { + accessKeyId: 'test', + secretAccessKey: 'test', + }, +}; + +function createProvider() { + return new S3StorageProvider(config, 'test-bucket'); +} + +test('presignPut should return url and headers', async t => { + const provider = createProvider(); + const result = await provider.presignPut('key', { + contentType: 'text/plain', + }); + + t.truthy(result); + t.true(result!.url.length > 0); + t.true(result!.url.includes('X-Amz-Algorithm=AWS4-HMAC-SHA256')); + t.deepEqual(result!.headers, { 'Content-Type': 'text/plain' }); + const now = Date.now(); + t.true(result!.expiresAt.getTime() >= now + SIGNED_URL_EXPIRED * 1000 - 2000); + t.true(result!.expiresAt.getTime() <= now + SIGNED_URL_EXPIRED * 1000 + 2000); +}); + +test('presignUploadPart should return url', async t => { + const provider = createProvider(); + const result = await provider.presignUploadPart('key', 'upload-1', 3); + + t.truthy(result); + t.true(result!.url.length > 0); + t.true(result!.url.includes('X-Amz-Algorithm=AWS4-HMAC-SHA256')); +}); + +test('createMultipartUpload should return uploadId', async t => { + const provider = createProvider(); + let receivedCommand: any; + const sendStub = async (command: any) => { + receivedCommand = command; + return { UploadId: 'upload-1' }; + }; + (provider as any).client = { send: sendStub }; + + const now = Date.now(); + const result = await provider.createMultipartUpload('key', { + contentType: 'text/plain', + }); + + t.is(result?.uploadId, 'upload-1'); + t.true(result!.expiresAt.getTime() >= now + SIGNED_URL_EXPIRED * 1000 - 2000); + t.true(result!.expiresAt.getTime() <= now + SIGNED_URL_EXPIRED * 1000 + 2000); + t.is(receivedCommand.input.Key, 'key'); + t.is(receivedCommand.input.ContentType, 'text/plain'); +}); + +test('completeMultipartUpload should order parts', async t => { + const provider = createProvider(); + let called = false; + const sendStub = async (command: any) => { + called = true; + t.deepEqual(command.input.MultipartUpload.Parts, [ + { ETag: 'a', PartNumber: 1 }, + { ETag: 'b', PartNumber: 2 }, + ]); + }; + (provider as any).client = { send: sendStub }; + + await provider.completeMultipartUpload('key', 'upload-1', [ + { partNumber: 2, etag: 'b' }, + { partNumber: 1, etag: 'a' }, + ]); + t.true(called); +}); diff --git a/packages/backend/server/src/base/storage/providers/index.ts b/packages/backend/server/src/base/storage/providers/index.ts index 4617a0cdd7..6a9096d272 100644 --- a/packages/backend/server/src/base/storage/providers/index.ts +++ b/packages/backend/server/src/base/storage/providers/index.ts @@ -118,7 +118,7 @@ export const StorageJSONSchema: JSONSchema = { urlPrefix: { type: 'string' as const, description: - 'The presigned url prefix for the cloudflare r2 storage provider.\nsee https://developers.cloudflare.com/waf/custom-rules/use-cases/configure-token-authentication/ to configure it.\nExample value: "https://storage.example.com"\nExample rule: is_timed_hmac_valid_v0("your_secret", http.request.uri, 10800, http.request.timestamp.sec, 6)', + 'The custom domain URL prefix for the cloudflare r2 storage provider.\nWhen `enabled=true` and `urlPrefix` + `signKey` are provided, the server will:\n- Redirect GET requests to this custom domain with an HMAC token.\n- Return upload URLs under `/api/storage/*` for uploads.\nPresigned/upload proxy TTL is 1 hour.\nsee https://developers.cloudflare.com/waf/custom-rules/use-cases/configure-token-authentication/ to configure it.\nExample value: "https://storage.example.com"\nExample rule: is_timed_hmac_valid_v0("your_secret", http.request.uri, 10800, http.request.timestamp.sec, 6)', }, signKey: { type: 'string' as const, @@ -135,4 +135,12 @@ export const StorageJSONSchema: JSONSchema = { }; export type * from './provider'; -export { applyAttachHeaders, autoMetadata, sniffMime, toBuffer } from './utils'; +export { + applyAttachHeaders, + autoMetadata, + PROXY_MULTIPART_PATH, + PROXY_UPLOAD_PATH, + sniffMime, + STORAGE_PROXY_ROOT, + toBuffer, +} from './utils'; diff --git a/packages/backend/server/src/base/storage/providers/provider.ts b/packages/backend/server/src/base/storage/providers/provider.ts index 2c765e20d0..82ee5dbbc6 100644 --- a/packages/backend/server/src/base/storage/providers/provider.ts +++ b/packages/backend/server/src/base/storage/providers/provider.ts @@ -25,12 +25,51 @@ export interface ListObjectsMetadata { export type BlobInputType = Buffer | Readable | string; export type BlobOutputType = Readable; +export interface PresignedUpload { + url: string; + headers?: Record; + expiresAt: Date; +} + +export interface MultipartUploadInit { + uploadId: string; + expiresAt: Date; +} + +export interface MultipartUploadPart { + partNumber: number; + etag: string; +} + export interface StorageProvider { put( key: string, body: BlobInputType, metadata?: PutObjectMetadata ): Promise; + presignPut?( + key: string, + metadata?: PutObjectMetadata + ): Promise; + createMultipartUpload?( + key: string, + metadata?: PutObjectMetadata + ): Promise; + presignUploadPart?( + key: string, + uploadId: string, + partNumber: number + ): Promise; + listMultipartUploadParts?( + key: string, + uploadId: string + ): Promise; + completeMultipartUpload?( + key: string, + uploadId: string, + parts: MultipartUploadPart[] + ): Promise; + abortMultipartUpload?(key: string, uploadId: string): Promise; head(key: string): Promise; get( key: string, diff --git a/packages/backend/server/src/base/storage/providers/r2.ts b/packages/backend/server/src/base/storage/providers/r2.ts index e89c745226..5b20643143 100644 --- a/packages/backend/server/src/base/storage/providers/r2.ts +++ b/packages/backend/server/src/base/storage/providers/r2.ts @@ -1,10 +1,20 @@ import assert from 'node:assert'; import { Readable } from 'node:stream'; +import { PutObjectCommand, UploadPartCommand } from '@aws-sdk/client-s3'; import { Logger } from '@nestjs/common'; -import { GetObjectMetadata } from './provider'; +import { + GetObjectMetadata, + PresignedUpload, + PutObjectMetadata, +} from './provider'; import { S3StorageConfig, S3StorageProvider } from './s3'; +import { + PROXY_MULTIPART_PATH, + PROXY_UPLOAD_PATH, + SIGNED_URL_EXPIRED, +} from './utils'; export interface R2StorageConfig extends S3StorageConfig { accountId: string; @@ -39,8 +49,24 @@ export class R2StorageProvider extends S3StorageProvider { this.key = this.encoder.encode(config.usePresignedURL?.signKey ?? ''); } - private async signUrl(url: URL): Promise { - const timestamp = Math.floor(Date.now() / 1000); + private get shouldUseProxyUpload() { + const { usePresignedURL } = this.config; + return ( + !!usePresignedURL?.enabled && + !!usePresignedURL.signKey && + this.key.length > 0 + ); + } + + private parseWorkspaceKey(fullKey: string) { + const [workspaceId, ...rest] = fullKey.split('/'); + if (!workspaceId || rest.length !== 1) { + return null; + } + return { workspaceId, key: rest.join('/') }; + } + + private async signPayload(payload: string) { const key = await crypto.subtle.importKey( 'raw', this.key, @@ -51,14 +77,140 @@ export class R2StorageProvider extends S3StorageProvider { const mac = await crypto.subtle.sign( 'HMAC', key, - this.encoder.encode(`${url.pathname}${timestamp}`) + this.encoder.encode(payload) ); - const base64Mac = Buffer.from(mac).toString('base64'); + return Buffer.from(mac).toString('base64'); + } + + private async signUrl(url: URL): Promise { + const timestamp = Math.floor(Date.now() / 1000); + const base64Mac = await this.signPayload(`${url.pathname}${timestamp}`); url.searchParams.set('sign', `${timestamp}-${base64Mac}`); return url.toString(); } + private async createProxyUrl( + path: string, + canonicalFields: (string | number | undefined)[], + query: Record + ) { + const exp = Math.floor(Date.now() / 1000) + SIGNED_URL_EXPIRED; + const canonical = [ + path, + ...canonicalFields.map(field => + field === undefined ? '' : field.toString() + ), + exp.toString(), + ].join('\n'); + const token = await this.signPayload(canonical); + + const url = new URL(`http://localhost${path}`); + for (const [key, value] of Object.entries(query)) { + if (value === undefined) continue; + url.searchParams.set(key, value.toString()); + } + url.searchParams.set('exp', exp.toString()); + url.searchParams.set('token', `${exp}-${token}`); + + return { url: url.pathname + url.search, expiresAt: new Date(exp * 1000) }; + } + + override async presignPut( + key: string, + metadata: PutObjectMetadata = {} + ): Promise { + if (!this.shouldUseProxyUpload) { + return super.presignPut(key, metadata); + } + + const parsed = this.parseWorkspaceKey(key); + if (!parsed) { + return super.presignPut(key, metadata); + } + + const contentType = metadata.contentType ?? 'application/octet-stream'; + const { url, expiresAt } = await this.createProxyUrl( + PROXY_UPLOAD_PATH, + [parsed.workspaceId, parsed.key, contentType, metadata.contentLength], + { + workspaceId: parsed.workspaceId, + key: parsed.key, + contentType, + contentLength: metadata.contentLength, + } + ); + + return { + url, + headers: { 'Content-Type': contentType }, + expiresAt, + }; + } + + override async presignUploadPart( + key: string, + uploadId: string, + partNumber: number + ): Promise { + if (!this.shouldUseProxyUpload) { + return super.presignUploadPart(key, uploadId, partNumber); + } + + const parsed = this.parseWorkspaceKey(key); + if (!parsed) { + return super.presignUploadPart(key, uploadId, partNumber); + } + + return this.createProxyUrl( + PROXY_MULTIPART_PATH, + [parsed.workspaceId, parsed.key, uploadId, partNumber], + { + workspaceId: parsed.workspaceId, + key: parsed.key, + uploadId, + partNumber, + } + ); + } + + async proxyPutObject( + key: string, + body: Readable | Buffer | Uint8Array | string, + options: { contentType?: string; contentLength?: number } = {} + ) { + return this.client.send( + new PutObjectCommand({ + Bucket: this.bucket, + Key: key, + Body: body, + ContentType: options.contentType, + ContentLength: options.contentLength, + }) + ); + } + + async proxyUploadPart( + key: string, + uploadId: string, + partNumber: number, + body: Readable | Buffer | Uint8Array | string, + options: { contentLength?: number } = {} + ) { + const result = await this.client.send( + new UploadPartCommand({ + Bucket: this.bucket, + Key: key, + UploadId: uploadId, + PartNumber: partNumber, + Body: body, + ContentLength: options.contentLength, + }) + ); + + return result.ETag; + } + override async get( key: string, signedUrl?: boolean diff --git a/packages/backend/server/src/base/storage/providers/s3.ts b/packages/backend/server/src/base/storage/providers/s3.ts index 188a1c18de..c6a693c883 100644 --- a/packages/backend/server/src/base/storage/providers/s3.ts +++ b/packages/backend/server/src/base/storage/providers/s3.ts @@ -2,15 +2,21 @@ import { Readable } from 'node:stream'; import { + AbortMultipartUploadCommand, + CompleteMultipartUploadCommand, + CreateMultipartUploadCommand, DeleteObjectCommand, GetObjectCommand, HeadObjectCommand, ListObjectsV2Command, + ListPartsCommand, NoSuchKey, + NoSuchUpload, NotFound, PutObjectCommand, S3Client, S3ClientConfig, + UploadPartCommand, } from '@aws-sdk/client-s3'; import { getSignedUrl } from '@aws-sdk/s3-request-presigner'; import { Logger } from '@nestjs/common'; @@ -19,6 +25,9 @@ import { BlobInputType, GetObjectMetadata, ListObjectsMetadata, + MultipartUploadInit, + MultipartUploadPart, + PresignedUpload, PutObjectMetadata, StorageProvider, } from './provider'; @@ -89,6 +98,196 @@ export class S3StorageProvider implements StorageProvider { } } + async presignPut( + key: string, + metadata: PutObjectMetadata = {} + ): Promise { + try { + const contentType = metadata.contentType ?? 'application/octet-stream'; + const url = await getSignedUrl( + this.client, + new PutObjectCommand({ + Bucket: this.bucket, + Key: key, + ContentType: contentType, + }), + { expiresIn: SIGNED_URL_EXPIRED } + ); + + return { + url, + headers: { 'Content-Type': contentType }, + expiresAt: new Date(Date.now() + SIGNED_URL_EXPIRED * 1000), + }; + } catch (e) { + this.logger.error( + `Failed to presign put object (${JSON.stringify({ + key, + bucket: this.bucket, + metadata, + })}` + ); + throw e; + } + } + + async createMultipartUpload( + key: string, + metadata: PutObjectMetadata = {} + ): Promise { + try { + const contentType = metadata.contentType ?? 'application/octet-stream'; + const response = await this.client.send( + new CreateMultipartUploadCommand({ + Bucket: this.bucket, + Key: key, + ContentType: contentType, + }) + ); + + if (!response.UploadId) { + return; + } + + return { + uploadId: response.UploadId, + expiresAt: new Date(Date.now() + SIGNED_URL_EXPIRED * 1000), + }; + } catch (e) { + this.logger.error( + `Failed to create multipart upload (${JSON.stringify({ + key, + bucket: this.bucket, + metadata, + })}` + ); + throw e; + } + } + + async presignUploadPart( + key: string, + uploadId: string, + partNumber: number + ): Promise { + try { + const url = await getSignedUrl( + this.client, + new UploadPartCommand({ + Bucket: this.bucket, + Key: key, + UploadId: uploadId, + PartNumber: partNumber, + }), + { expiresIn: SIGNED_URL_EXPIRED } + ); + + return { + url, + expiresAt: new Date(Date.now() + SIGNED_URL_EXPIRED * 1000), + }; + } catch (e) { + this.logger.error( + `Failed to presign upload part (${JSON.stringify({ key, bucket: this.bucket, uploadId, partNumber })}` + ); + throw e; + } + } + + async listMultipartUploadParts( + key: string, + uploadId: string + ): Promise { + const parts: MultipartUploadPart[] = []; + let partNumberMarker: string | undefined; + + try { + // ListParts is paginated by part number marker + // https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html + // R2 follows S3 semantics here. + while (true) { + const response = await this.client.send( + new ListPartsCommand({ + Bucket: this.bucket, + Key: key, + UploadId: uploadId, + PartNumberMarker: partNumberMarker, + }) + ); + + for (const part of response.Parts ?? []) { + if (!part.PartNumber || !part.ETag) { + continue; + } + parts.push({ partNumber: part.PartNumber, etag: part.ETag }); + } + + if (!response.IsTruncated) { + break; + } + + if (response.NextPartNumberMarker === undefined) { + break; + } + + partNumberMarker = response.NextPartNumberMarker; + } + + return parts; + } catch (e) { + // the upload may have been aborted/expired by provider lifecycle rules + if (e instanceof NoSuchUpload || e instanceof NotFound) { + return undefined; + } + this.logger.error(`Failed to list multipart upload parts for \`${key}\``); + throw e; + } + } + + async completeMultipartUpload( + key: string, + uploadId: string, + parts: MultipartUploadPart[] + ): Promise { + try { + const orderedParts = [...parts].sort( + (left, right) => left.partNumber - right.partNumber + ); + + await this.client.send( + new CompleteMultipartUploadCommand({ + Bucket: this.bucket, + Key: key, + UploadId: uploadId, + MultipartUpload: { + Parts: orderedParts.map(part => ({ + ETag: part.etag, + PartNumber: part.partNumber, + })), + }, + }) + ); + } catch (e) { + this.logger.error(`Failed to complete multipart upload for \`${key}\``); + throw e; + } + } + + async abortMultipartUpload(key: string, uploadId: string): Promise { + try { + await this.client.send( + new AbortMultipartUploadCommand({ + Bucket: this.bucket, + Key: key, + UploadId: uploadId, + }) + ); + } catch (e) { + this.logger.error(`Failed to abort multipart upload for \`${key}\``); + throw e; + } + } + async head(key: string) { try { const obj = await this.client.send( @@ -164,7 +363,7 @@ export class S3StorageProvider implements StorageProvider { body: obj.Body, metadata: { // always set when putting object - contentType: obj.ContentType!, + contentType: obj.ContentType ?? 'application/octet-stream', contentLength: obj.ContentLength!, lastModified: obj.LastModified!, checksumCRC32: obj.ChecksumCRC32, diff --git a/packages/backend/server/src/base/storage/providers/utils.ts b/packages/backend/server/src/base/storage/providers/utils.ts index 9ded0cd5b6..cda601bc76 100644 --- a/packages/backend/server/src/base/storage/providers/utils.ts +++ b/packages/backend/server/src/base/storage/providers/utils.ts @@ -94,3 +94,7 @@ export function sniffMime( } export const SIGNED_URL_EXPIRED = 60 * 60; // 1 hour + +export const STORAGE_PROXY_ROOT = '/api/storage'; +export const PROXY_UPLOAD_PATH = `${STORAGE_PROXY_ROOT}/upload`; +export const PROXY_MULTIPART_PATH = `${STORAGE_PROXY_ROOT}/multipart`; diff --git a/packages/backend/server/src/core/storage/constants.ts b/packages/backend/server/src/core/storage/constants.ts new file mode 100644 index 0000000000..bbbff8f0ca --- /dev/null +++ b/packages/backend/server/src/core/storage/constants.ts @@ -0,0 +1,4 @@ +import { OneMB } from '../../base'; + +export const MULTIPART_PART_SIZE = 5 * OneMB; +export const MULTIPART_THRESHOLD = 10 * OneMB; diff --git a/packages/backend/server/src/core/storage/index.ts b/packages/backend/server/src/core/storage/index.ts index 239f1e44ff..ffa3e1fb9b 100644 --- a/packages/backend/server/src/core/storage/index.ts +++ b/packages/backend/server/src/core/storage/index.ts @@ -2,6 +2,8 @@ import './config'; import { Module } from '@nestjs/common'; +import { BlobUploadCleanupJob } from './job'; +import { R2UploadController } from './r2-proxy'; import { AvatarStorage, CommentAttachmentStorage, @@ -9,7 +11,13 @@ import { } from './wrappers'; @Module({ - providers: [WorkspaceBlobStorage, AvatarStorage, CommentAttachmentStorage], + controllers: [R2UploadController], + providers: [ + WorkspaceBlobStorage, + AvatarStorage, + CommentAttachmentStorage, + BlobUploadCleanupJob, + ], exports: [WorkspaceBlobStorage, AvatarStorage, CommentAttachmentStorage], }) export class StorageModule {} diff --git a/packages/backend/server/src/core/storage/job.ts b/packages/backend/server/src/core/storage/job.ts new file mode 100644 index 0000000000..8fef65cddd --- /dev/null +++ b/packages/backend/server/src/core/storage/job.ts @@ -0,0 +1,54 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; + +import { JobQueue, OneDay, OnJob } from '../../base'; +import { Models } from '../../models'; +import { WorkspaceBlobStorage } from './wrappers/blob'; + +declare global { + interface Jobs { + 'nightly.cleanExpiredPendingBlobs': {}; + } +} + +@Injectable() +export class BlobUploadCleanupJob { + private readonly logger = new Logger(BlobUploadCleanupJob.name); + + constructor( + private readonly models: Models, + private readonly storage: WorkspaceBlobStorage, + private readonly queue: JobQueue + ) {} + + @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT) + async nightlyJob() { + await this.queue.add( + 'nightly.cleanExpiredPendingBlobs', + {}, + { + jobId: 'nightly-blob-clean-expired-pending', + } + ); + } + + @OnJob('nightly.cleanExpiredPendingBlobs') + async cleanExpiredPendingBlobs() { + const cutoff = new Date(Date.now() - OneDay); + const pending = await this.models.blob.listPendingExpired(cutoff); + + for (const blob of pending) { + if (blob.uploadId) { + await this.storage.abortMultipartUpload( + blob.workspaceId, + blob.key, + blob.uploadId + ); + } + + await this.storage.delete(blob.workspaceId, blob.key, true); + } + + this.logger.log(`cleaned ${pending.length} expired pending blobs`); + } +} diff --git a/packages/backend/server/src/core/storage/r2-proxy.ts b/packages/backend/server/src/core/storage/r2-proxy.ts new file mode 100644 index 0000000000..c93fff7ca3 --- /dev/null +++ b/packages/backend/server/src/core/storage/r2-proxy.ts @@ -0,0 +1,331 @@ +import { createHmac, timingSafeEqual } from 'node:crypto'; + +import { Controller, Logger, Put, Req, Res } from '@nestjs/common'; +import type { Request, Response } from 'express'; + +import { + BlobInvalid, + CallMetric, + Config, + OnEvent, + PROXY_MULTIPART_PATH, + PROXY_UPLOAD_PATH, + STORAGE_PROXY_ROOT, + StorageProviderConfig, + StorageProviderFactory, +} from '../../base'; +import { + R2StorageConfig, + R2StorageProvider, +} from '../../base/storage/providers/r2'; +import { Models } from '../../models'; +import { Public } from '../auth/guard'; +import { MULTIPART_PART_SIZE } from './constants'; + +type R2BlobStorageConfig = StorageProviderConfig & { + provider: 'cloudflare-r2'; + config: R2StorageConfig; +}; + +type QueryValue = Request['query'][string]; + +type R2Config = { + storage: R2BlobStorageConfig; + signKey: string; +}; + +@Controller(STORAGE_PROXY_ROOT) +export class R2UploadController { + private readonly logger = new Logger(R2UploadController.name); + private provider: R2StorageProvider | null = null; + + constructor( + private readonly config: Config, + private readonly models: Models, + private readonly storageFactory: StorageProviderFactory + ) {} + + @OnEvent('config.changed') + onConfigChanged(event: Events['config.changed']) { + if (event.updates.storages?.blob?.storage) { + this.provider = null; + } + } + + private getR2Config(): R2Config { + const storage = this.config.storages.blob.storage as StorageProviderConfig; + if (storage.provider !== 'cloudflare-r2') { + throw new BlobInvalid('Invalid endpoint'); + } + const r2Config = storage.config as R2StorageConfig; + const signKey = r2Config.usePresignedURL?.signKey; + if ( + !r2Config.usePresignedURL?.enabled || + !r2Config.usePresignedURL.urlPrefix || + !signKey + ) { + throw new BlobInvalid('Invalid endpoint'); + } + return { storage: storage as R2BlobStorageConfig, signKey }; + } + + private getProvider(storage: R2BlobStorageConfig) { + if (!this.provider) { + const candidate = this.storageFactory.create(storage); + if (candidate instanceof R2StorageProvider) { + this.provider = candidate; + } + } + return this.provider; + } + + private sign(canonical: string, signKey: string) { + return createHmac('sha256', signKey).update(canonical).digest('base64'); + } + + private safeEqual(expected: string, actual: string) { + const a = Buffer.from(expected); + const b = Buffer.from(actual); + + if (a.length !== b.length) { + return false; + } + + return timingSafeEqual(a, b); + } + + private verifyToken( + path: string, + canonicalFields: (string | number | undefined)[], + exp: number, + token: string, + signKey: string + ) { + const canonical = [ + path, + ...canonicalFields.map(field => + field === undefined ? '' : field.toString() + ), + exp.toString(), + ].join('\n'); + const expected = `${exp}-${this.sign(canonical, signKey)}`; + + return this.safeEqual(expected, token); + } + + private expectString(value: QueryValue, field: string): string { + if (Array.isArray(value)) { + return String(value[0]); + } + if (typeof value === 'string' && value.length > 0) { + return value; + } + throw new BlobInvalid(`Missing ${field}.`); + } + + private optionalString(value: QueryValue) { + if (Array.isArray(value)) { + return String(value[0]); + } + return typeof value === 'string' && value.length > 0 ? value : undefined; + } + + private number(value: QueryValue, field: string): number { + const str = this.expectString(value, field); + const num = Number(str); + if (!Number.isFinite(num)) { + throw new BlobInvalid(`Invalid ${field}.`); + } + return num; + } + + private optionalNumber(value: QueryValue, field: string): number | undefined { + if (value === undefined) { + return undefined; + } + const num = Number(Array.isArray(value) ? value[0] : value); + if (!Number.isFinite(num)) { + throw new BlobInvalid(`Invalid ${field}.`); + } + return num; + } + + private parseContentLength(req: Request) { + const raw = req.header('content-length'); + if (!raw) { + return undefined; + } + const num = Number(raw); + if (!Number.isFinite(num) || num < 0) { + throw new BlobInvalid('Invalid Content-Length header'); + } + return num; + } + + private ensureNotExpired(exp: number) { + const now = Math.floor(Date.now() / 1000); + if (exp < now) { + throw new BlobInvalid('Upload URL expired'); + } + } + + @Public() + @Put('upload') + @CallMetric('controllers', 'r2_proxy_upload') + async upload(@Req() req: Request, @Res() res: Response) { + const { storage, signKey } = this.getR2Config(); + + const workspaceId = this.expectString(req.query.workspaceId, 'workspaceId'); + const key = this.expectString(req.query.key, 'key'); + const token = this.expectString(req.query.token, 'token'); + const exp = this.number(req.query.exp, 'exp'); + const contentType = this.optionalString(req.query.contentType); + const contentLengthFromQuery = this.optionalNumber( + req.query.contentLength, + 'contentLength' + ); + + this.ensureNotExpired(exp); + + if ( + !this.verifyToken( + PROXY_UPLOAD_PATH, + [workspaceId, key, contentType, contentLengthFromQuery], + exp, + token, + signKey + ) + ) { + throw new BlobInvalid('Invalid upload token'); + } + + const record = await this.models.blob.get(workspaceId, key); + if (!record) { + throw new BlobInvalid('Blob upload is not initialized'); + } + if (record.status === 'completed') { + throw new BlobInvalid('Blob upload is already completed'); + } + + const contentLengthHeader = this.parseContentLength(req); + if ( + contentLengthFromQuery !== undefined && + contentLengthHeader !== undefined && + contentLengthFromQuery !== contentLengthHeader + ) { + throw new BlobInvalid('Content length mismatch'); + } + + const contentLength = contentLengthHeader ?? contentLengthFromQuery; + if (contentLength === undefined) { + throw new BlobInvalid('Missing Content-Length header'); + } + if (record.size && contentLength !== record.size) { + throw new BlobInvalid('Content length does not match upload metadata'); + } + + const mime = contentType ?? record.mime; + if (record.mime && mime && record.mime !== mime) { + throw new BlobInvalid('Mime type mismatch'); + } + + const provider = this.getProvider(storage); + if (!provider) { + throw new BlobInvalid('R2 provider is not available'); + } + + try { + await provider.proxyPutObject(`${workspaceId}/${key}`, req, { + contentType: mime, + contentLength, + }); + } catch (error) { + this.logger.error('Failed to proxy upload', error as Error); + throw new BlobInvalid('Upload failed'); + } + + res.status(200).end(); + } + + @Public() + @Put('multipart') + @CallMetric('controllers', 'r2_proxy_multipart') + async uploadPart(@Req() req: Request, @Res() res: Response) { + const { storage, signKey } = this.getR2Config(); + + const workspaceId = this.expectString(req.query.workspaceId, 'workspaceId'); + const key = this.expectString(req.query.key, 'key'); + const uploadId = this.expectString(req.query.uploadId, 'uploadId'); + const token = this.expectString(req.query.token, 'token'); + const exp = this.number(req.query.exp, 'exp'); + const partNumber = this.number(req.query.partNumber, 'partNumber'); + + if (partNumber < 1) { + throw new BlobInvalid('Invalid part number'); + } + + this.ensureNotExpired(exp); + + if ( + !this.verifyToken( + PROXY_MULTIPART_PATH, + [workspaceId, key, uploadId, partNumber], + exp, + token, + signKey + ) + ) { + throw new BlobInvalid('Invalid upload token'); + } + + const record = await this.models.blob.get(workspaceId, key); + if (!record) { + throw new BlobInvalid('Multipart upload is not initialized'); + } + if (record.status === 'completed') { + throw new BlobInvalid('Blob upload is already completed'); + } + if (record.uploadId !== uploadId) { + throw new BlobInvalid('Upload id mismatch'); + } + + const contentLength = this.parseContentLength(req); + if (contentLength === undefined || contentLength === 0) { + throw new BlobInvalid('Missing Content-Length header'); + } + + const maxPartNumber = Math.ceil(record.size / MULTIPART_PART_SIZE); + if (partNumber > maxPartNumber) { + throw new BlobInvalid('Part number exceeds upload size'); + } + if ( + record.size && + (partNumber - 1) * MULTIPART_PART_SIZE + contentLength > record.size + ) { + throw new BlobInvalid('Part size exceeds upload metadata'); + } + + const provider = this.getProvider(storage); + if (!provider) { + throw new BlobInvalid('R2 provider is not available'); + } + + try { + const etag = await provider.proxyUploadPart( + `${workspaceId}/${key}`, + uploadId, + partNumber, + req, + { contentLength } + ); + if (etag) { + res.setHeader('etag', etag); + } + } catch (error) { + this.logger.error('Failed to proxy multipart upload', error as Error); + throw new BlobInvalid('Upload failed'); + } + + res.status(200).end(); + } +} diff --git a/packages/backend/server/src/core/storage/wrappers/blob.ts b/packages/backend/server/src/core/storage/wrappers/blob.ts index 851f587881..c528f754e8 100644 --- a/packages/backend/server/src/core/storage/wrappers/blob.ts +++ b/packages/backend/server/src/core/storage/wrappers/blob.ts @@ -1,3 +1,5 @@ +import { createHash } from 'node:crypto'; + import { Injectable, Logger } from '@nestjs/common'; import { @@ -27,6 +29,17 @@ declare global { } } +type BlobCompleteResult = + | { ok: true; metadata: GetObjectMetadata } + | { + ok: false; + reason: + | 'not_found' + | 'size_mismatch' + | 'mime_mismatch' + | 'checksum_mismatch'; + }; + @Injectable() export class WorkspaceBlobStorage { private readonly logger = new Logger(WorkspaceBlobStorage.name); @@ -71,6 +84,142 @@ export class WorkspaceBlobStorage { return this.provider.get(`${workspaceId}/${key}`, signedUrl); } + async presignPut( + workspaceId: string, + key: string, + metadata?: PutObjectMetadata + ) { + return this.provider.presignPut?.(`${workspaceId}/${key}`, metadata); + } + + async createMultipartUpload( + workspaceId: string, + key: string, + metadata?: PutObjectMetadata + ) { + return this.provider.createMultipartUpload?.( + `${workspaceId}/${key}`, + metadata + ); + } + + async presignUploadPart( + workspaceId: string, + key: string, + uploadId: string, + partNumber: number + ) { + return this.provider.presignUploadPart?.( + `${workspaceId}/${key}`, + uploadId, + partNumber + ); + } + + async listMultipartUploadParts( + workspaceId: string, + key: string, + uploadId: string + ) { + return this.provider.listMultipartUploadParts?.( + `${workspaceId}/${key}`, + uploadId + ); + } + + async completeMultipartUpload( + workspaceId: string, + key: string, + uploadId: string, + parts: { partNumber: number; etag: string }[] + ) { + if (!this.provider.completeMultipartUpload) { + return false; + } + + await this.provider.completeMultipartUpload( + `${workspaceId}/${key}`, + uploadId, + parts + ); + return true; + } + + async abortMultipartUpload( + workspaceId: string, + key: string, + uploadId: string + ) { + if (!this.provider.abortMultipartUpload) { + return false; + } + + await this.provider.abortMultipartUpload(`${workspaceId}/${key}`, uploadId); + return true; + } + + async head(workspaceId: string, key: string) { + return this.provider.head(`${workspaceId}/${key}`); + } + + async complete( + workspaceId: string, + key: string, + expected: { size: number; mime: string } + ): Promise { + const metadata = await this.head(workspaceId, key); + if (!metadata) { + return { ok: false, reason: 'not_found' }; + } + + if (metadata.contentLength !== expected.size) { + return { ok: false, reason: 'size_mismatch' }; + } + + if (expected.mime && metadata.contentType !== expected.mime) { + return { ok: false, reason: 'mime_mismatch' }; + } + + const object = await this.provider.get(`${workspaceId}/${key}`); + if (!object.body) { + return { ok: false, reason: 'not_found' }; + } + + const checksum = createHash('sha256'); + try { + for await (const chunk of object.body) { + checksum.update(chunk as Buffer); + } + } catch (e) { + this.logger.error('failed to read blob for checksum verification', e); + return { ok: false, reason: 'checksum_mismatch' }; + } + + const base64 = checksum.digest('base64'); + const base64urlWithPadding = base64.replace(/\+/g, '-').replace(/\//g, '_'); + + if (base64urlWithPadding !== key) { + try { + await this.provider.delete(`${workspaceId}/${key}`); + } catch (e) { + // never throw + this.logger.error('failed to delete invalid blob', e); + } + return { ok: false, reason: 'checksum_mismatch' }; + } + + await this.models.blob.upsert({ + workspaceId, + key, + mime: metadata.contentType, + size: metadata.contentLength, + status: 'completed', + uploadId: null, + }); + + return { ok: true, metadata }; + } + async list(workspaceId: string, syncBlobMeta = true) { const blobsInDb = await this.models.blob.list(workspaceId); @@ -78,6 +227,12 @@ export class WorkspaceBlobStorage { return blobsInDb; } + // all blobs are uploading but not completed yet + const hasDbBlobs = await this.models.blob.hasAny(workspaceId); + if (hasDbBlobs) { + return blobsInDb; + } + const blobs = await this.provider.list(workspaceId + '/'); blobs.forEach(blob => { blob.key = blob.key.slice(workspaceId.length + 1); @@ -147,6 +302,8 @@ export class WorkspaceBlobStorage { key, mime: meta.contentType, size: meta.contentLength, + status: 'completed', + uploadId: null, }); } diff --git a/packages/backend/server/src/core/workspaces/resolvers/blob.ts b/packages/backend/server/src/core/workspaces/resolvers/blob.ts index 9c51d1eeb7..a50bcbe0a7 100644 --- a/packages/backend/server/src/core/workspaces/resolvers/blob.ts +++ b/packages/backend/server/src/core/workspaces/resolvers/blob.ts @@ -2,29 +2,110 @@ import { Logger, UseGuards } from '@nestjs/common'; import { Args, Field, + InputType, Int, Mutation, ObjectType, Parent, Query, + registerEnumType, ResolveField, Resolver, } from '@nestjs/graphql'; +import { GraphQLJSONObject } from 'graphql-scalars'; import GraphQLUpload from 'graphql-upload/GraphQLUpload.mjs'; import type { FileUpload } from '../../../base'; import { + BlobInvalid, + BlobNotFound, BlobQuotaExceeded, CloudThrottlerGuard, readBuffer, StorageQuotaExceeded, } from '../../../base'; +import { Models } from '../../../models'; import { CurrentUser } from '../../auth'; import { AccessController } from '../../permission'; import { QuotaService } from '../../quota'; import { WorkspaceBlobStorage } from '../../storage'; +import { + MULTIPART_PART_SIZE, + MULTIPART_THRESHOLD, +} from '../../storage/constants'; import { WorkspaceBlobSizes, WorkspaceType } from '../types'; +enum BlobUploadMethod { + GRAPHQL = 'GRAPHQL', + PRESIGNED = 'PRESIGNED', + MULTIPART = 'MULTIPART', +} + +registerEnumType(BlobUploadMethod, { + name: 'BlobUploadMethod', + description: 'Blob upload method', +}); + +@ObjectType() +class BlobUploadedPart { + @Field(() => Int) + partNumber!: number; + + @Field() + etag!: string; +} + +@ObjectType() +class BlobUploadInit { + @Field(() => BlobUploadMethod) + method!: BlobUploadMethod; + + @Field() + blobKey!: string; + + @Field(() => Boolean, { nullable: true }) + alreadyUploaded?: boolean; + + @Field(() => String, { nullable: true }) + uploadUrl?: string; + + @Field(() => GraphQLJSONObject, { nullable: true }) + headers?: Record; + + @Field(() => Date, { nullable: true }) + expiresAt?: Date; + + @Field(() => String, { nullable: true }) + uploadId?: string; + + @Field(() => Int, { nullable: true }) + partSize?: number; + + @Field(() => [BlobUploadedPart], { nullable: true }) + uploadedParts?: BlobUploadedPart[]; +} + +@ObjectType() +class BlobUploadPart { + @Field() + uploadUrl!: string; + + @Field(() => GraphQLJSONObject, { nullable: true }) + headers?: Record; + + @Field(() => Date, { nullable: true }) + expiresAt?: Date; +} + +@InputType() +class BlobUploadPartInput { + @Field(() => Int) + partNumber!: number; + + @Field() + etag!: string; +} + @ObjectType() class ListedBlob { @Field() @@ -47,7 +128,8 @@ export class WorkspaceBlobResolver { constructor( private readonly ac: AccessController, private readonly quota: QuotaService, - private readonly storage: WorkspaceBlobStorage + private readonly storage: WorkspaceBlobStorage, + private readonly models: Models ) {} @ResolveField(() => [ListedBlob], { @@ -110,6 +192,258 @@ export class WorkspaceBlobResolver { return blob.filename; } + @Mutation(() => BlobUploadInit) + async createBlobUpload( + @CurrentUser() user: CurrentUser, + @Args('workspaceId') workspaceId: string, + @Args('key') key: string, + @Args('size', { type: () => Int }) size: number, + @Args('mime') mime: string + ): Promise { + await this.ac + .user(user.id) + .workspace(workspaceId) + .assert('Workspace.Blobs.Write'); + + let record = await this.models.blob.get(workspaceId, key); + mime = mime || 'application/octet-stream'; + if (record) { + if (record.size !== size) { + throw new BlobInvalid('Blob size mismatch'); + } + if (record.mime !== mime) { + throw new BlobInvalid('Blob mime mismatch'); + } + + if (record.status === 'completed') { + const existingMetadata = await this.storage.head(workspaceId, key); + if (!existingMetadata) { + // record exists but object is missing, treat as a new upload + record = null; + } else if (existingMetadata.contentLength !== size) { + throw new BlobInvalid('Blob size mismatch'); + } else if (existingMetadata.contentType !== mime) { + throw new BlobInvalid('Blob mime mismatch'); + } else { + return { + method: BlobUploadMethod.GRAPHQL, + blobKey: key, + alreadyUploaded: true, + }; + } + } + } + + const checkExceeded = + await this.quota.getWorkspaceQuotaCalculator(workspaceId); + const result = checkExceeded(record ? 0 : size); + if (result?.blobQuotaExceeded) { + throw new BlobQuotaExceeded(); + } else if (result?.storageQuotaExceeded) { + throw new StorageQuotaExceeded(); + } + + const metadata = { contentType: mime, contentLength: size }; + let init: BlobUploadInit | null = null; + let uploadIdForRecord: string | null = null; + + // try to resume multipart uploads + if (record && record.uploadId) { + const uploadedParts = await this.storage.listMultipartUploadParts( + workspaceId, + key, + record.uploadId + ); + + if (uploadedParts) { + return { + method: BlobUploadMethod.MULTIPART, + blobKey: key, + uploadId: record.uploadId, + partSize: MULTIPART_PART_SIZE, + uploadedParts, + }; + } + } + + if (size >= MULTIPART_THRESHOLD) { + const multipart = await this.storage.createMultipartUpload( + workspaceId, + key, + metadata + ); + if (multipart) { + uploadIdForRecord = multipart.uploadId; + init = { + method: BlobUploadMethod.MULTIPART, + blobKey: key, + uploadId: multipart.uploadId, + partSize: MULTIPART_PART_SIZE, + expiresAt: multipart.expiresAt, + uploadedParts: [], + }; + } + } + + if (!init) { + const presigned = await this.storage.presignPut( + workspaceId, + key, + metadata + ); + if (presigned) { + init = { + method: BlobUploadMethod.PRESIGNED, + blobKey: key, + uploadUrl: presigned.url, + headers: presigned.headers, + expiresAt: presigned.expiresAt, + }; + } + } + + if (!init) { + init = { + method: BlobUploadMethod.GRAPHQL, + blobKey: key, + }; + } + + await this.models.blob.upsert({ + workspaceId, + key, + mime, + size, + status: 'pending', + uploadId: uploadIdForRecord, + }); + + return init; + } + + @Mutation(() => String) + async completeBlobUpload( + @CurrentUser() user: CurrentUser, + @Args('workspaceId') workspaceId: string, + @Args('key') key: string, + @Args('uploadId', { nullable: true }) uploadId?: string, + @Args({ + name: 'parts', + type: () => [BlobUploadPartInput], + nullable: true, + }) + parts?: BlobUploadPartInput[] + ): Promise { + await this.ac + .user(user.id) + .workspace(workspaceId) + .assert('Workspace.Blobs.Write'); + + const record = await this.models.blob.get(workspaceId, key); + if (!record) { + throw new BlobNotFound({ spaceId: workspaceId, blobId: key }); + } + if (record.status === 'completed') { + return key; + } + + const hasMultipartInput = + uploadId !== undefined || (parts?.length ?? 0) > 0; + const hasMultipartRecord = !!record.uploadId; + if (hasMultipartRecord) { + if (!uploadId || !parts || parts.length === 0) { + throw new BlobInvalid( + 'Multipart upload requires both uploadId and parts' + ); + } + if (uploadId !== record.uploadId) { + throw new BlobInvalid('Upload id mismatch'); + } + + const metadata = await this.storage.head(workspaceId, key); + if (!metadata) { + const completed = await this.storage.completeMultipartUpload( + workspaceId, + key, + uploadId, + parts + ); + if (!completed) { + throw new BlobInvalid('Multipart upload is not supported'); + } + } + } else if (hasMultipartInput) { + throw new BlobInvalid('Multipart upload is not initialized'); + } + + const result = await this.storage.complete(workspaceId, key, { + size: record.size, + mime: record.mime, + }); + if (!result.ok) { + if (result.reason === 'not_found') { + throw new BlobNotFound({ + spaceId: workspaceId, + blobId: key, + }); + } + if (result.reason === 'size_mismatch') { + throw new BlobInvalid('Blob size mismatch'); + } + if (result.reason === 'mime_mismatch') { + throw new BlobInvalid('Blob mime mismatch'); + } + throw new BlobInvalid('Blob key mismatch'); + } + + return key; + } + + @Mutation(() => BlobUploadPart) + async getBlobUploadPartUrl( + @CurrentUser() user: CurrentUser, + @Args('workspaceId') workspaceId: string, + @Args('key') key: string, + @Args('uploadId') uploadId: string, + @Args('partNumber', { type: () => Int }) partNumber: number + ): Promise { + await this.ac + .user(user.id) + .workspace(workspaceId) + .assert('Workspace.Blobs.Write'); + + const part = await this.storage.presignUploadPart( + workspaceId, + key, + uploadId, + partNumber + ); + if (!part) { + throw new BlobInvalid('Multipart upload is not supported'); + } + + return { + uploadUrl: part.url, + headers: part.headers, + expiresAt: part.expiresAt, + }; + } + + @Mutation(() => Boolean) + async abortBlobUpload( + @CurrentUser() user: CurrentUser, + @Args('workspaceId') workspaceId: string, + @Args('key') key: string, + @Args('uploadId') uploadId: string + ) { + await this.ac + .user(user.id) + .workspace(workspaceId) + .assert('Workspace.Blobs.Write'); + + return this.storage.abortMultipartUpload(workspaceId, key, uploadId); + } + @Mutation(() => Boolean) async deleteBlob( @CurrentUser() user: CurrentUser, diff --git a/packages/backend/server/src/models/blob.ts b/packages/backend/server/src/models/blob.ts index 1800953658..0ec2704fcb 100644 --- a/packages/backend/server/src/models/blob.ts +++ b/packages/backend/server/src/models/blob.ts @@ -21,12 +21,16 @@ export class BlobModel extends BaseModel { update: { mime: blob.mime, size: blob.size, + status: blob.status, + uploadId: blob.uploadId, }, create: { workspaceId: blob.workspaceId, key: blob.key, mime: blob.mime, size: blob.size, + status: blob.status, + uploadId: blob.uploadId, }, }); } @@ -76,11 +80,39 @@ export class BlobModel extends BaseModel { ...options?.where, workspaceId, deletedAt: null, + status: 'completed', }, select: options?.select, }); } + async hasAny(workspaceId: string) { + const count = await this.db.blob.count({ + where: { + workspaceId, + deletedAt: null, + }, + }); + return count > 0; + } + + async listPendingExpired(before: Date) { + return await this.db.blob.findMany({ + where: { + status: 'pending', + deletedAt: null, + createdAt: { + lt: before, + }, + }, + select: { + workspaceId: true, + key: true, + uploadId: true, + }, + }); + } + async listDeleted(workspaceId: string) { return await this.db.blob.findMany({ where: { diff --git a/packages/backend/server/src/schema.gql b/packages/backend/server/src/schema.gql index 388eee5795..97c3b05c4e 100644 --- a/packages/backend/server/src/schema.gql +++ b/packages/backend/server/src/schema.gql @@ -96,6 +96,41 @@ type BlobNotFoundDataType { spaceId: String! } +type BlobUploadInit { + alreadyUploaded: Boolean + blobKey: String! + expiresAt: DateTime + headers: JSONObject + method: BlobUploadMethod! + partSize: Int + uploadId: String + uploadUrl: String + uploadedParts: [BlobUploadedPart!] +} + +"""Blob upload method""" +enum BlobUploadMethod { + GRAPHQL + MULTIPART + PRESIGNED +} + +type BlobUploadPart { + expiresAt: DateTime + headers: JSONObject + uploadUrl: String! +} + +input BlobUploadPartInput { + etag: String! + partNumber: Int! +} + +type BlobUploadedPart { + etag: String! + partNumber: Int! +} + enum ChatHistoryOrder { asc desc @@ -659,6 +694,7 @@ enum ErrorNames { ALREADY_IN_SPACE AUTHENTICATION_REQUIRED BAD_REQUEST + BLOB_INVALID BLOB_NOT_FOUND BLOB_QUOTA_EXCEEDED CANNOT_DELETE_ACCOUNT_WITH_OWNED_TEAM_WORKSPACE @@ -1194,6 +1230,7 @@ type MissingOauthQueryParameterDataType { } type Mutation { + abortBlobUpload(key: String!, uploadId: String!, workspaceId: String!): Boolean! acceptInviteById(inviteId: String!, sendAcceptMail: Boolean @deprecated(reason: "never used"), workspaceId: String @deprecated(reason: "never used")): Boolean! activateLicense(license: String!, workspaceId: String!): License! @@ -1223,6 +1260,8 @@ type Mutation { """Cleanup sessions""" cleanupCopilotSession(options: DeleteSessionInput!): [String!]! + completeBlobUpload(key: String!, parts: [BlobUploadPartInput!], uploadId: String, workspaceId: String!): String! + createBlobUpload(key: String!, mime: String!, size: Int!, workspaceId: String!): BlobUploadInit! """Create change password url""" createChangePasswordUrl(callbackUrl: String!, userId: String!): String! @@ -1275,6 +1314,7 @@ type Mutation { forkCopilotSession(options: ForkChatSessionInput!): String! generateLicenseKey(sessionId: String!): String! generateUserAccessToken(input: GenerateAccessTokenInput!): RevealedAccessToken! + getBlobUploadPartUrl(key: String!, partNumber: Int!, uploadId: String!, workspaceId: String!): BlobUploadPart! grantDocUserRoles(input: GrantDocUserRolesInput!): Boolean! grantMember(permission: Permission!, userId: String!, workspaceId: String!): Boolean! diff --git a/packages/common/graphql/src/graphql/blob-upload-abort.gql b/packages/common/graphql/src/graphql/blob-upload-abort.gql new file mode 100644 index 0000000000..da4e8a1ccf --- /dev/null +++ b/packages/common/graphql/src/graphql/blob-upload-abort.gql @@ -0,0 +1,3 @@ +mutation abortBlobUpload($workspaceId: String!, $key: String!, $uploadId: String!) { + abortBlobUpload(workspaceId: $workspaceId, key: $key, uploadId: $uploadId) +} diff --git a/packages/common/graphql/src/graphql/blob-upload-complete.gql b/packages/common/graphql/src/graphql/blob-upload-complete.gql new file mode 100644 index 0000000000..f9cb1b264f --- /dev/null +++ b/packages/common/graphql/src/graphql/blob-upload-complete.gql @@ -0,0 +1,3 @@ +mutation completeBlobUpload($workspaceId: String!, $key: String!, $uploadId: String, $parts: [BlobUploadPartInput!]) { + completeBlobUpload(workspaceId: $workspaceId, key: $key, uploadId: $uploadId, parts: $parts) +} diff --git a/packages/common/graphql/src/graphql/blob-upload-create.gql b/packages/common/graphql/src/graphql/blob-upload-create.gql new file mode 100644 index 0000000000..2f6cd9c684 --- /dev/null +++ b/packages/common/graphql/src/graphql/blob-upload-create.gql @@ -0,0 +1,16 @@ +mutation createBlobUpload($workspaceId: String!, $key: String!, $size: Int!, $mime: String!) { + createBlobUpload(workspaceId: $workspaceId, key: $key, size: $size, mime: $mime) { + method + blobKey + alreadyUploaded + uploadUrl + headers + expiresAt + uploadId + partSize + uploadedParts { + partNumber + etag + } + } +} diff --git a/packages/common/graphql/src/graphql/blob-upload-part-url.gql b/packages/common/graphql/src/graphql/blob-upload-part-url.gql new file mode 100644 index 0000000000..b8b3f9131b --- /dev/null +++ b/packages/common/graphql/src/graphql/blob-upload-part-url.gql @@ -0,0 +1,7 @@ +mutation getBlobUploadPartUrl($workspaceId: String!, $key: String!, $uploadId: String!, $partNumber: Int!) { + getBlobUploadPartUrl(workspaceId: $workspaceId, key: $key, uploadId: $uploadId, partNumber: $partNumber) { + uploadUrl + headers + expiresAt + } +} diff --git a/packages/common/graphql/src/graphql/index.ts b/packages/common/graphql/src/graphql/index.ts index a7f2c85388..48a8aa5993 100644 --- a/packages/common/graphql/src/graphql/index.ts +++ b/packages/common/graphql/src/graphql/index.ts @@ -383,6 +383,65 @@ export const setBlobMutation = { file: true, }; +export const abortBlobUploadMutation = { + id: 'abortBlobUploadMutation' as const, + op: 'abortBlobUpload', + query: `mutation abortBlobUpload($workspaceId: String!, $key: String!, $uploadId: String!) { + abortBlobUpload(workspaceId: $workspaceId, key: $key, uploadId: $uploadId) +}`, +}; + +export const completeBlobUploadMutation = { + id: 'completeBlobUploadMutation' as const, + op: 'completeBlobUpload', + query: `mutation completeBlobUpload($workspaceId: String!, $key: String!, $uploadId: String, $parts: [BlobUploadPartInput!]) { + completeBlobUpload( + workspaceId: $workspaceId + key: $key + uploadId: $uploadId + parts: $parts + ) +}`, +}; + +export const createBlobUploadMutation = { + id: 'createBlobUploadMutation' as const, + op: 'createBlobUpload', + query: `mutation createBlobUpload($workspaceId: String!, $key: String!, $size: Int!, $mime: String!) { + createBlobUpload(workspaceId: $workspaceId, key: $key, size: $size, mime: $mime) { + method + blobKey + alreadyUploaded + uploadUrl + headers + expiresAt + uploadId + partSize + uploadedParts { + partNumber + etag + } + } +}`, +}; + +export const getBlobUploadPartUrlMutation = { + id: 'getBlobUploadPartUrlMutation' as const, + op: 'getBlobUploadPartUrl', + query: `mutation getBlobUploadPartUrl($workspaceId: String!, $key: String!, $uploadId: String!, $partNumber: Int!) { + getBlobUploadPartUrl( + workspaceId: $workspaceId + key: $key + uploadId: $uploadId + partNumber: $partNumber + ) { + uploadUrl + headers + expiresAt + } +}`, +}; + export const cancelSubscriptionMutation = { id: 'cancelSubscriptionMutation' as const, op: 'cancelSubscription', diff --git a/packages/common/graphql/src/schema.ts b/packages/common/graphql/src/schema.ts index f774e9bc34..d49f9b2e35 100644 --- a/packages/common/graphql/src/schema.ts +++ b/packages/common/graphql/src/schema.ts @@ -137,6 +137,44 @@ export interface BlobNotFoundDataType { spaceId: Scalars['String']['output']; } +export interface BlobUploadInit { + __typename?: 'BlobUploadInit'; + alreadyUploaded: Maybe; + blobKey: Scalars['String']['output']; + expiresAt: Maybe; + headers: Maybe; + method: BlobUploadMethod; + partSize: Maybe; + uploadId: Maybe; + uploadUrl: Maybe; + uploadedParts: Maybe>; +} + +/** Blob upload method */ +export enum BlobUploadMethod { + GRAPHQL = 'GRAPHQL', + MULTIPART = 'MULTIPART', + PRESIGNED = 'PRESIGNED', +} + +export interface BlobUploadPart { + __typename?: 'BlobUploadPart'; + expiresAt: Maybe; + headers: Maybe; + uploadUrl: Scalars['String']['output']; +} + +export interface BlobUploadPartInput { + etag: Scalars['String']['input']; + partNumber: Scalars['Int']['input']; +} + +export interface BlobUploadedPart { + __typename?: 'BlobUploadedPart'; + etag: Scalars['String']['output']; + partNumber: Scalars['Int']['output']; +} + export enum ChatHistoryOrder { asc = 'asc', desc = 'desc', @@ -838,6 +876,7 @@ export enum ErrorNames { ALREADY_IN_SPACE = 'ALREADY_IN_SPACE', AUTHENTICATION_REQUIRED = 'AUTHENTICATION_REQUIRED', BAD_REQUEST = 'BAD_REQUEST', + BLOB_INVALID = 'BLOB_INVALID', BLOB_NOT_FOUND = 'BLOB_NOT_FOUND', BLOB_QUOTA_EXCEEDED = 'BLOB_QUOTA_EXCEEDED', CANNOT_DELETE_ACCOUNT_WITH_OWNED_TEAM_WORKSPACE = 'CANNOT_DELETE_ACCOUNT_WITH_OWNED_TEAM_WORKSPACE', @@ -1370,6 +1409,7 @@ export interface MissingOauthQueryParameterDataType { export interface Mutation { __typename?: 'Mutation'; + abortBlobUpload: Scalars['Boolean']['output']; acceptInviteById: Scalars['Boolean']['output']; activateLicense: License; /** add a blob to context */ @@ -1392,6 +1432,8 @@ export interface Mutation { claimAudioTranscription: Maybe; /** Cleanup sessions */ cleanupCopilotSession: Array; + completeBlobUpload: Scalars['String']['output']; + createBlobUpload: BlobUploadInit; /** Create change password url */ createChangePasswordUrl: Scalars['String']['output']; /** Create a subscription checkout link of stripe */ @@ -1430,6 +1472,7 @@ export interface Mutation { forkCopilotSession: Scalars['String']['output']; generateLicenseKey: Scalars['String']['output']; generateUserAccessToken: RevealedAccessToken; + getBlobUploadPartUrl: BlobUploadPart; grantDocUserRoles: Scalars['Boolean']['output']; grantMember: Scalars['Boolean']['output']; /** import users */ @@ -1527,6 +1570,12 @@ export interface Mutation { verifyEmail: Scalars['Boolean']['output']; } +export interface MutationAbortBlobUploadArgs { + key: Scalars['String']['input']; + uploadId: Scalars['String']['input']; + workspaceId: Scalars['String']['input']; +} + export interface MutationAcceptInviteByIdArgs { inviteId: Scalars['String']['input']; sendAcceptMail?: InputMaybe; @@ -1599,6 +1648,20 @@ export interface MutationCleanupCopilotSessionArgs { options: DeleteSessionInput; } +export interface MutationCompleteBlobUploadArgs { + key: Scalars['String']['input']; + parts?: InputMaybe>; + uploadId?: InputMaybe; + workspaceId: Scalars['String']['input']; +} + +export interface MutationCreateBlobUploadArgs { + key: Scalars['String']['input']; + mime: Scalars['String']['input']; + size: Scalars['Int']['input']; + workspaceId: Scalars['String']['input']; +} + export interface MutationCreateChangePasswordUrlArgs { callbackUrl: Scalars['String']['input']; userId: Scalars['String']['input']; @@ -1693,6 +1756,13 @@ export interface MutationGenerateUserAccessTokenArgs { input: GenerateAccessTokenInput; } +export interface MutationGetBlobUploadPartUrlArgs { + key: Scalars['String']['input']; + partNumber: Scalars['Int']['input']; + uploadId: Scalars['String']['input']; + workspaceId: Scalars['String']['input']; +} + export interface MutationGrantDocUserRolesArgs { input: GrantDocUserRolesInput; } @@ -3411,6 +3481,73 @@ export type SetBlobMutationVariables = Exact<{ export type SetBlobMutation = { __typename?: 'Mutation'; setBlob: string }; +export type AbortBlobUploadMutationVariables = Exact<{ + workspaceId: Scalars['String']['input']; + key: Scalars['String']['input']; + uploadId: Scalars['String']['input']; +}>; + +export type AbortBlobUploadMutation = { + __typename?: 'Mutation'; + abortBlobUpload: boolean; +}; + +export type CompleteBlobUploadMutationVariables = Exact<{ + workspaceId: Scalars['String']['input']; + key: Scalars['String']['input']; + uploadId?: InputMaybe; + parts?: InputMaybe | BlobUploadPartInput>; +}>; + +export type CompleteBlobUploadMutation = { + __typename?: 'Mutation'; + completeBlobUpload: string; +}; + +export type CreateBlobUploadMutationVariables = Exact<{ + workspaceId: Scalars['String']['input']; + key: Scalars['String']['input']; + size: Scalars['Int']['input']; + mime: Scalars['String']['input']; +}>; + +export type CreateBlobUploadMutation = { + __typename?: 'Mutation'; + createBlobUpload: { + __typename?: 'BlobUploadInit'; + method: BlobUploadMethod; + blobKey: string; + alreadyUploaded: boolean | null; + uploadUrl: string | null; + headers: any | null; + expiresAt: string | null; + uploadId: string | null; + partSize: number | null; + uploadedParts: Array<{ + __typename?: 'BlobUploadedPart'; + partNumber: number; + etag: string; + }> | null; + }; +}; + +export type GetBlobUploadPartUrlMutationVariables = Exact<{ + workspaceId: Scalars['String']['input']; + key: Scalars['String']['input']; + uploadId: Scalars['String']['input']; + partNumber: Scalars['Int']['input']; +}>; + +export type GetBlobUploadPartUrlMutation = { + __typename?: 'Mutation'; + getBlobUploadPartUrl: { + __typename?: 'BlobUploadPart'; + uploadUrl: string; + headers: any | null; + expiresAt: string | null; + }; +}; + export type CancelSubscriptionMutationVariables = Exact<{ plan?: InputMaybe; workspaceId?: InputMaybe; @@ -6824,6 +6961,26 @@ export type Mutations = variables: SetBlobMutationVariables; response: SetBlobMutation; } + | { + name: 'abortBlobUploadMutation'; + variables: AbortBlobUploadMutationVariables; + response: AbortBlobUploadMutation; + } + | { + name: 'completeBlobUploadMutation'; + variables: CompleteBlobUploadMutationVariables; + response: CompleteBlobUploadMutation; + } + | { + name: 'createBlobUploadMutation'; + variables: CreateBlobUploadMutationVariables; + response: CreateBlobUploadMutation; + } + | { + name: 'getBlobUploadPartUrlMutation'; + variables: GetBlobUploadPartUrlMutationVariables; + response: GetBlobUploadPartUrlMutation; + } | { name: 'cancelSubscriptionMutation'; variables: CancelSubscriptionMutationVariables; diff --git a/packages/common/nbstore/src/__tests__/cloud-blob.spec.ts b/packages/common/nbstore/src/__tests__/cloud-blob.spec.ts new file mode 100644 index 0000000000..7be3d06f21 --- /dev/null +++ b/packages/common/nbstore/src/__tests__/cloud-blob.spec.ts @@ -0,0 +1,162 @@ +import { + abortBlobUploadMutation, + BlobUploadMethod, + completeBlobUploadMutation, + createBlobUploadMutation, + getBlobUploadPartUrlMutation, + setBlobMutation, + workspaceBlobQuotaQuery, +} from '@affine/graphql'; +import { afterEach, expect, test, vi } from 'vitest'; + +import { CloudBlobStorage } from '../impls/cloud/blob'; + +const quotaResponse = { + workspace: { + quota: { + humanReadable: { + blobLimit: '1 MB', + }, + blobLimit: 1024 * 1024, + }, + }, +}; + +afterEach(() => { + vi.restoreAllMocks(); + vi.unstubAllGlobals(); +}); + +function createStorage() { + return new CloudBlobStorage({ + serverBaseUrl: 'https://example.com', + id: 'workspace-1', + }); +} + +test('uses graphql upload when server returns GRAPHQL method', async () => { + const storage = createStorage(); + const gqlMock = vi.fn(async ({ query }) => { + if (query === workspaceBlobQuotaQuery) { + return quotaResponse; + } + if (query === createBlobUploadMutation) { + return { + createBlobUpload: { + method: BlobUploadMethod.GRAPHQL, + blobKey: 'blob-key', + alreadyUploaded: false, + }, + }; + } + if (query === setBlobMutation) { + return { setBlob: 'blob-key' }; + } + throw new Error('Unexpected query'); + }); + + (storage.connection as any).gql = gqlMock; + + await storage.set({ + key: 'blob-key', + data: new Uint8Array([1, 2, 3]), + mime: 'text/plain', + }); + + const queries = gqlMock.mock.calls.map(call => call[0].query); + expect(queries).toContain(createBlobUploadMutation); + expect(queries).toContain(setBlobMutation); +}); + +test('falls back to graphql when presigned upload fails', async () => { + const storage = createStorage(); + const gqlMock = vi.fn(async ({ query }) => { + if (query === workspaceBlobQuotaQuery) { + return quotaResponse; + } + if (query === createBlobUploadMutation) { + return { + createBlobUpload: { + method: BlobUploadMethod.PRESIGNED, + blobKey: 'blob-key', + alreadyUploaded: false, + uploadUrl: 'https://upload.example.com/blob', + }, + }; + } + if (query === setBlobMutation) { + return { setBlob: 'blob-key' }; + } + if (query === completeBlobUploadMutation) { + return { completeBlobUpload: 'blob-key' }; + } + throw new Error('Unexpected query'); + }); + + (storage.connection as any).gql = gqlMock; + vi.stubGlobal( + 'fetch', + vi.fn(async () => new Response('', { status: 500 })) + ); + + await storage.set({ + key: 'blob-key', + data: new Uint8Array([1, 2, 3]), + mime: 'text/plain', + }); + + const queries = gqlMock.mock.calls.map(call => call[0].query); + expect(queries).toContain(setBlobMutation); + expect(queries).not.toContain(completeBlobUploadMutation); +}); + +test('falls back to graphql and aborts when multipart upload fails', async () => { + const storage = createStorage(); + const gqlMock = vi.fn(async ({ query }) => { + if (query === workspaceBlobQuotaQuery) { + return quotaResponse; + } + if (query === createBlobUploadMutation) { + return { + createBlobUpload: { + method: BlobUploadMethod.MULTIPART, + blobKey: 'blob-key', + alreadyUploaded: false, + uploadId: 'upload-1', + partSize: 2, + uploadedParts: [], + }, + }; + } + if (query === getBlobUploadPartUrlMutation) { + return { + getBlobUploadPartUrl: { + uploadUrl: 'https://upload.example.com/part', + }, + }; + } + if (query === abortBlobUploadMutation) { + return { abortBlobUpload: true }; + } + if (query === setBlobMutation) { + return { setBlob: 'blob-key' }; + } + throw new Error('Unexpected query'); + }); + + (storage.connection as any).gql = gqlMock; + vi.stubGlobal( + 'fetch', + vi.fn(async () => new Response('', { status: 500 })) + ); + + await storage.set({ + key: 'blob-key', + data: new Uint8Array([1, 2, 3]), + mime: 'text/plain', + }); + + const queries = gqlMock.mock.calls.map(call => call[0].query); + expect(queries).toContain(abortBlobUploadMutation); + expect(queries).toContain(setBlobMutation); +}); diff --git a/packages/common/nbstore/src/impls/cloud/blob.ts b/packages/common/nbstore/src/impls/cloud/blob.ts index c317f11161..6d9bc0d2a7 100644 --- a/packages/common/nbstore/src/impls/cloud/blob.ts +++ b/packages/common/nbstore/src/impls/cloud/blob.ts @@ -1,6 +1,11 @@ import { UserFriendlyError } from '@affine/error'; import { + abortBlobUploadMutation, + BlobUploadMethod, + completeBlobUploadMutation, + createBlobUploadMutation, deleteBlobMutation, + getBlobUploadPartUrlMutation, listBlobsQuery, releaseDeletedBlobsMutation, setBlobMutation, @@ -21,6 +26,7 @@ interface CloudBlobStorageOptions { } const SHOULD_MANUAL_REDIRECT = BUILD_CONFIG.isAndroid || BUILD_CONFIG.isIOS; +const UPLOAD_REQUEST_TIMEOUT = 0; export class CloudBlobStorage extends BlobStorageBase { static readonly identifier = 'CloudBlobStorage'; @@ -97,16 +103,69 @@ export class CloudBlobStorage extends BlobStorageBase { if (blob.data.byteLength > blobSizeLimit) { throw new OverSizeError(this.humanReadableBlobSizeLimitCache); } - await this.connection.gql({ - query: setBlobMutation, + + const init = await this.connection.gql({ + query: createBlobUploadMutation, variables: { workspaceId: this.options.id, - blob: new File([blob.data], blob.key, { type: blob.mime }), - }, - context: { - signal, + key: blob.key, + size: blob.data.byteLength, + mime: blob.mime, }, + context: { signal }, }); + + const upload = init.createBlobUpload; + if (upload.alreadyUploaded) { + return; + } + if (upload.method === BlobUploadMethod.GRAPHQL) { + await this.uploadViaGraphql(blob, signal); + return; + } + + if (upload.method === BlobUploadMethod.PRESIGNED) { + try { + await this.uploadViaPresigned( + upload.uploadUrl!, + upload.headers, + blob.data, + signal + ); + await this.completeUpload(blob.key, undefined, undefined, signal); + return; + } catch { + await this.uploadViaGraphql(blob, signal); + return; + } + } + + if (upload.method === BlobUploadMethod.MULTIPART) { + try { + const parts = await this.uploadViaMultipart( + blob.key, + upload.uploadId!, + upload.partSize!, + blob.data, + upload.uploadedParts, + signal + ); + await this.completeUpload(blob.key, upload.uploadId!, parts, signal); + return; + } catch { + if (upload.uploadId) { + await this.tryAbortMultipartUpload( + blob.key, + upload.uploadId, + signal + ); + } + await this.uploadViaGraphql(blob, signal); + return; + } + } + + await this.uploadViaGraphql(blob, signal); } catch (err) { const userFriendlyError = UserFriendlyError.fromAny(err); if (userFriendlyError.is('STORAGE_QUOTA_EXCEEDED')) { @@ -151,6 +210,159 @@ export class CloudBlobStorage extends BlobStorageBase { })); } + private async uploadViaGraphql(blob: BlobRecord, signal?: AbortSignal) { + await this.connection.gql({ + query: setBlobMutation, + variables: { + workspaceId: this.options.id, + blob: new File([blob.data], blob.key, { type: blob.mime }), + }, + context: { signal }, + timeout: UPLOAD_REQUEST_TIMEOUT, + }); + } + + private async uploadViaPresigned( + uploadUrl: string, + headers: Record | null | undefined, + data: Uint8Array, + signal?: AbortSignal + ) { + const res = await this.fetchWithTimeout(uploadUrl, { + method: 'PUT', + headers: headers ?? undefined, + body: data, + signal, + timeout: UPLOAD_REQUEST_TIMEOUT, + }); + if (!res.ok) { + throw new Error(`Presigned upload failed with status ${res.status}`); + } + } + + private async uploadViaMultipart( + key: string, + uploadId: string, + partSize: number, + data: Uint8Array, + uploadedParts: { partNumber: number; etag: string }[] | null | undefined, + signal?: AbortSignal + ) { + const partsMap = new Map(); + for (const part of uploadedParts ?? []) { + partsMap.set(part.partNumber, part.etag); + } + const total = data.byteLength; + const totalParts = Math.ceil(total / partSize); + + for (let partNumber = 1; partNumber <= totalParts; partNumber += 1) { + if (partsMap.has(partNumber)) { + continue; + } + const start = (partNumber - 1) * partSize; + const end = Math.min(start + partSize, total); + const chunk = data.subarray(start, end); + + const part = await this.connection.gql({ + query: getBlobUploadPartUrlMutation, + variables: { workspaceId: this.options.id, key, uploadId, partNumber }, + context: { signal }, + }); + + const res = await this.fetchWithTimeout( + part.getBlobUploadPartUrl.uploadUrl, + { + method: 'PUT', + headers: part.getBlobUploadPartUrl.headers ?? undefined, + body: chunk, + signal, + timeout: UPLOAD_REQUEST_TIMEOUT, + } + ); + if (!res.ok) { + throw new Error( + `Multipart upload failed at part ${partNumber} with status ${res.status}` + ); + } + + const etag = res.headers.get('etag'); + if (!etag) { + throw new Error(`Missing ETag for part ${partNumber}.`); + } + partsMap.set(partNumber, etag); + } + + if (partsMap.size !== totalParts) { + throw new Error('Multipart upload has missing parts.'); + } + + return [...partsMap.entries()] + .sort((left, right) => left[0] - right[0]) + .map(([partNumber, etag]) => ({ partNumber, etag })); + } + + private async completeUpload( + key: string, + uploadId: string | undefined, + parts: { partNumber: number; etag: string }[] | undefined, + signal?: AbortSignal + ) { + await this.connection.gql({ + query: completeBlobUploadMutation, + variables: { workspaceId: this.options.id, key, uploadId, parts }, + context: { signal }, + timeout: UPLOAD_REQUEST_TIMEOUT, + }); + } + + private async tryAbortMultipartUpload( + key: string, + uploadId: string, + signal?: AbortSignal + ) { + try { + await this.connection.gql({ + query: abortBlobUploadMutation, + variables: { workspaceId: this.options.id, key, uploadId }, + context: { signal }, + }); + } catch {} + } + + private async fetchWithTimeout( + input: string, + init: RequestInit & { timeout?: number } + ) { + const externalSignal = init.signal; + if (externalSignal?.aborted) { + throw externalSignal.reason; + } + + const abortController = new AbortController(); + externalSignal?.addEventListener('abort', reason => { + abortController.abort(reason); + }); + + const timeout = init.timeout ?? 15000; + const timeoutId = + timeout > 0 + ? setTimeout(() => { + abortController.abort(new Error('request timeout')); + }, timeout) + : undefined; + + try { + return await globalThis.fetch(input, { + ...init, + signal: abortController.signal, + }); + } finally { + if (timeoutId) { + clearTimeout(timeoutId); + } + } + } + private humanReadableBlobSizeLimitCache: string | null = null; private blobSizeLimitCache: number | null = null; private blobSizeLimitCacheTime = 0; diff --git a/packages/common/nbstore/src/impls/cloud/http.ts b/packages/common/nbstore/src/impls/cloud/http.ts index f30ec1ef8e..acdcd27c10 100644 --- a/packages/common/nbstore/src/impls/cloud/http.ts +++ b/packages/common/nbstore/src/impls/cloud/http.ts @@ -19,9 +19,12 @@ export class HttpConnection extends DummyConnection { }); const timeout = init?.timeout ?? 15000; - const timeoutId = setTimeout(() => { - abortController.abort(new Error('request timeout')); - }, timeout); + const timeoutId = + timeout > 0 + ? setTimeout(() => { + abortController.abort(new Error('request timeout')); + }, timeout) + : undefined; const res = await globalThis .fetch(new URL(input, this.serverBaseUrl), { @@ -43,7 +46,9 @@ export class HttpConnection extends DummyConnection { stacktrace: err.stack, }); }); - clearTimeout(timeoutId); + if (timeoutId) { + clearTimeout(timeoutId); + } if (!res.ok && res.status !== 404) { if (res.status === 413) { throw new UserFriendlyError({ diff --git a/packages/frontend/i18n/src/i18n-completenesses.json b/packages/frontend/i18n/src/i18n-completenesses.json index 0c7fb272ba..11cded3d2c 100644 --- a/packages/frontend/i18n/src/i18n-completenesses.json +++ b/packages/frontend/i18n/src/i18n-completenesses.json @@ -15,7 +15,7 @@ "it": 1, "ja": 99, "ko": 100, - "nb-NO": 12, + "nb-NO": 30, "pl": 99, "pt-BR": 99, "ru": 99, diff --git a/packages/frontend/i18n/src/i18n.gen.ts b/packages/frontend/i18n/src/i18n.gen.ts index a08c534ebc..5a2156b5db 100644 --- a/packages/frontend/i18n/src/i18n.gen.ts +++ b/packages/frontend/i18n/src/i18n.gen.ts @@ -8680,6 +8680,10 @@ export function useAFFiNEI18N(): { blobId: string; spaceId: string; }>): string; + /** + * `Blob is invalid.` + */ + ["error.BLOB_INVALID"](): string; /** * `Expected to publish a doc, not a Space.` */ diff --git a/packages/frontend/i18n/src/resources/en.json b/packages/frontend/i18n/src/resources/en.json index e76f64e7a7..99ee6e0b03 100644 --- a/packages/frontend/i18n/src/resources/en.json +++ b/packages/frontend/i18n/src/resources/en.json @@ -2163,6 +2163,7 @@ "error.INVALID_HISTORY_TIMESTAMP": "Invalid doc history timestamp provided.", "error.DOC_HISTORY_NOT_FOUND": "History of {{docId}} at {{timestamp}} under Space {{spaceId}}.", "error.BLOB_NOT_FOUND": "Blob {{blobId}} not found in Space {{spaceId}}.", + "error.BLOB_INVALID": "Blob is invalid.", "error.EXPECT_TO_PUBLISH_DOC": "Expected to publish a doc, not a Space.", "error.EXPECT_TO_REVOKE_PUBLIC_DOC": "Expected to revoke a public doc, not a Space.", "error.EXPECT_TO_GRANT_DOC_USER_ROLES": "Expect grant roles on doc {{docId}} under Space {{spaceId}}, not a Space.",