From b7ab49a26313fa278ce8129815fb75e840f20f58 Mon Sep 17 00:00:00 2001 From: darkskygit Date: Tue, 18 Mar 2025 10:24:59 +0000 Subject: [PATCH] chore(server): improve stream read (#10960) --- .../backend/server/src/base/utils/index.ts | 1 + .../backend/server/src/base/utils/stream.ts | 62 +++++++++++++++++++ .../src/core/workspaces/resolvers/blob.ts | 30 +-------- .../src/plugins/copilot/context/utils.ts | 27 +------- .../server/src/plugins/copilot/storage.ts | 25 +------- 5 files changed, 70 insertions(+), 75 deletions(-) create mode 100644 packages/backend/server/src/base/utils/stream.ts diff --git a/packages/backend/server/src/base/utils/index.ts b/packages/backend/server/src/base/utils/index.ts index 09cc714ed4..f1129b7357 100644 --- a/packages/backend/server/src/base/utils/index.ts +++ b/packages/backend/server/src/base/utils/index.ts @@ -1,4 +1,5 @@ export * from './promise'; export * from './request'; +export * from './stream'; export * from './types'; export * from './unit'; diff --git a/packages/backend/server/src/base/utils/stream.ts b/packages/backend/server/src/base/utils/stream.ts new file mode 100644 index 0000000000..e7b71d47e6 --- /dev/null +++ b/packages/backend/server/src/base/utils/stream.ts @@ -0,0 +1,62 @@ +import { Readable } from 'node:stream'; + +import { BlobQuotaExceeded, StorageQuotaExceeded } from '../error'; + +export type CheckExceededResult = + | { + storageQuotaExceeded: boolean; + blobQuotaExceeded: boolean; + } + | undefined; + +export async function readBuffer( + readable: Readable, + checkExceeded: (recvSize: number) => CheckExceededResult +): Promise { + return new Promise((resolve, reject) => { + const chunks: Uint8Array[] = []; + let totalSize = 0; + let result: CheckExceededResult; + + readable.on('data', chunk => { + totalSize += chunk.length; + + // check size after receive each chunk to avoid unnecessary memory usage + result = checkExceeded(totalSize); + if (result?.blobQuotaExceeded) { + reject(new BlobQuotaExceeded()); + } else if (result?.storageQuotaExceeded) { + reject(new StorageQuotaExceeded()); + } + + if (checkExceeded(totalSize)) { + reject(new BlobQuotaExceeded()); + readable.destroy(new BlobQuotaExceeded()); + return; + } + chunks.push(chunk); + }); + + readable.on('error', reject); + readable.on('end', () => { + const buffer = Buffer.concat(chunks, totalSize); + + if (checkExceeded(buffer.length)) { + reject(new BlobQuotaExceeded()); + } else { + resolve(buffer); + } + }); + }); +} + +export async function readBufferWithLimit( + readable: Readable, + limit: number +): Promise { + return readBuffer(readable, size => + size > limit + ? { blobQuotaExceeded: true, storageQuotaExceeded: false } + : undefined + ); +} diff --git a/packages/backend/server/src/core/workspaces/resolvers/blob.ts b/packages/backend/server/src/core/workspaces/resolvers/blob.ts index a188096b58..9c51d1eeb7 100644 --- a/packages/backend/server/src/core/workspaces/resolvers/blob.ts +++ b/packages/backend/server/src/core/workspaces/resolvers/blob.ts @@ -16,6 +16,7 @@ import type { FileUpload } from '../../../base'; import { BlobQuotaExceeded, CloudThrottlerGuard, + readBuffer, StorageQuotaExceeded, } from '../../../base'; import { CurrentUser } from '../../auth'; @@ -102,35 +103,8 @@ export class WorkspaceBlobResolver { } else if (result?.storageQuotaExceeded) { throw new StorageQuotaExceeded(); } - const buffer = await new Promise((resolve, reject) => { - const stream = blob.createReadStream(); - const chunks: Uint8Array[] = []; - stream.on('data', chunk => { - chunks.push(chunk); - // check size after receive each chunk to avoid unnecessary memory usage - const bufferSize = chunks.reduce((acc, cur) => acc + cur.length, 0); - result = checkExceeded(bufferSize); - if (result?.blobQuotaExceeded) { - reject(new BlobQuotaExceeded()); - } else if (result?.storageQuotaExceeded) { - reject(new StorageQuotaExceeded()); - } - }); - stream.on('error', reject); - stream.on('end', () => { - const buffer = Buffer.concat(chunks); - - result = checkExceeded(buffer.length); - if (result?.blobQuotaExceeded) { - reject(new BlobQuotaExceeded()); - } else if (result?.storageQuotaExceeded) { - reject(new StorageQuotaExceeded()); - } else { - resolve(buffer); - } - }); - }); + const buffer = await readBuffer(blob.createReadStream(), checkExceeded); await this.storage.put(workspaceId, blob.filename, buffer); return blob.filename; diff --git a/packages/backend/server/src/plugins/copilot/context/utils.ts b/packages/backend/server/src/plugins/copilot/context/utils.ts index 2bee6e37d5..d4068a23b5 100644 --- a/packages/backend/server/src/plugins/copilot/context/utils.ts +++ b/packages/backend/server/src/plugins/copilot/context/utils.ts @@ -2,7 +2,7 @@ import { Readable } from 'node:stream'; import { PrismaClient } from '@prisma/client'; -import { BlobQuotaExceeded } from '../../../base'; +import { readBufferWithLimit } from '../../../base'; import { MAX_EMBEDDABLE_SIZE } from './types'; export class GqlSignal implements AsyncDisposable { @@ -31,27 +31,6 @@ export async function checkEmbeddingAvailable( export function readStream( readable: Readable, maxSize = MAX_EMBEDDABLE_SIZE -): Promise> { - return new Promise>((resolve, reject) => { - const chunks: Uint8Array[] = []; - let totalSize = 0; - - readable.on('data', chunk => { - totalSize += chunk.length; - if (totalSize > maxSize) { - reject(new BlobQuotaExceeded()); - readable.destroy(new BlobQuotaExceeded()); - return; - } - chunks.push(chunk); - }); - - readable.on('end', () => { - resolve(Buffer.concat(chunks, totalSize)); - }); - - readable.on('error', err => { - reject(err); - }); - }); +): Promise { + return readBufferWithLimit(readable, maxSize); } diff --git a/packages/backend/server/src/plugins/copilot/storage.ts b/packages/backend/server/src/plugins/copilot/storage.ts index c004b911c7..0e6830dedc 100644 --- a/packages/backend/server/src/plugins/copilot/storage.ts +++ b/packages/backend/server/src/plugins/copilot/storage.ts @@ -8,6 +8,7 @@ import { CallMetric, Config, type FileUpload, + readBuffer, type StorageProvider, StorageProviderFactory, URLHelper, @@ -63,29 +64,7 @@ export class CopilotStorage { throw new BlobQuotaExceeded(); } - const buffer = await new Promise((resolve, reject) => { - const stream = blob.createReadStream(); - const chunks: Uint8Array[] = []; - stream.on('data', chunk => { - chunks.push(chunk); - - // check size after receive each chunk to avoid unnecessary memory usage - const bufferSize = chunks.reduce((acc, cur) => acc + cur.length, 0); - if (checkExceeded(bufferSize)) { - reject(new BlobQuotaExceeded()); - } - }); - stream.on('error', reject); - stream.on('end', () => { - const buffer = Buffer.concat(chunks); - - if (checkExceeded(buffer.length)) { - reject(new BlobQuotaExceeded()); - } else { - resolve(buffer); - } - }); - }); + const buffer = await readBuffer(blob.createReadStream(), checkExceeded); return { buffer,