diff --git a/Cargo.lock b/Cargo.lock index 677697ee44..0396cbb6d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -99,6 +99,7 @@ dependencies = [ "screencapturekit", "symphonia", "thiserror 2.0.18", + "tokio", "uuid", "windows 0.61.3", "windows-core 0.61.2", @@ -5060,9 +5061,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.9" +version = "0.103.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7df23109aa6c1567d1c575b9952556388da57401e4ace1d15f79eedad0d8f53" +checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef" dependencies = [ "ring", "rustls-pki-types", diff --git a/packages/frontend/apps/electron-renderer/src/app/effects/recording.ts b/packages/frontend/apps/electron-renderer/src/app/effects/recording.ts index 554b5b58d9..52d92b6815 100644 --- a/packages/frontend/apps/electron-renderer/src/app/effects/recording.ts +++ b/packages/frontend/apps/electron-renderer/src/app/effects/recording.ts @@ -13,16 +13,15 @@ import type { FrameworkProvider } from '@toeverything/infra'; import { getCurrentWorkspace, isAiEnabled } from './utils'; const logger = new DebugLogger('electron-renderer:recording'); -const RECORDING_PROCESS_RETRY_MS = 1000; +const RECORDING_IMPORT_RETRY_MS = 1000; const NATIVE_RECORDING_MIME_TYPE = 'audio/ogg'; -type ProcessingRecordingStatus = { +type RecordingImportStatus = { id: number; - status: 'processing'; appName?: string; - blockCreationStatus?: undefined; filepath: string; startTime: number; + importStatus: 'pending_import' | 'importing' | 'imported' | 'import_failed'; }; type WorkspaceHandle = NonNullable>; @@ -65,24 +64,10 @@ async function saveRecordingBlob(blobEngine: BlobEngine, filepath: string) { return { blob, blobId }; } -function shouldProcessRecording( - status: unknown -): status is ProcessingRecordingStatus { - return ( - !!status && - typeof status === 'object' && - 'status' in status && - status.status === 'processing' && - 'filepath' in status && - typeof status.filepath === 'string' && - !('blockCreationStatus' in status && status.blockCreationStatus) - ); -} - async function createRecordingDoc( frameworkProvider: FrameworkProvider, workspace: WorkspaceHandle['workspace'], - status: ProcessingRecordingStatus + status: RecordingImportStatus ) { const docsService = workspace.scope.get(DocsService); const aiEnabled = isAiEnabled(frameworkProvider); @@ -99,13 +84,11 @@ async function createRecordingDoc( const docProps: DocProps = { onStoreLoad: (doc, { noteId }) => { void (async () => { - // it takes a while to save the blob, so we show the attachment first const { blobId, blob } = await saveRecordingBlob( doc.workspace.blobSync, recordingFilepath ); - // name + timestamp(readable) + extension const attachmentName = (status.appName ?? 'System Audio') + ' ' + timestamp + '.opus'; @@ -163,7 +146,7 @@ async function createRecordingDoc( } export function setupRecordingEvents(frameworkProvider: FrameworkProvider) { - let pendingStatus: ProcessingRecordingStatus | null = null; + let importQueue: RecordingImportStatus[] = []; let retryTimer: ReturnType | null = null; let processingStatusId: number | null = null; @@ -174,28 +157,48 @@ export function setupRecordingEvents(frameworkProvider: FrameworkProvider) { } }; - const clearPending = (id?: number) => { - if (id === undefined || pendingStatus?.id === id) { - pendingStatus = null; - clearRetry(); - } - if (id === undefined || processingStatusId === id) { + const updateQueue = (nextQueue: RecordingImportStatus[]) => { + importQueue = nextQueue; + if ( + processingStatusId !== null && + !importQueue.some( + status => + status.id === processingStatusId && + status.importStatus === 'importing' + ) + ) { processingStatusId = null; } }; + const updateLocalImportStatus = ( + id: number, + importStatus: RecordingImportStatus['importStatus'] + ) => { + importQueue = importQueue.map(status => + status.id === id ? { ...status, importStatus } : status + ); + }; + + const getNextImportCandidate = () => + importQueue.find( + status => + status.importStatus === 'pending_import' || + status.importStatus === 'import_failed' + ) ?? null; + const scheduleRetry = () => { - if (!pendingStatus || retryTimer !== null) { + if (!getNextImportCandidate() || retryTimer !== null) { return; } retryTimer = setTimeout(() => { retryTimer = null; - void processPendingStatus().catch(console.error); - }, RECORDING_PROCESS_RETRY_MS); + void processNextImport().catch(console.error); + }, RECORDING_IMPORT_RETRY_MS); }; - const processPendingStatus = async () => { - const status = pendingStatus; + const processNextImport = async () => { + const status = getNextImportCandidate(); if (!status || processingStatusId === status.id) { return; } @@ -216,8 +219,12 @@ export function setupRecordingEvents(frameworkProvider: FrameworkProvider) { using currentWorkspace = getCurrentWorkspace(frameworkProvider); if (!currentWorkspace) { - // Workspace can lag behind the post-recording status update for a short - // time; keep retrying instead of permanently failing the import. + scheduleRetry(); + return; + } + + const claimed = await apis?.recording.claimRecordingImport(status.id); + if (!claimed) { scheduleRetry(); return; } @@ -228,47 +235,38 @@ export function setupRecordingEvents(frameworkProvider: FrameworkProvider) { await createRecordingDoc( frameworkProvider, currentWorkspace.workspace, - status + claimed ); - await apis?.recording.setRecordingBlockCreationStatus( - status.id, - 'success' - ); - clearPending(status.id); + updateLocalImportStatus(status.id, 'imported'); + await apis?.recording.completeRecordingImport(status.id); } catch (error) { - logger.error('Failed to create recording block', error); - try { - await apis?.recording.setRecordingBlockCreationStatus( - status.id, - 'failed', - error instanceof Error ? error.message : undefined - ); - } finally { - clearPending(status.id); - } + logger.error('Failed to import recording artifact', error); + updateLocalImportStatus(status.id, 'import_failed'); + await apis?.recording.failRecordingImport( + status.id, + error instanceof Error ? error.message : undefined + ); } finally { - if (pendingStatus?.id === status.id) { - processingStatusId = null; - scheduleRetry(); - } + processingStatusId = null; + scheduleRetry(); } }; - events?.recording.onRecordingStatusChanged(status => { - if (shouldProcessRecording(status)) { - pendingStatus = status; - clearRetry(); - void processPendingStatus().catch(console.error); - return; - } + if (apis?.recording) { + void apis.recording + .getRecordingImportQueue() + .then(queue => { + updateQueue(queue ?? []); + void processNextImport().catch(console.error); + }) + .catch(error => { + logger.error('Failed to load recording import queue', error); + }); + } - if (!status) { - clearPending(); - return; - } - - if (pendingStatus?.id === status.id) { - clearPending(status.id); - } + events?.recording.onRecordingImportQueueChanged(queue => { + updateQueue(queue); + clearRetry(); + void processNextImport().catch(console.error); }); } diff --git a/packages/frontend/apps/electron-renderer/src/popup/recording/index.tsx b/packages/frontend/apps/electron-renderer/src/popup/recording/index.tsx index 207cfb661c..b71de7ab33 100644 --- a/packages/frontend/apps/electron-renderer/src/popup/recording/index.tsx +++ b/packages/frontend/apps/electron-renderer/src/popup/recording/index.tsx @@ -10,8 +10,16 @@ import * as styles from './styles.css'; type Status = { id: number; - status: 'new' | 'recording' | 'processing' | 'ready'; - blockCreationStatus?: 'success' | 'failed'; + status: + | 'new' + | 'starting' + | 'recording' + | 'finalizing' + | 'pending_import' + | 'importing' + | 'imported' + | 'import_failed' + | 'finalize_failed'; appName?: string; appGroupId?: number; icon?: Buffer; @@ -56,19 +64,17 @@ export function Recording() { } if (status.status === 'new') { return t['com.affine.recording.new'](); - } else if ( - status.status === 'ready' && - status.blockCreationStatus === 'success' - ) { + } else if (status.status === 'imported') { return t['com.affine.recording.success.prompt'](); } else if ( - status.status === 'ready' && - status.blockCreationStatus === 'failed' + status.status === 'import_failed' || + status.status === 'finalize_failed' ) { return t['com.affine.recording.failed.prompt'](); } else if ( + status.status === 'starting' || status.status === 'recording' || - status.status === 'processing' + status.status === 'finalizing' ) { if (status.appName) { return t['com.affine.recording.recording']({ @@ -77,6 +83,11 @@ export function Recording() { } else { return t['com.affine.recording.recording.unnamed'](); } + } else if ( + status.status === 'pending_import' || + status.status === 'importing' + ) { + return t['com.affine.recording.success.prompt'](); } return null; }, [status, t]); @@ -155,8 +166,10 @@ export function Recording() { ); } else if ( - status.status === 'processing' || - (status.status === 'ready' && !status.blockCreationStatus) + status.status === 'starting' || + status.status === 'finalizing' || + status.status === 'pending_import' || + status.status === 'importing' ) { return ( ); } else if ( - status.status === 'ready' && - status.blockCreationStatus === 'failed' + status.status === 'import_failed' || + status.status === 'finalize_failed' ) { return ( <> diff --git a/packages/frontend/apps/electron/src/main/recording/artifact-registry.ts b/packages/frontend/apps/electron/src/main/recording/artifact-registry.ts new file mode 100644 index 0000000000..f4689967c8 --- /dev/null +++ b/packages/frontend/apps/electron/src/main/recording/artifact-registry.ts @@ -0,0 +1,191 @@ +import { BehaviorSubject } from 'rxjs'; + +import { logger } from '../logger'; +import { globalStateStorage } from '../shared-storage/storage'; +import type { + RecordingArtifactInfo, + RecordingImportState, + RecordingImportStatus, + RecordingSessionStatus, +} from './types'; + +const RECORDING_IMPORT_REGISTRY_KEY = 'recordingImportRegistry:v1'; + +function isImportState(value: unknown): value is RecordingImportState { + return ( + value === 'pending_import' || + value === 'importing' || + value === 'imported' || + value === 'import_failed' + ); +} + +function isArtifactInfo(value: unknown): value is RecordingArtifactInfo { + if (!value || typeof value !== 'object') { + return false; + } + + const artifact = value as Partial; + return ( + typeof artifact.filepath === 'string' && + (artifact.sampleRate === undefined || + typeof artifact.sampleRate === 'number') && + (artifact.numberOfChannels === undefined || + typeof artifact.numberOfChannels === 'number') && + (artifact.durationMs === undefined || + typeof artifact.durationMs === 'number') && + (artifact.size === undefined || typeof artifact.size === 'number') && + (artifact.degraded === undefined || + typeof artifact.degraded === 'boolean') && + (artifact.overflowCount === undefined || + typeof artifact.overflowCount === 'number') + ); +} + +function isRecordingImportStatus( + value: unknown +): value is RecordingImportStatus { + if (!isArtifactInfo(value) || typeof value !== 'object') { + return false; + } + + const item = value as Partial; + return ( + typeof item.id === 'number' && + typeof item.startTime === 'number' && + typeof item.createdAt === 'number' && + typeof item.updatedAt === 'number' && + isImportState(item.importStatus) && + (item.appName === undefined || typeof item.appName === 'string') && + (item.errorMessage === undefined || typeof item.errorMessage === 'string') + ); +} + +function loadPersistedImports() { + const persisted = globalStateStorage.get(RECORDING_IMPORT_REGISTRY_KEY); + if (!Array.isArray(persisted)) { + return [] as RecordingImportStatus[]; + } + return persisted.filter(isRecordingImportStatus); +} + +export class RecordingArtifactRegistry { + private readonly imports$ = new BehaviorSubject( + loadPersistedImports() + ); + + get entries$() { + return this.imports$; + } + + get entries() { + return this.imports$.value; + } + + private setEntries( + updater: + | RecordingImportStatus[] + | ((entries: RecordingImportStatus[]) => RecordingImportStatus[]) + ) { + const nextEntries = + typeof updater === 'function' ? updater(this.imports$.value) : updater; + this.imports$.next(nextEntries); + globalStateStorage.set(RECORDING_IMPORT_REGISTRY_KEY, nextEntries); + return nextEntries; + } + + enqueueFromSession( + session: RecordingSessionStatus, + artifact: RecordingArtifactInfo + ) { + const now = Date.now(); + return this.setEntries(entries => { + const next = entries.filter(entry => entry.id !== session.id); + next.push({ + id: session.id, + appName: session.appGroup?.name, + startTime: session.startTime, + importStatus: 'pending_import', + createdAt: now, + updatedAt: now, + ...artifact, + }); + next.sort((left, right) => left.createdAt - right.createdAt); + return next; + }).find(entry => entry.id === session.id); + } + + claim(id: number) { + let claimed: RecordingImportStatus | null = null; + this.setEntries(entries => + entries.map(entry => { + if (entry.id !== id) { + return entry; + } + if ( + entry.importStatus !== 'pending_import' && + entry.importStatus !== 'import_failed' + ) { + return entry; + } + claimed = { + ...entry, + importStatus: 'importing', + errorMessage: undefined, + updatedAt: Date.now(), + }; + return claimed; + }) + ); + return claimed; + } + + markImported(id: number) { + return this.updateState(id, 'imported'); + } + + markFailed(id: number, errorMessage?: string) { + return this.updateState(id, 'import_failed', errorMessage); + } + + remove(id: number) { + this.setEntries(entries => entries.filter(entry => entry.id !== id)); + } + + latest() { + return [...this.entries].sort((left, right) => { + if (left.updatedAt !== right.updatedAt) { + return right.updatedAt - left.updatedAt; + } + return right.id - left.id; + })[0]; + } + + private updateState( + id: number, + importStatus: RecordingImportState, + errorMessage?: string + ) { + let updated: RecordingImportStatus | null = null; + this.setEntries(entries => + entries.map(entry => { + if (entry.id !== id) { + return entry; + } + updated = { + ...entry, + importStatus, + errorMessage, + updatedAt: Date.now(), + }; + return updated; + }) + ); + if (!updated) { + logger.error(`Recording import ${id} not found`); + } + return updated; + } +} + +export const recordingArtifactRegistry = new RecordingArtifactRegistry(); diff --git a/packages/frontend/apps/electron/src/main/recording/feature.ts b/packages/frontend/apps/electron/src/main/recording/feature.ts index a5544dde76..3f1f675bc9 100644 --- a/packages/frontend/apps/electron/src/main/recording/feature.ts +++ b/packages/frontend/apps/electron/src/main/recording/feature.ts @@ -35,8 +35,15 @@ import { globalStateStorage } from '../shared-storage/storage'; import { getMainWindow } from '../windows-manager'; import { popupManager } from '../windows-manager/popup'; import { isAppNameAllowed } from './allow-list'; +import { recordingArtifactRegistry } from './artifact-registry'; import { recordingStateMachine } from './state-machine'; -import type { AppGroupInfo, RecordingStatus, TappableAppInfo } from './types'; +import type { + AppGroupInfo, + RecordingImportStatus, + RecordingSessionStatus, + RecordingStatus, + TappableAppInfo, +} from './types'; export const MeetingsSettingsState = { $: globalStateStorage.watch(MeetingSettingsKey).pipe( @@ -74,14 +81,32 @@ type ShareableContentType = InstanceType; type ShareableContentStatic = NativeModule['ShareableContent']; let shareableContent: ShareableContentType | null = null; +let nativeModuleOverride: NativeModule | null = null; function getNativeModule(): NativeModule { - return require('@affine/native') as NativeModule; + return nativeModuleOverride ?? (require('@affine/native') as NativeModule); +} + +async function getNativeModuleAsync(): Promise { + if (nativeModuleOverride) { + return nativeModuleOverride; + } + return (await import('@affine/native')) as NativeModule; +} + +export function setRecordingNativeModuleForTesting( + nativeModule: NativeModule | null +) { + nativeModuleOverride = nativeModule; } function cleanup() { const nativeId = recordingStateMachine.status?.nativeId; - if (nativeId) cleanupAbandonedNativeRecording(nativeId); + if (nativeId) { + void cleanupAbandonedNativeRecording(nativeId).catch(error => { + logger.error('failed to cleanup abandoned native recording', error); + }); + } recordingStatus$.next(null); shareableContent = null; appStateSubscribers.forEach(subscriber => { @@ -113,16 +138,103 @@ export const appGroups$ = new BehaviorSubject([]); export const updateApplicationsPing$ = new Subject(); -// There should be only one active recording at a time; state is managed by the state machine -export const recordingStatus$ = recordingStateMachine.status$; +export const recordingSessionStatus$ = recordingStateMachine.status$; +export const recordingImportQueue$ = recordingArtifactRegistry.entries$; +export const recordingStatus$ = new BehaviorSubject( + null +); -function isRecordingSettled( +function hasActiveRecordingSession( + status: RecordingSessionStatus | null | undefined +) { + return ( + status?.sessionStatus === 'starting' || + status?.sessionStatus === 'recording' || + status?.sessionStatus === 'finalizing' + ); +} + +function isTerminalPopupStatus( status: RecordingStatus | null | undefined ): status is RecordingStatus & { - status: 'ready'; - blockCreationStatus: 'success' | 'failed'; + status: 'imported' | 'import_failed' | 'finalize_failed'; } { - return status?.status === 'ready' && status.blockCreationStatus !== undefined; + return ( + status?.status === 'imported' || + status?.status === 'import_failed' || + status?.status === 'finalize_failed' + ); +} + +function serializeSessionStatus( + status: RecordingSessionStatus | null +): RecordingStatus | null { + if (!status || status.sessionStatus === 'aborted') { + return null; + } + + const artifact = status.artifact; + return { + id: status.id, + status: + status.sessionStatus === 'finalized' + ? 'pending_import' + : status.sessionStatus, + appName: status.appGroup?.name, + appGroupId: status.appGroup?.processGroupId, + icon: status.appGroup?.icon, + startTime: status.startTime, + filepath: artifact?.filepath, + sampleRate: artifact?.sampleRate, + numberOfChannels: artifact?.numberOfChannels, + durationMs: artifact?.durationMs, + size: artifact?.size, + degraded: artifact?.degraded, + overflowCount: artifact?.overflowCount, + errorMessage: status.errorMessage, + }; +} + +function serializeImportStatus( + status: RecordingImportStatus | null | undefined +): RecordingStatus | null { + if (!status) { + return null; + } + + return { + id: status.id, + status: status.importStatus, + appName: status.appName, + startTime: status.startTime, + filepath: status.filepath, + sampleRate: status.sampleRate, + numberOfChannels: status.numberOfChannels, + durationMs: status.durationMs, + size: status.size, + degraded: status.degraded, + overflowCount: status.overflowCount, + errorMessage: status.errorMessage, + }; +} + +function getProjectedRecordingStatus(): RecordingStatus | null { + const session = recordingStateMachine.status; + if ( + session?.sessionStatus === 'new' || + session?.sessionStatus === 'starting' || + session?.sessionStatus === 'recording' || + session?.sessionStatus === 'finalizing' || + session?.sessionStatus === 'finalize_failed' + ) { + return serializeSessionStatus(session); + } + + return serializeImportStatus(recordingArtifactRegistry.latest()); +} + +function emitRecordingStatus() { + recordingStatus$.next(getProjectedRecordingStatus()); } function createAppGroup(processGroupId: number): AppGroupInfo | undefined { @@ -193,10 +305,10 @@ function setupNewRunningAppGroup() { ); appGroups$.value.forEach(group => { - const recordingStatus = recordingStatus$.value; + const recordingStatus = recordingStateMachine.status; if ( group.isRunning && - (!recordingStatus || recordingStatus.status === 'new') + (!recordingStatus || recordingStatus.sessionStatus === 'new') ) { newRecording(group); } @@ -225,15 +337,13 @@ function setupNewRunningAppGroup() { return; } - const recordingStatus = recordingStatus$.value; + const recordingStatus = recordingStateMachine.status; if (currentGroup.isRunning) { - // when the app is running and there is no active recording popup - // we should show a new recording popup if ( !recordingStatus || - recordingStatus.status === 'new' || - isRecordingSettled(recordingStatus) + recordingStatus.sessionStatus === 'new' || + !hasActiveRecordingSession(recordingStatus) ) { if (MeetingsSettingsState.value.recordingMode === 'prompt') { newRecording(currentGroup); @@ -251,7 +361,7 @@ function setupNewRunningAppGroup() { // when displaying in "new" state but the app is not running any more // we should remove the recording if ( - recordingStatus?.status === 'new' && + recordingStatus?.sessionStatus === 'new' && currentGroup.bundleIdentifier === recordingStatus.appGroup?.bundleIdentifier ) { @@ -261,7 +371,7 @@ function setupNewRunningAppGroup() { // if the watched app stops while we are recording it, // we should stop the recording if ( - recordingStatus?.status === 'recording' && + recordingStatus?.sessionStatus === 'recording' && recordingStatus.appGroup?.bundleIdentifier === currentGroup.bundleIdentifier ) { @@ -276,28 +386,49 @@ function setupNewRunningAppGroup() { export async function getRecording(id: number) { const recording = recordingStateMachine.status; - if (!recording || recording.id !== id) { + if (recording?.id === id) { + return { + id, + appGroup: recording.appGroup, + app: recording.app, + startTime: recording.startTime, + filepath: recording.artifact?.filepath, + sampleRate: recording.artifact?.sampleRate, + numberOfChannels: recording.artifact?.numberOfChannels, + }; + } + + const artifact = recordingArtifactRegistry.entries.find( + recordingArtifact => recordingArtifact.id === id + ); + if (!artifact) { logger.error(`Recording ${id} not found`); return; } + return { id, - appGroup: recording.appGroup, - app: recording.app, - startTime: recording.startTime, - filepath: recording.filepath, - sampleRate: recording.sampleRate, - numberOfChannels: recording.numberOfChannels, + startTime: artifact.startTime, + filepath: artifact.filepath, + sampleRate: artifact.sampleRate, + numberOfChannels: artifact.numberOfChannels, }; } -// recording popup status -// new: waiting for user confirmation -// recording: native recording is ongoing -// processing: native stop or renderer import/transcription is ongoing -// ready + blockCreationStatus: post-processing finished -// null: hide popup function setupRecordingListeners() { + subscribers.push( + recordingSessionStatus$ + .pipe(distinctUntilChanged(shallowEqual)) + .subscribe(() => { + emitRecordingStatus(); + }), + recordingImportQueue$ + .pipe(distinctUntilChanged(shallowEqual)) + .subscribe(() => { + emitRecordingStatus(); + }) + ); + subscribers.push( recordingStatus$ .pipe(distinctUntilChanged(shallowEqual)) @@ -310,13 +441,12 @@ function setupRecordingListeners() { }); } - if (isRecordingSettled(status)) { - // show the popup for 10s + if (isTerminalPopupStatus(status)) { setTimeout( () => { const currentStatus = recordingStatus$.value; if ( - isRecordingSettled(currentStatus) && + isTerminalPopupStatus(currentStatus) && currentStatus.id === status.id ) { popup.hide().catch(err => { @@ -324,10 +454,12 @@ function setupRecordingListeners() { }); } }, - status.blockCreationStatus === 'failed' ? 30_000 : 10_000 + status.status === 'import_failed' || + status.status === 'finalize_failed' + ? 30_000 + : 10_000 ); } else if (!status) { - // status is removed, we should hide the popup popupManager .get('recording') .hide() @@ -452,11 +584,10 @@ export function setupRecordingFeature() { shareableContent = new ShareableContent(); setupMediaListeners(); } - // reset all states - recordingStatus$.next(null); setupAppGroups(); setupNewRunningAppGroup(); setupRecordingListeners(); + emitRecordingStatus(); return true; } catch (error) { logger.error('failed to setup recording feature', error); @@ -479,10 +610,12 @@ function normalizeAppGroupInfo( export function newRecording( appGroup?: AppGroupInfo | number ): RecordingStatus | null { - return recordingStateMachine.dispatch({ + const nextState = recordingStateMachine.dispatch({ type: 'NEW_RECORDING', appGroup: normalizeAppGroupInfo(appGroup), }); + emitRecordingStatus(); + return serializeRecordingStatus(nextState); } export async function startRecording( @@ -493,9 +626,10 @@ export async function startRecording( type: 'START_RECORDING', appGroup: normalizeAppGroupInfo(appGroup), }); + emitRecordingStatus(); - if (!state || state.status !== 'recording' || state === previousState) { - return state; + if (!state || state.sessionStatus !== 'starting' || state === previousState) { + return serializeRecordingStatus(state); } let nativeId: string | undefined; @@ -503,7 +637,9 @@ export async function startRecording( try { fs.ensureDirSync(SAVED_RECORDINGS_DIR); - const meta = getNativeModule().startRecording({ + logger.info(`recording ${state.id} starting`); + const nativeModule = await getNativeModuleAsync(); + const meta = await nativeModule.startRecording({ appProcessId: state.app?.processId, outputDir: SAVED_RECORDINGS_DIR, format: 'opus', @@ -521,22 +657,30 @@ export async function startRecording( sampleRate: meta.sampleRate, numberOfChannels: meta.channels, }); + emitRecordingStatus(); if (!nextState || nextState.nativeId !== meta.id) { throw new Error('Failed to attach native recording metadata'); } - return nextState; + logger.info(`recording ${state.id} started`, { + nativeId: meta.id, + sampleRate: meta.sampleRate, + channels: meta.channels, + }); + return serializeRecordingStatus(nextState); } catch (error) { if (nativeId) { - cleanupAbandonedNativeRecording(nativeId); + await cleanupAbandonedNativeRecording(nativeId); } logger.error('failed to start recording', error); - return setRecordingBlockCreationStatus( - state.id, - 'failed', - error instanceof Error ? error.message : undefined - ); + const nextState = recordingStateMachine.dispatch({ + type: 'START_RECORDING_FAILED', + id: state.id, + errorMessage: error instanceof Error ? error.message : undefined, + }); + emitRecordingStatus(); + return serializeRecordingStatus(nextState); } } @@ -552,34 +696,58 @@ export async function stopRecording(id: number) { return; } - const processingState = recordingStateMachine.dispatch({ + const finalizingState = recordingStateMachine.dispatch({ type: 'STOP_RECORDING', id, }); + emitRecordingStatus(); if ( - !processingState || - processingState.id !== id || - processingState.status !== 'processing' + !finalizingState || + finalizingState.id !== id || + finalizingState.sessionStatus !== 'finalizing' ) { - return serializeRecordingStatus(processingState ?? recording); + return serializeRecordingStatus(finalizingState ?? recording); } try { - const artifact = getNativeModule().stopRecording(recording.nativeId); + logger.info(`recording ${id} finalizing`, { + nativeId: recording.nativeId, + }); + const nativeModule = await getNativeModuleAsync(); + const artifact = await nativeModule.stopRecording(recording.nativeId); const filepath = await assertRecordingFilepath(artifact.filepath); - const readyStatus = recordingStateMachine.dispatch({ + const finalizedStatus = recordingStateMachine.dispatch({ type: 'ATTACH_RECORDING_ARTIFACT', id, - filepath, - sampleRate: artifact.sampleRate, - numberOfChannels: artifact.channels, + artifact: { + filepath, + sampleRate: artifact.sampleRate, + numberOfChannels: artifact.channels, + durationMs: artifact.durationMs, + size: artifact.size, + degraded: artifact.degraded, + overflowCount: artifact.overflowCount, + }, }); + emitRecordingStatus(); - if (!readyStatus) { + if (!finalizedStatus) { logger.error('No recording status to save'); return; } + const importEntry = recordingArtifactRegistry.enqueueFromSession( + finalizedStatus, + finalizedStatus.artifact ?? { filepath } + ); + emitRecordingStatus(); + logger.info(`recording ${id} finalized`, { + filepath, + degraded: artifact.degraded, + overflowCount: artifact.overflowCount, + importStatus: importEntry?.importStatus, + }); + getMainWindow() .then(mainWindow => { if (mainWindow) { @@ -590,19 +758,16 @@ export async function stopRecording(id: number) { logger.error('failed to bring up the window', err); }); - return serializeRecordingStatus(readyStatus); + return serializeRecordingStatus(getProjectedRecordingStatus()); } catch (error: unknown) { logger.error('Failed to stop recording', error); - const recordingStatus = await setRecordingBlockCreationStatus( + const nextState = recordingStateMachine.dispatch({ + type: 'FINALIZE_RECORDING_FAILED', id, - 'failed', - error instanceof Error ? error.message : undefined - ); - if (!recordingStatus) { - logger.error('No recording status to stop'); - return; - } - return serializeRecordingStatus(recordingStatus); + errorMessage: error instanceof Error ? error.message : undefined, + }); + emitRecordingStatus(); + return serializeRecordingStatus(nextState); } } @@ -618,66 +783,137 @@ export async function readRecordingFile(filepath: string) { return fsp.readFile(normalizedPath); } -function cleanupAbandonedNativeRecording(nativeId: string) { +async function cleanupAbandonedNativeRecording(nativeId: string) { try { - const artifact = getNativeModule().stopRecording(nativeId); - void assertRecordingFilepath(artifact.filepath) - .then(filepath => { - fs.removeSync(filepath); - }) - .catch(error => { - logger.error('failed to validate abandoned recording filepath', error); - }); + const nativeModule = await getNativeModuleAsync(); + await nativeModule.abortRecording(nativeId); } catch (error) { logger.error('failed to cleanup abandoned native recording', error); } } -export async function setRecordingBlockCreationStatus( - id: number, - status: 'success' | 'failed', - errorMessage?: string -) { - return recordingStateMachine.dispatch({ - type: 'SET_BLOCK_CREATION_STATUS', - id, - status, - errorMessage, +export function getRecordingImportQueue() { + return recordingArtifactRegistry.entries.flatMap(entry => { + const serialized = serializeRecordingImportStatus(entry); + return serialized ? [serialized] : []; }); } +export function claimRecordingImport(id: number) { + const status = recordingArtifactRegistry.claim(id); + if (status) { + logger.info(`recording import ${id} claimed`); + } + emitRecordingStatus(); + return serializeRecordingImportStatus(status); +} + +export function completeRecordingImport(id: number) { + logger.info(`recording import ${id} completed`); + const status = recordingArtifactRegistry.markImported(id); + emitRecordingStatus(); + return serializeRecordingImportStatus(status); +} + +export function failRecordingImport(id: number, errorMessage?: string) { + logger.error(`recording import ${id} failed`, errorMessage); + const status = recordingArtifactRegistry.markFailed(id, errorMessage); + emitRecordingStatus(); + return serializeRecordingImportStatus(status); +} + export function removeRecording(id: number) { recordingStateMachine.dispatch({ type: 'REMOVE_RECORDING', id }); + recordingArtifactRegistry.remove(id); + emitRecordingStatus(); } export interface SerializedRecordingStatus { id: number; status: RecordingStatus['status']; - blockCreationStatus?: RecordingStatus['blockCreationStatus']; appName?: string; - // if there is no app group, it means the recording is for system audio appGroupId?: number; icon?: Buffer; startTime: number; filepath?: string; sampleRate?: number; numberOfChannels?: number; + durationMs?: number; + size?: number; + degraded?: boolean; + overflowCount?: number; + errorMessage?: string; } export function serializeRecordingStatus( - status: RecordingStatus + status: RecordingStatus | RecordingSessionStatus | null | undefined ): SerializedRecordingStatus | null { + const serialized = + !status || 'sessionStatus' in status + ? serializeSessionStatus(status ?? null) + : status; + + if (!serialized) { + return null; + } + + return { + id: serialized.id, + status: serialized.status, + appName: serialized.appName, + appGroupId: serialized.appGroupId, + icon: serialized.icon, + startTime: serialized.startTime, + filepath: serialized.filepath, + sampleRate: serialized.sampleRate, + numberOfChannels: serialized.numberOfChannels, + durationMs: serialized.durationMs, + size: serialized.size, + degraded: serialized.degraded, + overflowCount: serialized.overflowCount, + errorMessage: serialized.errorMessage, + }; +} + +export interface SerializedRecordingImportStatus { + id: number; + appName?: string; + startTime: number; + filepath: string; + sampleRate?: number; + numberOfChannels?: number; + durationMs?: number; + size?: number; + degraded?: boolean; + overflowCount?: number; + importStatus: RecordingImportStatus['importStatus']; + errorMessage?: string; + createdAt: number; + updatedAt: number; +} + +export function serializeRecordingImportStatus( + status: RecordingImportStatus | null | undefined +): SerializedRecordingImportStatus | null { + if (!status) { + return null; + } + return { id: status.id, - status: status.status, - blockCreationStatus: status.blockCreationStatus, - appName: status.appGroup?.name, - appGroupId: status.appGroup?.processGroupId, - icon: status.appGroup?.icon, + appName: status.appName, startTime: status.startTime, filepath: status.filepath, sampleRate: status.sampleRate, numberOfChannels: status.numberOfChannels, + durationMs: status.durationMs, + size: status.size, + degraded: status.degraded, + overflowCount: status.overflowCount, + importStatus: status.importStatus, + errorMessage: status.errorMessage, + createdAt: status.createdAt, + updatedAt: status.updatedAt, }; } diff --git a/packages/frontend/apps/electron/src/main/recording/index.ts b/packages/frontend/apps/electron/src/main/recording/index.ts index 4942e6b232..5ec6d198d2 100644 --- a/packages/frontend/apps/electron/src/main/recording/index.ts +++ b/packages/frontend/apps/electron/src/main/recording/index.ts @@ -11,15 +11,20 @@ import { askForMeetingPermission, checkMeetingPermissions, checkRecordingAvailable, + claimRecordingImport, + completeRecordingImport, disableRecordingFeature, + failRecordingImport, getRecording, + getRecordingImportQueue, readRecordingFile, + recordingImportQueue$, recordingStatus$, removeRecording, SAVED_RECORDINGS_DIR, + type SerializedRecordingImportStatus, type SerializedRecordingStatus, serializeRecordingStatus, - setRecordingBlockCreationStatus, setupRecordingFeature, startRecording, stopRecording, @@ -45,13 +50,17 @@ export const recordingHandlers = { readRecordingFile: async (_, filepath: string) => { return readRecordingFile(filepath); }, - setRecordingBlockCreationStatus: async ( - _, - id: number, - status: 'success' | 'failed', - errorMessage?: string - ) => { - return setRecordingBlockCreationStatus(id, status, errorMessage); + getRecordingImportQueue: async () => { + return getRecordingImportQueue(); + }, + claimRecordingImport: async (_, id: number) => { + return claimRecordingImport(id); + }, + completeRecordingImport: async (_, id: number) => { + return completeRecordingImport(id); + }, + failRecordingImport: async (_, id: number, errorMessage?: string) => { + return failRecordingImport(id, errorMessage); }, removeRecording: async (_, id: number) => { return removeRecording(id); @@ -108,4 +117,35 @@ export const recordingEvents = { } }; }, + onRecordingImportQueueChanged: ( + fn: (queue: SerializedRecordingImportStatus[]) => void + ) => { + const sub = recordingImportQueue$.subscribe(queue => { + fn( + queue.map(item => ({ + id: item.id, + appName: item.appName, + startTime: item.startTime, + filepath: item.filepath, + sampleRate: item.sampleRate, + numberOfChannels: item.numberOfChannels, + durationMs: item.durationMs, + size: item.size, + degraded: item.degraded, + overflowCount: item.overflowCount, + importStatus: item.importStatus, + errorMessage: item.errorMessage, + createdAt: item.createdAt, + updatedAt: item.updatedAt, + })) + ); + }); + return () => { + try { + sub.unsubscribe(); + } catch { + // ignore unsubscribe error + } + }; + }, }; diff --git a/packages/frontend/apps/electron/src/main/recording/state-machine.ts b/packages/frontend/apps/electron/src/main/recording/state-machine.ts index d821fe7bc8..afbcbfe1a0 100644 --- a/packages/frontend/apps/electron/src/main/recording/state-machine.ts +++ b/packages/frontend/apps/electron/src/main/recording/state-machine.ts @@ -2,17 +2,15 @@ import { BehaviorSubject } from 'rxjs'; import { shallowEqual } from '../../shared/utils'; import { logger } from '../logger'; -import type { AppGroupInfo, RecordingStatus } from './types'; +import type { + AppGroupInfo, + RecordingArtifactInfo, + RecordingSessionStatus, +} from './types'; -/** - * Recording state machine events - */ export type RecordingEvent = | { type: 'NEW_RECORDING'; appGroup?: AppGroupInfo } - | { - type: 'START_RECORDING'; - appGroup?: AppGroupInfo; - } + | { type: 'START_RECORDING'; appGroup?: AppGroupInfo } | { type: 'ATTACH_NATIVE_RECORDING'; id: number; @@ -22,56 +20,33 @@ export type RecordingEvent = sampleRate: number; numberOfChannels: number; } - | { - type: 'STOP_RECORDING'; - id: number; - } + | { type: 'START_RECORDING_FAILED'; id: number; errorMessage?: string } + | { type: 'STOP_RECORDING'; id: number } | { type: 'ATTACH_RECORDING_ARTIFACT'; id: number; - filepath: string; - sampleRate?: number; - numberOfChannels?: number; - } - | { - type: 'SET_BLOCK_CREATION_STATUS'; - id: number; - status: 'success' | 'failed'; - errorMessage?: string; + artifact: RecordingArtifactInfo; } + | { type: 'FINALIZE_RECORDING_FAILED'; id: number; errorMessage?: string } + | { type: 'ABORT_RECORDING'; id: number } | { type: 'REMOVE_RECORDING'; id: number }; -/** - * Recording State Machine - * Handles state transitions for the recording process - */ export class RecordingStateMachine { private recordingId = 0; private readonly recordingStatus$ = - new BehaviorSubject(null); + new BehaviorSubject(null); - /** - * Get the current recording status - */ - get status(): RecordingStatus | null { + get status(): RecordingSessionStatus | null { return this.recordingStatus$.value; } - /** - * Get the BehaviorSubject for recording status - */ - get status$(): BehaviorSubject { + get status$(): BehaviorSubject { return this.recordingStatus$; } - /** - * Dispatch an event to the state machine - * @param event The event to dispatch - * @returns The new recording status after the event is processed - */ - dispatch(event: RecordingEvent, emit = true): RecordingStatus | null { + dispatch(event: RecordingEvent, emit = true): RecordingSessionStatus | null { const currentStatus = this.recordingStatus$.value; - let newStatus: RecordingStatus | null = null; + let newStatus: RecordingSessionStatus | null = null; switch (event.type) { case 'NEW_RECORDING': @@ -83,24 +58,30 @@ export class RecordingStateMachine { case 'ATTACH_NATIVE_RECORDING': newStatus = this.handleAttachNativeRecording(event); break; + case 'START_RECORDING_FAILED': + newStatus = this.handleStartRecordingFailed( + event.id, + event.errorMessage + ); + break; case 'STOP_RECORDING': newStatus = this.handleStopRecording(event.id); break; case 'ATTACH_RECORDING_ARTIFACT': newStatus = this.handleAttachRecordingArtifact( event.id, - event.filepath, - event.sampleRate, - event.numberOfChannels + event.artifact ); break; - case 'SET_BLOCK_CREATION_STATUS': - newStatus = this.handleSetBlockCreationStatus( + case 'FINALIZE_RECORDING_FAILED': + newStatus = this.handleFinalizeRecordingFailed( event.id, - event.status, event.errorMessage ); break; + case 'ABORT_RECORDING': + newStatus = this.handleAbortRecording(event.id); + break; case 'REMOVE_RECORDING': this.handleRemoveRecording(event.id); newStatus = currentStatus?.id === event.id ? null : currentStatus; @@ -121,86 +102,104 @@ export class RecordingStateMachine { return newStatus; } - /** - * Handle the NEW_RECORDING event - */ - private handleNewRecording(appGroup?: AppGroupInfo): RecordingStatus { - const recordingStatus: RecordingStatus = { + private hasActiveSession(status: RecordingSessionStatus | null | undefined) { + return ( + status?.sessionStatus === 'starting' || + status?.sessionStatus === 'recording' || + status?.sessionStatus === 'finalizing' + ); + } + + private handleNewRecording(appGroup?: AppGroupInfo): RecordingSessionStatus { + return { id: this.recordingId++, - status: 'new', + sessionStatus: 'new', startTime: Date.now(), app: appGroup?.apps.find(app => app.isRunning), appGroup, }; - return recordingStatus; } - /** - * Handle the START_RECORDING event - */ - private handleStartRecording(appGroup?: AppGroupInfo): RecordingStatus { + private handleStartRecording( + appGroup?: AppGroupInfo + ): RecordingSessionStatus | null { const currentStatus = this.recordingStatus$.value; - if ( - currentStatus?.status === 'recording' || - currentStatus?.status === 'processing' - ) { + if (this.hasActiveSession(currentStatus)) { logger.error( - 'Cannot start a new recording if there is already a recording' + 'Cannot start a new recording while another session is active' ); return currentStatus; } if ( + currentStatus?.sessionStatus === 'new' && appGroup && - currentStatus?.appGroup?.processGroupId === appGroup.processGroupId && - currentStatus.status === 'new' + currentStatus.appGroup?.processGroupId === appGroup.processGroupId ) { return { ...currentStatus, - status: 'recording', - }; - } else { - const newStatus = this.handleNewRecording(appGroup); - return { - ...newStatus, - status: 'recording', + sessionStatus: 'starting', + errorMessage: undefined, }; } + + const nextStatus = + currentStatus?.sessionStatus === 'new' && !appGroup + ? currentStatus + : this.handleNewRecording(appGroup); + + return { + ...nextStatus, + sessionStatus: 'starting', + errorMessage: undefined, + }; } - /** - * Attach native recording metadata to the current recording - */ private handleAttachNativeRecording( event: Extract - ): RecordingStatus | null { + ) { const currentStatus = this.recordingStatus$.value; if (!currentStatus || currentStatus.id !== event.id) { logger.error(`Recording ${event.id} not found for native attachment`); return currentStatus; } - if (currentStatus.status !== 'recording') { + if (currentStatus.sessionStatus !== 'starting') { logger.error( - `Cannot attach native metadata when recording is in ${currentStatus.status} state` + `Cannot attach native metadata when recording is in ${currentStatus.sessionStatus} state` ); return currentStatus; } return { ...currentStatus, + sessionStatus: 'recording' as const, nativeId: event.nativeId, startTime: event.startTime, - filepath: event.filepath, - sampleRate: event.sampleRate, - numberOfChannels: event.numberOfChannels, + artifact: { + filepath: event.filepath, + sampleRate: event.sampleRate, + numberOfChannels: event.numberOfChannels, + }, }; } - /** - * Handle the STOP_RECORDING event - */ - private handleStopRecording(id: number): RecordingStatus | null { + private handleStartRecordingFailed(id: number, errorMessage?: string) { + const currentStatus = this.recordingStatus$.value; + + if (!currentStatus || currentStatus.id !== id) { + logger.error(`Recording ${id} not found for start failure`); + return currentStatus; + } + + return { + ...currentStatus, + sessionStatus: 'finalize_failed' as const, + errorMessage, + }; + } + + private handleStopRecording(id: number) { const currentStatus = this.recordingStatus$.value; if (!currentStatus || currentStatus.id !== id) { @@ -208,26 +207,24 @@ export class RecordingStateMachine { return currentStatus; } - if (currentStatus.status !== 'recording') { - logger.error(`Cannot stop recording in ${currentStatus.status} state`); + if (currentStatus.sessionStatus !== 'recording') { + logger.error( + `Cannot stop recording in ${currentStatus.sessionStatus} state` + ); return currentStatus; } return { ...currentStatus, - status: 'processing', + sessionStatus: 'finalizing' as const, + errorMessage: undefined, }; } - /** - * Attach the encoded artifact once native stop completes - */ private handleAttachRecordingArtifact( id: number, - filepath: string, - sampleRate?: number, - numberOfChannels?: number - ): RecordingStatus | null { + artifact: RecordingArtifactInfo + ) { const currentStatus = this.recordingStatus$.value; if (!currentStatus || currentStatus.id !== id) { @@ -235,66 +232,60 @@ export class RecordingStateMachine { return currentStatus; } - if (currentStatus.status !== 'processing') { - logger.error(`Cannot attach artifact in ${currentStatus.status} state`); + if (currentStatus.sessionStatus !== 'finalizing') { + logger.error( + `Cannot attach artifact in ${currentStatus.sessionStatus} state` + ); return currentStatus; } return { ...currentStatus, - filepath, - sampleRate: sampleRate ?? currentStatus.sampleRate, - numberOfChannels: numberOfChannels ?? currentStatus.numberOfChannels, + sessionStatus: 'finalized' as const, + artifact: { + ...currentStatus.artifact, + ...artifact, + }, }; } - /** - * Set the renderer-side block creation result - */ - private handleSetBlockCreationStatus( - id: number, - status: 'success' | 'failed', - errorMessage?: string - ): RecordingStatus | null { + private handleFinalizeRecordingFailed(id: number, errorMessage?: string) { const currentStatus = this.recordingStatus$.value; if (!currentStatus || currentStatus.id !== id) { - logger.error(`Recording ${id} not found for block creation status`); - return currentStatus; - } - - if (currentStatus.status === 'new') { - logger.error(`Cannot settle recording ${id} before it starts`); - return currentStatus; - } - - if ( - currentStatus.status === 'ready' && - currentStatus.blockCreationStatus !== undefined - ) { + logger.error(`Recording ${id} not found for finalize failure`); return currentStatus; } if (errorMessage) { - logger.error(`Recording ${id} create block failed: ${errorMessage}`); + logger.error(`Recording ${id} finalize failed: ${errorMessage}`); } return { ...currentStatus, - status: 'ready', - blockCreationStatus: status, + sessionStatus: 'finalize_failed' as const, + errorMessage, }; } - /** - * Handle the REMOVE_RECORDING event - */ - private handleRemoveRecording(id: number): void { - // Actual recording removal logic would be handled by the caller - // This just ensures the state is updated correctly + private handleAbortRecording(id: number) { + const currentStatus = this.recordingStatus$.value; + + if (!currentStatus || currentStatus.id !== id) { + logger.error(`Recording ${id} not found for abort`); + return currentStatus; + } + + return { + ...currentStatus, + sessionStatus: 'aborted' as const, + errorMessage: undefined, + }; + } + + private handleRemoveRecording(id: number) { logger.info(`Recording ${id} removed from state machine`); } } -// Create and export a singleton instance export const recordingStateMachine = new RecordingStateMachine(); diff --git a/packages/frontend/apps/electron/src/main/recording/state-transitions.md b/packages/frontend/apps/electron/src/main/recording/state-transitions.md index 10e4fda998..63250ecf0b 100644 --- a/packages/frontend/apps/electron/src/main/recording/state-transitions.md +++ b/packages/frontend/apps/electron/src/main/recording/state-transitions.md @@ -1,35 +1,68 @@ # Recording State Transitions -The desktop recording flow now has a single linear engine state and a separate post-process result. +The desktop recording flow now uses two independent lifecycle models: -## Engine states +1. recording session state in Electron main, which tracks native capture/finalize. +2. artifact import state in Electron main, which tracks renderer-side doc import. -- `inactive`: no active recording -- `new`: app detected, waiting for user confirmation -- `recording`: native capture is running -- `processing`: native capture has stopped and the artifact is being imported -- `ready`: post-processing has finished +## Recording Session State -## Post-process result +- `inactive`: no session has been created yet. +- `new`: app detected, waiting for user confirmation. +- `starting`: native session setup is in progress. +- `recording`: native capture is running. +- `finalizing`: native stop/finalize is in progress. +- `finalized`: native finalized an artifact successfully. +- `finalize_failed`: native finalize failed. +- `aborted`: native session was discarded without producing an artifact. -`ready` recordings may carry `blockCreationStatus`: +Only `starting`, `recording`, and `finalizing` occupy the active native slot. +`finalized`, `finalize_failed`, and `aborted` no longer block the next recording. -- `success`: the recording block was created successfully -- `failed`: the artifact was saved, but block creation/import failed +## Recording Artifact Import State -## State flow +- `pending_import`: artifact is finalized and durable in main, waiting for a renderer to consume it. +- `importing`: a renderer has claimed the artifact and is importing it into a doc. +- `imported`: doc import finished successfully. +- `import_failed`: doc import failed; the artifact remains available for retry. + +Artifacts are persisted in main process storage so renderer reloads or missing workspace context do not drop them. + +## Session Flow ```text -inactive -> new -> recording -> processing -> ready - ^ | - | | - +------ start ---------+ +inactive -> new -> starting -> recording -> finalizing -> finalized + \ \ + \ -> finalize_failed + -> finalize_failed ``` -- `START_RECORDING` creates or reuses a pending `new` recording. -- `ATTACH_NATIVE_RECORDING` fills in native metadata while staying in `recording`. -- `STOP_RECORDING` moves the flow to `processing`. -- `ATTACH_RECORDING_ARTIFACT` attaches the finalized `.opus` artifact while staying in `processing`. -- `SET_BLOCK_CREATION_STATUS` settles the flow as `ready`. +- `START_RECORDING` creates or reuses a pending `new` recording and moves it to `starting`. +- `ATTACH_NATIVE_RECORDING` attaches native session metadata and moves the session to `recording`. +- `START_RECORDING_FAILED` keeps the session terminal with `finalize_failed`. +- `STOP_RECORDING` moves the session to `finalizing`. +- `ATTACH_RECORDING_ARTIFACT` marks the session `finalized` with the native artifact metadata. +- `FINALIZE_RECORDING_FAILED` marks the session `finalize_failed`. +- `ABORT_RECORDING` marks the session `aborted`. -Only one recording can be active at a time. A new recording can start only after the previous one has been removed or its `ready` result has been settled. +## Import Flow + +```text +pending_import -> importing -> imported + \ + -> import_failed -> importing +``` + +- main enqueues `pending_import` after native finalize succeeds. +- renderer claims the artifact, moving it to `importing`. +- renderer marks the artifact `imported` or `import_failed`. +- `import_failed` artifacts can be claimed again for retry. + +## Popup Projection + +The popup still renders a single current status, but it is now a projection: + +- active session states map to `new`, `starting`, `recording`, `finalizing`, `finalize_failed`. +- otherwise the latest import entry maps to `pending_import`, `importing`, `imported`, `import_failed`. + +This keeps the UI simple without collapsing the underlying source-of-truth back into a single overloaded `processing` state. diff --git a/packages/frontend/apps/electron/src/main/recording/types.ts b/packages/frontend/apps/electron/src/main/recording/types.ts index a3b4b30801..4ea3c956e2 100644 --- a/packages/frontend/apps/electron/src/main/recording/types.ts +++ b/packages/frontend/apps/electron/src/main/recording/types.ts @@ -18,19 +18,76 @@ export interface AppGroupInfo { isRunning: boolean; } -export interface RecordingStatus { +export type RecordingSessionState = + | 'new' + | 'starting' + | 'recording' + | 'finalizing' + | 'finalized' + | 'finalize_failed' + | 'aborted'; + +export type RecordingImportState = + | 'pending_import' + | 'importing' + | 'imported' + | 'import_failed'; + +export interface RecordingArtifactInfo { + filepath: string; + sampleRate?: number; + numberOfChannels?: number; + durationMs?: number; + size?: number; + degraded?: boolean; + overflowCount?: number; +} + +export interface RecordingSessionStatus { id: number; // corresponds to the recording id - // an app group is detected and waiting for user confirmation - // recording: the native recorder is running - // processing: recording has stopped and the artifact is being prepared/imported - // ready: the post-processing result has been settled - status: 'new' | 'recording' | 'processing' | 'ready'; + sessionStatus: RecordingSessionState; app?: TappableAppInfo; appGroup?: AppGroupInfo; startTime: number; // 0 means not started yet - filepath?: string; // encoded file path nativeId?: string; + artifact?: RecordingArtifactInfo; + errorMessage?: string; +} + +export interface RecordingImportStatus extends RecordingArtifactInfo { + id: number; + appName?: string; + startTime: number; + importStatus: RecordingImportState; + errorMessage?: string; + createdAt: number; + updatedAt: number; +} + +export type RecordingDisplayState = + | 'new' + | 'starting' + | 'recording' + | 'finalizing' + | 'pending_import' + | 'importing' + | 'imported' + | 'import_failed' + | 'finalize_failed'; + +export interface RecordingStatus { + id: number; + status: RecordingDisplayState; + appName?: string; + appGroupId?: number; + icon?: Buffer; + startTime: number; + filepath?: string; sampleRate?: number; numberOfChannels?: number; - blockCreationStatus?: 'success' | 'failed'; + durationMs?: number; + size?: number; + degraded?: boolean; + overflowCount?: number; + errorMessage?: string; } diff --git a/packages/frontend/apps/electron/src/main/tray/index.ts b/packages/frontend/apps/electron/src/main/tray/index.ts index ffceb64bdc..eef2caa703 100644 --- a/packages/frontend/apps/electron/src/main/tray/index.ts +++ b/packages/frontend/apps/electron/src/main/tray/index.ts @@ -160,7 +160,12 @@ class TrayState implements Disposable { const recordingStatus = recordingStatus$.value; - if (!recordingStatus || recordingStatus.status !== 'recording') { + if ( + !recordingStatus || + (recordingStatus.status !== 'starting' && + recordingStatus.status !== 'recording' && + recordingStatus.status !== 'finalizing') + ) { const appMenuItems = runningAppGroups.map(appGroup => ({ label: appGroup.name, icon: appGroup.icon || undefined, @@ -197,8 +202,8 @@ class TrayState implements Disposable { ...appMenuItems ); } else { - const recordingLabel = recordingStatus.appGroup?.name - ? `Recording (${recordingStatus.appGroup?.name})` + const recordingLabel = recordingStatus.appName + ? `Recording (${recordingStatus.appName})` : 'Recording'; // recording is active @@ -212,9 +217,11 @@ class TrayState implements Disposable { label: 'Stop', click: () => { logger.info('User action: Stop Recording'); - stopRecording(recordingStatus.id).catch(err => { - logger.error('Failed to stop recording:', err); - }); + if (recordingStatus.status === 'recording') { + stopRecording(recordingStatus.id).catch(err => { + logger.error('Failed to stop recording:', err); + }); + } }, } ); diff --git a/packages/frontend/apps/electron/test/main/recording-effect.spec.ts b/packages/frontend/apps/electron/test/main/recording-effect.spec.ts index e503d1b0c4..1395c3220a 100644 --- a/packages/frontend/apps/electron/test/main/recording-effect.spec.ts +++ b/packages/frontend/apps/electron/test/main/recording-effect.spec.ts @@ -2,22 +2,24 @@ import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest'; const isActiveTab = vi.fn(); const readRecordingFile = vi.fn(); -const setRecordingBlockCreationStatus = vi.fn(); +const claimRecordingImport = vi.fn(); +const completeRecordingImport = vi.fn(); +const failRecordingImport = vi.fn(); +const getRecordingImportQueue = vi.fn(); const getCurrentWorkspace = vi.fn(); const isAiEnabled = vi.fn(); const transcribeRecording = vi.fn(); -let onRecordingStatusChanged: - | (( - status: { - id: number; - status: 'processing'; - appName?: string; - filepath?: string; - startTime: number; - blockCreationStatus?: 'success' | 'failed'; - } | null - ) => void) +type RecordingImportStatus = { + id: number; + appName?: string; + filepath: string; + startTime: number; + importStatus: 'pending_import' | 'importing' | 'imported' | 'import_failed'; +}; + +let onRecordingImportQueueChanged: + | ((queue: RecordingImportStatus[]) => void) | undefined; vi.mock('@affine/core/modules/doc', () => ({ @@ -46,16 +48,19 @@ vi.mock('@affine/electron-api', () => ({ }, recording: { readRecordingFile, - setRecordingBlockCreationStatus, + claimRecordingImport, + completeRecordingImport, + failRecordingImport, + getRecordingImportQueue, }, }, events: { recording: { - onRecordingStatusChanged: vi.fn( - (handler: typeof onRecordingStatusChanged) => { - onRecordingStatusChanged = handler; + onRecordingImportQueueChanged: vi.fn( + (handler: typeof onRecordingImportQueueChanged) => { + onRecordingImportQueueChanged = handler; return () => { - onRecordingStatusChanged = undefined; + onRecordingImportQueueChanged = undefined; }; } ), @@ -162,10 +167,12 @@ describe('recording effect', () => { vi.useFakeTimers(); vi.clearAllMocks(); vi.resetModules(); - onRecordingStatusChanged = undefined; + onRecordingImportQueueChanged = undefined; readRecordingFile.mockResolvedValue(new Uint8Array([1, 2, 3]).buffer); - setRecordingBlockCreationStatus.mockResolvedValue(undefined); + completeRecordingImport.mockResolvedValue(undefined); + failRecordingImport.mockResolvedValue(undefined); isAiEnabled.mockReturnValue(false); + getRecordingImportQueue.mockResolvedValue([]); }); afterEach(() => { @@ -173,37 +180,43 @@ describe('recording effect', () => { vi.useRealTimers(); }); - test('retries processing until the active tab has a workspace', async () => { + test('retries pending imports until the active tab has a workspace', async () => { const workspace = createWorkspaceRef(); + const pendingImport = { + id: 7, + importStatus: 'pending_import' as const, + appName: 'Zoom', + filepath: '/tmp/meeting.opus', + startTime: 1000, + }; isActiveTab.mockResolvedValueOnce(false).mockResolvedValue(true); getCurrentWorkspace .mockReturnValueOnce(undefined) .mockReturnValue(workspace.ref); + claimRecordingImport.mockResolvedValue({ + ...pendingImport, + importStatus: 'importing', + }); + getRecordingImportQueue.mockResolvedValue([pendingImport]); const { setupRecordingEvents } = await import('../../../electron-renderer/src/app/effects/recording'); setupRecordingEvents({} as never); - - onRecordingStatusChanged?.({ - id: 7, - status: 'processing', - appName: 'Zoom', - filepath: '/tmp/meeting.opus', - startTime: 1000, - }); - await Promise.resolve(); + expect(workspace.createDoc).not.toHaveBeenCalled(); - expect(setRecordingBlockCreationStatus).not.toHaveBeenCalled(); + expect(claimRecordingImport).not.toHaveBeenCalled(); await vi.advanceTimersByTimeAsync(1000); expect(workspace.createDoc).not.toHaveBeenCalled(); - expect(setRecordingBlockCreationStatus).not.toHaveBeenCalled(); + expect(claimRecordingImport).not.toHaveBeenCalled(); + onRecordingImportQueueChanged?.([pendingImport]); await vi.advanceTimersByTimeAsync(1000); + expect(claimRecordingImport).toHaveBeenCalledWith(7); expect(workspace.createDoc).toHaveBeenCalledTimes(1); expect(workspace.openDoc).toHaveBeenCalledWith('doc-1'); expect(workspace.blobSet).toHaveBeenCalledTimes(1); @@ -215,42 +228,51 @@ describe('recording effect', () => { expect.objectContaining({ type: 'audio/ogg' }), 'note-1' ); - expect(setRecordingBlockCreationStatus).toHaveBeenCalledWith(7, 'success'); - expect(setRecordingBlockCreationStatus).not.toHaveBeenCalledWith( - 7, - 'failed', - expect.anything() - ); + expect(completeRecordingImport).toHaveBeenCalledWith(7); + expect(failRecordingImport).not.toHaveBeenCalled(); }); - test('retries when the active-tab probe rejects', async () => { - const workspace = createWorkspaceRef(); + test('marks imports as failed when the doc import throws and retries later', async () => { + const pendingImport = { + id: 9, + importStatus: 'import_failed' as const, + appName: 'Meet', + filepath: '/tmp/meeting.opus', + startTime: 1000, + }; - isActiveTab - .mockRejectedValueOnce(new Error('probe failed')) - .mockResolvedValue(true); + const workspace = createWorkspaceRef(); + workspace.createDoc.mockImplementationOnce(() => { + throw new Error('create doc failed'); + }); + + isActiveTab.mockResolvedValue(true); getCurrentWorkspace.mockReturnValue(workspace.ref); + claimRecordingImport + .mockResolvedValueOnce({ + ...pendingImport, + importStatus: 'importing', + }) + .mockResolvedValueOnce({ + ...pendingImport, + importStatus: 'importing', + }); + getRecordingImportQueue.mockResolvedValue([pendingImport]); const { setupRecordingEvents } = await import('../../../electron-renderer/src/app/effects/recording'); setupRecordingEvents({} as never); - - onRecordingStatusChanged?.({ - id: 9, - status: 'processing', - appName: 'Meet', - filepath: '/tmp/meeting.opus', - startTime: 1000, - }); - await Promise.resolve(); - expect(workspace.createDoc).not.toHaveBeenCalled(); - expect(setRecordingBlockCreationStatus).not.toHaveBeenCalled(); + await Promise.resolve(); + await vi.advanceTimersByTimeAsync(0); + expect(failRecordingImport).toHaveBeenCalledWith(9, 'create doc failed'); + expect(completeRecordingImport).not.toHaveBeenCalled(); + + onRecordingImportQueueChanged?.([pendingImport]); await vi.advanceTimersByTimeAsync(1000); - expect(workspace.createDoc).toHaveBeenCalledTimes(1); - expect(setRecordingBlockCreationStatus).toHaveBeenCalledWith(9, 'success'); + expect(claimRecordingImport).toHaveBeenCalledTimes(2); }); }); diff --git a/packages/frontend/apps/electron/test/main/recording-state.spec.ts b/packages/frontend/apps/electron/test/main/recording-state.spec.ts index 52a659b3ea..707bad2446 100644 --- a/packages/frontend/apps/electron/test/main/recording-state.spec.ts +++ b/packages/frontend/apps/electron/test/main/recording-state.spec.ts @@ -1,4 +1,5 @@ -import { describe, expect, test, vi } from 'vitest'; +import { BehaviorSubject } from 'rxjs'; +import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest'; vi.mock('../../src/main/logger', () => ({ logger: { @@ -9,14 +10,24 @@ vi.mock('../../src/main/logger', () => ({ import { RecordingStateMachine } from '../../src/main/recording/state-machine'; +function createDeferred() { + let resolve!: (value: T) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + function createAttachedRecording(stateMachine: RecordingStateMachine) { - const pending = stateMachine.dispatch({ + const starting = stateMachine.dispatch({ type: 'START_RECORDING', }); stateMachine.dispatch({ type: 'ATTACH_NATIVE_RECORDING', - id: pending!.id, + id: starting!.id, nativeId: 'native-1', startTime: 100, filepath: '/tmp/recording.opus', @@ -24,93 +35,390 @@ function createAttachedRecording(stateMachine: RecordingStateMachine) { numberOfChannels: 2, }); - return pending!; + return starting!; } describe('RecordingStateMachine', () => { - test('transitions from recording to ready after artifact import and block creation', () => { + test('tracks session lifecycle through finalized without coupling import state', () => { const stateMachine = new RecordingStateMachine(); - const pending = createAttachedRecording(stateMachine); - expect(pending?.status).toBe('recording'); + const starting = stateMachine.dispatch({ + type: 'START_RECORDING', + }); + expect(starting).toMatchObject({ + sessionStatus: 'starting', + }); - const processing = stateMachine.dispatch({ + const recording = stateMachine.dispatch({ + type: 'ATTACH_NATIVE_RECORDING', + id: starting!.id, + nativeId: 'native-1', + startTime: 100, + filepath: '/tmp/recording.opus', + sampleRate: 48000, + numberOfChannels: 2, + }); + expect(recording).toMatchObject({ + sessionStatus: 'recording', + artifact: { + filepath: '/tmp/recording.opus', + sampleRate: 48000, + numberOfChannels: 2, + }, + }); + + const finalizing = stateMachine.dispatch({ type: 'STOP_RECORDING', - id: pending.id, + id: starting!.id, }); - expect(processing?.status).toBe('processing'); + expect(finalizing?.sessionStatus).toBe('finalizing'); - const artifactAttached = stateMachine.dispatch({ + const finalized = stateMachine.dispatch({ type: 'ATTACH_RECORDING_ARTIFACT', - id: pending.id, - filepath: '/tmp/recording.opus', - sampleRate: 48000, - numberOfChannels: 2, + id: starting!.id, + artifact: { + filepath: '/tmp/recording.opus', + durationMs: 1_000, + size: 128, + degraded: true, + overflowCount: 2, + }, }); - expect(artifactAttached).toMatchObject({ - status: 'processing', - filepath: '/tmp/recording.opus', - }); - - const ready = stateMachine.dispatch({ - type: 'SET_BLOCK_CREATION_STATUS', - id: pending.id, - status: 'success', - }); - expect(ready).toMatchObject({ - status: 'ready', - blockCreationStatus: 'success', - }); - }); - - test('keeps native audio metadata when stop artifact omits it', () => { - const stateMachine = new RecordingStateMachine(); - - const pending = createAttachedRecording(stateMachine); - stateMachine.dispatch({ type: 'STOP_RECORDING', id: pending.id }); - - const artifactAttached = stateMachine.dispatch({ - type: 'ATTACH_RECORDING_ARTIFACT', - id: pending.id, - filepath: '/tmp/recording.opus', - }); - - expect(artifactAttached).toMatchObject({ - sampleRate: 48000, - numberOfChannels: 2, + expect(finalized).toMatchObject({ + sessionStatus: 'finalized', + artifact: { + filepath: '/tmp/recording.opus', + sampleRate: 48000, + numberOfChannels: 2, + durationMs: 1_000, + size: 128, + degraded: true, + overflowCount: 2, + }, }); }); test.each([ - { status: 'success' as const, errorMessage: undefined }, - { status: 'failed' as const, errorMessage: 'native start failed' }, + { + name: 'finalized sessions', + settleEvent: { + type: 'ATTACH_RECORDING_ARTIFACT' as const, + artifact: { + filepath: '/tmp/recording.opus', + }, + }, + expectedStatus: 'finalized', + }, + { + name: 'failed finalize sessions', + settleEvent: { + type: 'FINALIZE_RECORDING_FAILED' as const, + errorMessage: 'boom', + }, + expectedStatus: 'finalize_failed', + }, ])( - 'settles recordings into ready state with blockCreationStatus=$status', - ({ status, errorMessage }) => { + 'allows a new recording after $name', + ({ settleEvent, expectedStatus }) => { const stateMachine = new RecordingStateMachine(); - const pending = stateMachine.dispatch({ - type: 'START_RECORDING', + const pending = createAttachedRecording(stateMachine); + stateMachine.dispatch({ + type: 'STOP_RECORDING', + id: pending.id, }); - expect(pending?.status).toBe('recording'); const settled = stateMachine.dispatch({ - type: 'SET_BLOCK_CREATION_STATUS', - id: pending!.id, - status, - errorMessage, - }); - expect(settled).toMatchObject({ - status: 'ready', - blockCreationStatus: status, + id: pending.id, + ...settleEvent, }); + expect(settled?.sessionStatus).toBe(expectedStatus); const next = stateMachine.dispatch({ type: 'START_RECORDING', }); - expect(next?.id).toBeGreaterThan(pending!.id); - expect(next?.status).toBe('recording'); - expect(next?.blockCreationStatus).toBeUndefined(); + expect(next?.id).toBeGreaterThan(pending.id); + expect(next?.sessionStatus).toBe('starting'); } ); }); + +describe('recording feature', () => { + const nativeStartRecording = vi.fn(); + const nativeStopRecording = vi.fn(); + const nativeAbortRecording = vi.fn(); + const ensureDirSync = vi.fn(); + const resolveExistingPathInBase = vi.fn( + async (_base: string, filepath: string) => filepath + ); + const getMainWindow = vi.fn(async () => ({ + show: vi.fn(), + })); + + const storageState = new Map(); + const watchSubjects = new Map>(); + + beforeEach(() => { + vi.resetModules(); + vi.clearAllMocks(); + storageState.clear(); + watchSubjects.clear(); + + vi.doMock('@affine/native', () => ({ + ShareableContent: class ShareableContent { + static applications() { + return []; + } + + static applicationWithProcessId() { + return null; + } + + static isUsingMicrophone() { + return false; + } + + static onApplicationListChanged() { + return { unsubscribe: vi.fn() }; + } + + static onAppStateChanged() { + return { unsubscribe: vi.fn() }; + } + }, + startRecording: nativeStartRecording, + stopRecording: nativeStopRecording, + abortRecording: nativeAbortRecording, + })); + + vi.doMock('electron', () => ({ + app: { + getPath: vi.fn(() => '/tmp'), + on: vi.fn(), + }, + systemPreferences: { + getMediaAccessStatus: vi.fn(() => 'granted'), + askForMediaAccess: vi.fn(async () => true), + }, + })); + + vi.doMock('fs-extra', () => ({ + default: { + ensureDirSync, + removeSync: vi.fn(), + }, + })); + + vi.doMock('../../src/shared/utils', async () => { + const actual = await vi.importActual('../../src/shared/utils'); + return { + ...actual, + isMacOS: () => false, + isWindows: () => false, + resolveExistingPathInBase, + }; + }); + + vi.doMock('../../src/main/shared-storage/storage', () => ({ + globalStateStorage: { + get: (key: string) => storageState.get(key), + set: (key: string, value: unknown) => { + storageState.set(key, value); + const subject$ = watchSubjects.get(key); + subject$?.next(value); + }, + watch: (key: string) => { + const subject$ = + watchSubjects.get(key) ?? + new BehaviorSubject(storageState.get(key)); + watchSubjects.set(key, subject$); + return subject$.asObservable(); + }, + }, + })); + + vi.doMock('../../src/main/windows-manager', () => ({ + getMainWindow, + })); + + vi.doMock('../../src/main/windows-manager/popup', () => ({ + popupManager: { + get: () => ({ + showing: false, + show: vi.fn(async () => undefined), + hide: vi.fn(async () => undefined), + }), + }, + })); + + vi.doMock('lodash-es', () => ({ + debounce: (fn: (...args: unknown[]) => void) => fn, + })); + }); + + afterEach(() => { + vi.clearAllTimers(); + }); + + test('slow start exposes starting state before native setup resolves', async () => { + const startDeferred = createDeferred<{ + id: string; + filepath: string; + sampleRate: number; + channels: number; + startedAt: number; + }>(); + nativeStartRecording.mockReturnValue(startDeferred.promise); + + const { + recordingStatus$, + setRecordingNativeModuleForTesting, + startRecording, + } = await import('../../src/main/recording/feature'); + setRecordingNativeModuleForTesting({ + ShareableContent: class ShareableContent {}, + startRecording: nativeStartRecording, + stopRecording: nativeStopRecording, + abortRecording: nativeAbortRecording, + } as never); + + const startPromise = startRecording(); + expect(recordingStatus$.value).toMatchObject({ + status: 'starting', + }); + + startDeferred.resolve({ + id: 'native-1', + filepath: '/tmp/0.opus', + sampleRate: 48_000, + channels: 2, + startedAt: 123, + }); + + await startPromise; + expect(recordingStatus$.value).toMatchObject({ + status: 'recording', + }); + expect(recordingStatus$.value?.filepath).toContain('0.opus'); + }); + + test('slow stop transitions through finalizing and then pending_import', async () => { + nativeStartRecording.mockResolvedValue({ + id: 'native-1', + filepath: '/tmp/0.opus', + sampleRate: 48_000, + channels: 2, + startedAt: 123, + }); + + const stopDeferred = createDeferred<{ + id: string; + filepath: string; + sampleRate: number; + channels: number; + durationMs: number; + size: number; + degraded: boolean; + overflowCount: number; + }>(); + nativeStopRecording.mockReturnValue(stopDeferred.promise); + + const { + getRecordingImportQueue, + recordingStatus$, + setRecordingNativeModuleForTesting, + startRecording, + stopRecording, + } = await import('../../src/main/recording/feature'); + setRecordingNativeModuleForTesting({ + ShareableContent: class ShareableContent {}, + startRecording: nativeStartRecording, + stopRecording: nativeStopRecording, + abortRecording: nativeAbortRecording, + } as never); + + const started = await startRecording(); + const stopPromise = stopRecording(started!.id); + expect(recordingStatus$.value).toMatchObject({ + id: started!.id, + status: 'finalizing', + }); + + stopDeferred.resolve({ + id: 'native-1', + filepath: '/tmp/0.opus', + sampleRate: 48_000, + channels: 2, + durationMs: 2_000, + size: 256, + degraded: true, + overflowCount: 4, + }); + + await stopPromise; + + expect(recordingStatus$.value).toMatchObject({ + id: started!.id, + status: 'pending_import', + degraded: true, + overflowCount: 4, + }); + expect(getRecordingImportQueue()).toEqual([ + expect.objectContaining({ + id: started!.id, + importStatus: 'pending_import', + filepath: '/tmp/0.opus', + degraded: true, + overflowCount: 4, + }), + ]); + }); + + test('stop failure releases the active slot for the next recording', async () => { + nativeStartRecording + .mockResolvedValueOnce({ + id: 'native-1', + filepath: '/tmp/0.opus', + sampleRate: 48_000, + channels: 2, + startedAt: 123, + }) + .mockResolvedValueOnce({ + id: 'native-2', + filepath: '/tmp/1.opus', + sampleRate: 48_000, + channels: 2, + startedAt: 456, + }); + nativeStopRecording.mockRejectedValue(new Error('native stop failed')); + + const { + recordingStatus$, + setRecordingNativeModuleForTesting, + startRecording, + stopRecording, + } = await import('../../src/main/recording/feature'); + setRecordingNativeModuleForTesting({ + ShareableContent: class ShareableContent {}, + startRecording: nativeStartRecording, + stopRecording: nativeStopRecording, + abortRecording: nativeAbortRecording, + } as never); + + const first = await startRecording(); + await stopRecording(first!.id); + + expect(recordingStatus$.value).toMatchObject({ + id: first!.id, + status: 'finalize_failed', + errorMessage: 'native stop failed', + }); + + const second = await startRecording(); + expect(second).toMatchObject({ + id: expect.any(Number), + status: 'recording', + }); + expect(second!.id).toBeGreaterThan(first!.id); + }); +}); diff --git a/packages/frontend/apps/electron/test/workspace/handlers.spec.ts b/packages/frontend/apps/electron/test/workspace/handlers.spec.ts index db826bba25..87e66b89af 100644 --- a/packages/frontend/apps/electron/test/workspace/handlers.spec.ts +++ b/packages/frontend/apps/electron/test/workspace/handlers.spec.ts @@ -132,51 +132,46 @@ describe('workspace db management', () => { ).toBe(false); }); - test('rejects unsafe ids when deleting a workspace', async () => { - const { deleteWorkspace } = - await import('@affine/electron/helper/workspace/handlers'); - const outsideDir = path.join(tmpDir, 'outside-delete-target'); + test.each([ + { + name: 'deleting a workspace', + outsideDirName: 'outside-delete-target', + call: async () => { + const { deleteWorkspace } = + await import('@affine/electron/helper/workspace/handlers'); + return deleteWorkspace( + universalId({ + peer: 'local', + type: 'workspace', + id: '../../outside-delete-target', + }) + ); + }, + }, + { + name: 'deleting backup workspaces', + outsideDirName: 'outside-backup-target', + call: async () => { + const { deleteBackupWorkspace } = + await import('@affine/electron/helper/workspace/handlers'); + return deleteBackupWorkspace('../../outside-backup-target'); + }, + }, + { + name: 'recovering backup workspaces', + outsideDirName: 'outside-recover-target', + call: async () => { + const { recoverBackupWorkspace } = + await import('@affine/electron/helper/workspace/handlers'); + return recoverBackupWorkspace('../../outside-recover-target'); + }, + }, + ])('rejects unsafe ids when $name', async ({ outsideDirName, call }) => { + const outsideDir = path.join(tmpDir, outsideDirName); await fs.ensureDir(outsideDir); - await expect( - deleteWorkspace( - universalId({ - peer: 'local', - type: 'workspace', - id: '../../outside-delete-target', - }) - ) - ).rejects.toThrow('Invalid workspace id'); - - expect(await fs.pathExists(outsideDir)).toBe(true); - }); - - test('rejects unsafe ids when deleting backup workspaces', async () => { - const { deleteBackupWorkspace } = - await import('@affine/electron/helper/workspace/handlers'); - const outsideDir = path.join(tmpDir, 'outside-backup-target'); - - await fs.ensureDir(outsideDir); - - await expect( - deleteBackupWorkspace('../../outside-backup-target') - ).rejects.toThrow('Invalid workspace id'); - - expect(await fs.pathExists(outsideDir)).toBe(true); - }); - - test('rejects unsafe ids when recovering backup workspaces', async () => { - const { recoverBackupWorkspace } = - await import('@affine/electron/helper/workspace/handlers'); - const outsideDir = path.join(tmpDir, 'outside-recover-target'); - - await fs.ensureDir(outsideDir); - - await expect( - recoverBackupWorkspace('../../outside-recover-target') - ).rejects.toThrow('Invalid workspace id'); - + await expect(call()).rejects.toThrow('Invalid workspace id'); expect(await fs.pathExists(outsideDir)).toBe(true); }); }); diff --git a/packages/frontend/native/index.d.ts b/packages/frontend/native/index.d.ts index e0d650ef84..8e14011a35 100644 --- a/packages/frontend/native/index.d.ts +++ b/packages/frontend/native/index.d.ts @@ -36,6 +36,8 @@ export declare class ShareableContent { static tapGlobalAudio(excludedProcesses: Array | undefined | null, audioStreamCallback: ((err: Error | null, arg: Float32Array) => void)): AudioCaptureSession } +export declare function abortRecording(id: string): Promise + export declare function decodeAudio(buf: Uint8Array, destSampleRate?: number | undefined | null, filename?: string | undefined | null, signal?: AbortSignal | undefined | null): Promise /** Decode audio file into a Float32Array */ @@ -48,6 +50,8 @@ export interface RecordingArtifact { channels: number durationMs: number size: number + degraded: boolean + overflowCount: number } export interface RecordingSessionMeta { @@ -68,9 +72,9 @@ export interface RecordingStartOptions { id?: string } -export declare function startRecording(opts: RecordingStartOptions): RecordingSessionMeta +export declare function startRecording(opts: RecordingStartOptions): Promise -export declare function stopRecording(id: string): RecordingArtifact +export declare function stopRecording(id: string): Promise export interface MermaidRenderOptions { theme?: string fontFamily?: string diff --git a/packages/frontend/native/index.js b/packages/frontend/native/index.js index fa4aabbcea..4d61bc9c0a 100644 --- a/packages/frontend/native/index.js +++ b/packages/frontend/native/index.js @@ -577,6 +577,7 @@ module.exports.ApplicationListChangedSubscriber = nativeBinding.ApplicationListC module.exports.ApplicationStateChangedSubscriber = nativeBinding.ApplicationStateChangedSubscriber module.exports.AudioCaptureSession = nativeBinding.AudioCaptureSession module.exports.ShareableContent = nativeBinding.ShareableContent +module.exports.abortRecording = nativeBinding.abortRecording module.exports.decodeAudio = nativeBinding.decodeAudio module.exports.decodeAudioSync = nativeBinding.decodeAudioSync module.exports.startRecording = nativeBinding.startRecording diff --git a/packages/frontend/native/media_capture/Cargo.toml b/packages/frontend/native/media_capture/Cargo.toml index 97d0db1674..2c7b990646 100644 --- a/packages/frontend/native/media_capture/Cargo.toml +++ b/packages/frontend/native/media_capture/Cargo.toml @@ -21,6 +21,7 @@ rand = { workspace = true } rubato = { workspace = true } symphonia = { workspace = true, features = ["all", "opt-simd"] } thiserror = { workspace = true } +tokio = { workspace = true, features = ["rt", "sync"] } [target.'cfg(target_os = "macos")'.dependencies] block2 = { workspace = true } diff --git a/packages/frontend/native/media_capture/src/audio_callback.rs b/packages/frontend/native/media_capture/src/audio_callback.rs index 11e9238c24..ba9c1e60e6 100644 --- a/packages/frontend/native/media_capture/src/audio_callback.rs +++ b/packages/frontend/native/media_capture/src/audio_callback.rs @@ -1,6 +1,9 @@ -use std::sync::Arc; +use std::sync::{ + Arc, + atomic::{AtomicU64, Ordering}, +}; -use crossbeam_channel::Sender; +use crossbeam_channel::{Sender, TrySendError}; use napi::{ bindgen_prelude::Float32Array, threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode}, @@ -11,7 +14,10 @@ use napi::{ #[derive(Clone)] pub enum AudioCallback { Js(Arc>), - Channel(Sender>), + Channel { + sender: Sender>, + overflow_count: Arc, + }, } impl AudioCallback { @@ -22,10 +28,16 @@ impl AudioCallback { // audio thread. let _ = func.call(Ok(samples.into()), ThreadsafeFunctionCallMode::NonBlocking); } - Self::Channel(sender) => { - // Drop the chunk if the channel is full to avoid blocking capture. - let _ = sender.try_send(samples); - } + Self::Channel { sender, overflow_count } => match sender.try_send(samples) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + let dropped = overflow_count.fetch_add(1, Ordering::Relaxed) + 1; + if dropped == 1 || dropped.is_power_of_two() { + eprintln!("[affine_media_capture] audio queue overflow, dropped {dropped} chunks"); + } + } + Err(TrySendError::Disconnected(_)) => {} + }, } } } diff --git a/packages/frontend/native/media_capture/src/recording.rs b/packages/frontend/native/media_capture/src/recording.rs index f85c06d572..c6ec31d552 100644 --- a/packages/frontend/native/media_capture/src/recording.rs +++ b/packages/frontend/native/media_capture/src/recording.rs @@ -2,7 +2,10 @@ use std::{ fs, io::{BufWriter, Write}, path::PathBuf, - sync::{LazyLock, Mutex}, + sync::{ + Arc, LazyLock, + atomic::{AtomicU64, Ordering}, + }, thread::{self, JoinHandle}, time::{SystemTime, UNIX_EPOCH}, }; @@ -13,6 +16,7 @@ use napi_derive::napi; use ogg::writing::{PacketWriteEndInfo, PacketWriter}; use opus_codec::{Application, Channels, Encoder, FrameSize, SampleRate as OpusSampleRate}; use rubato::Resampler; +use tokio::sync::{Mutex as AsyncMutex, mpsc, oneshot}; #[cfg(any(target_os = "macos", target_os = "windows"))] use crate::audio_callback::AudioCallback; @@ -24,6 +28,7 @@ use crate::windows::screen_capture_kit::ShareableContent; const ENCODE_SAMPLE_RATE: OpusSampleRate = OpusSampleRate::Hz48000; const MAX_PACKET_SIZE: usize = 4096; const RESAMPLER_INPUT_CHUNK: usize = 1024; +const AUDIO_CHUNK_QUEUE_CAPACITY: usize = 1024; type RecordingResult = std::result::Result; @@ -55,6 +60,8 @@ pub struct RecordingArtifact { pub channels: u32, pub duration_ms: i64, pub size: i64, + pub degraded: bool, + pub overflow_count: u32, } #[derive(Debug, thiserror::Error)] @@ -448,6 +455,8 @@ impl OggOpusWriter { channels: self.channels.as_usize() as u32, duration_ms, size, + degraded: false, + overflow_count: 0, }) } } @@ -507,17 +516,41 @@ impl PlatformCapture { } enum ControlMessage { - Stop(Sender>), + Stop { + reply_tx: oneshot::Sender>, + }, + Abort { + reply_tx: oneshot::Sender>, + }, } struct ActiveRecording { id: String, - control_tx: Sender, + control_tx: mpsc::UnboundedSender, controller: Option>, } -static ACTIVE_RECORDING: LazyLock>> = LazyLock::new(|| Mutex::new(None)); -static START_RECORDING_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); +#[derive(Default)] +struct RecordingQualityMetrics { + overflow_count: Arc, +} + +impl RecordingQualityMetrics { + fn shared_counter(&self) -> Arc { + Arc::clone(&self.overflow_count) + } + + fn overflow_count(&self) -> u32 { + self + .overflow_count + .load(Ordering::Relaxed) + .try_into() + .unwrap_or(u32::MAX) + } +} + +static ACTIVE_RECORDING: LazyLock>> = LazyLock::new(|| AsyncMutex::new(None)); +static START_RECORDING_LOCK: LazyLock> = LazyLock::new(|| AsyncMutex::new(())); fn now_millis() -> i64 { SystemTime::now() @@ -568,10 +601,17 @@ fn build_excluded_refs(ids: &[u32]) -> Result> { Ok(excluded) } -fn start_capture(opts: &RecordingStartOptions, tx: Sender>) -> Result<(PlatformCapture, u32, u32)> { +fn start_capture( + opts: &RecordingStartOptions, + tx: Sender>, + overflow_count: Arc, +) -> Result<(PlatformCapture, u32, u32)> { #[cfg(target_os = "macos")] { - let callback = AudioCallback::Channel(tx); + let callback = AudioCallback::Channel { + sender: tx, + overflow_count, + }; let session = if let Some(app_id) = opts.app_process_id { ShareableContent::tap_audio_with_callback(app_id, callback)? } else { @@ -586,7 +626,10 @@ fn start_capture(opts: &RecordingStartOptions, tx: Sender>) -> Result<( #[cfg(target_os = "windows")] { - let callback = AudioCallback::Channel(tx); + let callback = AudioCallback::Channel { + sender: tx, + overflow_count, + }; let session = ShareableContent::tap_audio_with_callback(opts.app_process_id.unwrap_or(0), callback, opts.sample_rate)?; let sample_rate = session.get_sample_rate().round() as u32; @@ -598,6 +641,7 @@ fn start_capture(opts: &RecordingStartOptions, tx: Sender>) -> Result<( { let _ = opts; let _ = tx; + let _ = overflow_count; Err(RecordingError::UnsupportedPlatform.into()) } } @@ -625,13 +669,19 @@ fn spawn_recording_controller( id: String, filepath: PathBuf, opts: RecordingStartOptions, -) -> (Receiver>, Sender, JoinHandle<()>) { - let (started_tx, started_rx) = bounded(1); - let (control_tx, control_rx) = bounded(1); +) -> ( + oneshot::Receiver>, + mpsc::UnboundedSender, + JoinHandle<()>, +) { + let (started_tx, started_rx) = oneshot::channel(); + let (control_tx, mut control_rx) = mpsc::unbounded_channel(); let controller = thread::spawn(move || { - let (tx, rx) = bounded::>(32); - let (mut capture, capture_rate, capture_channels) = match start_capture(&opts, tx.clone()) { + let (tx, rx) = bounded::>(AUDIO_CHUNK_QUEUE_CAPACITY); + let metrics = RecordingQualityMetrics::default(); + let (mut capture, capture_rate, capture_channels) = match start_capture(&opts, tx.clone(), metrics.shared_counter()) + { Ok(capture) => capture, Err(error) => { let _ = started_tx.send(Err(RecordingError::Start(error.to_string()))); @@ -675,28 +725,52 @@ fn spawn_recording_controller( return; } - while let Ok(message) = control_rx.recv() { + if let Some(message) = control_rx.blocking_recv() { match message { - ControlMessage::Stop(reply_tx) => { - let result = match capture.stop() { - Ok(()) => { - drop(audio_tx.take()); - match worker.take() { - Some(handle) => match handle.join() { - Ok(result) => result, - Err(_) => Err(RecordingError::Join), - }, - None => Err(RecordingError::Join), - } - } + ControlMessage::Stop { reply_tx } => { + let stop_result = capture.stop(); + drop(audio_tx.take()); + let worker_result = match worker.take() { + Some(handle) => match handle.join() { + Ok(result) => result, + Err(_) => Err(RecordingError::Join), + }, + None => Err(RecordingError::Join), + }; + let result = match stop_result { + Ok(()) => worker_result.map(|mut artifact| { + artifact.overflow_count = metrics.overflow_count(); + artifact.degraded = artifact.overflow_count > 0; + artifact + }), Err(error) => Err(RecordingError::Start(error.to_string())), }; let _ = reply_tx.send(result); + } + ControlMessage::Abort { reply_tx } => { + let stop_result = capture.stop(); + drop(audio_tx.take()); + let worker_result = match worker.take() { + Some(handle) => match handle.join() { + Ok(result) => result, + Err(_) => Err(RecordingError::Join), + }, + None => Err(RecordingError::Join), + }; + let result = match stop_result { + Ok(()) => match worker_result { + Ok(artifact) => { + fs::remove_file(&artifact.filepath).ok(); + Ok(()) + } + Err(RecordingError::Empty) => Ok(()), + Err(error) => Err(error), + }, + Err(error) => Err(RecordingError::Start(error.to_string())), + }; - if worker.is_none() { - break; - } + let _ = reply_tx.send(result); } } } @@ -711,17 +785,65 @@ fn spawn_recording_controller( (started_rx, control_tx, controller) } -fn cleanup_recording_controller(control_tx: &Sender, controller: JoinHandle<()>) { - let (reply_tx, reply_rx) = bounded(1); - let _ = control_tx.send(ControlMessage::Stop(reply_tx)); - let _ = reply_rx.recv(); - let _ = controller.join(); +async fn join_controller_handle(controller: JoinHandle<()>) -> RecordingResult<()> { + tokio::task::spawn_blocking(move || controller.join().map_err(|_| RecordingError::Join)) + .await + .map_err(|_| RecordingError::Join)? } -fn take_active_recording(id: &str) -> RecordingResult { - let mut active_recording = ACTIVE_RECORDING - .lock() - .map_err(|_| RecordingError::Start("lock poisoned".into()))?; +async fn cleanup_recording_controller(control_tx: &mpsc::UnboundedSender, controller: JoinHandle<()>) { + let (reply_tx, reply_rx) = oneshot::channel(); + let _ = control_tx.send(ControlMessage::Abort { reply_tx }); + let _ = reply_rx.await; + let _ = join_controller_handle(controller).await; +} + +fn map_recording_result(result: RecordingResult) -> Result { + match result { + Ok(value) => Ok(value), + Err(RecordingError::Start(message)) => Err(RecordingError::Start(message).into()), + Err(error) => Err(error.into()), + } +} + +async fn send_control_message( + id: &str, + message: ControlMessage, + reply_rx: oneshot::Receiver>, +) -> Result { + let control_tx = { + let recording = ACTIVE_RECORDING.lock().await; + let active = recording.as_ref().ok_or(RecordingError::NotFound)?; + if active.id != id { + return Err(RecordingError::NotFound.into()); + } + active.control_tx.clone() + }; + + if control_tx.send(message).is_err() { + if let Ok(recording) = take_active_recording(id).await { + let _ = join_active_recording(recording).await; + } + return Err(RecordingError::Join.into()); + } + + let response = match reply_rx.await { + Ok(response) => response, + Err(_) => { + if let Ok(recording) = take_active_recording(id).await { + let _ = join_active_recording(recording).await; + } + return Err(RecordingError::Join.into()); + } + }; + + let active_recording = take_active_recording(id).await?; + join_active_recording(active_recording).await?; + map_recording_result(response) +} + +async fn take_active_recording(id: &str) -> RecordingResult { + let mut active_recording = ACTIVE_RECORDING.lock().await; let recording = active_recording.take().ok_or(RecordingError::NotFound)?; if recording.id != id { *active_recording = Some(recording); @@ -730,15 +852,14 @@ fn take_active_recording(id: &str) -> RecordingResult { Ok(recording) } -fn join_active_recording(mut recording: ActiveRecording) -> RecordingResult<()> { +async fn join_active_recording(mut recording: ActiveRecording) -> RecordingResult<()> { if let Some(handle) = recording.controller.take() { - handle.join().map_err(|_| RecordingError::Join)?; + join_controller_handle(handle).await?; } Ok(()) } -#[napi] -pub fn start_recording(opts: RecordingStartOptions) -> Result { +async fn start_recording_inner(opts: RecordingStartOptions) -> Result { if let Some(fmt) = opts.format.as_deref() && !fmt.eq_ignore_ascii_case("opus") { @@ -748,17 +869,12 @@ pub fn start_recording(opts: RecordingStartOptions) -> Result Result channels, + Ok(Err(error)) => { + let _ = join_controller_handle(controller).await; + return Err(error.into()); + } + Err(_) => { + let _ = join_controller_handle(controller).await; + return Err(RecordingError::Start("failed to start recording controller".into()).into()); + } + }; let meta = RecordingSessionMeta { id: id.clone(), @@ -782,16 +906,10 @@ pub fn start_recording(opts: RecordingStartOptions) -> Result recording, - Err(_) => { - cleanup_recording_controller(&control_tx, controller); - return Err(RecordingError::Start("lock poisoned".into()).into()); - } - }; + let mut recording = ACTIVE_RECORDING.lock().await; if recording.is_some() { - cleanup_recording_controller(&control_tx, controller); + cleanup_recording_controller(&control_tx, controller).await; return Err(RecordingError::Start("recording already active".into()).into()); } @@ -805,63 +923,50 @@ pub fn start_recording(opts: RecordingStartOptions) -> Result Result { - let control_tx = { - let recording = ACTIVE_RECORDING - .lock() - .map_err(|_| RecordingError::Start("lock poisoned".into()))?; +pub async fn start_recording(opts: RecordingStartOptions) -> Result { + start_recording_inner(opts).await +} - let active = recording.as_ref().ok_or(RecordingError::NotFound)?; - if active.id != id { - return Err(RecordingError::NotFound.into()); - } - active.control_tx.clone() - }; +async fn stop_recording_inner(id: String) -> Result { + let (reply_tx, reply_rx) = oneshot::channel(); + send_control_message(&id, ControlMessage::Stop { reply_tx }, reply_rx).await +} - let (reply_tx, reply_rx) = bounded(1); - if control_tx.send(ControlMessage::Stop(reply_tx)).is_err() { - if let Ok(recording) = take_active_recording(&id) { - let _ = join_active_recording(recording); - } - return Err(RecordingError::Join.into()); - } +#[napi] +pub async fn stop_recording(id: String) -> Result { + stop_recording_inner(id).await +} - let response = match reply_rx.recv() { - Ok(response) => response, - Err(_) => { - if let Ok(recording) = take_active_recording(&id) { - let _ = join_active_recording(recording); - } - return Err(RecordingError::Join.into()); - } - }; +async fn abort_recording_inner(id: String) -> Result<()> { + let (reply_tx, reply_rx) = oneshot::channel(); + send_control_message(&id, ControlMessage::Abort { reply_tx }, reply_rx).await +} - let artifact = match response { - Ok(artifact) => artifact, - Err(RecordingError::Start(message)) => { - return Err(RecordingError::Start(message).into()); - } - Err(error) => { - if let Ok(recording) = take_active_recording(&id) { - let _ = join_active_recording(recording); - } - return Err(error.into()); - } - }; - - let active_recording = take_active_recording(&id)?; - join_active_recording(active_recording)?; - - Ok(artifact) +#[napi] +pub async fn abort_recording(id: String) -> Result<()> { + abort_recording_inner(id).await } #[cfg(test)] mod tests { - use std::{env, fs::File, path::PathBuf}; + use std::{env, fs::File, path::PathBuf, thread}; use ogg::PacketReader; + use tokio::runtime::Builder; - use super::{OggOpusWriter, convert_interleaved_channels}; + use super::{ + ACTIVE_RECORDING, ActiveRecording, ControlMessage, OggOpusWriter, RecordingArtifact, RecordingError, + RecordingQualityMetrics, START_RECORDING_LOCK, abort_recording_inner, bounded, convert_interleaved_channels, mpsc, + stop_recording_inner, + }; + use crate::audio_callback::AudioCallback; + + fn block_on(future: F) -> F::Output { + Builder::new_current_thread() + .build() + .expect("create runtime") + .block_on(future) + } fn temp_recording_path() -> PathBuf { env::temp_dir().join(format!("affine-recording-test-{}.opus", rand::random::())) @@ -939,4 +1044,105 @@ mod tests { vec![3.0, 3.0, 4.0, 4.0] ); } + + #[test] + fn stop_recording_clears_active_session_after_stop_error() { + let _lock = START_RECORDING_LOCK.blocking_lock(); + let id = String::from("stop-error"); + let (control_tx, mut control_rx) = mpsc::unbounded_channel(); + let controller = thread::spawn(move || { + let Some(ControlMessage::Stop { reply_tx }) = control_rx.blocking_recv() else { + panic!("expected stop"); + }; + let _ = reply_tx.send(Err(RecordingError::Start(String::from("boom")))); + }); + + *ACTIVE_RECORDING.blocking_lock() = Some(ActiveRecording { + id: id.clone(), + control_tx, + controller: Some(controller), + }); + + let error = match block_on(stop_recording_inner(id)) { + Ok(_) => panic!("stop should fail"), + Err(error) => error, + }; + assert!( + error.to_string().contains("start failure: boom"), + "unexpected error: {error}" + ); + assert!(ACTIVE_RECORDING.blocking_lock().is_none()); + } + + #[test] + fn stop_recording_returns_artifact_and_clears_active_session() { + let _lock = START_RECORDING_LOCK.blocking_lock(); + let id = String::from("stop-success"); + let (control_tx, mut control_rx) = mpsc::unbounded_channel(); + let controller = thread::spawn(move || { + let Some(ControlMessage::Stop { reply_tx }) = control_rx.blocking_recv() else { + panic!("expected stop"); + }; + let _ = reply_tx.send(Ok(RecordingArtifact { + id: String::from("stop-success"), + filepath: String::from("/tmp/recording.opus"), + sample_rate: 48_000, + channels: 2, + duration_ms: 1_000, + size: 128, + degraded: true, + overflow_count: 3, + })); + }); + + *ACTIVE_RECORDING.blocking_lock() = Some(ActiveRecording { + id: id.clone(), + control_tx, + controller: Some(controller), + }); + + let artifact = block_on(stop_recording_inner(id)).expect("stop should succeed"); + assert_eq!(artifact.filepath, "/tmp/recording.opus"); + assert!(artifact.degraded); + assert_eq!(artifact.overflow_count, 3); + assert!(ACTIVE_RECORDING.blocking_lock().is_none()); + } + + #[test] + fn abort_recording_clears_active_session() { + let _lock = START_RECORDING_LOCK.blocking_lock(); + let id = String::from("abort-success"); + let (control_tx, mut control_rx) = mpsc::unbounded_channel(); + let controller = thread::spawn(move || { + let Some(ControlMessage::Abort { reply_tx }) = control_rx.blocking_recv() else { + panic!("expected abort"); + }; + let _ = reply_tx.send(Ok(())); + }); + + *ACTIVE_RECORDING.blocking_lock() = Some(ActiveRecording { + id: id.clone(), + control_tx, + controller: Some(controller), + }); + + block_on(abort_recording_inner(id)).expect("abort should succeed"); + assert!(ACTIVE_RECORDING.blocking_lock().is_none()); + } + + #[test] + fn queue_overflow_marks_recording_as_degraded() { + let metrics = RecordingQualityMetrics::default(); + let (sender, receiver) = bounded(1); + let callback = AudioCallback::Channel { + sender, + overflow_count: metrics.shared_counter(), + }; + + callback.call(vec![0.1, 0.2]); + callback.call(vec![0.3, 0.4]); + + let _ = receiver.recv().expect("queued audio"); + assert_eq!(metrics.overflow_count(), 1); + } } diff --git a/packages/frontend/native/media_capture/src/windows/audio_capture.rs b/packages/frontend/native/media_capture/src/windows/audio_capture.rs index 3fe694ad90..6281dcb2d6 100644 --- a/packages/frontend/native/media_capture/src/windows/audio_capture.rs +++ b/packages/frontend/native/media_capture/src/windows/audio_capture.rs @@ -192,19 +192,26 @@ impl AudioCaptureSession { if self.stopped.load(Ordering::SeqCst) { return Ok(()); } + self.stopped.store(true, Ordering::SeqCst); + let mut pause_errors = Vec::new(); self .mic_stream .pause() - .map_err(|e| Error::new(Status::GenericFailure, format!("{e}")))?; + .map_err(|e| pause_errors.push(format!("pause mic stream: {e}"))) + .ok(); self .lb_stream .pause() - .map_err(|e| Error::new(Status::GenericFailure, format!("{e}")))?; - self.stopped.store(true, Ordering::SeqCst); + .map_err(|e| pause_errors.push(format!("pause loopback stream: {e}"))) + .ok(); if let Some(jh) = self.jh.take() { let _ = jh.join(); // ignore poison } - Ok(()) + if pause_errors.is_empty() { + Ok(()) + } else { + Err(Error::new(Status::GenericFailure, pause_errors.join("; "))) + } } }