feat(server): new storage provider (#5410)

This commit is contained in:
liuyi
2024-01-02 07:21:01 +00:00
parent abcca8b09e
commit 1eefd712dd
10 changed files with 741 additions and 0 deletions

View File

@@ -0,0 +1,113 @@
import { promises as fs } from 'node:fs';
import { join } from 'node:path';
import test from 'ava';
import { getStreamAsBuffer } from 'get-stream';
import { ListObjectsMetadata } from '../providers';
import { FsStorageProvider } from '../providers/fs';
const config = {
path: join(process.cwd(), 'node_modules', '.cache/affine-test-storage'),
};
function createProvider() {
return new FsStorageProvider(
config,
'test' + Math.random().toString(16).substring(2, 8)
);
}
function keys(list: ListObjectsMetadata[]) {
return list.map(i => i.key);
}
async function randomPut(
provider: FsStorageProvider,
prefix = ''
): Promise<string> {
const key = prefix + 'test-key-' + Math.random().toString(16).substring(2, 8);
const body = Buffer.from(key);
provider.put(key, body);
return key;
}
test.after.always(() => {
fs.rm(config.path, { recursive: true });
});
test('put & get', async t => {
const provider = createProvider();
const key = 'testKey';
const body = Buffer.from('testBody');
await provider.put(key, body);
const result = await provider.get(key);
t.deepEqual(await getStreamAsBuffer(result.body!), body);
t.is(result.metadata?.contentLength, body.length);
});
test('list - one level', async t => {
const provider = createProvider();
const list = await Promise.all(
Array.from({ length: 100 }).map(() => randomPut(provider))
);
list.sort();
// random order, use set
const result = await provider.list();
t.deepEqual(keys(result), list);
const result2 = await provider.list('test-key');
t.deepEqual(keys(result2), list);
const result3 = await provider.list('testKey');
t.is(result3.length, 0);
});
test('list recursively', async t => {
const provider = createProvider();
await Promise.all([
Promise.all(Array.from({ length: 10 }).map(() => randomPut(provider))),
Promise.all(
Array.from({ length: 10 }).map(() => randomPut(provider, 'a/'))
),
Promise.all(
Array.from({ length: 10 }).map(() => randomPut(provider, 'a/b/'))
),
Promise.all(
Array.from({ length: 10 }).map(() => randomPut(provider, 'a/b/t/'))
),
]);
const r1 = await provider.list();
t.is(r1.length, 40);
// contains all `a/xxx` and `a/b/xxx` and `a/b/c/xxx`
const r2 = await provider.list('a');
t.is(r2.length, 30);
// contains only `a/b/xxx`
const r3 = await provider.list('a/b');
const r4 = await provider.list('a/b/');
t.is(r3.length, 20);
t.deepEqual(r3, r4);
// prefix is not ended with '/', it's open to all files and sub dirs
// contains all `a/b/t/xxx` and `a/b/t{xxxx}`
const r5 = await provider.list('a/b/t');
t.is(r5.length, 20);
});
test.only('delete', async t => {
const provider = createProvider();
const key = 'testKey';
const body = Buffer.from('testBody');
await provider.put(key, body);
await provider.delete(key);
await t.throwsAsync(() => fs.access(join(config.path, provider.bucket, key)));
});

View File

@@ -0,0 +1,256 @@
import {
accessSync,
constants,
createReadStream,
Dirent,
mkdirSync,
readdirSync,
readFileSync,
rmSync,
statSync,
writeFileSync,
} from 'node:fs';
import { join, parse, resolve } from 'node:path';
import { Logger } from '@nestjs/common';
import { Readable } from 'stream';
import { FsStorageConfig } from '../../../config/storage';
import {
BlobInputType,
GetObjectMetadata,
ListObjectsMetadata,
PutObjectMetadata,
StorageProvider,
} from './provider';
import { autoMetadata, toBuffer } from './utils';
function escapeKey(key: string): string {
// avoid '../' and './' in key
return key.replace(/\.?\.[/\\]/g, '%');
}
export class FsStorageProvider implements StorageProvider {
private readonly path: string;
private readonly logger: Logger;
constructor(
config: FsStorageConfig,
public readonly bucket: string
) {
this.path = resolve(config.path, bucket);
this.ensureAvailability();
this.logger = new Logger(`${FsStorageProvider.name}:${bucket}`);
}
async put(
key: string,
body: BlobInputType,
metadata: PutObjectMetadata = {}
): Promise<void> {
key = escapeKey(key);
const blob = await toBuffer(body);
// write object
this.writeObject(key, blob);
// write metadata
await this.writeMetadata(key, blob, metadata);
this.logger.verbose(`Object \`${key}\` put`);
}
async get(key: string): Promise<{
body?: Readable;
metadata?: GetObjectMetadata;
}> {
key = escapeKey(key);
try {
const metadata = this.readMetadata(key);
const stream = this.readObject(this.join(key));
this.logger.verbose(`Read object \`${key}\``);
return {
body: stream,
metadata,
};
} catch (e) {
this.logger.error(`Failed to read object \`${key}\``, e);
return {};
}
}
async list(prefix?: string): Promise<ListObjectsMetadata[]> {
// prefix cases:
// - `undefined`: list all objects
// - `a/b`: list objects under dir `a` with prefix `b`, `b` might be a dir under `a` as well.
// - `a/b/` list objects under dir `a/b`
// read dir recursively and filter out '.metadata.json' files
let dir = this.path;
if (prefix) {
prefix = escapeKey(prefix);
const parts = prefix.split(/[/\\]/);
// for prefix `a/b/c`, move `a/b` to dir and `c` to key prefix
if (parts.length > 1) {
dir = join(dir, ...parts.slice(0, -1));
prefix = parts[parts.length - 1];
}
}
const results: ListObjectsMetadata[] = [];
async function getFiles(dir: string, prefix?: string): Promise<void> {
try {
const entries: Dirent[] = readdirSync(dir, { withFileTypes: true });
for (const entry of entries) {
const res = join(dir, entry.name);
if (entry.isDirectory()) {
if (!prefix || entry.name.startsWith(prefix)) {
await getFiles(res);
}
} else if (
(!prefix || entry.name.startsWith(prefix)) &&
!entry.name.endsWith('.metadata.json')
) {
const stat = statSync(res);
results.push({
key: res,
lastModified: stat.mtime,
size: stat.size,
});
}
}
} catch (e) {
// failed to read dir, stop recursion
}
}
await getFiles(dir, prefix);
// trim path with `this.path` prefix
results.forEach(r => (r.key = r.key.slice(this.path.length + 1)));
return results;
}
delete(key: string): Promise<void> {
key = escapeKey(key);
try {
rmSync(this.join(key), { force: true });
rmSync(this.join(`${key}.metadata.json`), { force: true });
} catch (e) {
throw new Error(`Failed to delete object \`${key}\``, {
cause: e,
});
}
this.logger.verbose(`Object \`${key}\` deleted`);
return Promise.resolve();
}
ensureAvailability() {
// check stats
const stats = statSync(this.path, {
throwIfNoEntry: false,
});
// not existing, create it
if (!stats) {
try {
mkdirSync(this.path, { recursive: true });
} catch (e) {
throw new Error(
`Failed to create target directory for fs storage provider: ${this.path}`,
{
cause: e,
}
);
}
} else if (stats.isDirectory()) {
// the target directory has already existed, check if it is readable & writable
try {
accessSync(this.path, constants.W_OK | constants.R_OK);
} catch (e) {
throw new Error(
`The target directory for fs storage provider has already existed, but it is not readable & writable: ${this.path}`,
{
cause: e,
}
);
}
} else if (stats.isFile()) {
throw new Error(
`The target directory for fs storage provider is a file: ${this.path}`
);
}
}
private join(...paths: string[]) {
return join(this.path, ...paths);
}
private readObject(file: string): Readable | undefined {
const state = statSync(file, { throwIfNoEntry: false });
if (state?.isFile()) {
return createReadStream(file);
}
return undefined;
}
private writeObject(key: string, blob: Buffer) {
const path = this.join(key);
mkdirSync(parse(path).dir, { recursive: true });
writeFileSync(path, blob);
}
private async writeMetadata(
key: string,
blob: Buffer,
raw: PutObjectMetadata
) {
try {
const metadata = await autoMetadata(blob, raw);
if (raw.checksumCRC32 && metadata.checksumCRC32 !== raw.checksumCRC32) {
throw new Error(
'The checksum of the uploaded file is not matched with the one you provide, the file may be corrupted and the uploading will not be processed.'
);
}
writeFileSync(
this.join(`${key}.metadata.json`),
JSON.stringify({
...metadata,
lastModified: Date.now(),
})
);
} catch (e) {
this.logger.warn(`Failed to write metadata of object \`${key}\``, e);
}
}
private readMetadata(key: string): GetObjectMetadata | undefined {
try {
const raw = JSON.parse(
readFileSync(this.join(`${key}.metadata.json`), {
encoding: 'utf-8',
})
);
return {
...raw,
lastModified: new Date(raw.lastModified),
expires: raw.expires ? new Date(raw.expires) : undefined,
};
} catch (e) {
this.logger.warn(`Failed to read metadata of object \`${key}\``, e);
return;
}
}
}

View File

@@ -0,0 +1,34 @@
import { AFFiNEStorageConfig, Storages } from '../../../config/storage';
import { FsStorageProvider } from './fs';
import type { StorageProvider } from './provider';
import { R2StorageProvider } from './r2';
import { S3StorageProvider } from './s3';
export function createStorageProvider(
config: AFFiNEStorageConfig,
storage: Storages
): StorageProvider {
const storageConfig = config.storages[storage];
const providerConfig = config.providers[storageConfig.provider] as any;
if (!providerConfig) {
throw new Error(
`Failed to create ${storageConfig.provider} storage, configuration not correctly set`
);
}
if (storageConfig.provider === 's3') {
return new S3StorageProvider(providerConfig, storageConfig.bucket);
}
if (storageConfig.provider === 'r2') {
return new R2StorageProvider(providerConfig, storageConfig.bucket);
}
if (storageConfig.provider === 'fs') {
return new FsStorageProvider(providerConfig, storageConfig.bucket);
}
throw new Error(`Unknown storage provider type: ${storageConfig.provider}`);
}
export type * from './provider';

View File

@@ -0,0 +1,39 @@
import type { Readable } from 'node:stream';
export interface GetObjectMetadata {
/**
* @default 'application/octet-stream'
*/
contentType: string;
contentLength: number;
lastModified: Date;
checksumCRC32: string;
}
export interface PutObjectMetadata {
contentType?: string;
contentLength?: number;
checksumCRC32?: string;
}
export interface ListObjectsMetadata {
key: string;
lastModified: Date;
size: number;
}
export type BlobInputType = Buffer | Readable | string;
export type BlobOutputType = Readable;
export interface StorageProvider {
put(
key: string,
body: BlobInputType,
metadata?: PutObjectMetadata
): Promise<void>;
get(
key: string
): Promise<{ body?: BlobOutputType; metadata?: GetObjectMetadata }>;
list(prefix?: string): Promise<ListObjectsMetadata[]>;
delete(key: string): Promise<void>;
}

View File

@@ -0,0 +1,14 @@
import { R2StorageConfig } from '../../../config/storage';
import { S3StorageProvider } from './s3';
export class R2StorageProvider extends S3StorageProvider {
constructor(config: R2StorageConfig, bucket: string) {
super(
{
...config,
endpoint: `https://${config.accountId}.r2.cloudflarestorage.com`,
},
bucket
);
}
}

View File

@@ -0,0 +1,159 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import { Readable } from 'node:stream';
import {
DeleteObjectCommand,
GetObjectCommand,
ListObjectsV2Command,
PutObjectCommand,
S3Client,
} from '@aws-sdk/client-s3';
import { Logger } from '@nestjs/common';
import { S3StorageConfig } from '../../../config/storage';
import {
BlobInputType,
GetObjectMetadata,
ListObjectsMetadata,
PutObjectMetadata,
StorageProvider,
} from './provider';
import { autoMetadata, toBuffer } from './utils';
export class S3StorageProvider implements StorageProvider {
logger: Logger;
client: S3Client;
constructor(
config: S3StorageConfig,
public readonly bucket: string
) {
this.client = new S3Client(config);
this.logger = new Logger(`${S3StorageProvider.name}:${bucket}`);
}
async put(
key: string,
body: BlobInputType,
metadata: PutObjectMetadata = {}
): Promise<void> {
const blob = await toBuffer(body);
metadata = await autoMetadata(blob, metadata);
try {
await this.client.send(
new PutObjectCommand({
Bucket: this.bucket,
Key: key,
Body: body,
// metadata
ContentType: metadata.contentType,
ContentLength: metadata.contentLength,
ChecksumCRC32: metadata.checksumCRC32,
})
);
this.logger.verbose(`Object \`${key}\` put`);
} catch (e) {
throw new Error(`Failed to put object \`${key}\``, {
cause: e,
});
}
}
async get(key: string): Promise<{
body?: Readable;
metadata?: GetObjectMetadata;
}> {
try {
const obj = await this.client.send(
new GetObjectCommand({
Bucket: this.bucket,
Key: key,
})
);
if (!obj.Body) {
this.logger.verbose(`Object \`${key}\` not found`);
return {};
}
this.logger.verbose(`Read object \`${key}\``);
return {
// @ts-expect-errors ignore browser response type `Blob`
body: obj.Body,
metadata: {
// always set when putting object
contentType: obj.ContentType!,
contentLength: obj.ContentLength!,
checksumCRC32: obj.ChecksumCRC32!,
lastModified: obj.LastModified!,
},
};
} catch (e) {
throw new Error(`Failed to read object \`${key}\``, {
cause: e,
});
}
}
async list(prefix?: string): Promise<ListObjectsMetadata[]> {
// continuationToken should be `string | undefined`,
// but TypeScript will fail on type infer in the code below.
// Seems to be a bug in TypeScript
let continuationToken: any = undefined;
let hasMore = true;
let result: ListObjectsMetadata[] = [];
try {
while (hasMore) {
const listResult = await this.client.send(
new ListObjectsV2Command({
Bucket: this.bucket,
Prefix: prefix,
ContinuationToken: continuationToken,
})
);
if (listResult.Contents?.length) {
result = result.concat(
listResult.Contents.map(r => ({
key: r.Key!,
lastModified: r.LastModified!,
size: r.Size!,
}))
);
}
// has more items not listed
hasMore = !!listResult.IsTruncated;
continuationToken = listResult.NextContinuationToken;
}
this.logger.verbose(
`List ${result.length} objects with prefix \`${prefix}\``
);
return result;
} catch (e) {
throw new Error(`Failed to list objects with prefix \`${prefix}\``, {
cause: e,
});
}
}
async delete(key: string): Promise<void> {
try {
await this.client.send(
new DeleteObjectCommand({
Bucket: this.bucket,
Key: key,
})
);
} catch (e) {
throw new Error(`Failed to delete object \`${key}\``, {
cause: e,
});
}
}
}

View File

@@ -0,0 +1,49 @@
import { Readable } from 'node:stream';
import { crc32 } from '@node-rs/crc32';
import { fileTypeFromBuffer } from 'file-type';
import { getStreamAsBuffer } from 'get-stream';
import { BlobInputType, PutObjectMetadata } from './provider';
export async function toBuffer(input: BlobInputType): Promise<Buffer> {
return input instanceof Readable
? await getStreamAsBuffer(input)
: input instanceof Buffer
? input
: Buffer.from(input);
}
export async function autoMetadata(
blob: Buffer,
raw: PutObjectMetadata
): Promise<PutObjectMetadata> {
const metadata = {
...raw,
};
try {
// length
if (!metadata.contentLength) {
metadata.contentLength = blob.length;
}
// checksum
if (!metadata.checksumCRC32) {
metadata.checksumCRC32 = crc32(blob).toString(16);
}
// mime type
if (!metadata.contentType) {
try {
const typeResult = await fileTypeFromBuffer(blob);
metadata.contentType = typeResult?.mime ?? 'application/octet-stream';
} catch {
// ignore
}
}
return metadata;
} catch (e) {
return metadata;
}
}

View File

@@ -0,0 +1,30 @@
import { Injectable } from '@nestjs/common';
import { Config } from '../../../config';
import {
BlobInputType,
createStorageProvider,
PutObjectMetadata,
StorageProvider,
} from '../providers';
@Injectable()
export class AvatarStorage {
public readonly provider: StorageProvider;
constructor({ storage }: Config) {
this.provider = createStorageProvider(storage, 'avatar');
}
put(key: string, blob: BlobInputType, metadata?: PutObjectMetadata) {
return this.provider.put(key, blob, metadata);
}
get(key: string) {
return this.provider.get(key);
}
async delete(key: string) {
return this.provider.delete(key);
}
}

View File

@@ -0,0 +1,45 @@
import { Injectable } from '@nestjs/common';
import { Config } from '../../../config';
import {
BlobInputType,
createStorageProvider,
StorageProvider,
} from '../providers';
@Injectable()
export class WorkspaceBlobStorage {
public readonly provider: StorageProvider;
constructor({ storage }: Config) {
this.provider = createStorageProvider(storage, 'blob');
}
put(workspaceId: string, key: string, blob: BlobInputType) {
return this.provider.put(`${workspaceId}/${key}`, blob);
}
get(workspaceId: string, key: string) {
return this.provider.get(`${workspaceId}/${key}`);
}
async list(workspaceId: string) {
const blobs = await this.provider.list(workspaceId + '/');
blobs.forEach(item => {
// trim workspace prefix
item.key = item.key.slice(workspaceId.length + 1);
});
return blobs;
}
async delete(workspaceId: string, key: string) {
return this.provider.delete(`${workspaceId}/${key}`);
}
async totalSize(workspaceId: string) {
const blobs = await this.list(workspaceId);
// how could we ignore the ones get soft-deleted?
return blobs.reduce((acc, item) => acc + item.size, 0);
}
}

View File

@@ -0,0 +1,2 @@
export { AvatarStorage } from './avatar';
export { WorkspaceBlobStorage } from './blob';