chore(server): improve stream read (#10960)

This commit is contained in:
darkskygit
2025-03-18 10:24:59 +00:00
parent 0cb06668cd
commit b7ab49a263
5 changed files with 70 additions and 75 deletions

View File

@@ -1,4 +1,5 @@
export * from './promise';
export * from './request';
export * from './stream';
export * from './types';
export * from './unit';

View File

@@ -0,0 +1,62 @@
import { Readable } from 'node:stream';
import { BlobQuotaExceeded, StorageQuotaExceeded } from '../error';
export type CheckExceededResult =
| {
storageQuotaExceeded: boolean;
blobQuotaExceeded: boolean;
}
| undefined;
export async function readBuffer(
readable: Readable,
checkExceeded: (recvSize: number) => CheckExceededResult
): Promise<Buffer> {
return new Promise<Buffer>((resolve, reject) => {
const chunks: Uint8Array[] = [];
let totalSize = 0;
let result: CheckExceededResult;
readable.on('data', chunk => {
totalSize += chunk.length;
// check size after receive each chunk to avoid unnecessary memory usage
result = checkExceeded(totalSize);
if (result?.blobQuotaExceeded) {
reject(new BlobQuotaExceeded());
} else if (result?.storageQuotaExceeded) {
reject(new StorageQuotaExceeded());
}
if (checkExceeded(totalSize)) {
reject(new BlobQuotaExceeded());
readable.destroy(new BlobQuotaExceeded());
return;
}
chunks.push(chunk);
});
readable.on('error', reject);
readable.on('end', () => {
const buffer = Buffer.concat(chunks, totalSize);
if (checkExceeded(buffer.length)) {
reject(new BlobQuotaExceeded());
} else {
resolve(buffer);
}
});
});
}
export async function readBufferWithLimit(
readable: Readable,
limit: number
): Promise<Buffer> {
return readBuffer(readable, size =>
size > limit
? { blobQuotaExceeded: true, storageQuotaExceeded: false }
: undefined
);
}

View File

@@ -16,6 +16,7 @@ import type { FileUpload } from '../../../base';
import {
BlobQuotaExceeded,
CloudThrottlerGuard,
readBuffer,
StorageQuotaExceeded,
} from '../../../base';
import { CurrentUser } from '../../auth';
@@ -102,35 +103,8 @@ export class WorkspaceBlobResolver {
} else if (result?.storageQuotaExceeded) {
throw new StorageQuotaExceeded();
}
const buffer = await new Promise<Buffer>((resolve, reject) => {
const stream = blob.createReadStream();
const chunks: Uint8Array[] = [];
stream.on('data', chunk => {
chunks.push(chunk);
// check size after receive each chunk to avoid unnecessary memory usage
const bufferSize = chunks.reduce((acc, cur) => acc + cur.length, 0);
result = checkExceeded(bufferSize);
if (result?.blobQuotaExceeded) {
reject(new BlobQuotaExceeded());
} else if (result?.storageQuotaExceeded) {
reject(new StorageQuotaExceeded());
}
});
stream.on('error', reject);
stream.on('end', () => {
const buffer = Buffer.concat(chunks);
result = checkExceeded(buffer.length);
if (result?.blobQuotaExceeded) {
reject(new BlobQuotaExceeded());
} else if (result?.storageQuotaExceeded) {
reject(new StorageQuotaExceeded());
} else {
resolve(buffer);
}
});
});
const buffer = await readBuffer(blob.createReadStream(), checkExceeded);
await this.storage.put(workspaceId, blob.filename, buffer);
return blob.filename;

View File

@@ -2,7 +2,7 @@ import { Readable } from 'node:stream';
import { PrismaClient } from '@prisma/client';
import { BlobQuotaExceeded } from '../../../base';
import { readBufferWithLimit } from '../../../base';
import { MAX_EMBEDDABLE_SIZE } from './types';
export class GqlSignal implements AsyncDisposable {
@@ -31,27 +31,6 @@ export async function checkEmbeddingAvailable(
export function readStream(
readable: Readable,
maxSize = MAX_EMBEDDABLE_SIZE
): Promise<Buffer<ArrayBuffer>> {
return new Promise<Buffer<ArrayBuffer>>((resolve, reject) => {
const chunks: Uint8Array[] = [];
let totalSize = 0;
readable.on('data', chunk => {
totalSize += chunk.length;
if (totalSize > maxSize) {
reject(new BlobQuotaExceeded());
readable.destroy(new BlobQuotaExceeded());
return;
}
chunks.push(chunk);
});
readable.on('end', () => {
resolve(Buffer.concat(chunks, totalSize));
});
readable.on('error', err => {
reject(err);
});
});
): Promise<Buffer> {
return readBufferWithLimit(readable, maxSize);
}

View File

@@ -8,6 +8,7 @@ import {
CallMetric,
Config,
type FileUpload,
readBuffer,
type StorageProvider,
StorageProviderFactory,
URLHelper,
@@ -63,29 +64,7 @@ export class CopilotStorage {
throw new BlobQuotaExceeded();
}
const buffer = await new Promise<Buffer>((resolve, reject) => {
const stream = blob.createReadStream();
const chunks: Uint8Array[] = [];
stream.on('data', chunk => {
chunks.push(chunk);
// check size after receive each chunk to avoid unnecessary memory usage
const bufferSize = chunks.reduce((acc, cur) => acc + cur.length, 0);
if (checkExceeded(bufferSize)) {
reject(new BlobQuotaExceeded());
}
});
stream.on('error', reject);
stream.on('end', () => {
const buffer = Buffer.concat(chunks);
if (checkExceeded(buffer.length)) {
reject(new BlobQuotaExceeded());
} else {
resolve(buffer);
}
});
});
const buffer = await readBuffer(blob.createReadStream(), checkExceeded);
return {
buffer,