fix(server): avoid get object content when syncing (#9402)

This commit is contained in:
liuyi
2024-12-28 00:25:15 +08:00
committed by GitHub
parent 6ebefbbf2b
commit 378db1054b
5 changed files with 75 additions and 37 deletions

View File

@@ -24,6 +24,10 @@ export interface WorkspaceEvents {
workspaceId: Workspace['id'];
key: string;
}>;
sync: Payload<{
workspaceId: Workspace['id'];
key: string;
}>;
};
}

View File

@@ -61,6 +61,15 @@ export class FsStorageProvider implements StorageProvider {
this.logger.verbose(`Object \`${key}\` put`);
}
async head(key: string) {
const metadata = this.readMetadata(key);
if (!metadata) {
this.logger.verbose(`Object \`${key}\` not found`);
return undefined;
}
return metadata;
}
async get(key: string): Promise<{
body?: Readable;
metadata?: GetObjectMetadata;

View File

@@ -34,6 +34,7 @@ export interface StorageProvider {
body: BlobInputType,
metadata?: PutObjectMetadata
): Promise<void>;
head(key: string): Promise<GetObjectMetadata | undefined>;
get(
key: string
): Promise<{ body?: BlobOutputType; metadata?: GetObjectMetadata }>;

View File

@@ -32,7 +32,7 @@ export class WorkspaceBlobStorage {
const meta: PutObjectMetadata = autoMetadata(blob);
await this.provider.put(`${workspaceId}/${key}`, blob, meta);
this.trySyncBlobMeta(workspaceId, key, {
await this.upsert(workspaceId, key, {
contentType: meta.contentType ?? 'application/octet-stream',
contentLength: blob.length,
lastModified: new Date(),
@@ -119,53 +119,48 @@ export class WorkspaceBlobStorage {
private trySyncBlobsMeta(workspaceId: string, blobs: ListObjectsMetadata[]) {
for (const blob of blobs) {
this.trySyncBlobMeta(workspaceId, blob.key);
this.event.emit('workspace.blob.sync', {
workspaceId,
key: blob.key,
});
}
}
private trySyncBlobMeta(
private async upsert(
workspaceId: string,
key: string,
meta?: GetObjectMetadata
meta: GetObjectMetadata
) {
setImmediate(() => {
this.syncBlobMeta(workspaceId, key, meta).catch(() => {
/* never throw */
});
await this.db.blob.upsert({
where: {
workspaceId_key: {
workspaceId,
key,
},
},
update: {
mime: meta.contentType,
size: meta.contentLength,
},
create: {
workspaceId,
key,
mime: meta.contentType,
size: meta.contentLength,
},
});
}
private async syncBlobMeta(
workspaceId: string,
key: string,
meta?: GetObjectMetadata
) {
@OnEvent('workspace.blob.sync')
async syncBlobMeta({
workspaceId,
key,
}: EventPayload<'workspace.blob.sync'>) {
try {
if (!meta) {
const blob = await this.get(workspaceId, key);
meta = blob.metadata;
blob.body?.destroy();
}
const meta = await this.provider.head(`${workspaceId}/${key}`);
if (meta) {
await this.db.blob.upsert({
where: {
workspaceId_key: {
workspaceId,
key,
},
},
update: {
mime: meta.contentType,
size: meta.contentLength,
},
create: {
workspaceId,
key,
mime: meta.contentType,
size: meta.contentLength,
},
});
await this.upsert(workspaceId, key, meta);
} else {
await this.db.blob.deleteMany({
where: {

View File

@@ -1,9 +1,10 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
/* oxlint-disable @typescript-eslint/no-non-null-assertion */
import { Readable } from 'node:stream';
import {
DeleteObjectCommand,
GetObjectCommand,
HeadObjectCommand,
ListObjectsV2Command,
NoSuchKey,
PutObjectCommand,
@@ -75,6 +76,34 @@ export class S3StorageProvider implements StorageProvider {
}
}
async head(key: string) {
try {
const obj = await this.client.send(
new HeadObjectCommand({
Bucket: this.bucket,
Key: key,
})
);
return {
contentType: obj.ContentType!,
contentLength: obj.ContentLength!,
lastModified: obj.LastModified!,
checksumCRC32: obj.ChecksumCRC32,
};
} catch (e) {
// 404
if (e instanceof NoSuchKey) {
this.logger.verbose(`Object \`${key}\` not found`);
return undefined;
} else {
throw new Error(`Failed to head object \`${key}\``, {
cause: e,
});
}
}
}
async get(key: string): Promise<{
body?: Readable;
metadata?: GetObjectMetadata;