diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index 0263548fc9..3c0ec967db 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -11,6 +11,7 @@ services: network_mode: service:db environment: DATABASE_URL: postgresql://affine:affine@db:5432/affine + REDIS_SERVER_HOST: redis db: image: postgres:latest @@ -21,6 +22,10 @@ services: POSTGRES_PASSWORD: affine POSTGRES_USER: affine POSTGRES_DB: affine + redis: + image: redis + ports: + - 6379:6379 volumes: postgres-data: diff --git a/.docker/dev/.env.example b/.docker/dev/.env.example new file mode 100644 index 0000000000..b6beb2deb4 --- /dev/null +++ b/.docker/dev/.env.example @@ -0,0 +1,4 @@ +DATABASE_LOCATION=./postgres +DB_PASSWORD=affine +DB_USERNAME=affine +DB_DATABASE_NAME=affine \ No newline at end of file diff --git a/.docker/dev/.gitignore b/.docker/dev/.gitignore new file mode 100644 index 0000000000..257739faba --- /dev/null +++ b/.docker/dev/.gitignore @@ -0,0 +1,3 @@ +postgres +.env +compose.yml \ No newline at end of file diff --git a/.docker/dev/compose.yml.example b/.docker/dev/compose.yml.example new file mode 100644 index 0000000000..624958d99e --- /dev/null +++ b/.docker/dev/compose.yml.example @@ -0,0 +1,28 @@ +name: affine_dev_services +services: + postgres: + env_file: + - .env + image: postgres:16 + ports: + - 5432:5432 + environment: + POSTGRES_PASSWORD: ${DB_PASSWORD} + POSTGRES_USER: ${DB_USERNAME} + POSTGRES_DB: ${DB_DATABASE_NAME} + volumes: + - ${DATABASE_LOCATION}:/var/lib/postgresql/data + + redis: + image: redis:latest + ports: + - 6379:6379 + + mailhog: + image: mailhog/mailhog:latest + ports: + - 1025:1025 + - 8025:8025 + +networks: + dev: diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 1b33441921..39d51cfa3a 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -304,6 +304,7 @@ jobs: NODE_ENV: test DISTRIBUTION: web DATABASE_URL: postgresql://affine:affine@localhost:5432/affine + REDIS_SERVER_HOST: localhost services: postgres: image: postgres @@ -316,6 +317,10 @@ jobs: --health-retries 5 ports: - 5432:5432 + redis: + image: redis + ports: + - 6379:6379 mailer: image: mailhog/mailhog ports: @@ -379,6 +384,7 @@ jobs: NODE_ENV: test DISTRIBUTION: web DATABASE_URL: postgresql://affine:affine@localhost:5432/affine + REDIS_SERVER_HOST: localhost services: postgres: image: postgres @@ -391,6 +397,10 @@ jobs: --health-retries 5 ports: - 5432:5432 + redis: + image: redis + ports: + - 6379:6379 mailer: image: mailhog/mailhog ports: @@ -461,6 +471,7 @@ jobs: DISTRIBUTION: web DATABASE_URL: postgresql://affine:affine@localhost:5432/affine IN_CI_TEST: true + REDIS_SERVER_HOST: localhost strategy: fail-fast: false matrix: @@ -480,6 +491,10 @@ jobs: --health-retries 5 ports: - 5432:5432 + redis: + image: redis + ports: + - 6379:6379 steps: - uses: actions/checkout@v4 @@ -532,6 +547,7 @@ jobs: env: DISTRIBUTION: web DATABASE_URL: postgresql://affine:affine@localhost:5432/affine + REDIS_SERVER_HOST: localhost IN_CI_TEST: true strategy: fail-fast: false @@ -566,6 +582,10 @@ jobs: --health-retries 5 ports: - 5432:5432 + redis: + image: redis + ports: + - 6379:6379 mailer: image: mailhog/mailhog ports: diff --git a/.github/workflows/copilot-test.yml b/.github/workflows/copilot-test.yml index ea3c0c70f9..6933a49a39 100644 --- a/.github/workflows/copilot-test.yml +++ b/.github/workflows/copilot-test.yml @@ -42,6 +42,7 @@ jobs: NODE_ENV: test DISTRIBUTION: web DATABASE_URL: postgresql://affine:affine@localhost:5432/affine + REDIS_SERVER_HOST: localhost services: postgres: image: postgres @@ -54,6 +55,10 @@ jobs: --health-retries 5 ports: - 5432:5432 + redis: + image: redis + ports: + - 6379:6379 mailer: image: mailhog/mailhog ports: @@ -100,6 +105,7 @@ jobs: env: DISTRIBUTION: web DATABASE_URL: postgresql://affine:affine@localhost:5432/affine + REDIS_SERVER_HOST: localhost IN_CI_TEST: true strategy: fail-fast: false @@ -120,6 +126,10 @@ jobs: --health-retries 5 ports: - 5432:5432 + redis: + image: redis + ports: + - 6379:6379 steps: - uses: actions/checkout@v4 diff --git a/docs/developing-server.md b/docs/developing-server.md index 5f01085e8c..c09064216b 100644 --- a/docs/developing-server.md +++ b/docs/developing-server.md @@ -5,81 +5,13 @@ This document explains how to start server (@affine/server) locally with Docker > This document is not guaranteed to be up-to-date. > If you find any outdated information, please feel free to open an issue or submit a PR. -## Run postgresql in docker +## Run required dev services in docker compose ``` -docker pull postgres -docker run --rm --name affine-postgres -e POSTGRES_PASSWORD=affine -p 5432:5432 -v ~/Documents/postgres:/var/lib/postgresql/data postgres -``` +cp ./.docker/dev/compose.yml.example ./.docker/dev/compose.yml +cp ./.docker/dev/.env.example ./.docker/dev/.env -### Optionally, use a dedicated volume - -``` -docker volume create affine-postgres -docker run --rm --name affine-postgres -e POSTGRES_PASSWORD=affine -p 5432:5432 -v affine-postgres:/var/lib/postgresql/data postgres -``` - -### mailhog (for local testing) - -``` -docker run --rm --name mailhog -p 1025:1025 -p 8025:8025 mailhog/mailhog -``` - -## prepare db - -``` -docker ps -docker exec -it affine-postgres psql -U postgres ## `affine-postgres` is the container name from the previous step -``` - -### in the terminal, following the example to user & table - -``` -psql (15.3 (Debian 15.3-1.pgdg120+1)) -Type "help" for help. - -postgres=# CREATE USER affine WITH PASSWORD 'affine'; -CREATE ROLE -postgres=# ALTER USER affine WITH SUPERUSER; -ALTER ROLE -postgres=# CREATE DATABASE affine; -CREATE DATABASE -postgres=# \du - List of roles - Role name | Attributes | Member of ------------+------------------------------------------------------------+----------- - affine | Superuser | {} - postgres | Superuser, Create role, Create DB, Replication, Bypass RLS | {} -``` - -### Set the following config to `packages/backend/server/.env` - -In the following setup, we assume you have postgres server running at localhost:5432 and mailhog running at localhost:1025. - -When logging in via email, you will see the mail arriving at localhost:8025 in a browser. - -``` -DATABASE_URL="postgresql://affine:affine@localhost:5432/affine" -MAILER_SENDER="noreply@toeverything.info" -MAILER_USER="auth" -MAILER_PASSWORD="auth" -MAILER_HOST="localhost" -MAILER_PORT="1025" -``` - -## Prepare prisma - -``` -yarn workspace @affine/server prisma db push -yarn workspace @affine/server data-migration run -``` - -Note, you may need to do it again if db schema changed. - -### Enable prisma studio - -``` -yarn workspace @affine/server prisma studio +docker compose -f ./.docker/dev/compose.yml up -d ``` ## Build native packages (you need to setup rust toolchain first) @@ -87,27 +19,46 @@ yarn workspace @affine/server prisma studio ``` # build native yarn workspace @affine/server-native build -yarn workspace @affine/native build ``` -## start server +## Prepare dev environment ``` -yarn workspace @affine/server dev +cd packages/backend/server + +cp .env.example .env +yarn prisma db push +yarn data-migration run ``` -when server started, it will created a default user: - -email: dev@affine.pro -name: Dev User -password: dev - -## start core (web) +## Start server ``` yarn dev ``` +when server started, it will created a default user for testing: + +- email: dev@affine.pro +- name: Dev User +- password: dev + +## Start frontend + +``` +# at project root +yarn dev +``` + ## Done Now you should be able to start developing affine with server enabled. + +## Bonus + +### Enable prisma studio (Database GUI) + +``` +# available at http://localhost:5555 +yarn prisma studio +``` diff --git a/packages/backend/server/.env.example b/packages/backend/server/.env.example index 3c19085abb..b7adcf56c9 100644 --- a/packages/backend/server/.env.example +++ b/packages/backend/server/.env.example @@ -1,4 +1,9 @@ -# AFFINE_SERVER_PORT=3010 -# AFFINE_SERVER_HOST=app.affine.pro -# AFFINE_SERVER_HTTPS=true # DATABASE_URL="postgres://affine:affine@localhost:5432/affine" +# REDIS_SERVER_HOST=localhost + +# MAILER_HOST=localhost +# MAILER_PORT=1025 +# MAILER_SENDER="noreply@toeverything.info" +# MAILER_USER="noreply@toeverything.info" +# MAILER_PASSWORD="affine" +# MAILER_SECURE=false \ No newline at end of file diff --git a/packages/backend/server/src/app.module.ts b/packages/backend/server/src/app.module.ts index 3fa3bffaba..d6764ab9c2 100644 --- a/packages/backend/server/src/app.module.ts +++ b/packages/backend/server/src/app.module.ts @@ -19,6 +19,7 @@ import { MailModule } from './base/mailer'; import { MetricsModule } from './base/metrics'; import { MutexModule } from './base/mutex'; import { PrismaModule } from './base/prisma'; +import { RedisModule } from './base/redis'; import { RuntimeModule } from './base/runtime'; import { StorageProviderModule } from './base/storage'; import { RateLimiterModule } from './base/throttler'; @@ -42,6 +43,7 @@ export const FunctionalityModules = [ ConfigModule.forRoot(), RuntimeModule, EventModule, + RedisModule, CacheModule, MutexModule, PrismaModule, diff --git a/packages/backend/server/src/app.ts b/packages/backend/server/src/app.ts index f6941f9a80..a070e4b493 100644 --- a/packages/backend/server/src/app.ts +++ b/packages/backend/server/src/app.ts @@ -1,4 +1,3 @@ -import { Type } from '@nestjs/common'; import { NestFactory } from '@nestjs/core'; import type { NestExpressApplication } from '@nestjs/platform-express'; import cookieParser from 'cookie-parser'; @@ -9,7 +8,7 @@ import { CloudThrottlerGuard, GlobalExceptionFilter, } from './base'; -import { SocketIoAdapter, SocketIoAdapterImpl } from './base/websocket'; +import { SocketIoAdapter } from './base/websocket'; import { AuthGuard } from './core/auth'; import { ENABLED_FEATURES } from './core/config/server-feature'; import { serverTimingAndCache } from './middleware/timing'; @@ -44,13 +43,6 @@ export async function createApp() { app.use(cookieParser()); if (AFFiNE.flavor.sync) { - const SocketIoAdapter = app.get>( - SocketIoAdapterImpl, - { - strict: false, - } - ); - const adapter = new SocketIoAdapter(app); app.useWebSocketAdapter(adapter); } diff --git a/packages/backend/server/src/base/cache/def.ts b/packages/backend/server/src/base/cache/def.ts deleted file mode 100644 index e220b72a4e..0000000000 --- a/packages/backend/server/src/base/cache/def.ts +++ /dev/null @@ -1,51 +0,0 @@ -export interface CacheSetOptions { - /** - * in milliseconds - */ - ttl?: number; -} - -// extends if needed -export interface Cache { - // standard operation - get(key: string): Promise; - set( - key: string, - value: T, - opts?: CacheSetOptions - ): Promise; - setnx( - key: string, - value: T, - opts?: CacheSetOptions - ): Promise; - increase(key: string, count?: number): Promise; - decrease(key: string, count?: number): Promise; - delete(key: string): Promise; - has(key: string): Promise; - ttl(key: string): Promise; - expire(key: string, ttl: number): Promise; - - // list operations - pushBack(key: string, ...values: T[]): Promise; - pushFront(key: string, ...values: T[]): Promise; - len(key: string): Promise; - list(key: string, start: number, end: number): Promise; - popFront(key: string, count?: number): Promise; - popBack(key: string, count?: number): Promise; - - // map operations - mapSet( - map: string, - key: string, - value: T, - opts: CacheSetOptions - ): Promise; - mapIncrease(map: string, key: string, count?: number): Promise; - mapDecrease(map: string, key: string, count?: number): Promise; - mapGet(map: string, key: string): Promise; - mapDelete(map: string, key: string): Promise; - mapKeys(map: string): Promise; - mapRandomKey(map: string): Promise; - mapLen(map: string): Promise; -} diff --git a/packages/backend/server/src/base/cache/instances.ts b/packages/backend/server/src/base/cache/instances.ts index da57e1fed4..288a19d6a1 100644 --- a/packages/backend/server/src/base/cache/instances.ts +++ b/packages/backend/server/src/base/cache/instances.ts @@ -1,13 +1,18 @@ import { Injectable } from '@nestjs/common'; -import { LocalCache } from './local'; +import { CacheRedis, SessionRedis } from '../redis'; +import { CacheProvider } from './provider'; @Injectable() -export class Cache extends LocalCache {} - -@Injectable() -export class SessionCache extends LocalCache { - constructor() { - super({ namespace: 'session' }); +export class Cache extends CacheProvider { + constructor(redis: CacheRedis) { + super(redis); + } +} + +@Injectable() +export class SessionCache extends CacheProvider { + constructor(redis: SessionRedis) { + super(redis); } } diff --git a/packages/backend/server/src/base/cache/local.ts b/packages/backend/server/src/base/cache/local.ts deleted file mode 100644 index ca46e17bd3..0000000000 --- a/packages/backend/server/src/base/cache/local.ts +++ /dev/null @@ -1,286 +0,0 @@ -import Keyv, { KeyvOptions } from 'keyv'; - -import type { Cache, CacheSetOptions } from './def'; - -export class LocalCache implements Cache { - private readonly kv: Keyv; - - constructor(opts: KeyvOptions = {}) { - this.kv = new Keyv(opts); - } - - // standard operation - async get(key: string): Promise { - return this.kv.get(key).catch(() => undefined); - } - - async set( - key: string, - value: T, - opts: CacheSetOptions = {} - ): Promise { - return this.kv - .set(key, value, opts.ttl) - .then(() => true) - .catch(() => false); - } - - async setnx( - key: string, - value: T, - opts?: CacheSetOptions | undefined - ): Promise { - if (!(await this.has(key))) { - return this.set(key, value, opts); - } - return false; - } - - async increase(key: string, count: number = 1): Promise { - const prev = (await this.get(key)) ?? 0; - if (typeof prev !== 'number') { - throw new Error( - `Expect a Number keyed by ${key}, but found ${typeof prev}` - ); - } - - const curr = prev + count; - return (await this.set(key, curr)) ? curr : prev; - } - - async decrease(key: string, count: number = 1): Promise { - return this.increase(key, -count); - } - - async delete(key: string): Promise { - return this.kv.delete(key).catch(() => false); - } - - async has(key: string): Promise { - return this.kv.has(key).catch(() => false); - } - - async ttl(key: string): Promise { - return this.kv - .get(key, { raw: true }) - .then(raw => (raw?.expires ? raw.expires - Date.now() : Infinity)) - .catch(() => 0); - } - - async expire(key: string, ttl: number): Promise { - const value = await this.kv.get(key); - return this.set(key, value, { ttl }); - } - - // list operations - private async getArray(key: string) { - const raw = await this.kv.get(key, { raw: true }); - if (raw && !Array.isArray(raw.value)) { - throw new Error( - `Expect an Array keyed by ${key}, but found ${raw.value}` - ); - } - - return raw; - } - - private async setArray( - key: string, - value: T[], - opts: CacheSetOptions = {} - ) { - return this.set(key, value, opts).then(() => value.length); - } - - async pushBack(key: string, ...values: T[]): Promise { - let list: any[] = []; - let ttl: number | undefined = undefined; - const raw = await this.getArray(key); - if (raw) { - if (raw.value) { - list = raw.value; - } - if (raw.expires) { - ttl = raw.expires - Date.now(); - } - } - - list = list.concat(values); - return this.setArray(key, list, { ttl }); - } - - async pushFront(key: string, ...values: T[]): Promise { - let list: any[] = []; - let ttl: number | undefined = undefined; - const raw = await this.getArray(key); - if (raw) { - if (raw.value) { - list = raw.value; - } - if (raw.expires) { - ttl = raw.expires - Date.now(); - } - } - - list = values.concat(list); - return this.setArray(key, list, { ttl }); - } - - async len(key: string): Promise { - return this.getArray(key).then(v => v?.value?.length ?? 0); - } - - /** - * list array elements with `[start, end]` - * the end indice is inclusive - */ - async list( - key: string, - start: number, - end: number - ): Promise { - const raw = await this.getArray(key); - if (raw?.value) { - start = (raw.value.length + start) % raw.value.length; - end = ((raw.value.length + end) % raw.value.length) + 1; - return raw.value.slice(start, end); - } else { - return []; - } - } - - private async trim(key: string, start: number, end: number) { - const raw = await this.getArray(key); - if (raw && raw.value) { - start = (raw.value.length + start) % raw.value.length; - // make negative end index work, and end indice is inclusive - end = ((raw.value.length + end) % raw.value.length) + 1; - const result = raw.value.splice(start, end); - - await this.set(key, raw.value, { - ttl: raw.expires ? raw.expires - Date.now() : undefined, - }); - - return result; - } - - return []; - } - - async popFront(key: string, count: number = 1) { - return this.trim(key, 0, count - 1); - } - - async popBack(key: string, count: number = 1) { - return this.trim(key, -count, count - 1); - } - - // map operations - private async getMap(map: string) { - const raw = await this.kv.get>(map, { raw: true }); - - if (raw) { - if (typeof raw.value !== 'object') { - throw new Error( - `Expect an Object keyed by ${map}, but found ${typeof raw}` - ); - } - - if (Array.isArray(raw.value)) { - throw new Error(`Expect an Object keyed by ${map}, but found an Array`); - } - } - - return raw; - } - - private async setMap( - map: string, - value: Record, - opts: CacheSetOptions = {} - ) { - return this.kv.set(map, value, opts.ttl).then(() => true); - } - - async mapGet(map: string, key: string): Promise { - const raw = await this.getMap(map); - if (raw?.value) { - return raw.value[key]; - } - - return undefined; - } - - async mapSet( - map: string, - key: string, - value: T - ): Promise { - const raw = await this.getMap(map); - const data = raw?.value ?? {}; - - data[key] = value; - - return this.setMap(map, data, { - ttl: raw?.expires ? raw.expires - Date.now() : undefined, - }); - } - - async mapDelete(map: string, key: string): Promise { - const raw = await this.getMap(map); - - if (raw?.value) { - delete raw.value[key]; - return this.setMap(map, raw.value, { - ttl: raw.expires ? raw.expires - Date.now() : undefined, - }); - } - - return false; - } - - async mapIncrease( - map: string, - key: string, - count: number = 1 - ): Promise { - const prev = (await this.mapGet(map, key)) ?? 0; - - if (typeof prev !== 'number') { - throw new Error( - `Expect a Number keyed by ${key}, but found ${typeof prev}` - ); - } - - const curr = prev + count; - - return (await this.mapSet(map, key, curr)) ? curr : prev; - } - - async mapDecrease( - map: string, - key: string, - count: number = 1 - ): Promise { - return this.mapIncrease(map, key, -count); - } - - async mapKeys(map: string): Promise { - const raw = await this.getMap(map); - if (raw?.value) { - return Object.keys(raw.value); - } - - return []; - } - - async mapRandomKey(map: string): Promise { - const keys = await this.mapKeys(map); - return keys[Math.floor(Math.random() * keys.length)]; - } - - async mapLen(map: string): Promise { - const raw = await this.getMap(map); - return raw?.value ? Object.keys(raw.value).length : 0; - } -} diff --git a/packages/backend/server/src/plugins/redis/cache.ts b/packages/backend/server/src/base/cache/provider.ts similarity index 96% rename from packages/backend/server/src/plugins/redis/cache.ts rename to packages/backend/server/src/base/cache/provider.ts index 13cc7a287d..397ab66ac3 100644 --- a/packages/backend/server/src/plugins/redis/cache.ts +++ b/packages/backend/server/src/base/cache/provider.ts @@ -1,8 +1,13 @@ -import { Redis } from 'ioredis'; +import Redis from 'ioredis'; -import type { Cache, CacheSetOptions } from '../../base/cache/def'; +export interface CacheSetOptions { + /** + * in milliseconds + */ + ttl?: number; +} -export class RedisCache implements Cache { +export class CacheProvider { constructor(private readonly redis: Redis) {} // standard operation diff --git a/packages/backend/server/src/base/index.ts b/packages/backend/server/src/base/index.ts index a450571b4d..1e5d1422a6 100644 --- a/packages/backend/server/src/base/index.ts +++ b/packages/backend/server/src/base/index.ts @@ -20,7 +20,7 @@ export * from './guard'; export { CryptoHelper, URLHelper } from './helpers'; export { MailService } from './mailer'; export { CallMetric, metrics } from './metrics'; -export { type ILocker, Lock, Locker, Mutex, RequestMutex } from './mutex'; +export { Lock, Locker, Mutex, RequestMutex } from './mutex'; export { GatewayErrorWrapper, getOptionalModuleMetadata, diff --git a/packages/backend/server/src/base/mutex/index.ts b/packages/backend/server/src/base/mutex/index.ts index ba37c0c761..45c80cc774 100644 --- a/packages/backend/server/src/base/mutex/index.ts +++ b/packages/backend/server/src/base/mutex/index.ts @@ -1,6 +1,6 @@ import { Global, Module } from '@nestjs/common'; -import { Locker } from './local-lock'; +import { Locker } from './locker'; import { Mutex, RequestMutex } from './mutex'; @Global() @@ -11,4 +11,4 @@ import { Mutex, RequestMutex } from './mutex'; export class MutexModule {} export { Locker, Mutex, RequestMutex }; -export { type Locker as ILocker, Lock } from './lock'; +export { Lock } from './lock'; diff --git a/packages/backend/server/src/base/mutex/local-lock.ts b/packages/backend/server/src/base/mutex/local-lock.ts deleted file mode 100644 index c0460f5fb7..0000000000 --- a/packages/backend/server/src/base/mutex/local-lock.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { Cache } from '../cache'; -import { Lock, Locker as ILocker } from './lock'; - -@Injectable() -export class Locker implements ILocker { - constructor(private readonly cache: Cache) {} - - async lock(owner: string, key: string): Promise { - const lockKey = `MutexLock:${key}`; - const prevOwner = await this.cache.get(lockKey); - - if (prevOwner && prevOwner !== owner) { - throw new Error(`Lock for resource [${key}] has been holder by others`); - } - - const acquired = await this.cache.set(lockKey, owner); - - if (acquired) { - return new Lock(async () => { - await this.cache.delete(lockKey); - }); - } - - throw new Error(`Failed to acquire lock for resource [${key}]`); - } -} diff --git a/packages/backend/server/src/base/mutex/lock.ts b/packages/backend/server/src/base/mutex/lock.ts index bf0a971293..e0dda38148 100644 --- a/packages/backend/server/src/base/mutex/lock.ts +++ b/packages/backend/server/src/base/mutex/lock.ts @@ -17,7 +17,3 @@ export class Lock implements AsyncDisposable { await this.release(); } } - -export interface Locker { - lock(owner: string, key: string): Promise; -} diff --git a/packages/backend/server/src/plugins/redis/mutex.ts b/packages/backend/server/src/base/mutex/locker.ts similarity index 89% rename from packages/backend/server/src/plugins/redis/mutex.ts rename to packages/backend/server/src/base/mutex/locker.ts index 651b0fd618..8f2dce4838 100644 --- a/packages/backend/server/src/plugins/redis/mutex.ts +++ b/packages/backend/server/src/base/mutex/locker.ts @@ -1,8 +1,8 @@ import { Injectable, Logger } from '@nestjs/common'; import { Command } from 'ioredis'; -import { ILocker, Lock } from '../../base'; -import { SessionRedis } from './instances'; +import { SessionRedis } from '../redis'; +import { Lock } from './lock'; // === atomic mutex lock === // acquire lock @@ -36,8 +36,9 @@ else end`; @Injectable() -export class RedisMutexLocker implements ILocker { - private readonly logger = new Logger(RedisMutexLocker.name); +export class Locker { + private readonly logger = new Logger(Locker.name); + constructor(private readonly redis: SessionRedis) {} async lock(owner: string, key: string): Promise { diff --git a/packages/backend/server/src/base/mutex/mutex.ts b/packages/backend/server/src/base/mutex/mutex.ts index 08a0c58e66..fc1e640308 100644 --- a/packages/backend/server/src/base/mutex/mutex.ts +++ b/packages/backend/server/src/base/mutex/mutex.ts @@ -6,7 +6,7 @@ import type { Request } from 'express'; import { GraphqlContext } from '../graphql'; import { retryable } from '../utils/promise'; -import { Locker } from './local-lock'; +import { Locker } from './locker'; export const MUTEX_RETRY = 5; export const MUTEX_WAIT = 100; @@ -26,7 +26,7 @@ export class Mutex { * ```typescript * { * // lock is acquired here - * await using lock = await mutex.lock('resource-key'); + * await using lock = await mutex.acquire('resource-key'); * if (lock) { * // do something * } else { @@ -38,7 +38,7 @@ export class Mutex { * @param key resource key * @returns LockGuard */ - async lock(key: string, owner: string = 'global') { + async acquire(key: string, owner: string = 'global') { try { return await retryable( () => this.locker.lock(owner, key), @@ -83,7 +83,7 @@ export class RequestMutex extends Mutex { return id; } - override lock(key: string) { - return super.lock(key, this.getId()); + override acquire(key: string) { + return super.acquire(key, this.getId()); } } diff --git a/packages/backend/server/src/plugins/redis/config.ts b/packages/backend/server/src/base/redis/config.ts similarity index 72% rename from packages/backend/server/src/plugins/redis/config.ts rename to packages/backend/server/src/base/redis/config.ts index 5a3ee800eb..84fb350046 100644 --- a/packages/backend/server/src/plugins/redis/config.ts +++ b/packages/backend/server/src/base/redis/config.ts @@ -3,9 +3,9 @@ import { RedisOptions } from 'ioredis'; import { defineStartupConfig, ModuleConfig } from '../../base/config'; declare module '../config' { - interface PluginsConfig { + interface AppConfig { redis: ModuleConfig; } } -defineStartupConfig('plugins.redis', {}); +defineStartupConfig('redis', {}); diff --git a/packages/backend/server/src/base/redis/index.ts b/packages/backend/server/src/base/redis/index.ts new file mode 100644 index 0000000000..5a813a37bc --- /dev/null +++ b/packages/backend/server/src/base/redis/index.ts @@ -0,0 +1,14 @@ +import './config'; + +import { Global, Module } from '@nestjs/common'; + +import { CacheRedis, SessionRedis, SocketIoRedis } from './instances'; + +@Global() +@Module({ + providers: [CacheRedis, SessionRedis, SocketIoRedis], + exports: [CacheRedis, SessionRedis, SocketIoRedis], +}) +export class RedisModule {} + +export { CacheRedis, SessionRedis, SocketIoRedis }; diff --git a/packages/backend/server/src/base/redis/instances.ts b/packages/backend/server/src/base/redis/instances.ts new file mode 100644 index 0000000000..7bec65f3f5 --- /dev/null +++ b/packages/backend/server/src/base/redis/instances.ts @@ -0,0 +1,35 @@ +import { Injectable, OnModuleDestroy } from '@nestjs/common'; +import { Redis as IORedis, RedisOptions } from 'ioredis'; + +import { Config } from '../../base/config'; + +class Redis extends IORedis implements OnModuleDestroy { + constructor(opts: RedisOptions) { + super(opts); + } + + onModuleDestroy() { + this.disconnect(); + } +} + +@Injectable() +export class CacheRedis extends Redis { + constructor(config: Config) { + super(config.redis); + } +} + +@Injectable() +export class SessionRedis extends Redis { + constructor(config: Config) { + super({ ...config.redis, db: (config.redis.db ?? 0) + 2 }); + } +} + +@Injectable() +export class SocketIoRedis extends Redis { + constructor(config: Config) { + super({ ...config.redis, db: (config.redis.db ?? 0) + 3 }); + } +} diff --git a/packages/backend/server/src/base/websocket/adapter.ts b/packages/backend/server/src/base/websocket/adapter.ts new file mode 100644 index 0000000000..9153b000a0 --- /dev/null +++ b/packages/backend/server/src/base/websocket/adapter.ts @@ -0,0 +1,56 @@ +import { INestApplication } from '@nestjs/common'; +import { IoAdapter } from '@nestjs/platform-socket.io'; +import { createAdapter } from '@socket.io/redis-adapter'; +import { Server } from 'socket.io'; + +import { Config } from '../config'; +import { AuthenticationRequired } from '../error'; +import { SocketIoRedis } from '../redis'; +import { WEBSOCKET_OPTIONS } from './options'; + +export class SocketIoAdapter extends IoAdapter { + constructor(private readonly app: INestApplication) { + super(app); + } + + override createIOServer(port: number, options?: any): Server { + const config = this.app.get(WEBSOCKET_OPTIONS) as Config['websocket']; + const server: Server = super.createIOServer(port, { + ...config, + ...options, + }); + + if (config.canActivate) { + server.use((socket, next) => { + // @ts-expect-error checked + config + .canActivate(socket) + .then(pass => { + if (pass) { + next(); + } else { + throw new AuthenticationRequired(); + } + }) + .catch(e => { + next(e); + }); + }); + } + + const pubClient = this.app.get(SocketIoRedis); + + pubClient.on('error', err => { + console.error(err); + }); + + const subClient = pubClient.duplicate(); + subClient.on('error', err => { + console.error(err); + }); + + server.adapter(createAdapter(pubClient, subClient)); + + return server; + } +} diff --git a/packages/backend/server/src/base/websocket/index.ts b/packages/backend/server/src/base/websocket/index.ts index cd4f629e59..c57cff8b5e 100644 --- a/packages/backend/server/src/base/websocket/index.ts +++ b/packages/backend/server/src/base/websocket/index.ts @@ -1,70 +1,14 @@ import './config'; -import { - FactoryProvider, - INestApplicationContext, - Module, - Provider, -} from '@nestjs/common'; -import { IoAdapter } from '@nestjs/platform-socket.io'; -import { Server } from 'socket.io'; +import { Module } from '@nestjs/common'; -import { Config } from '../config'; -import { AuthenticationRequired } from '../error'; - -export const SocketIoAdapterImpl = Symbol('SocketIoAdapterImpl'); - -export class SocketIoAdapter extends IoAdapter { - constructor(protected readonly app: INestApplicationContext) { - super(app); - } - - override createIOServer(port: number, options?: any): Server { - const config = this.app.get(WEBSOCKET_OPTIONS) as Config['websocket']; - const server: Server = super.createIOServer(port, { - ...config, - ...options, - }); - - if (config.canActivate) { - server.use((socket, next) => { - // @ts-expect-error checked - config - .canActivate(socket) - .then(pass => { - if (pass) { - next(); - } else { - throw new AuthenticationRequired(); - } - }) - .catch(e => { - next(e); - }); - }); - } - - return server; - } -} - -const SocketIoAdapterImplProvider: Provider = { - provide: SocketIoAdapterImpl, - useValue: SocketIoAdapter, -}; - -export const WEBSOCKET_OPTIONS = Symbol('WEBSOCKET_OPTIONS'); - -export const websocketOptionsProvider: FactoryProvider = { - provide: WEBSOCKET_OPTIONS, - useFactory: (config: Config) => { - return config.websocket; - }, - inject: [Config], -}; +import { WEBSOCKET_OPTIONS, websocketOptionsProvider } from './options'; @Module({ - providers: [SocketIoAdapterImplProvider, websocketOptionsProvider], - exports: [SocketIoAdapterImplProvider, websocketOptionsProvider], + providers: [websocketOptionsProvider], + exports: [websocketOptionsProvider], }) export class WebSocketModule {} + +export { WEBSOCKET_OPTIONS }; +export { SocketIoAdapter } from './adapter'; diff --git a/packages/backend/server/src/base/websocket/options.ts b/packages/backend/server/src/base/websocket/options.ts new file mode 100644 index 0000000000..710bb29a9c --- /dev/null +++ b/packages/backend/server/src/base/websocket/options.ts @@ -0,0 +1,13 @@ +import { FactoryProvider } from '@nestjs/common'; + +import { Config } from '../config'; + +export const WEBSOCKET_OPTIONS = Symbol('WEBSOCKET_OPTIONS'); + +export const websocketOptionsProvider: FactoryProvider = { + provide: WEBSOCKET_OPTIONS, + useFactory: (config: Config) => { + return config.websocket; + }, + inject: [Config], +}; diff --git a/packages/backend/server/src/config/affine.env.ts b/packages/backend/server/src/config/affine.env.ts index 49308b682d..05ff3dc60b 100644 --- a/packages/backend/server/src/config/affine.env.ts +++ b/packages/backend/server/src/config/affine.env.ts @@ -29,11 +29,11 @@ AFFiNE.ENV_MAP = { COPILOT_OPENAI_API_KEY: 'plugins.copilot.openai.apiKey', COPILOT_FAL_API_KEY: 'plugins.copilot.fal.apiKey', COPILOT_UNSPLASH_API_KEY: 'plugins.copilot.unsplashKey', - REDIS_SERVER_HOST: 'plugins.redis.host', - REDIS_SERVER_PORT: ['plugins.redis.port', 'int'], - REDIS_SERVER_USER: 'plugins.redis.username', - REDIS_SERVER_PASSWORD: 'plugins.redis.password', - REDIS_SERVER_DATABASE: ['plugins.redis.db', 'int'], + REDIS_SERVER_HOST: 'redis.host', + REDIS_SERVER_PORT: ['redis.port', 'int'], + REDIS_SERVER_USER: 'redis.username', + REDIS_SERVER_PASSWORD: 'redis.password', + REDIS_SERVER_DATABASE: ['redis.db', 'int'], DOC_MERGE_INTERVAL: ['doc.manager.updatePollInterval', 'int'], STRIPE_API_KEY: 'plugins.payment.stripe.keys.APIKey', STRIPE_WEBHOOK_KEY: 'plugins.payment.stripe.keys.webhookKey', diff --git a/packages/backend/server/src/config/affine.self.ts b/packages/backend/server/src/config/affine.self.ts index fbfe50b67d..1c8900f9c8 100644 --- a/packages/backend/server/src/config/affine.self.ts +++ b/packages/backend/server/src/config/affine.self.ts @@ -58,13 +58,6 @@ AFFiNE.use('copilot', { apiKey: '', }, }); -AFFiNE.use('redis', { - host: env.REDIS_SERVER_HOST, - db: 0, - port: 6379, - username: env.REDIS_SERVER_USER, - password: env.REDIS_SERVER_PASSWORD, -}); AFFiNE.use('payment', { stripe: { keys: { diff --git a/packages/backend/server/src/config/affine.ts b/packages/backend/server/src/config/affine.ts index bd5e55dffe..b693473830 100644 --- a/packages/backend/server/src/config/affine.ts +++ b/packages/backend/server/src/config/affine.ts @@ -71,14 +71,6 @@ AFFiNE.server.port = 3010; // ## Plugins settings ## // ############################################################### // -// /* Redis Plugin */ -// /* Provide caching and session storing backed by Redis. */ -// /* Useful when you deploy AFFiNE server in a cluster. */ -// AFFiNE.use('redis', { -// /* override options */ -// }); -// -// // /* Payment Plugin */ // AFFiNE.use('payment', { // stripe: { keys: {}, apiVersion: '2023-10-16' }, diff --git a/packages/backend/server/src/core/doc/adapters/userspace.ts b/packages/backend/server/src/core/doc/adapters/userspace.ts index 4b169a61a1..d909e09b26 100644 --- a/packages/backend/server/src/core/doc/adapters/userspace.ts +++ b/packages/backend/server/src/core/doc/adapters/userspace.ts @@ -175,7 +175,7 @@ export class PgUserspaceDocStorageAdapter extends DocStorageAdapter { workspaceId: string, docId: string ) { - const lock = await this.mutex.lock(`userspace:${workspaceId}:${docId}`); + const lock = await this.mutex.acquire(`userspace:${workspaceId}:${docId}`); if (!lock) { throw new Error('Too many concurrent writings'); diff --git a/packages/backend/server/src/core/doc/adapters/workspace.ts b/packages/backend/server/src/core/doc/adapters/workspace.ts index 0f9b8cfa7d..7bf9f6828e 100644 --- a/packages/backend/server/src/core/doc/adapters/workspace.ts +++ b/packages/backend/server/src/core/doc/adapters/workspace.ts @@ -488,7 +488,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter { workspaceId: string, docId: string ) { - const lock = await this.mutex.lock(`doc:update:${workspaceId}:${docId}`); + const lock = await this.mutex.acquire(`doc:update:${workspaceId}:${docId}`); if (!lock) { throw new Error('Too many concurrent writings'); diff --git a/packages/backend/server/src/core/selfhost/controller.ts b/packages/backend/server/src/core/selfhost/controller.ts index e8116a9634..369a6a6d0f 100644 --- a/packages/backend/server/src/core/selfhost/controller.ts +++ b/packages/backend/server/src/core/selfhost/controller.ts @@ -38,7 +38,7 @@ export class CustomSetupController { throw new PasswordRequired(); } - await using lock = await this.mutex.lock('createFirstAdmin'); + await using lock = await this.mutex.acquire('createFirstAdmin'); if (!lock) { throw new InternalServerError(); diff --git a/packages/backend/server/src/core/workspaces/resolvers/team.ts b/packages/backend/server/src/core/workspaces/resolvers/team.ts index a1e9cd92e3..195e7fad99 100644 --- a/packages/backend/server/src/core/workspaces/resolvers/team.ts +++ b/packages/backend/server/src/core/workspaces/resolvers/team.ts @@ -80,7 +80,7 @@ export class TeamWorkspaceResolver { // lock to prevent concurrent invite const lockFlag = `invite:${workspaceId}`; - await using lock = await this.mutex.lock(lockFlag); + await using lock = await this.mutex.acquire(lockFlag); if (!lock) { return new TooManyRequest(); } @@ -231,7 +231,7 @@ export class TeamWorkspaceResolver { try { // lock to prevent concurrent invite and grant const lockFlag = `invite:${workspaceId}`; - await using lock = await this.mutex.lock(lockFlag); + await using lock = await this.mutex.acquire(lockFlag); if (!lock) { return new TooManyRequest(); } @@ -281,7 +281,7 @@ export class TeamWorkspaceResolver { try { // lock to prevent concurrent invite and grant const lockFlag = `invite:${workspaceId}`; - await using lock = await this.mutex.lock(lockFlag); + await using lock = await this.mutex.acquire(lockFlag); if (!lock) { return new TooManyRequest(); } diff --git a/packages/backend/server/src/core/workspaces/resolvers/workspace.ts b/packages/backend/server/src/core/workspaces/resolvers/workspace.ts index 679e7513e5..02e82bd61f 100644 --- a/packages/backend/server/src/core/workspaces/resolvers/workspace.ts +++ b/packages/backend/server/src/core/workspaces/resolvers/workspace.ts @@ -399,7 +399,7 @@ export class WorkspaceResolver { try { // lock to prevent concurrent invite and grant const lockFlag = `invite:${workspaceId}`; - await using lock = await this.mutex.lock(lockFlag); + await using lock = await this.mutex.acquire(lockFlag); if (!lock) { return new TooManyRequest(); } @@ -524,7 +524,7 @@ export class WorkspaceResolver { @Args('sendAcceptMail', { nullable: true }) sendAcceptMail: boolean ) { const lockFlag = `invite:${workspaceId}`; - await using lock = await this.mutex.lock(lockFlag); + await using lock = await this.mutex.acquire(lockFlag); if (!lock) { return new TooManyRequest(); } diff --git a/packages/backend/server/src/plugins/copilot/resolver.ts b/packages/backend/server/src/plugins/copilot/resolver.ts index 9498981455..9d0429b678 100644 --- a/packages/backend/server/src/plugins/copilot/resolver.ts +++ b/packages/backend/server/src/plugins/copilot/resolver.ts @@ -359,7 +359,7 @@ export class CopilotResolver { user.id ); const lockFlag = `${COPILOT_LOCKER}:session:${user.id}:${options.workspaceId}`; - await using lock = await this.mutex.lock(lockFlag); + await using lock = await this.mutex.acquire(lockFlag); if (!lock) { return new TooManyRequest('Server is busy'); } @@ -387,7 +387,7 @@ export class CopilotResolver { user.id ); const lockFlag = `${COPILOT_LOCKER}:session:${user.id}:${options.workspaceId}`; - await using lock = await this.mutex.lock(lockFlag); + await using lock = await this.mutex.acquire(lockFlag); if (!lock) { return new TooManyRequest('Server is busy'); } @@ -418,7 +418,7 @@ export class CopilotResolver { return new NotFoundException('Session not found'); } const lockFlag = `${COPILOT_LOCKER}:session:${user.id}:${options.workspaceId}`; - await using lock = await this.mutex.lock(lockFlag); + await using lock = await this.mutex.acquire(lockFlag); if (!lock) { return new TooManyRequest('Server is busy'); } @@ -439,7 +439,7 @@ export class CopilotResolver { options: CreateChatMessageInput ) { const lockFlag = `${COPILOT_LOCKER}:message:${user?.id}:${options.sessionId}`; - await using lock = await this.mutex.lock(lockFlag); + await using lock = await this.mutex.acquire(lockFlag); if (!lock) { return new TooManyRequest('Server is busy'); } diff --git a/packages/backend/server/src/plugins/index.ts b/packages/backend/server/src/plugins/index.ts index 2c89280080..e0453a6b81 100644 --- a/packages/backend/server/src/plugins/index.ts +++ b/packages/backend/server/src/plugins/index.ts @@ -3,7 +3,6 @@ import './copilot'; import './gcloud'; import './oauth'; import './payment'; -import './redis'; import './storage'; export { diff --git a/packages/backend/server/src/plugins/payment/service.ts b/packages/backend/server/src/plugins/payment/service.ts index 9449aed284..90cceaa800 100644 --- a/packages/backend/server/src/plugins/payment/service.ts +++ b/packages/backend/server/src/plugins/payment/service.ts @@ -555,7 +555,7 @@ export class SubscriptionService implements OnApplicationBootstrap { return; } - await using lock = await this.mutex.lock('init stripe prices'); + await using lock = await this.mutex.acquire('init stripe prices'); if (!lock) { return; diff --git a/packages/backend/server/src/plugins/redis/index.ts b/packages/backend/server/src/plugins/redis/index.ts deleted file mode 100644 index 94ca078bc0..0000000000 --- a/packages/backend/server/src/plugins/redis/index.ts +++ /dev/null @@ -1,67 +0,0 @@ -import './config'; - -import { Global, Provider, Type } from '@nestjs/common'; -import { Redis } from 'ioredis'; -import { ThrottlerStorageRedisService } from 'nestjs-throttler-storage-redis'; - -import { Cache, Locker, SessionCache } from '../../base'; -import { ThrottlerStorage } from '../../base/throttler'; -import { SocketIoAdapterImpl } from '../../base/websocket'; -import { Plugin } from '../registry'; -import { RedisCache } from './cache'; -import { CacheRedis, SessionRedis, SocketIoRedis } from './instances'; -import { RedisMutexLocker } from './mutex'; -import { createSockerIoAdapterImpl } from './ws-adapter'; - -function makeProvider(token: Type, impl: Type): Provider { - return { - provide: token, - useFactory: (redis: Redis) => { - return new RedisCache(redis); - }, - inject: [impl], - }; -} - -// cache -const cacheProvider = makeProvider(Cache, CacheRedis); -const sessionCacheProvider = makeProvider(SessionCache, SessionRedis); - -// throttler -const throttlerStorageProvider: Provider = { - provide: ThrottlerStorage, - useFactory: (redis: Redis) => { - return new ThrottlerStorageRedisService(redis); - }, - inject: [SessionRedis], -}; - -// socket io -const socketIoRedisAdapterProvider: Provider = { - provide: SocketIoAdapterImpl, - useFactory: (redis: Redis) => { - return createSockerIoAdapterImpl(redis); - }, - inject: [SocketIoRedis], -}; - -// mutex -const mutexRedisAdapterProvider: Provider = { - provide: Locker, - useClass: RedisMutexLocker, -}; - -@Global() -@Plugin({ - name: 'redis', - providers: [CacheRedis, SessionRedis, SocketIoRedis], - overrides: [ - cacheProvider, - sessionCacheProvider, - socketIoRedisAdapterProvider, - throttlerStorageProvider, - mutexRedisAdapterProvider, - ], - requires: ['plugins.redis.host'], -}) -export class RedisModule {} diff --git a/packages/backend/server/src/plugins/redis/instances.ts b/packages/backend/server/src/plugins/redis/instances.ts deleted file mode 100644 index 5d1138ad4e..0000000000 --- a/packages/backend/server/src/plugins/redis/instances.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { - Injectable, - Logger, - OnModuleDestroy, - OnModuleInit, -} from '@nestjs/common'; -import { Redis as IORedis, RedisOptions } from 'ioredis'; - -import { Config } from '../../base/config'; - -class Redis extends IORedis implements OnModuleDestroy, OnModuleInit { - logger = new Logger(Redis.name); - constructor(opts: RedisOptions) { - super({ - ...opts, - lazyConnect: true, - }); - } - - async onModuleInit() { - await this.connect().catch(() => { - this.logger.error('Failed to connect to Redis server.'); - }); - } - onModuleDestroy() { - this.disconnect(); - } -} - -@Injectable() -export class CacheRedis extends Redis { - constructor(config: Config) { - super(config.plugins.redis); - } -} - -@Injectable() -export class SessionRedis extends Redis { - constructor(config: Config) { - super({ ...config.plugins.redis, db: (config.plugins.redis.db ?? 0) + 2 }); - } -} - -@Injectable() -export class SocketIoRedis extends Redis { - constructor(config: Config) { - super({ ...config.plugins.redis, db: (config.plugins.redis.db ?? 0) + 3 }); - } -} diff --git a/packages/backend/server/src/plugins/redis/ws-adapter.ts b/packages/backend/server/src/plugins/redis/ws-adapter.ts deleted file mode 100644 index 043a7b9aeb..0000000000 --- a/packages/backend/server/src/plugins/redis/ws-adapter.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { createAdapter } from '@socket.io/redis-adapter'; -import { Redis } from 'ioredis'; -import { Server, ServerOptions } from 'socket.io'; - -import { SocketIoAdapter } from '../../base/websocket'; - -export function createSockerIoAdapterImpl( - redis: Redis -): typeof SocketIoAdapter { - class RedisIoAdapter extends SocketIoAdapter { - override createIOServer(port: number, options?: ServerOptions): Server { - const pubClient = redis; - pubClient.on('error', err => { - console.error(err); - }); - const subClient = pubClient.duplicate(); - subClient.on('error', err => { - console.error(err); - }); - - const server = super.createIOServer(port, options); - server.adapter(createAdapter(pubClient, subClient)); - return server; - } - } - - return RedisIoAdapter; -}