From 1eefd712dd5024defa94db2bca3e6b57d05cefa6 Mon Sep 17 00:00:00 2001 From: liuyi Date: Tue, 2 Jan 2024 07:21:01 +0000 Subject: [PATCH] feat(server): new storage provider (#5410) --- .../src/modules/storage/__tests__/fs.spec.ts | 113 ++++++++ .../src/modules/storage/providers/fs.ts | 256 ++++++++++++++++++ .../src/modules/storage/providers/index.ts | 34 +++ .../src/modules/storage/providers/provider.ts | 39 +++ .../src/modules/storage/providers/r2.ts | 14 + .../src/modules/storage/providers/s3.ts | 159 +++++++++++ .../src/modules/storage/providers/utils.ts | 49 ++++ .../src/modules/storage/wrappers/avatar.ts | 30 ++ .../src/modules/storage/wrappers/blob.ts | 45 +++ .../src/modules/storage/wrappers/index.ts | 2 + 10 files changed, 741 insertions(+) create mode 100644 packages/backend/server/src/modules/storage/__tests__/fs.spec.ts create mode 100644 packages/backend/server/src/modules/storage/providers/fs.ts create mode 100644 packages/backend/server/src/modules/storage/providers/index.ts create mode 100644 packages/backend/server/src/modules/storage/providers/provider.ts create mode 100644 packages/backend/server/src/modules/storage/providers/r2.ts create mode 100644 packages/backend/server/src/modules/storage/providers/s3.ts create mode 100644 packages/backend/server/src/modules/storage/providers/utils.ts create mode 100644 packages/backend/server/src/modules/storage/wrappers/avatar.ts create mode 100644 packages/backend/server/src/modules/storage/wrappers/blob.ts create mode 100644 packages/backend/server/src/modules/storage/wrappers/index.ts diff --git a/packages/backend/server/src/modules/storage/__tests__/fs.spec.ts b/packages/backend/server/src/modules/storage/__tests__/fs.spec.ts new file mode 100644 index 0000000000..242c7d5664 --- /dev/null +++ b/packages/backend/server/src/modules/storage/__tests__/fs.spec.ts @@ -0,0 +1,113 @@ +import { promises as fs } from 'node:fs'; +import { join } from 'node:path'; + +import test from 'ava'; +import { getStreamAsBuffer } from 'get-stream'; + +import { ListObjectsMetadata } from '../providers'; +import { FsStorageProvider } from '../providers/fs'; + +const config = { + path: join(process.cwd(), 'node_modules', '.cache/affine-test-storage'), +}; + +function createProvider() { + return new FsStorageProvider( + config, + 'test' + Math.random().toString(16).substring(2, 8) + ); +} + +function keys(list: ListObjectsMetadata[]) { + return list.map(i => i.key); +} + +async function randomPut( + provider: FsStorageProvider, + prefix = '' +): Promise { + const key = prefix + 'test-key-' + Math.random().toString(16).substring(2, 8); + const body = Buffer.from(key); + provider.put(key, body); + return key; +} + +test.after.always(() => { + fs.rm(config.path, { recursive: true }); +}); + +test('put & get', async t => { + const provider = createProvider(); + const key = 'testKey'; + const body = Buffer.from('testBody'); + await provider.put(key, body); + + const result = await provider.get(key); + + t.deepEqual(await getStreamAsBuffer(result.body!), body); + t.is(result.metadata?.contentLength, body.length); +}); + +test('list - one level', async t => { + const provider = createProvider(); + const list = await Promise.all( + Array.from({ length: 100 }).map(() => randomPut(provider)) + ); + list.sort(); + // random order, use set + const result = await provider.list(); + t.deepEqual(keys(result), list); + + const result2 = await provider.list('test-key'); + t.deepEqual(keys(result2), list); + + const result3 = await provider.list('testKey'); + t.is(result3.length, 0); +}); + +test('list recursively', async t => { + const provider = createProvider(); + + await Promise.all([ + Promise.all(Array.from({ length: 10 }).map(() => randomPut(provider))), + Promise.all( + Array.from({ length: 10 }).map(() => randomPut(provider, 'a/')) + ), + Promise.all( + Array.from({ length: 10 }).map(() => randomPut(provider, 'a/b/')) + ), + Promise.all( + Array.from({ length: 10 }).map(() => randomPut(provider, 'a/b/t/')) + ), + ]); + + const r1 = await provider.list(); + t.is(r1.length, 40); + + // contains all `a/xxx` and `a/b/xxx` and `a/b/c/xxx` + const r2 = await provider.list('a'); + t.is(r2.length, 30); + + // contains only `a/b/xxx` + const r3 = await provider.list('a/b'); + const r4 = await provider.list('a/b/'); + t.is(r3.length, 20); + t.deepEqual(r3, r4); + + // prefix is not ended with '/', it's open to all files and sub dirs + // contains all `a/b/t/xxx` and `a/b/t{xxxx}` + const r5 = await provider.list('a/b/t'); + + t.is(r5.length, 20); +}); + +test.only('delete', async t => { + const provider = createProvider(); + const key = 'testKey'; + const body = Buffer.from('testBody'); + await provider.put(key, body); + + await provider.delete(key); + + await t.throwsAsync(() => fs.access(join(config.path, provider.bucket, key))); +}); diff --git a/packages/backend/server/src/modules/storage/providers/fs.ts b/packages/backend/server/src/modules/storage/providers/fs.ts new file mode 100644 index 0000000000..6b4b70d3a3 --- /dev/null +++ b/packages/backend/server/src/modules/storage/providers/fs.ts @@ -0,0 +1,256 @@ +import { + accessSync, + constants, + createReadStream, + Dirent, + mkdirSync, + readdirSync, + readFileSync, + rmSync, + statSync, + writeFileSync, +} from 'node:fs'; +import { join, parse, resolve } from 'node:path'; + +import { Logger } from '@nestjs/common'; +import { Readable } from 'stream'; + +import { FsStorageConfig } from '../../../config/storage'; +import { + BlobInputType, + GetObjectMetadata, + ListObjectsMetadata, + PutObjectMetadata, + StorageProvider, +} from './provider'; +import { autoMetadata, toBuffer } from './utils'; + +function escapeKey(key: string): string { + // avoid '../' and './' in key + return key.replace(/\.?\.[/\\]/g, '%'); +} + +export class FsStorageProvider implements StorageProvider { + private readonly path: string; + private readonly logger: Logger; + + constructor( + config: FsStorageConfig, + public readonly bucket: string + ) { + this.path = resolve(config.path, bucket); + this.ensureAvailability(); + + this.logger = new Logger(`${FsStorageProvider.name}:${bucket}`); + } + + async put( + key: string, + body: BlobInputType, + metadata: PutObjectMetadata = {} + ): Promise { + key = escapeKey(key); + const blob = await toBuffer(body); + + // write object + this.writeObject(key, blob); + // write metadata + await this.writeMetadata(key, blob, metadata); + this.logger.verbose(`Object \`${key}\` put`); + } + + async get(key: string): Promise<{ + body?: Readable; + metadata?: GetObjectMetadata; + }> { + key = escapeKey(key); + + try { + const metadata = this.readMetadata(key); + const stream = this.readObject(this.join(key)); + this.logger.verbose(`Read object \`${key}\``); + return { + body: stream, + metadata, + }; + } catch (e) { + this.logger.error(`Failed to read object \`${key}\``, e); + return {}; + } + } + + async list(prefix?: string): Promise { + // prefix cases: + // - `undefined`: list all objects + // - `a/b`: list objects under dir `a` with prefix `b`, `b` might be a dir under `a` as well. + // - `a/b/` list objects under dir `a/b` + + // read dir recursively and filter out '.metadata.json' files + let dir = this.path; + if (prefix) { + prefix = escapeKey(prefix); + const parts = prefix.split(/[/\\]/); + // for prefix `a/b/c`, move `a/b` to dir and `c` to key prefix + if (parts.length > 1) { + dir = join(dir, ...parts.slice(0, -1)); + prefix = parts[parts.length - 1]; + } + } + + const results: ListObjectsMetadata[] = []; + async function getFiles(dir: string, prefix?: string): Promise { + try { + const entries: Dirent[] = readdirSync(dir, { withFileTypes: true }); + + for (const entry of entries) { + const res = join(dir, entry.name); + + if (entry.isDirectory()) { + if (!prefix || entry.name.startsWith(prefix)) { + await getFiles(res); + } + } else if ( + (!prefix || entry.name.startsWith(prefix)) && + !entry.name.endsWith('.metadata.json') + ) { + const stat = statSync(res); + results.push({ + key: res, + lastModified: stat.mtime, + size: stat.size, + }); + } + } + } catch (e) { + // failed to read dir, stop recursion + } + } + + await getFiles(dir, prefix); + + // trim path with `this.path` prefix + results.forEach(r => (r.key = r.key.slice(this.path.length + 1))); + + return results; + } + + delete(key: string): Promise { + key = escapeKey(key); + + try { + rmSync(this.join(key), { force: true }); + rmSync(this.join(`${key}.metadata.json`), { force: true }); + } catch (e) { + throw new Error(`Failed to delete object \`${key}\``, { + cause: e, + }); + } + + this.logger.verbose(`Object \`${key}\` deleted`); + + return Promise.resolve(); + } + + ensureAvailability() { + // check stats + const stats = statSync(this.path, { + throwIfNoEntry: false, + }); + + // not existing, create it + if (!stats) { + try { + mkdirSync(this.path, { recursive: true }); + } catch (e) { + throw new Error( + `Failed to create target directory for fs storage provider: ${this.path}`, + { + cause: e, + } + ); + } + } else if (stats.isDirectory()) { + // the target directory has already existed, check if it is readable & writable + try { + accessSync(this.path, constants.W_OK | constants.R_OK); + } catch (e) { + throw new Error( + `The target directory for fs storage provider has already existed, but it is not readable & writable: ${this.path}`, + { + cause: e, + } + ); + } + } else if (stats.isFile()) { + throw new Error( + `The target directory for fs storage provider is a file: ${this.path}` + ); + } + } + + private join(...paths: string[]) { + return join(this.path, ...paths); + } + + private readObject(file: string): Readable | undefined { + const state = statSync(file, { throwIfNoEntry: false }); + + if (state?.isFile()) { + return createReadStream(file); + } + + return undefined; + } + + private writeObject(key: string, blob: Buffer) { + const path = this.join(key); + mkdirSync(parse(path).dir, { recursive: true }); + writeFileSync(path, blob); + } + + private async writeMetadata( + key: string, + blob: Buffer, + raw: PutObjectMetadata + ) { + try { + const metadata = await autoMetadata(blob, raw); + + if (raw.checksumCRC32 && metadata.checksumCRC32 !== raw.checksumCRC32) { + throw new Error( + 'The checksum of the uploaded file is not matched with the one you provide, the file may be corrupted and the uploading will not be processed.' + ); + } + + writeFileSync( + this.join(`${key}.metadata.json`), + JSON.stringify({ + ...metadata, + lastModified: Date.now(), + }) + ); + } catch (e) { + this.logger.warn(`Failed to write metadata of object \`${key}\``, e); + } + } + + private readMetadata(key: string): GetObjectMetadata | undefined { + try { + const raw = JSON.parse( + readFileSync(this.join(`${key}.metadata.json`), { + encoding: 'utf-8', + }) + ); + + return { + ...raw, + lastModified: new Date(raw.lastModified), + expires: raw.expires ? new Date(raw.expires) : undefined, + }; + } catch (e) { + this.logger.warn(`Failed to read metadata of object \`${key}\``, e); + + return; + } + } +} diff --git a/packages/backend/server/src/modules/storage/providers/index.ts b/packages/backend/server/src/modules/storage/providers/index.ts new file mode 100644 index 0000000000..bfcc51cda2 --- /dev/null +++ b/packages/backend/server/src/modules/storage/providers/index.ts @@ -0,0 +1,34 @@ +import { AFFiNEStorageConfig, Storages } from '../../../config/storage'; +import { FsStorageProvider } from './fs'; +import type { StorageProvider } from './provider'; +import { R2StorageProvider } from './r2'; +import { S3StorageProvider } from './s3'; + +export function createStorageProvider( + config: AFFiNEStorageConfig, + storage: Storages +): StorageProvider { + const storageConfig = config.storages[storage]; + const providerConfig = config.providers[storageConfig.provider] as any; + if (!providerConfig) { + throw new Error( + `Failed to create ${storageConfig.provider} storage, configuration not correctly set` + ); + } + + if (storageConfig.provider === 's3') { + return new S3StorageProvider(providerConfig, storageConfig.bucket); + } + + if (storageConfig.provider === 'r2') { + return new R2StorageProvider(providerConfig, storageConfig.bucket); + } + + if (storageConfig.provider === 'fs') { + return new FsStorageProvider(providerConfig, storageConfig.bucket); + } + + throw new Error(`Unknown storage provider type: ${storageConfig.provider}`); +} + +export type * from './provider'; diff --git a/packages/backend/server/src/modules/storage/providers/provider.ts b/packages/backend/server/src/modules/storage/providers/provider.ts new file mode 100644 index 0000000000..f6663843d6 --- /dev/null +++ b/packages/backend/server/src/modules/storage/providers/provider.ts @@ -0,0 +1,39 @@ +import type { Readable } from 'node:stream'; + +export interface GetObjectMetadata { + /** + * @default 'application/octet-stream' + */ + contentType: string; + contentLength: number; + lastModified: Date; + checksumCRC32: string; +} + +export interface PutObjectMetadata { + contentType?: string; + contentLength?: number; + checksumCRC32?: string; +} + +export interface ListObjectsMetadata { + key: string; + lastModified: Date; + size: number; +} + +export type BlobInputType = Buffer | Readable | string; +export type BlobOutputType = Readable; + +export interface StorageProvider { + put( + key: string, + body: BlobInputType, + metadata?: PutObjectMetadata + ): Promise; + get( + key: string + ): Promise<{ body?: BlobOutputType; metadata?: GetObjectMetadata }>; + list(prefix?: string): Promise; + delete(key: string): Promise; +} diff --git a/packages/backend/server/src/modules/storage/providers/r2.ts b/packages/backend/server/src/modules/storage/providers/r2.ts new file mode 100644 index 0000000000..670a59ba20 --- /dev/null +++ b/packages/backend/server/src/modules/storage/providers/r2.ts @@ -0,0 +1,14 @@ +import { R2StorageConfig } from '../../../config/storage'; +import { S3StorageProvider } from './s3'; + +export class R2StorageProvider extends S3StorageProvider { + constructor(config: R2StorageConfig, bucket: string) { + super( + { + ...config, + endpoint: `https://${config.accountId}.r2.cloudflarestorage.com`, + }, + bucket + ); + } +} diff --git a/packages/backend/server/src/modules/storage/providers/s3.ts b/packages/backend/server/src/modules/storage/providers/s3.ts new file mode 100644 index 0000000000..4597ad0127 --- /dev/null +++ b/packages/backend/server/src/modules/storage/providers/s3.ts @@ -0,0 +1,159 @@ +/* eslint-disable @typescript-eslint/no-non-null-assertion */ +import { Readable } from 'node:stream'; + +import { + DeleteObjectCommand, + GetObjectCommand, + ListObjectsV2Command, + PutObjectCommand, + S3Client, +} from '@aws-sdk/client-s3'; +import { Logger } from '@nestjs/common'; + +import { S3StorageConfig } from '../../../config/storage'; +import { + BlobInputType, + GetObjectMetadata, + ListObjectsMetadata, + PutObjectMetadata, + StorageProvider, +} from './provider'; +import { autoMetadata, toBuffer } from './utils'; + +export class S3StorageProvider implements StorageProvider { + logger: Logger; + client: S3Client; + constructor( + config: S3StorageConfig, + public readonly bucket: string + ) { + this.client = new S3Client(config); + this.logger = new Logger(`${S3StorageProvider.name}:${bucket}`); + } + + async put( + key: string, + body: BlobInputType, + metadata: PutObjectMetadata = {} + ): Promise { + const blob = await toBuffer(body); + + metadata = await autoMetadata(blob, metadata); + + try { + await this.client.send( + new PutObjectCommand({ + Bucket: this.bucket, + Key: key, + Body: body, + + // metadata + ContentType: metadata.contentType, + ContentLength: metadata.contentLength, + ChecksumCRC32: metadata.checksumCRC32, + }) + ); + + this.logger.verbose(`Object \`${key}\` put`); + } catch (e) { + throw new Error(`Failed to put object \`${key}\``, { + cause: e, + }); + } + } + + async get(key: string): Promise<{ + body?: Readable; + metadata?: GetObjectMetadata; + }> { + try { + const obj = await this.client.send( + new GetObjectCommand({ + Bucket: this.bucket, + Key: key, + }) + ); + + if (!obj.Body) { + this.logger.verbose(`Object \`${key}\` not found`); + return {}; + } + + this.logger.verbose(`Read object \`${key}\``); + return { + // @ts-expect-errors ignore browser response type `Blob` + body: obj.Body, + metadata: { + // always set when putting object + contentType: obj.ContentType!, + contentLength: obj.ContentLength!, + checksumCRC32: obj.ChecksumCRC32!, + lastModified: obj.LastModified!, + }, + }; + } catch (e) { + throw new Error(`Failed to read object \`${key}\``, { + cause: e, + }); + } + } + + async list(prefix?: string): Promise { + // continuationToken should be `string | undefined`, + // but TypeScript will fail on type infer in the code below. + // Seems to be a bug in TypeScript + let continuationToken: any = undefined; + let hasMore = true; + let result: ListObjectsMetadata[] = []; + + try { + while (hasMore) { + const listResult = await this.client.send( + new ListObjectsV2Command({ + Bucket: this.bucket, + Prefix: prefix, + ContinuationToken: continuationToken, + }) + ); + + if (listResult.Contents?.length) { + result = result.concat( + listResult.Contents.map(r => ({ + key: r.Key!, + lastModified: r.LastModified!, + size: r.Size!, + })) + ); + } + + // has more items not listed + hasMore = !!listResult.IsTruncated; + continuationToken = listResult.NextContinuationToken; + } + + this.logger.verbose( + `List ${result.length} objects with prefix \`${prefix}\`` + ); + return result; + } catch (e) { + throw new Error(`Failed to list objects with prefix \`${prefix}\``, { + cause: e, + }); + } + } + + async delete(key: string): Promise { + try { + await this.client.send( + new DeleteObjectCommand({ + Bucket: this.bucket, + Key: key, + }) + ); + } catch (e) { + throw new Error(`Failed to delete object \`${key}\``, { + cause: e, + }); + } + } +} diff --git a/packages/backend/server/src/modules/storage/providers/utils.ts b/packages/backend/server/src/modules/storage/providers/utils.ts new file mode 100644 index 0000000000..c1b3355a3f --- /dev/null +++ b/packages/backend/server/src/modules/storage/providers/utils.ts @@ -0,0 +1,49 @@ +import { Readable } from 'node:stream'; + +import { crc32 } from '@node-rs/crc32'; +import { fileTypeFromBuffer } from 'file-type'; +import { getStreamAsBuffer } from 'get-stream'; + +import { BlobInputType, PutObjectMetadata } from './provider'; + +export async function toBuffer(input: BlobInputType): Promise { + return input instanceof Readable + ? await getStreamAsBuffer(input) + : input instanceof Buffer + ? input + : Buffer.from(input); +} + +export async function autoMetadata( + blob: Buffer, + raw: PutObjectMetadata +): Promise { + const metadata = { + ...raw, + }; + try { + // length + if (!metadata.contentLength) { + metadata.contentLength = blob.length; + } + + // checksum + if (!metadata.checksumCRC32) { + metadata.checksumCRC32 = crc32(blob).toString(16); + } + + // mime type + if (!metadata.contentType) { + try { + const typeResult = await fileTypeFromBuffer(blob); + metadata.contentType = typeResult?.mime ?? 'application/octet-stream'; + } catch { + // ignore + } + } + + return metadata; + } catch (e) { + return metadata; + } +} diff --git a/packages/backend/server/src/modules/storage/wrappers/avatar.ts b/packages/backend/server/src/modules/storage/wrappers/avatar.ts new file mode 100644 index 0000000000..edc6e0a8c5 --- /dev/null +++ b/packages/backend/server/src/modules/storage/wrappers/avatar.ts @@ -0,0 +1,30 @@ +import { Injectable } from '@nestjs/common'; + +import { Config } from '../../../config'; +import { + BlobInputType, + createStorageProvider, + PutObjectMetadata, + StorageProvider, +} from '../providers'; + +@Injectable() +export class AvatarStorage { + public readonly provider: StorageProvider; + + constructor({ storage }: Config) { + this.provider = createStorageProvider(storage, 'avatar'); + } + + put(key: string, blob: BlobInputType, metadata?: PutObjectMetadata) { + return this.provider.put(key, blob, metadata); + } + + get(key: string) { + return this.provider.get(key); + } + + async delete(key: string) { + return this.provider.delete(key); + } +} diff --git a/packages/backend/server/src/modules/storage/wrappers/blob.ts b/packages/backend/server/src/modules/storage/wrappers/blob.ts new file mode 100644 index 0000000000..1d24526fe0 --- /dev/null +++ b/packages/backend/server/src/modules/storage/wrappers/blob.ts @@ -0,0 +1,45 @@ +import { Injectable } from '@nestjs/common'; + +import { Config } from '../../../config'; +import { + BlobInputType, + createStorageProvider, + StorageProvider, +} from '../providers'; + +@Injectable() +export class WorkspaceBlobStorage { + public readonly provider: StorageProvider; + constructor({ storage }: Config) { + this.provider = createStorageProvider(storage, 'blob'); + } + + put(workspaceId: string, key: string, blob: BlobInputType) { + return this.provider.put(`${workspaceId}/${key}`, blob); + } + + get(workspaceId: string, key: string) { + return this.provider.get(`${workspaceId}/${key}`); + } + + async list(workspaceId: string) { + const blobs = await this.provider.list(workspaceId + '/'); + + blobs.forEach(item => { + // trim workspace prefix + item.key = item.key.slice(workspaceId.length + 1); + }); + + return blobs; + } + + async delete(workspaceId: string, key: string) { + return this.provider.delete(`${workspaceId}/${key}`); + } + + async totalSize(workspaceId: string) { + const blobs = await this.list(workspaceId); + // how could we ignore the ones get soft-deleted? + return blobs.reduce((acc, item) => acc + item.size, 0); + } +} diff --git a/packages/backend/server/src/modules/storage/wrappers/index.ts b/packages/backend/server/src/modules/storage/wrappers/index.ts new file mode 100644 index 0000000000..250e9a8cc3 --- /dev/null +++ b/packages/backend/server/src/modules/storage/wrappers/index.ts @@ -0,0 +1,2 @@ +export { AvatarStorage } from './avatar'; +export { WorkspaceBlobStorage } from './blob';