feat(native): async recorder

This commit is contained in:
DarkSky
2026-03-22 04:01:38 +08:00
parent ffa3ff9d7f
commit 9ddd6dd871
19 changed files with 1761 additions and 641 deletions

5
Cargo.lock generated
View File

@@ -99,6 +99,7 @@ dependencies = [
"screencapturekit",
"symphonia",
"thiserror 2.0.18",
"tokio",
"uuid",
"windows 0.61.3",
"windows-core 0.61.2",
@@ -5060,9 +5061,9 @@ dependencies = [
[[package]]
name = "rustls-webpki"
version = "0.103.9"
version = "0.103.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7df23109aa6c1567d1c575b9952556388da57401e4ace1d15f79eedad0d8f53"
checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef"
dependencies = [
"ring",
"rustls-pki-types",

View File

@@ -13,16 +13,15 @@ import type { FrameworkProvider } from '@toeverything/infra';
import { getCurrentWorkspace, isAiEnabled } from './utils';
const logger = new DebugLogger('electron-renderer:recording');
const RECORDING_PROCESS_RETRY_MS = 1000;
const RECORDING_IMPORT_RETRY_MS = 1000;
const NATIVE_RECORDING_MIME_TYPE = 'audio/ogg';
type ProcessingRecordingStatus = {
type RecordingImportStatus = {
id: number;
status: 'processing';
appName?: string;
blockCreationStatus?: undefined;
filepath: string;
startTime: number;
importStatus: 'pending_import' | 'importing' | 'imported' | 'import_failed';
};
type WorkspaceHandle = NonNullable<ReturnType<typeof getCurrentWorkspace>>;
@@ -65,24 +64,10 @@ async function saveRecordingBlob(blobEngine: BlobEngine, filepath: string) {
return { blob, blobId };
}
function shouldProcessRecording(
status: unknown
): status is ProcessingRecordingStatus {
return (
!!status &&
typeof status === 'object' &&
'status' in status &&
status.status === 'processing' &&
'filepath' in status &&
typeof status.filepath === 'string' &&
!('blockCreationStatus' in status && status.blockCreationStatus)
);
}
async function createRecordingDoc(
frameworkProvider: FrameworkProvider,
workspace: WorkspaceHandle['workspace'],
status: ProcessingRecordingStatus
status: RecordingImportStatus
) {
const docsService = workspace.scope.get(DocsService);
const aiEnabled = isAiEnabled(frameworkProvider);
@@ -99,13 +84,11 @@ async function createRecordingDoc(
const docProps: DocProps = {
onStoreLoad: (doc, { noteId }) => {
void (async () => {
// it takes a while to save the blob, so we show the attachment first
const { blobId, blob } = await saveRecordingBlob(
doc.workspace.blobSync,
recordingFilepath
);
// name + timestamp(readable) + extension
const attachmentName =
(status.appName ?? 'System Audio') + ' ' + timestamp + '.opus';
@@ -163,7 +146,7 @@ async function createRecordingDoc(
}
export function setupRecordingEvents(frameworkProvider: FrameworkProvider) {
let pendingStatus: ProcessingRecordingStatus | null = null;
let importQueue: RecordingImportStatus[] = [];
let retryTimer: ReturnType<typeof setTimeout> | null = null;
let processingStatusId: number | null = null;
@@ -174,28 +157,48 @@ export function setupRecordingEvents(frameworkProvider: FrameworkProvider) {
}
};
const clearPending = (id?: number) => {
if (id === undefined || pendingStatus?.id === id) {
pendingStatus = null;
clearRetry();
}
if (id === undefined || processingStatusId === id) {
const updateQueue = (nextQueue: RecordingImportStatus[]) => {
importQueue = nextQueue;
if (
processingStatusId !== null &&
!importQueue.some(
status =>
status.id === processingStatusId &&
status.importStatus === 'importing'
)
) {
processingStatusId = null;
}
};
const updateLocalImportStatus = (
id: number,
importStatus: RecordingImportStatus['importStatus']
) => {
importQueue = importQueue.map(status =>
status.id === id ? { ...status, importStatus } : status
);
};
const getNextImportCandidate = () =>
importQueue.find(
status =>
status.importStatus === 'pending_import' ||
status.importStatus === 'import_failed'
) ?? null;
const scheduleRetry = () => {
if (!pendingStatus || retryTimer !== null) {
if (!getNextImportCandidate() || retryTimer !== null) {
return;
}
retryTimer = setTimeout(() => {
retryTimer = null;
void processPendingStatus().catch(console.error);
}, RECORDING_PROCESS_RETRY_MS);
void processNextImport().catch(console.error);
}, RECORDING_IMPORT_RETRY_MS);
};
const processPendingStatus = async () => {
const status = pendingStatus;
const processNextImport = async () => {
const status = getNextImportCandidate();
if (!status || processingStatusId === status.id) {
return;
}
@@ -216,8 +219,12 @@ export function setupRecordingEvents(frameworkProvider: FrameworkProvider) {
using currentWorkspace = getCurrentWorkspace(frameworkProvider);
if (!currentWorkspace) {
// Workspace can lag behind the post-recording status update for a short
// time; keep retrying instead of permanently failing the import.
scheduleRetry();
return;
}
const claimed = await apis?.recording.claimRecordingImport(status.id);
if (!claimed) {
scheduleRetry();
return;
}
@@ -228,47 +235,38 @@ export function setupRecordingEvents(frameworkProvider: FrameworkProvider) {
await createRecordingDoc(
frameworkProvider,
currentWorkspace.workspace,
status
claimed
);
await apis?.recording.setRecordingBlockCreationStatus(
status.id,
'success'
);
clearPending(status.id);
updateLocalImportStatus(status.id, 'imported');
await apis?.recording.completeRecordingImport(status.id);
} catch (error) {
logger.error('Failed to create recording block', error);
try {
await apis?.recording.setRecordingBlockCreationStatus(
status.id,
'failed',
error instanceof Error ? error.message : undefined
);
} finally {
clearPending(status.id);
}
logger.error('Failed to import recording artifact', error);
updateLocalImportStatus(status.id, 'import_failed');
await apis?.recording.failRecordingImport(
status.id,
error instanceof Error ? error.message : undefined
);
} finally {
if (pendingStatus?.id === status.id) {
processingStatusId = null;
scheduleRetry();
}
processingStatusId = null;
scheduleRetry();
}
};
events?.recording.onRecordingStatusChanged(status => {
if (shouldProcessRecording(status)) {
pendingStatus = status;
clearRetry();
void processPendingStatus().catch(console.error);
return;
}
if (apis?.recording) {
void apis.recording
.getRecordingImportQueue()
.then(queue => {
updateQueue(queue ?? []);
void processNextImport().catch(console.error);
})
.catch(error => {
logger.error('Failed to load recording import queue', error);
});
}
if (!status) {
clearPending();
return;
}
if (pendingStatus?.id === status.id) {
clearPending(status.id);
}
events?.recording.onRecordingImportQueueChanged(queue => {
updateQueue(queue);
clearRetry();
void processNextImport().catch(console.error);
});
}

View File

@@ -10,8 +10,16 @@ import * as styles from './styles.css';
type Status = {
id: number;
status: 'new' | 'recording' | 'processing' | 'ready';
blockCreationStatus?: 'success' | 'failed';
status:
| 'new'
| 'starting'
| 'recording'
| 'finalizing'
| 'pending_import'
| 'importing'
| 'imported'
| 'import_failed'
| 'finalize_failed';
appName?: string;
appGroupId?: number;
icon?: Buffer;
@@ -56,19 +64,17 @@ export function Recording() {
}
if (status.status === 'new') {
return t['com.affine.recording.new']();
} else if (
status.status === 'ready' &&
status.blockCreationStatus === 'success'
) {
} else if (status.status === 'imported') {
return t['com.affine.recording.success.prompt']();
} else if (
status.status === 'ready' &&
status.blockCreationStatus === 'failed'
status.status === 'import_failed' ||
status.status === 'finalize_failed'
) {
return t['com.affine.recording.failed.prompt']();
} else if (
status.status === 'starting' ||
status.status === 'recording' ||
status.status === 'processing'
status.status === 'finalizing'
) {
if (status.appName) {
return t['com.affine.recording.recording']({
@@ -77,6 +83,11 @@ export function Recording() {
} else {
return t['com.affine.recording.recording.unnamed']();
}
} else if (
status.status === 'pending_import' ||
status.status === 'importing'
) {
return t['com.affine.recording.success.prompt']();
}
return null;
}, [status, t]);
@@ -155,8 +166,10 @@ export function Recording() {
</Button>
);
} else if (
status.status === 'processing' ||
(status.status === 'ready' && !status.blockCreationStatus)
status.status === 'starting' ||
status.status === 'finalizing' ||
status.status === 'pending_import' ||
status.status === 'importing'
) {
return (
<Button
@@ -166,18 +179,15 @@ export function Recording() {
disabled
/>
);
} else if (
status.status === 'ready' &&
status.blockCreationStatus === 'success'
) {
} else if (status.status === 'imported') {
return (
<Button variant="primary" onClick={handleDismiss}>
{t['com.affine.recording.success.button']()}
</Button>
);
} else if (
status.status === 'ready' &&
status.blockCreationStatus === 'failed'
status.status === 'import_failed' ||
status.status === 'finalize_failed'
) {
return (
<>

View File

@@ -0,0 +1,191 @@
import { BehaviorSubject } from 'rxjs';
import { logger } from '../logger';
import { globalStateStorage } from '../shared-storage/storage';
import type {
RecordingArtifactInfo,
RecordingImportState,
RecordingImportStatus,
RecordingSessionStatus,
} from './types';
const RECORDING_IMPORT_REGISTRY_KEY = 'recordingImportRegistry:v1';
function isImportState(value: unknown): value is RecordingImportState {
return (
value === 'pending_import' ||
value === 'importing' ||
value === 'imported' ||
value === 'import_failed'
);
}
function isArtifactInfo(value: unknown): value is RecordingArtifactInfo {
if (!value || typeof value !== 'object') {
return false;
}
const artifact = value as Partial<RecordingArtifactInfo>;
return (
typeof artifact.filepath === 'string' &&
(artifact.sampleRate === undefined ||
typeof artifact.sampleRate === 'number') &&
(artifact.numberOfChannels === undefined ||
typeof artifact.numberOfChannels === 'number') &&
(artifact.durationMs === undefined ||
typeof artifact.durationMs === 'number') &&
(artifact.size === undefined || typeof artifact.size === 'number') &&
(artifact.degraded === undefined ||
typeof artifact.degraded === 'boolean') &&
(artifact.overflowCount === undefined ||
typeof artifact.overflowCount === 'number')
);
}
function isRecordingImportStatus(
value: unknown
): value is RecordingImportStatus {
if (!isArtifactInfo(value) || typeof value !== 'object') {
return false;
}
const item = value as Partial<RecordingImportStatus>;
return (
typeof item.id === 'number' &&
typeof item.startTime === 'number' &&
typeof item.createdAt === 'number' &&
typeof item.updatedAt === 'number' &&
isImportState(item.importStatus) &&
(item.appName === undefined || typeof item.appName === 'string') &&
(item.errorMessage === undefined || typeof item.errorMessage === 'string')
);
}
function loadPersistedImports() {
const persisted = globalStateStorage.get(RECORDING_IMPORT_REGISTRY_KEY);
if (!Array.isArray(persisted)) {
return [] as RecordingImportStatus[];
}
return persisted.filter(isRecordingImportStatus);
}
export class RecordingArtifactRegistry {
private readonly imports$ = new BehaviorSubject<RecordingImportStatus[]>(
loadPersistedImports()
);
get entries$() {
return this.imports$;
}
get entries() {
return this.imports$.value;
}
private setEntries(
updater:
| RecordingImportStatus[]
| ((entries: RecordingImportStatus[]) => RecordingImportStatus[])
) {
const nextEntries =
typeof updater === 'function' ? updater(this.imports$.value) : updater;
this.imports$.next(nextEntries);
globalStateStorage.set(RECORDING_IMPORT_REGISTRY_KEY, nextEntries);
return nextEntries;
}
enqueueFromSession(
session: RecordingSessionStatus,
artifact: RecordingArtifactInfo
) {
const now = Date.now();
return this.setEntries(entries => {
const next = entries.filter(entry => entry.id !== session.id);
next.push({
id: session.id,
appName: session.appGroup?.name,
startTime: session.startTime,
importStatus: 'pending_import',
createdAt: now,
updatedAt: now,
...artifact,
});
next.sort((left, right) => left.createdAt - right.createdAt);
return next;
}).find(entry => entry.id === session.id);
}
claim(id: number) {
let claimed: RecordingImportStatus | null = null;
this.setEntries(entries =>
entries.map(entry => {
if (entry.id !== id) {
return entry;
}
if (
entry.importStatus !== 'pending_import' &&
entry.importStatus !== 'import_failed'
) {
return entry;
}
claimed = {
...entry,
importStatus: 'importing',
errorMessage: undefined,
updatedAt: Date.now(),
};
return claimed;
})
);
return claimed;
}
markImported(id: number) {
return this.updateState(id, 'imported');
}
markFailed(id: number, errorMessage?: string) {
return this.updateState(id, 'import_failed', errorMessage);
}
remove(id: number) {
this.setEntries(entries => entries.filter(entry => entry.id !== id));
}
latest() {
return [...this.entries].sort((left, right) => {
if (left.updatedAt !== right.updatedAt) {
return right.updatedAt - left.updatedAt;
}
return right.id - left.id;
})[0];
}
private updateState(
id: number,
importStatus: RecordingImportState,
errorMessage?: string
) {
let updated: RecordingImportStatus | null = null;
this.setEntries(entries =>
entries.map(entry => {
if (entry.id !== id) {
return entry;
}
updated = {
...entry,
importStatus,
errorMessage,
updatedAt: Date.now(),
};
return updated;
})
);
if (!updated) {
logger.error(`Recording import ${id} not found`);
}
return updated;
}
}
export const recordingArtifactRegistry = new RecordingArtifactRegistry();

View File

@@ -35,8 +35,15 @@ import { globalStateStorage } from '../shared-storage/storage';
import { getMainWindow } from '../windows-manager';
import { popupManager } from '../windows-manager/popup';
import { isAppNameAllowed } from './allow-list';
import { recordingArtifactRegistry } from './artifact-registry';
import { recordingStateMachine } from './state-machine';
import type { AppGroupInfo, RecordingStatus, TappableAppInfo } from './types';
import type {
AppGroupInfo,
RecordingImportStatus,
RecordingSessionStatus,
RecordingStatus,
TappableAppInfo,
} from './types';
export const MeetingsSettingsState = {
$: globalStateStorage.watch<MeetingSettingsSchema>(MeetingSettingsKey).pipe(
@@ -74,14 +81,32 @@ type ShareableContentType = InstanceType<NativeModule['ShareableContent']>;
type ShareableContentStatic = NativeModule['ShareableContent'];
let shareableContent: ShareableContentType | null = null;
let nativeModuleOverride: NativeModule | null = null;
function getNativeModule(): NativeModule {
return require('@affine/native') as NativeModule;
return nativeModuleOverride ?? (require('@affine/native') as NativeModule);
}
async function getNativeModuleAsync(): Promise<NativeModule> {
if (nativeModuleOverride) {
return nativeModuleOverride;
}
return (await import('@affine/native')) as NativeModule;
}
export function setRecordingNativeModuleForTesting(
nativeModule: NativeModule | null
) {
nativeModuleOverride = nativeModule;
}
function cleanup() {
const nativeId = recordingStateMachine.status?.nativeId;
if (nativeId) cleanupAbandonedNativeRecording(nativeId);
if (nativeId) {
void cleanupAbandonedNativeRecording(nativeId).catch(error => {
logger.error('failed to cleanup abandoned native recording', error);
});
}
recordingStatus$.next(null);
shareableContent = null;
appStateSubscribers.forEach(subscriber => {
@@ -113,16 +138,103 @@ export const appGroups$ = new BehaviorSubject<AppGroupInfo[]>([]);
export const updateApplicationsPing$ = new Subject<number>();
// There should be only one active recording at a time; state is managed by the state machine
export const recordingStatus$ = recordingStateMachine.status$;
export const recordingSessionStatus$ = recordingStateMachine.status$;
export const recordingImportQueue$ = recordingArtifactRegistry.entries$;
export const recordingStatus$ = new BehaviorSubject<RecordingStatus | null>(
null
);
function isRecordingSettled(
function hasActiveRecordingSession(
status: RecordingSessionStatus | null | undefined
) {
return (
status?.sessionStatus === 'starting' ||
status?.sessionStatus === 'recording' ||
status?.sessionStatus === 'finalizing'
);
}
function isTerminalPopupStatus(
status: RecordingStatus | null | undefined
): status is RecordingStatus & {
status: 'ready';
blockCreationStatus: 'success' | 'failed';
status: 'imported' | 'import_failed' | 'finalize_failed';
} {
return status?.status === 'ready' && status.blockCreationStatus !== undefined;
return (
status?.status === 'imported' ||
status?.status === 'import_failed' ||
status?.status === 'finalize_failed'
);
}
function serializeSessionStatus(
status: RecordingSessionStatus | null
): RecordingStatus | null {
if (!status || status.sessionStatus === 'aborted') {
return null;
}
const artifact = status.artifact;
return {
id: status.id,
status:
status.sessionStatus === 'finalized'
? 'pending_import'
: status.sessionStatus,
appName: status.appGroup?.name,
appGroupId: status.appGroup?.processGroupId,
icon: status.appGroup?.icon,
startTime: status.startTime,
filepath: artifact?.filepath,
sampleRate: artifact?.sampleRate,
numberOfChannels: artifact?.numberOfChannels,
durationMs: artifact?.durationMs,
size: artifact?.size,
degraded: artifact?.degraded,
overflowCount: artifact?.overflowCount,
errorMessage: status.errorMessage,
};
}
function serializeImportStatus(
status: RecordingImportStatus | null | undefined
): RecordingStatus | null {
if (!status) {
return null;
}
return {
id: status.id,
status: status.importStatus,
appName: status.appName,
startTime: status.startTime,
filepath: status.filepath,
sampleRate: status.sampleRate,
numberOfChannels: status.numberOfChannels,
durationMs: status.durationMs,
size: status.size,
degraded: status.degraded,
overflowCount: status.overflowCount,
errorMessage: status.errorMessage,
};
}
function getProjectedRecordingStatus(): RecordingStatus | null {
const session = recordingStateMachine.status;
if (
session?.sessionStatus === 'new' ||
session?.sessionStatus === 'starting' ||
session?.sessionStatus === 'recording' ||
session?.sessionStatus === 'finalizing' ||
session?.sessionStatus === 'finalize_failed'
) {
return serializeSessionStatus(session);
}
return serializeImportStatus(recordingArtifactRegistry.latest());
}
function emitRecordingStatus() {
recordingStatus$.next(getProjectedRecordingStatus());
}
function createAppGroup(processGroupId: number): AppGroupInfo | undefined {
@@ -193,10 +305,10 @@ function setupNewRunningAppGroup() {
);
appGroups$.value.forEach(group => {
const recordingStatus = recordingStatus$.value;
const recordingStatus = recordingStateMachine.status;
if (
group.isRunning &&
(!recordingStatus || recordingStatus.status === 'new')
(!recordingStatus || recordingStatus.sessionStatus === 'new')
) {
newRecording(group);
}
@@ -225,15 +337,13 @@ function setupNewRunningAppGroup() {
return;
}
const recordingStatus = recordingStatus$.value;
const recordingStatus = recordingStateMachine.status;
if (currentGroup.isRunning) {
// when the app is running and there is no active recording popup
// we should show a new recording popup
if (
!recordingStatus ||
recordingStatus.status === 'new' ||
isRecordingSettled(recordingStatus)
recordingStatus.sessionStatus === 'new' ||
!hasActiveRecordingSession(recordingStatus)
) {
if (MeetingsSettingsState.value.recordingMode === 'prompt') {
newRecording(currentGroup);
@@ -251,7 +361,7 @@ function setupNewRunningAppGroup() {
// when displaying in "new" state but the app is not running any more
// we should remove the recording
if (
recordingStatus?.status === 'new' &&
recordingStatus?.sessionStatus === 'new' &&
currentGroup.bundleIdentifier ===
recordingStatus.appGroup?.bundleIdentifier
) {
@@ -261,7 +371,7 @@ function setupNewRunningAppGroup() {
// if the watched app stops while we are recording it,
// we should stop the recording
if (
recordingStatus?.status === 'recording' &&
recordingStatus?.sessionStatus === 'recording' &&
recordingStatus.appGroup?.bundleIdentifier ===
currentGroup.bundleIdentifier
) {
@@ -276,28 +386,49 @@ function setupNewRunningAppGroup() {
export async function getRecording(id: number) {
const recording = recordingStateMachine.status;
if (!recording || recording.id !== id) {
if (recording?.id === id) {
return {
id,
appGroup: recording.appGroup,
app: recording.app,
startTime: recording.startTime,
filepath: recording.artifact?.filepath,
sampleRate: recording.artifact?.sampleRate,
numberOfChannels: recording.artifact?.numberOfChannels,
};
}
const artifact = recordingArtifactRegistry.entries.find(
recordingArtifact => recordingArtifact.id === id
);
if (!artifact) {
logger.error(`Recording ${id} not found`);
return;
}
return {
id,
appGroup: recording.appGroup,
app: recording.app,
startTime: recording.startTime,
filepath: recording.filepath,
sampleRate: recording.sampleRate,
numberOfChannels: recording.numberOfChannels,
startTime: artifact.startTime,
filepath: artifact.filepath,
sampleRate: artifact.sampleRate,
numberOfChannels: artifact.numberOfChannels,
};
}
// recording popup status
// new: waiting for user confirmation
// recording: native recording is ongoing
// processing: native stop or renderer import/transcription is ongoing
// ready + blockCreationStatus: post-processing finished
// null: hide popup
function setupRecordingListeners() {
subscribers.push(
recordingSessionStatus$
.pipe(distinctUntilChanged(shallowEqual))
.subscribe(() => {
emitRecordingStatus();
}),
recordingImportQueue$
.pipe(distinctUntilChanged(shallowEqual))
.subscribe(() => {
emitRecordingStatus();
})
);
subscribers.push(
recordingStatus$
.pipe(distinctUntilChanged(shallowEqual))
@@ -310,13 +441,12 @@ function setupRecordingListeners() {
});
}
if (isRecordingSettled(status)) {
// show the popup for 10s
if (isTerminalPopupStatus(status)) {
setTimeout(
() => {
const currentStatus = recordingStatus$.value;
if (
isRecordingSettled(currentStatus) &&
isTerminalPopupStatus(currentStatus) &&
currentStatus.id === status.id
) {
popup.hide().catch(err => {
@@ -324,10 +454,12 @@ function setupRecordingListeners() {
});
}
},
status.blockCreationStatus === 'failed' ? 30_000 : 10_000
status.status === 'import_failed' ||
status.status === 'finalize_failed'
? 30_000
: 10_000
);
} else if (!status) {
// status is removed, we should hide the popup
popupManager
.get('recording')
.hide()
@@ -452,11 +584,10 @@ export function setupRecordingFeature() {
shareableContent = new ShareableContent();
setupMediaListeners();
}
// reset all states
recordingStatus$.next(null);
setupAppGroups();
setupNewRunningAppGroup();
setupRecordingListeners();
emitRecordingStatus();
return true;
} catch (error) {
logger.error('failed to setup recording feature', error);
@@ -479,10 +610,12 @@ function normalizeAppGroupInfo(
export function newRecording(
appGroup?: AppGroupInfo | number
): RecordingStatus | null {
return recordingStateMachine.dispatch({
const nextState = recordingStateMachine.dispatch({
type: 'NEW_RECORDING',
appGroup: normalizeAppGroupInfo(appGroup),
});
emitRecordingStatus();
return serializeRecordingStatus(nextState);
}
export async function startRecording(
@@ -493,9 +626,10 @@ export async function startRecording(
type: 'START_RECORDING',
appGroup: normalizeAppGroupInfo(appGroup),
});
emitRecordingStatus();
if (!state || state.status !== 'recording' || state === previousState) {
return state;
if (!state || state.sessionStatus !== 'starting' || state === previousState) {
return serializeRecordingStatus(state);
}
let nativeId: string | undefined;
@@ -503,7 +637,9 @@ export async function startRecording(
try {
fs.ensureDirSync(SAVED_RECORDINGS_DIR);
const meta = getNativeModule().startRecording({
logger.info(`recording ${state.id} starting`);
const nativeModule = await getNativeModuleAsync();
const meta = await nativeModule.startRecording({
appProcessId: state.app?.processId,
outputDir: SAVED_RECORDINGS_DIR,
format: 'opus',
@@ -521,22 +657,30 @@ export async function startRecording(
sampleRate: meta.sampleRate,
numberOfChannels: meta.channels,
});
emitRecordingStatus();
if (!nextState || nextState.nativeId !== meta.id) {
throw new Error('Failed to attach native recording metadata');
}
return nextState;
logger.info(`recording ${state.id} started`, {
nativeId: meta.id,
sampleRate: meta.sampleRate,
channels: meta.channels,
});
return serializeRecordingStatus(nextState);
} catch (error) {
if (nativeId) {
cleanupAbandonedNativeRecording(nativeId);
await cleanupAbandonedNativeRecording(nativeId);
}
logger.error('failed to start recording', error);
return setRecordingBlockCreationStatus(
state.id,
'failed',
error instanceof Error ? error.message : undefined
);
const nextState = recordingStateMachine.dispatch({
type: 'START_RECORDING_FAILED',
id: state.id,
errorMessage: error instanceof Error ? error.message : undefined,
});
emitRecordingStatus();
return serializeRecordingStatus(nextState);
}
}
@@ -552,34 +696,58 @@ export async function stopRecording(id: number) {
return;
}
const processingState = recordingStateMachine.dispatch({
const finalizingState = recordingStateMachine.dispatch({
type: 'STOP_RECORDING',
id,
});
emitRecordingStatus();
if (
!processingState ||
processingState.id !== id ||
processingState.status !== 'processing'
!finalizingState ||
finalizingState.id !== id ||
finalizingState.sessionStatus !== 'finalizing'
) {
return serializeRecordingStatus(processingState ?? recording);
return serializeRecordingStatus(finalizingState ?? recording);
}
try {
const artifact = getNativeModule().stopRecording(recording.nativeId);
logger.info(`recording ${id} finalizing`, {
nativeId: recording.nativeId,
});
const nativeModule = await getNativeModuleAsync();
const artifact = await nativeModule.stopRecording(recording.nativeId);
const filepath = await assertRecordingFilepath(artifact.filepath);
const readyStatus = recordingStateMachine.dispatch({
const finalizedStatus = recordingStateMachine.dispatch({
type: 'ATTACH_RECORDING_ARTIFACT',
id,
filepath,
sampleRate: artifact.sampleRate,
numberOfChannels: artifact.channels,
artifact: {
filepath,
sampleRate: artifact.sampleRate,
numberOfChannels: artifact.channels,
durationMs: artifact.durationMs,
size: artifact.size,
degraded: artifact.degraded,
overflowCount: artifact.overflowCount,
},
});
emitRecordingStatus();
if (!readyStatus) {
if (!finalizedStatus) {
logger.error('No recording status to save');
return;
}
const importEntry = recordingArtifactRegistry.enqueueFromSession(
finalizedStatus,
finalizedStatus.artifact ?? { filepath }
);
emitRecordingStatus();
logger.info(`recording ${id} finalized`, {
filepath,
degraded: artifact.degraded,
overflowCount: artifact.overflowCount,
importStatus: importEntry?.importStatus,
});
getMainWindow()
.then(mainWindow => {
if (mainWindow) {
@@ -590,19 +758,16 @@ export async function stopRecording(id: number) {
logger.error('failed to bring up the window', err);
});
return serializeRecordingStatus(readyStatus);
return serializeRecordingStatus(getProjectedRecordingStatus());
} catch (error: unknown) {
logger.error('Failed to stop recording', error);
const recordingStatus = await setRecordingBlockCreationStatus(
const nextState = recordingStateMachine.dispatch({
type: 'FINALIZE_RECORDING_FAILED',
id,
'failed',
error instanceof Error ? error.message : undefined
);
if (!recordingStatus) {
logger.error('No recording status to stop');
return;
}
return serializeRecordingStatus(recordingStatus);
errorMessage: error instanceof Error ? error.message : undefined,
});
emitRecordingStatus();
return serializeRecordingStatus(nextState);
}
}
@@ -618,66 +783,137 @@ export async function readRecordingFile(filepath: string) {
return fsp.readFile(normalizedPath);
}
function cleanupAbandonedNativeRecording(nativeId: string) {
async function cleanupAbandonedNativeRecording(nativeId: string) {
try {
const artifact = getNativeModule().stopRecording(nativeId);
void assertRecordingFilepath(artifact.filepath)
.then(filepath => {
fs.removeSync(filepath);
})
.catch(error => {
logger.error('failed to validate abandoned recording filepath', error);
});
const nativeModule = await getNativeModuleAsync();
await nativeModule.abortRecording(nativeId);
} catch (error) {
logger.error('failed to cleanup abandoned native recording', error);
}
}
export async function setRecordingBlockCreationStatus(
id: number,
status: 'success' | 'failed',
errorMessage?: string
) {
return recordingStateMachine.dispatch({
type: 'SET_BLOCK_CREATION_STATUS',
id,
status,
errorMessage,
export function getRecordingImportQueue() {
return recordingArtifactRegistry.entries.flatMap(entry => {
const serialized = serializeRecordingImportStatus(entry);
return serialized ? [serialized] : [];
});
}
export function claimRecordingImport(id: number) {
const status = recordingArtifactRegistry.claim(id);
if (status) {
logger.info(`recording import ${id} claimed`);
}
emitRecordingStatus();
return serializeRecordingImportStatus(status);
}
export function completeRecordingImport(id: number) {
logger.info(`recording import ${id} completed`);
const status = recordingArtifactRegistry.markImported(id);
emitRecordingStatus();
return serializeRecordingImportStatus(status);
}
export function failRecordingImport(id: number, errorMessage?: string) {
logger.error(`recording import ${id} failed`, errorMessage);
const status = recordingArtifactRegistry.markFailed(id, errorMessage);
emitRecordingStatus();
return serializeRecordingImportStatus(status);
}
export function removeRecording(id: number) {
recordingStateMachine.dispatch({ type: 'REMOVE_RECORDING', id });
recordingArtifactRegistry.remove(id);
emitRecordingStatus();
}
export interface SerializedRecordingStatus {
id: number;
status: RecordingStatus['status'];
blockCreationStatus?: RecordingStatus['blockCreationStatus'];
appName?: string;
// if there is no app group, it means the recording is for system audio
appGroupId?: number;
icon?: Buffer;
startTime: number;
filepath?: string;
sampleRate?: number;
numberOfChannels?: number;
durationMs?: number;
size?: number;
degraded?: boolean;
overflowCount?: number;
errorMessage?: string;
}
export function serializeRecordingStatus(
status: RecordingStatus
status: RecordingStatus | RecordingSessionStatus | null | undefined
): SerializedRecordingStatus | null {
const serialized =
!status || 'sessionStatus' in status
? serializeSessionStatus(status ?? null)
: status;
if (!serialized) {
return null;
}
return {
id: serialized.id,
status: serialized.status,
appName: serialized.appName,
appGroupId: serialized.appGroupId,
icon: serialized.icon,
startTime: serialized.startTime,
filepath: serialized.filepath,
sampleRate: serialized.sampleRate,
numberOfChannels: serialized.numberOfChannels,
durationMs: serialized.durationMs,
size: serialized.size,
degraded: serialized.degraded,
overflowCount: serialized.overflowCount,
errorMessage: serialized.errorMessage,
};
}
export interface SerializedRecordingImportStatus {
id: number;
appName?: string;
startTime: number;
filepath: string;
sampleRate?: number;
numberOfChannels?: number;
durationMs?: number;
size?: number;
degraded?: boolean;
overflowCount?: number;
importStatus: RecordingImportStatus['importStatus'];
errorMessage?: string;
createdAt: number;
updatedAt: number;
}
export function serializeRecordingImportStatus(
status: RecordingImportStatus | null | undefined
): SerializedRecordingImportStatus | null {
if (!status) {
return null;
}
return {
id: status.id,
status: status.status,
blockCreationStatus: status.blockCreationStatus,
appName: status.appGroup?.name,
appGroupId: status.appGroup?.processGroupId,
icon: status.appGroup?.icon,
appName: status.appName,
startTime: status.startTime,
filepath: status.filepath,
sampleRate: status.sampleRate,
numberOfChannels: status.numberOfChannels,
durationMs: status.durationMs,
size: status.size,
degraded: status.degraded,
overflowCount: status.overflowCount,
importStatus: status.importStatus,
errorMessage: status.errorMessage,
createdAt: status.createdAt,
updatedAt: status.updatedAt,
};
}

View File

@@ -11,15 +11,20 @@ import {
askForMeetingPermission,
checkMeetingPermissions,
checkRecordingAvailable,
claimRecordingImport,
completeRecordingImport,
disableRecordingFeature,
failRecordingImport,
getRecording,
getRecordingImportQueue,
readRecordingFile,
recordingImportQueue$,
recordingStatus$,
removeRecording,
SAVED_RECORDINGS_DIR,
type SerializedRecordingImportStatus,
type SerializedRecordingStatus,
serializeRecordingStatus,
setRecordingBlockCreationStatus,
setupRecordingFeature,
startRecording,
stopRecording,
@@ -45,13 +50,17 @@ export const recordingHandlers = {
readRecordingFile: async (_, filepath: string) => {
return readRecordingFile(filepath);
},
setRecordingBlockCreationStatus: async (
_,
id: number,
status: 'success' | 'failed',
errorMessage?: string
) => {
return setRecordingBlockCreationStatus(id, status, errorMessage);
getRecordingImportQueue: async () => {
return getRecordingImportQueue();
},
claimRecordingImport: async (_, id: number) => {
return claimRecordingImport(id);
},
completeRecordingImport: async (_, id: number) => {
return completeRecordingImport(id);
},
failRecordingImport: async (_, id: number, errorMessage?: string) => {
return failRecordingImport(id, errorMessage);
},
removeRecording: async (_, id: number) => {
return removeRecording(id);
@@ -108,4 +117,35 @@ export const recordingEvents = {
}
};
},
onRecordingImportQueueChanged: (
fn: (queue: SerializedRecordingImportStatus[]) => void
) => {
const sub = recordingImportQueue$.subscribe(queue => {
fn(
queue.map(item => ({
id: item.id,
appName: item.appName,
startTime: item.startTime,
filepath: item.filepath,
sampleRate: item.sampleRate,
numberOfChannels: item.numberOfChannels,
durationMs: item.durationMs,
size: item.size,
degraded: item.degraded,
overflowCount: item.overflowCount,
importStatus: item.importStatus,
errorMessage: item.errorMessage,
createdAt: item.createdAt,
updatedAt: item.updatedAt,
}))
);
});
return () => {
try {
sub.unsubscribe();
} catch {
// ignore unsubscribe error
}
};
},
};

View File

@@ -2,17 +2,15 @@ import { BehaviorSubject } from 'rxjs';
import { shallowEqual } from '../../shared/utils';
import { logger } from '../logger';
import type { AppGroupInfo, RecordingStatus } from './types';
import type {
AppGroupInfo,
RecordingArtifactInfo,
RecordingSessionStatus,
} from './types';
/**
* Recording state machine events
*/
export type RecordingEvent =
| { type: 'NEW_RECORDING'; appGroup?: AppGroupInfo }
| {
type: 'START_RECORDING';
appGroup?: AppGroupInfo;
}
| { type: 'START_RECORDING'; appGroup?: AppGroupInfo }
| {
type: 'ATTACH_NATIVE_RECORDING';
id: number;
@@ -22,56 +20,33 @@ export type RecordingEvent =
sampleRate: number;
numberOfChannels: number;
}
| {
type: 'STOP_RECORDING';
id: number;
}
| { type: 'START_RECORDING_FAILED'; id: number; errorMessage?: string }
| { type: 'STOP_RECORDING'; id: number }
| {
type: 'ATTACH_RECORDING_ARTIFACT';
id: number;
filepath: string;
sampleRate?: number;
numberOfChannels?: number;
}
| {
type: 'SET_BLOCK_CREATION_STATUS';
id: number;
status: 'success' | 'failed';
errorMessage?: string;
artifact: RecordingArtifactInfo;
}
| { type: 'FINALIZE_RECORDING_FAILED'; id: number; errorMessage?: string }
| { type: 'ABORT_RECORDING'; id: number }
| { type: 'REMOVE_RECORDING'; id: number };
/**
* Recording State Machine
* Handles state transitions for the recording process
*/
export class RecordingStateMachine {
private recordingId = 0;
private readonly recordingStatus$ =
new BehaviorSubject<RecordingStatus | null>(null);
new BehaviorSubject<RecordingSessionStatus | null>(null);
/**
* Get the current recording status
*/
get status(): RecordingStatus | null {
get status(): RecordingSessionStatus | null {
return this.recordingStatus$.value;
}
/**
* Get the BehaviorSubject for recording status
*/
get status$(): BehaviorSubject<RecordingStatus | null> {
get status$(): BehaviorSubject<RecordingSessionStatus | null> {
return this.recordingStatus$;
}
/**
* Dispatch an event to the state machine
* @param event The event to dispatch
* @returns The new recording status after the event is processed
*/
dispatch(event: RecordingEvent, emit = true): RecordingStatus | null {
dispatch(event: RecordingEvent, emit = true): RecordingSessionStatus | null {
const currentStatus = this.recordingStatus$.value;
let newStatus: RecordingStatus | null = null;
let newStatus: RecordingSessionStatus | null = null;
switch (event.type) {
case 'NEW_RECORDING':
@@ -83,24 +58,30 @@ export class RecordingStateMachine {
case 'ATTACH_NATIVE_RECORDING':
newStatus = this.handleAttachNativeRecording(event);
break;
case 'START_RECORDING_FAILED':
newStatus = this.handleStartRecordingFailed(
event.id,
event.errorMessage
);
break;
case 'STOP_RECORDING':
newStatus = this.handleStopRecording(event.id);
break;
case 'ATTACH_RECORDING_ARTIFACT':
newStatus = this.handleAttachRecordingArtifact(
event.id,
event.filepath,
event.sampleRate,
event.numberOfChannels
event.artifact
);
break;
case 'SET_BLOCK_CREATION_STATUS':
newStatus = this.handleSetBlockCreationStatus(
case 'FINALIZE_RECORDING_FAILED':
newStatus = this.handleFinalizeRecordingFailed(
event.id,
event.status,
event.errorMessage
);
break;
case 'ABORT_RECORDING':
newStatus = this.handleAbortRecording(event.id);
break;
case 'REMOVE_RECORDING':
this.handleRemoveRecording(event.id);
newStatus = currentStatus?.id === event.id ? null : currentStatus;
@@ -121,86 +102,104 @@ export class RecordingStateMachine {
return newStatus;
}
/**
* Handle the NEW_RECORDING event
*/
private handleNewRecording(appGroup?: AppGroupInfo): RecordingStatus {
const recordingStatus: RecordingStatus = {
private hasActiveSession(status: RecordingSessionStatus | null | undefined) {
return (
status?.sessionStatus === 'starting' ||
status?.sessionStatus === 'recording' ||
status?.sessionStatus === 'finalizing'
);
}
private handleNewRecording(appGroup?: AppGroupInfo): RecordingSessionStatus {
return {
id: this.recordingId++,
status: 'new',
sessionStatus: 'new',
startTime: Date.now(),
app: appGroup?.apps.find(app => app.isRunning),
appGroup,
};
return recordingStatus;
}
/**
* Handle the START_RECORDING event
*/
private handleStartRecording(appGroup?: AppGroupInfo): RecordingStatus {
private handleStartRecording(
appGroup?: AppGroupInfo
): RecordingSessionStatus | null {
const currentStatus = this.recordingStatus$.value;
if (
currentStatus?.status === 'recording' ||
currentStatus?.status === 'processing'
) {
if (this.hasActiveSession(currentStatus)) {
logger.error(
'Cannot start a new recording if there is already a recording'
'Cannot start a new recording while another session is active'
);
return currentStatus;
}
if (
currentStatus?.sessionStatus === 'new' &&
appGroup &&
currentStatus?.appGroup?.processGroupId === appGroup.processGroupId &&
currentStatus.status === 'new'
currentStatus.appGroup?.processGroupId === appGroup.processGroupId
) {
return {
...currentStatus,
status: 'recording',
};
} else {
const newStatus = this.handleNewRecording(appGroup);
return {
...newStatus,
status: 'recording',
sessionStatus: 'starting',
errorMessage: undefined,
};
}
const nextStatus =
currentStatus?.sessionStatus === 'new' && !appGroup
? currentStatus
: this.handleNewRecording(appGroup);
return {
...nextStatus,
sessionStatus: 'starting',
errorMessage: undefined,
};
}
/**
* Attach native recording metadata to the current recording
*/
private handleAttachNativeRecording(
event: Extract<RecordingEvent, { type: 'ATTACH_NATIVE_RECORDING' }>
): RecordingStatus | null {
) {
const currentStatus = this.recordingStatus$.value;
if (!currentStatus || currentStatus.id !== event.id) {
logger.error(`Recording ${event.id} not found for native attachment`);
return currentStatus;
}
if (currentStatus.status !== 'recording') {
if (currentStatus.sessionStatus !== 'starting') {
logger.error(
`Cannot attach native metadata when recording is in ${currentStatus.status} state`
`Cannot attach native metadata when recording is in ${currentStatus.sessionStatus} state`
);
return currentStatus;
}
return {
...currentStatus,
sessionStatus: 'recording' as const,
nativeId: event.nativeId,
startTime: event.startTime,
filepath: event.filepath,
sampleRate: event.sampleRate,
numberOfChannels: event.numberOfChannels,
artifact: {
filepath: event.filepath,
sampleRate: event.sampleRate,
numberOfChannels: event.numberOfChannels,
},
};
}
/**
* Handle the STOP_RECORDING event
*/
private handleStopRecording(id: number): RecordingStatus | null {
private handleStartRecordingFailed(id: number, errorMessage?: string) {
const currentStatus = this.recordingStatus$.value;
if (!currentStatus || currentStatus.id !== id) {
logger.error(`Recording ${id} not found for start failure`);
return currentStatus;
}
return {
...currentStatus,
sessionStatus: 'finalize_failed' as const,
errorMessage,
};
}
private handleStopRecording(id: number) {
const currentStatus = this.recordingStatus$.value;
if (!currentStatus || currentStatus.id !== id) {
@@ -208,26 +207,24 @@ export class RecordingStateMachine {
return currentStatus;
}
if (currentStatus.status !== 'recording') {
logger.error(`Cannot stop recording in ${currentStatus.status} state`);
if (currentStatus.sessionStatus !== 'recording') {
logger.error(
`Cannot stop recording in ${currentStatus.sessionStatus} state`
);
return currentStatus;
}
return {
...currentStatus,
status: 'processing',
sessionStatus: 'finalizing' as const,
errorMessage: undefined,
};
}
/**
* Attach the encoded artifact once native stop completes
*/
private handleAttachRecordingArtifact(
id: number,
filepath: string,
sampleRate?: number,
numberOfChannels?: number
): RecordingStatus | null {
artifact: RecordingArtifactInfo
) {
const currentStatus = this.recordingStatus$.value;
if (!currentStatus || currentStatus.id !== id) {
@@ -235,66 +232,60 @@ export class RecordingStateMachine {
return currentStatus;
}
if (currentStatus.status !== 'processing') {
logger.error(`Cannot attach artifact in ${currentStatus.status} state`);
if (currentStatus.sessionStatus !== 'finalizing') {
logger.error(
`Cannot attach artifact in ${currentStatus.sessionStatus} state`
);
return currentStatus;
}
return {
...currentStatus,
filepath,
sampleRate: sampleRate ?? currentStatus.sampleRate,
numberOfChannels: numberOfChannels ?? currentStatus.numberOfChannels,
sessionStatus: 'finalized' as const,
artifact: {
...currentStatus.artifact,
...artifact,
},
};
}
/**
* Set the renderer-side block creation result
*/
private handleSetBlockCreationStatus(
id: number,
status: 'success' | 'failed',
errorMessage?: string
): RecordingStatus | null {
private handleFinalizeRecordingFailed(id: number, errorMessage?: string) {
const currentStatus = this.recordingStatus$.value;
if (!currentStatus || currentStatus.id !== id) {
logger.error(`Recording ${id} not found for block creation status`);
return currentStatus;
}
if (currentStatus.status === 'new') {
logger.error(`Cannot settle recording ${id} before it starts`);
return currentStatus;
}
if (
currentStatus.status === 'ready' &&
currentStatus.blockCreationStatus !== undefined
) {
logger.error(`Recording ${id} not found for finalize failure`);
return currentStatus;
}
if (errorMessage) {
logger.error(`Recording ${id} create block failed: ${errorMessage}`);
logger.error(`Recording ${id} finalize failed: ${errorMessage}`);
}
return {
...currentStatus,
status: 'ready',
blockCreationStatus: status,
sessionStatus: 'finalize_failed' as const,
errorMessage,
};
}
/**
* Handle the REMOVE_RECORDING event
*/
private handleRemoveRecording(id: number): void {
// Actual recording removal logic would be handled by the caller
// This just ensures the state is updated correctly
private handleAbortRecording(id: number) {
const currentStatus = this.recordingStatus$.value;
if (!currentStatus || currentStatus.id !== id) {
logger.error(`Recording ${id} not found for abort`);
return currentStatus;
}
return {
...currentStatus,
sessionStatus: 'aborted' as const,
errorMessage: undefined,
};
}
private handleRemoveRecording(id: number) {
logger.info(`Recording ${id} removed from state machine`);
}
}
// Create and export a singleton instance
export const recordingStateMachine = new RecordingStateMachine();

View File

@@ -1,35 +1,68 @@
# Recording State Transitions
The desktop recording flow now has a single linear engine state and a separate post-process result.
The desktop recording flow now uses two independent lifecycle models:
## Engine states
1. recording session state in Electron main, which tracks native capture/finalize.
2. artifact import state in Electron main, which tracks renderer-side doc import.
- `inactive`: no active recording
- `new`: app detected, waiting for user confirmation
- `recording`: native capture is running
- `processing`: native capture has stopped and the artifact is being imported
- `ready`: post-processing has finished
## Recording Session State
## Post-process result
- `inactive`: no session has been created yet.
- `new`: app detected, waiting for user confirmation.
- `starting`: native session setup is in progress.
- `recording`: native capture is running.
- `finalizing`: native stop/finalize is in progress.
- `finalized`: native finalized an artifact successfully.
- `finalize_failed`: native finalize failed.
- `aborted`: native session was discarded without producing an artifact.
`ready` recordings may carry `blockCreationStatus`:
Only `starting`, `recording`, and `finalizing` occupy the active native slot.
`finalized`, `finalize_failed`, and `aborted` no longer block the next recording.
- `success`: the recording block was created successfully
- `failed`: the artifact was saved, but block creation/import failed
## Recording Artifact Import State
## State flow
- `pending_import`: artifact is finalized and durable in main, waiting for a renderer to consume it.
- `importing`: a renderer has claimed the artifact and is importing it into a doc.
- `imported`: doc import finished successfully.
- `import_failed`: doc import failed; the artifact remains available for retry.
Artifacts are persisted in main process storage so renderer reloads or missing workspace context do not drop them.
## Session Flow
```text
inactive -> new -> recording -> processing -> ready
^ |
| |
+------ start ---------+
inactive -> new -> starting -> recording -> finalizing -> finalized
\ \
\ -> finalize_failed
-> finalize_failed
```
- `START_RECORDING` creates or reuses a pending `new` recording.
- `ATTACH_NATIVE_RECORDING` fills in native metadata while staying in `recording`.
- `STOP_RECORDING` moves the flow to `processing`.
- `ATTACH_RECORDING_ARTIFACT` attaches the finalized `.opus` artifact while staying in `processing`.
- `SET_BLOCK_CREATION_STATUS` settles the flow as `ready`.
- `START_RECORDING` creates or reuses a pending `new` recording and moves it to `starting`.
- `ATTACH_NATIVE_RECORDING` attaches native session metadata and moves the session to `recording`.
- `START_RECORDING_FAILED` keeps the session terminal with `finalize_failed`.
- `STOP_RECORDING` moves the session to `finalizing`.
- `ATTACH_RECORDING_ARTIFACT` marks the session `finalized` with the native artifact metadata.
- `FINALIZE_RECORDING_FAILED` marks the session `finalize_failed`.
- `ABORT_RECORDING` marks the session `aborted`.
Only one recording can be active at a time. A new recording can start only after the previous one has been removed or its `ready` result has been settled.
## Import Flow
```text
pending_import -> importing -> imported
\
-> import_failed -> importing
```
- main enqueues `pending_import` after native finalize succeeds.
- renderer claims the artifact, moving it to `importing`.
- renderer marks the artifact `imported` or `import_failed`.
- `import_failed` artifacts can be claimed again for retry.
## Popup Projection
The popup still renders a single current status, but it is now a projection:
- active session states map to `new`, `starting`, `recording`, `finalizing`, `finalize_failed`.
- otherwise the latest import entry maps to `pending_import`, `importing`, `imported`, `import_failed`.
This keeps the UI simple without collapsing the underlying source-of-truth back into a single overloaded `processing` state.

View File

@@ -18,19 +18,76 @@ export interface AppGroupInfo {
isRunning: boolean;
}
export interface RecordingStatus {
export type RecordingSessionState =
| 'new'
| 'starting'
| 'recording'
| 'finalizing'
| 'finalized'
| 'finalize_failed'
| 'aborted';
export type RecordingImportState =
| 'pending_import'
| 'importing'
| 'imported'
| 'import_failed';
export interface RecordingArtifactInfo {
filepath: string;
sampleRate?: number;
numberOfChannels?: number;
durationMs?: number;
size?: number;
degraded?: boolean;
overflowCount?: number;
}
export interface RecordingSessionStatus {
id: number; // corresponds to the recording id
// an app group is detected and waiting for user confirmation
// recording: the native recorder is running
// processing: recording has stopped and the artifact is being prepared/imported
// ready: the post-processing result has been settled
status: 'new' | 'recording' | 'processing' | 'ready';
sessionStatus: RecordingSessionState;
app?: TappableAppInfo;
appGroup?: AppGroupInfo;
startTime: number; // 0 means not started yet
filepath?: string; // encoded file path
nativeId?: string;
artifact?: RecordingArtifactInfo;
errorMessage?: string;
}
export interface RecordingImportStatus extends RecordingArtifactInfo {
id: number;
appName?: string;
startTime: number;
importStatus: RecordingImportState;
errorMessage?: string;
createdAt: number;
updatedAt: number;
}
export type RecordingDisplayState =
| 'new'
| 'starting'
| 'recording'
| 'finalizing'
| 'pending_import'
| 'importing'
| 'imported'
| 'import_failed'
| 'finalize_failed';
export interface RecordingStatus {
id: number;
status: RecordingDisplayState;
appName?: string;
appGroupId?: number;
icon?: Buffer;
startTime: number;
filepath?: string;
sampleRate?: number;
numberOfChannels?: number;
blockCreationStatus?: 'success' | 'failed';
durationMs?: number;
size?: number;
degraded?: boolean;
overflowCount?: number;
errorMessage?: string;
}

View File

@@ -160,7 +160,12 @@ class TrayState implements Disposable {
const recordingStatus = recordingStatus$.value;
if (!recordingStatus || recordingStatus.status !== 'recording') {
if (
!recordingStatus ||
(recordingStatus.status !== 'starting' &&
recordingStatus.status !== 'recording' &&
recordingStatus.status !== 'finalizing')
) {
const appMenuItems = runningAppGroups.map(appGroup => ({
label: appGroup.name,
icon: appGroup.icon || undefined,
@@ -197,8 +202,8 @@ class TrayState implements Disposable {
...appMenuItems
);
} else {
const recordingLabel = recordingStatus.appGroup?.name
? `Recording (${recordingStatus.appGroup?.name})`
const recordingLabel = recordingStatus.appName
? `Recording (${recordingStatus.appName})`
: 'Recording';
// recording is active
@@ -212,9 +217,11 @@ class TrayState implements Disposable {
label: 'Stop',
click: () => {
logger.info('User action: Stop Recording');
stopRecording(recordingStatus.id).catch(err => {
logger.error('Failed to stop recording:', err);
});
if (recordingStatus.status === 'recording') {
stopRecording(recordingStatus.id).catch(err => {
logger.error('Failed to stop recording:', err);
});
}
},
}
);

View File

@@ -2,22 +2,24 @@ import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest';
const isActiveTab = vi.fn();
const readRecordingFile = vi.fn();
const setRecordingBlockCreationStatus = vi.fn();
const claimRecordingImport = vi.fn();
const completeRecordingImport = vi.fn();
const failRecordingImport = vi.fn();
const getRecordingImportQueue = vi.fn();
const getCurrentWorkspace = vi.fn();
const isAiEnabled = vi.fn();
const transcribeRecording = vi.fn();
let onRecordingStatusChanged:
| ((
status: {
id: number;
status: 'processing';
appName?: string;
filepath?: string;
startTime: number;
blockCreationStatus?: 'success' | 'failed';
} | null
) => void)
type RecordingImportStatus = {
id: number;
appName?: string;
filepath: string;
startTime: number;
importStatus: 'pending_import' | 'importing' | 'imported' | 'import_failed';
};
let onRecordingImportQueueChanged:
| ((queue: RecordingImportStatus[]) => void)
| undefined;
vi.mock('@affine/core/modules/doc', () => ({
@@ -46,16 +48,19 @@ vi.mock('@affine/electron-api', () => ({
},
recording: {
readRecordingFile,
setRecordingBlockCreationStatus,
claimRecordingImport,
completeRecordingImport,
failRecordingImport,
getRecordingImportQueue,
},
},
events: {
recording: {
onRecordingStatusChanged: vi.fn(
(handler: typeof onRecordingStatusChanged) => {
onRecordingStatusChanged = handler;
onRecordingImportQueueChanged: vi.fn(
(handler: typeof onRecordingImportQueueChanged) => {
onRecordingImportQueueChanged = handler;
return () => {
onRecordingStatusChanged = undefined;
onRecordingImportQueueChanged = undefined;
};
}
),
@@ -162,10 +167,12 @@ describe('recording effect', () => {
vi.useFakeTimers();
vi.clearAllMocks();
vi.resetModules();
onRecordingStatusChanged = undefined;
onRecordingImportQueueChanged = undefined;
readRecordingFile.mockResolvedValue(new Uint8Array([1, 2, 3]).buffer);
setRecordingBlockCreationStatus.mockResolvedValue(undefined);
completeRecordingImport.mockResolvedValue(undefined);
failRecordingImport.mockResolvedValue(undefined);
isAiEnabled.mockReturnValue(false);
getRecordingImportQueue.mockResolvedValue([]);
});
afterEach(() => {
@@ -173,37 +180,43 @@ describe('recording effect', () => {
vi.useRealTimers();
});
test('retries processing until the active tab has a workspace', async () => {
test('retries pending imports until the active tab has a workspace', async () => {
const workspace = createWorkspaceRef();
const pendingImport = {
id: 7,
importStatus: 'pending_import' as const,
appName: 'Zoom',
filepath: '/tmp/meeting.opus',
startTime: 1000,
};
isActiveTab.mockResolvedValueOnce(false).mockResolvedValue(true);
getCurrentWorkspace
.mockReturnValueOnce(undefined)
.mockReturnValue(workspace.ref);
claimRecordingImport.mockResolvedValue({
...pendingImport,
importStatus: 'importing',
});
getRecordingImportQueue.mockResolvedValue([pendingImport]);
const { setupRecordingEvents } =
await import('../../../electron-renderer/src/app/effects/recording');
setupRecordingEvents({} as never);
onRecordingStatusChanged?.({
id: 7,
status: 'processing',
appName: 'Zoom',
filepath: '/tmp/meeting.opus',
startTime: 1000,
});
await Promise.resolve();
expect(workspace.createDoc).not.toHaveBeenCalled();
expect(setRecordingBlockCreationStatus).not.toHaveBeenCalled();
expect(claimRecordingImport).not.toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(1000);
expect(workspace.createDoc).not.toHaveBeenCalled();
expect(setRecordingBlockCreationStatus).not.toHaveBeenCalled();
expect(claimRecordingImport).not.toHaveBeenCalled();
onRecordingImportQueueChanged?.([pendingImport]);
await vi.advanceTimersByTimeAsync(1000);
expect(claimRecordingImport).toHaveBeenCalledWith(7);
expect(workspace.createDoc).toHaveBeenCalledTimes(1);
expect(workspace.openDoc).toHaveBeenCalledWith('doc-1');
expect(workspace.blobSet).toHaveBeenCalledTimes(1);
@@ -215,42 +228,51 @@ describe('recording effect', () => {
expect.objectContaining({ type: 'audio/ogg' }),
'note-1'
);
expect(setRecordingBlockCreationStatus).toHaveBeenCalledWith(7, 'success');
expect(setRecordingBlockCreationStatus).not.toHaveBeenCalledWith(
7,
'failed',
expect.anything()
);
expect(completeRecordingImport).toHaveBeenCalledWith(7);
expect(failRecordingImport).not.toHaveBeenCalled();
});
test('retries when the active-tab probe rejects', async () => {
const workspace = createWorkspaceRef();
test('marks imports as failed when the doc import throws and retries later', async () => {
const pendingImport = {
id: 9,
importStatus: 'import_failed' as const,
appName: 'Meet',
filepath: '/tmp/meeting.opus',
startTime: 1000,
};
isActiveTab
.mockRejectedValueOnce(new Error('probe failed'))
.mockResolvedValue(true);
const workspace = createWorkspaceRef();
workspace.createDoc.mockImplementationOnce(() => {
throw new Error('create doc failed');
});
isActiveTab.mockResolvedValue(true);
getCurrentWorkspace.mockReturnValue(workspace.ref);
claimRecordingImport
.mockResolvedValueOnce({
...pendingImport,
importStatus: 'importing',
})
.mockResolvedValueOnce({
...pendingImport,
importStatus: 'importing',
});
getRecordingImportQueue.mockResolvedValue([pendingImport]);
const { setupRecordingEvents } =
await import('../../../electron-renderer/src/app/effects/recording');
setupRecordingEvents({} as never);
onRecordingStatusChanged?.({
id: 9,
status: 'processing',
appName: 'Meet',
filepath: '/tmp/meeting.opus',
startTime: 1000,
});
await Promise.resolve();
expect(workspace.createDoc).not.toHaveBeenCalled();
expect(setRecordingBlockCreationStatus).not.toHaveBeenCalled();
await Promise.resolve();
await vi.advanceTimersByTimeAsync(0);
expect(failRecordingImport).toHaveBeenCalledWith(9, 'create doc failed');
expect(completeRecordingImport).not.toHaveBeenCalled();
onRecordingImportQueueChanged?.([pendingImport]);
await vi.advanceTimersByTimeAsync(1000);
expect(workspace.createDoc).toHaveBeenCalledTimes(1);
expect(setRecordingBlockCreationStatus).toHaveBeenCalledWith(9, 'success');
expect(claimRecordingImport).toHaveBeenCalledTimes(2);
});
});

View File

@@ -1,4 +1,5 @@
import { describe, expect, test, vi } from 'vitest';
import { BehaviorSubject } from 'rxjs';
import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest';
vi.mock('../../src/main/logger', () => ({
logger: {
@@ -9,14 +10,24 @@ vi.mock('../../src/main/logger', () => ({
import { RecordingStateMachine } from '../../src/main/recording/state-machine';
function createDeferred<T>() {
let resolve!: (value: T) => void;
let reject!: (reason?: unknown) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}
function createAttachedRecording(stateMachine: RecordingStateMachine) {
const pending = stateMachine.dispatch({
const starting = stateMachine.dispatch({
type: 'START_RECORDING',
});
stateMachine.dispatch({
type: 'ATTACH_NATIVE_RECORDING',
id: pending!.id,
id: starting!.id,
nativeId: 'native-1',
startTime: 100,
filepath: '/tmp/recording.opus',
@@ -24,93 +35,390 @@ function createAttachedRecording(stateMachine: RecordingStateMachine) {
numberOfChannels: 2,
});
return pending!;
return starting!;
}
describe('RecordingStateMachine', () => {
test('transitions from recording to ready after artifact import and block creation', () => {
test('tracks session lifecycle through finalized without coupling import state', () => {
const stateMachine = new RecordingStateMachine();
const pending = createAttachedRecording(stateMachine);
expect(pending?.status).toBe('recording');
const starting = stateMachine.dispatch({
type: 'START_RECORDING',
});
expect(starting).toMatchObject({
sessionStatus: 'starting',
});
const processing = stateMachine.dispatch({
const recording = stateMachine.dispatch({
type: 'ATTACH_NATIVE_RECORDING',
id: starting!.id,
nativeId: 'native-1',
startTime: 100,
filepath: '/tmp/recording.opus',
sampleRate: 48000,
numberOfChannels: 2,
});
expect(recording).toMatchObject({
sessionStatus: 'recording',
artifact: {
filepath: '/tmp/recording.opus',
sampleRate: 48000,
numberOfChannels: 2,
},
});
const finalizing = stateMachine.dispatch({
type: 'STOP_RECORDING',
id: pending.id,
id: starting!.id,
});
expect(processing?.status).toBe('processing');
expect(finalizing?.sessionStatus).toBe('finalizing');
const artifactAttached = stateMachine.dispatch({
const finalized = stateMachine.dispatch({
type: 'ATTACH_RECORDING_ARTIFACT',
id: pending.id,
filepath: '/tmp/recording.opus',
sampleRate: 48000,
numberOfChannels: 2,
id: starting!.id,
artifact: {
filepath: '/tmp/recording.opus',
durationMs: 1_000,
size: 128,
degraded: true,
overflowCount: 2,
},
});
expect(artifactAttached).toMatchObject({
status: 'processing',
filepath: '/tmp/recording.opus',
});
const ready = stateMachine.dispatch({
type: 'SET_BLOCK_CREATION_STATUS',
id: pending.id,
status: 'success',
});
expect(ready).toMatchObject({
status: 'ready',
blockCreationStatus: 'success',
});
});
test('keeps native audio metadata when stop artifact omits it', () => {
const stateMachine = new RecordingStateMachine();
const pending = createAttachedRecording(stateMachine);
stateMachine.dispatch({ type: 'STOP_RECORDING', id: pending.id });
const artifactAttached = stateMachine.dispatch({
type: 'ATTACH_RECORDING_ARTIFACT',
id: pending.id,
filepath: '/tmp/recording.opus',
});
expect(artifactAttached).toMatchObject({
sampleRate: 48000,
numberOfChannels: 2,
expect(finalized).toMatchObject({
sessionStatus: 'finalized',
artifact: {
filepath: '/tmp/recording.opus',
sampleRate: 48000,
numberOfChannels: 2,
durationMs: 1_000,
size: 128,
degraded: true,
overflowCount: 2,
},
});
});
test.each([
{ status: 'success' as const, errorMessage: undefined },
{ status: 'failed' as const, errorMessage: 'native start failed' },
{
name: 'finalized sessions',
settleEvent: {
type: 'ATTACH_RECORDING_ARTIFACT' as const,
artifact: {
filepath: '/tmp/recording.opus',
},
},
expectedStatus: 'finalized',
},
{
name: 'failed finalize sessions',
settleEvent: {
type: 'FINALIZE_RECORDING_FAILED' as const,
errorMessage: 'boom',
},
expectedStatus: 'finalize_failed',
},
])(
'settles recordings into ready state with blockCreationStatus=$status',
({ status, errorMessage }) => {
'allows a new recording after $name',
({ settleEvent, expectedStatus }) => {
const stateMachine = new RecordingStateMachine();
const pending = stateMachine.dispatch({
type: 'START_RECORDING',
const pending = createAttachedRecording(stateMachine);
stateMachine.dispatch({
type: 'STOP_RECORDING',
id: pending.id,
});
expect(pending?.status).toBe('recording');
const settled = stateMachine.dispatch({
type: 'SET_BLOCK_CREATION_STATUS',
id: pending!.id,
status,
errorMessage,
});
expect(settled).toMatchObject({
status: 'ready',
blockCreationStatus: status,
id: pending.id,
...settleEvent,
});
expect(settled?.sessionStatus).toBe(expectedStatus);
const next = stateMachine.dispatch({
type: 'START_RECORDING',
});
expect(next?.id).toBeGreaterThan(pending!.id);
expect(next?.status).toBe('recording');
expect(next?.blockCreationStatus).toBeUndefined();
expect(next?.id).toBeGreaterThan(pending.id);
expect(next?.sessionStatus).toBe('starting');
}
);
});
describe('recording feature', () => {
const nativeStartRecording = vi.fn();
const nativeStopRecording = vi.fn();
const nativeAbortRecording = vi.fn();
const ensureDirSync = vi.fn();
const resolveExistingPathInBase = vi.fn(
async (_base: string, filepath: string) => filepath
);
const getMainWindow = vi.fn(async () => ({
show: vi.fn(),
}));
const storageState = new Map<string, unknown>();
const watchSubjects = new Map<string, BehaviorSubject<unknown>>();
beforeEach(() => {
vi.resetModules();
vi.clearAllMocks();
storageState.clear();
watchSubjects.clear();
vi.doMock('@affine/native', () => ({
ShareableContent: class ShareableContent {
static applications() {
return [];
}
static applicationWithProcessId() {
return null;
}
static isUsingMicrophone() {
return false;
}
static onApplicationListChanged() {
return { unsubscribe: vi.fn() };
}
static onAppStateChanged() {
return { unsubscribe: vi.fn() };
}
},
startRecording: nativeStartRecording,
stopRecording: nativeStopRecording,
abortRecording: nativeAbortRecording,
}));
vi.doMock('electron', () => ({
app: {
getPath: vi.fn(() => '/tmp'),
on: vi.fn(),
},
systemPreferences: {
getMediaAccessStatus: vi.fn(() => 'granted'),
askForMediaAccess: vi.fn(async () => true),
},
}));
vi.doMock('fs-extra', () => ({
default: {
ensureDirSync,
removeSync: vi.fn(),
},
}));
vi.doMock('../../src/shared/utils', async () => {
const actual = await vi.importActual('../../src/shared/utils');
return {
...actual,
isMacOS: () => false,
isWindows: () => false,
resolveExistingPathInBase,
};
});
vi.doMock('../../src/main/shared-storage/storage', () => ({
globalStateStorage: {
get: (key: string) => storageState.get(key),
set: (key: string, value: unknown) => {
storageState.set(key, value);
const subject$ = watchSubjects.get(key);
subject$?.next(value);
},
watch: (key: string) => {
const subject$ =
watchSubjects.get(key) ??
new BehaviorSubject(storageState.get(key));
watchSubjects.set(key, subject$);
return subject$.asObservable();
},
},
}));
vi.doMock('../../src/main/windows-manager', () => ({
getMainWindow,
}));
vi.doMock('../../src/main/windows-manager/popup', () => ({
popupManager: {
get: () => ({
showing: false,
show: vi.fn(async () => undefined),
hide: vi.fn(async () => undefined),
}),
},
}));
vi.doMock('lodash-es', () => ({
debounce: (fn: (...args: unknown[]) => void) => fn,
}));
});
afterEach(() => {
vi.clearAllTimers();
});
test('slow start exposes starting state before native setup resolves', async () => {
const startDeferred = createDeferred<{
id: string;
filepath: string;
sampleRate: number;
channels: number;
startedAt: number;
}>();
nativeStartRecording.mockReturnValue(startDeferred.promise);
const {
recordingStatus$,
setRecordingNativeModuleForTesting,
startRecording,
} = await import('../../src/main/recording/feature');
setRecordingNativeModuleForTesting({
ShareableContent: class ShareableContent {},
startRecording: nativeStartRecording,
stopRecording: nativeStopRecording,
abortRecording: nativeAbortRecording,
} as never);
const startPromise = startRecording();
expect(recordingStatus$.value).toMatchObject({
status: 'starting',
});
startDeferred.resolve({
id: 'native-1',
filepath: '/tmp/0.opus',
sampleRate: 48_000,
channels: 2,
startedAt: 123,
});
await startPromise;
expect(recordingStatus$.value).toMatchObject({
status: 'recording',
});
expect(recordingStatus$.value?.filepath).toContain('0.opus');
});
test('slow stop transitions through finalizing and then pending_import', async () => {
nativeStartRecording.mockResolvedValue({
id: 'native-1',
filepath: '/tmp/0.opus',
sampleRate: 48_000,
channels: 2,
startedAt: 123,
});
const stopDeferred = createDeferred<{
id: string;
filepath: string;
sampleRate: number;
channels: number;
durationMs: number;
size: number;
degraded: boolean;
overflowCount: number;
}>();
nativeStopRecording.mockReturnValue(stopDeferred.promise);
const {
getRecordingImportQueue,
recordingStatus$,
setRecordingNativeModuleForTesting,
startRecording,
stopRecording,
} = await import('../../src/main/recording/feature');
setRecordingNativeModuleForTesting({
ShareableContent: class ShareableContent {},
startRecording: nativeStartRecording,
stopRecording: nativeStopRecording,
abortRecording: nativeAbortRecording,
} as never);
const started = await startRecording();
const stopPromise = stopRecording(started!.id);
expect(recordingStatus$.value).toMatchObject({
id: started!.id,
status: 'finalizing',
});
stopDeferred.resolve({
id: 'native-1',
filepath: '/tmp/0.opus',
sampleRate: 48_000,
channels: 2,
durationMs: 2_000,
size: 256,
degraded: true,
overflowCount: 4,
});
await stopPromise;
expect(recordingStatus$.value).toMatchObject({
id: started!.id,
status: 'pending_import',
degraded: true,
overflowCount: 4,
});
expect(getRecordingImportQueue()).toEqual([
expect.objectContaining({
id: started!.id,
importStatus: 'pending_import',
filepath: '/tmp/0.opus',
degraded: true,
overflowCount: 4,
}),
]);
});
test('stop failure releases the active slot for the next recording', async () => {
nativeStartRecording
.mockResolvedValueOnce({
id: 'native-1',
filepath: '/tmp/0.opus',
sampleRate: 48_000,
channels: 2,
startedAt: 123,
})
.mockResolvedValueOnce({
id: 'native-2',
filepath: '/tmp/1.opus',
sampleRate: 48_000,
channels: 2,
startedAt: 456,
});
nativeStopRecording.mockRejectedValue(new Error('native stop failed'));
const {
recordingStatus$,
setRecordingNativeModuleForTesting,
startRecording,
stopRecording,
} = await import('../../src/main/recording/feature');
setRecordingNativeModuleForTesting({
ShareableContent: class ShareableContent {},
startRecording: nativeStartRecording,
stopRecording: nativeStopRecording,
abortRecording: nativeAbortRecording,
} as never);
const first = await startRecording();
await stopRecording(first!.id);
expect(recordingStatus$.value).toMatchObject({
id: first!.id,
status: 'finalize_failed',
errorMessage: 'native stop failed',
});
const second = await startRecording();
expect(second).toMatchObject({
id: expect.any(Number),
status: 'recording',
});
expect(second!.id).toBeGreaterThan(first!.id);
});
});

View File

@@ -132,51 +132,46 @@ describe('workspace db management', () => {
).toBe(false);
});
test('rejects unsafe ids when deleting a workspace', async () => {
const { deleteWorkspace } =
await import('@affine/electron/helper/workspace/handlers');
const outsideDir = path.join(tmpDir, 'outside-delete-target');
test.each([
{
name: 'deleting a workspace',
outsideDirName: 'outside-delete-target',
call: async () => {
const { deleteWorkspace } =
await import('@affine/electron/helper/workspace/handlers');
return deleteWorkspace(
universalId({
peer: 'local',
type: 'workspace',
id: '../../outside-delete-target',
})
);
},
},
{
name: 'deleting backup workspaces',
outsideDirName: 'outside-backup-target',
call: async () => {
const { deleteBackupWorkspace } =
await import('@affine/electron/helper/workspace/handlers');
return deleteBackupWorkspace('../../outside-backup-target');
},
},
{
name: 'recovering backup workspaces',
outsideDirName: 'outside-recover-target',
call: async () => {
const { recoverBackupWorkspace } =
await import('@affine/electron/helper/workspace/handlers');
return recoverBackupWorkspace('../../outside-recover-target');
},
},
])('rejects unsafe ids when $name', async ({ outsideDirName, call }) => {
const outsideDir = path.join(tmpDir, outsideDirName);
await fs.ensureDir(outsideDir);
await expect(
deleteWorkspace(
universalId({
peer: 'local',
type: 'workspace',
id: '../../outside-delete-target',
})
)
).rejects.toThrow('Invalid workspace id');
expect(await fs.pathExists(outsideDir)).toBe(true);
});
test('rejects unsafe ids when deleting backup workspaces', async () => {
const { deleteBackupWorkspace } =
await import('@affine/electron/helper/workspace/handlers');
const outsideDir = path.join(tmpDir, 'outside-backup-target');
await fs.ensureDir(outsideDir);
await expect(
deleteBackupWorkspace('../../outside-backup-target')
).rejects.toThrow('Invalid workspace id');
expect(await fs.pathExists(outsideDir)).toBe(true);
});
test('rejects unsafe ids when recovering backup workspaces', async () => {
const { recoverBackupWorkspace } =
await import('@affine/electron/helper/workspace/handlers');
const outsideDir = path.join(tmpDir, 'outside-recover-target');
await fs.ensureDir(outsideDir);
await expect(
recoverBackupWorkspace('../../outside-recover-target')
).rejects.toThrow('Invalid workspace id');
await expect(call()).rejects.toThrow('Invalid workspace id');
expect(await fs.pathExists(outsideDir)).toBe(true);
});
});

View File

@@ -36,6 +36,8 @@ export declare class ShareableContent {
static tapGlobalAudio(excludedProcesses: Array<ApplicationInfo> | undefined | null, audioStreamCallback: ((err: Error | null, arg: Float32Array) => void)): AudioCaptureSession
}
export declare function abortRecording(id: string): Promise<void>
export declare function decodeAudio(buf: Uint8Array, destSampleRate?: number | undefined | null, filename?: string | undefined | null, signal?: AbortSignal | undefined | null): Promise<Float32Array>
/** Decode audio file into a Float32Array */
@@ -48,6 +50,8 @@ export interface RecordingArtifact {
channels: number
durationMs: number
size: number
degraded: boolean
overflowCount: number
}
export interface RecordingSessionMeta {
@@ -68,9 +72,9 @@ export interface RecordingStartOptions {
id?: string
}
export declare function startRecording(opts: RecordingStartOptions): RecordingSessionMeta
export declare function startRecording(opts: RecordingStartOptions): Promise<RecordingSessionMeta>
export declare function stopRecording(id: string): RecordingArtifact
export declare function stopRecording(id: string): Promise<RecordingArtifact>
export interface MermaidRenderOptions {
theme?: string
fontFamily?: string

View File

@@ -577,6 +577,7 @@ module.exports.ApplicationListChangedSubscriber = nativeBinding.ApplicationListC
module.exports.ApplicationStateChangedSubscriber = nativeBinding.ApplicationStateChangedSubscriber
module.exports.AudioCaptureSession = nativeBinding.AudioCaptureSession
module.exports.ShareableContent = nativeBinding.ShareableContent
module.exports.abortRecording = nativeBinding.abortRecording
module.exports.decodeAudio = nativeBinding.decodeAudio
module.exports.decodeAudioSync = nativeBinding.decodeAudioSync
module.exports.startRecording = nativeBinding.startRecording

View File

@@ -21,6 +21,7 @@ rand = { workspace = true }
rubato = { workspace = true }
symphonia = { workspace = true, features = ["all", "opt-simd"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt", "sync"] }
[target.'cfg(target_os = "macos")'.dependencies]
block2 = { workspace = true }

View File

@@ -1,6 +1,9 @@
use std::sync::Arc;
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use crossbeam_channel::Sender;
use crossbeam_channel::{Sender, TrySendError};
use napi::{
bindgen_prelude::Float32Array,
threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
@@ -11,7 +14,10 @@ use napi::{
#[derive(Clone)]
pub enum AudioCallback {
Js(Arc<ThreadsafeFunction<Float32Array, ()>>),
Channel(Sender<Vec<f32>>),
Channel {
sender: Sender<Vec<f32>>,
overflow_count: Arc<AtomicU64>,
},
}
impl AudioCallback {
@@ -22,10 +28,16 @@ impl AudioCallback {
// audio thread.
let _ = func.call(Ok(samples.into()), ThreadsafeFunctionCallMode::NonBlocking);
}
Self::Channel(sender) => {
// Drop the chunk if the channel is full to avoid blocking capture.
let _ = sender.try_send(samples);
}
Self::Channel { sender, overflow_count } => match sender.try_send(samples) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
let dropped = overflow_count.fetch_add(1, Ordering::Relaxed) + 1;
if dropped == 1 || dropped.is_power_of_two() {
eprintln!("[affine_media_capture] audio queue overflow, dropped {dropped} chunks");
}
}
Err(TrySendError::Disconnected(_)) => {}
},
}
}
}

View File

@@ -2,7 +2,10 @@ use std::{
fs,
io::{BufWriter, Write},
path::PathBuf,
sync::{LazyLock, Mutex},
sync::{
Arc, LazyLock,
atomic::{AtomicU64, Ordering},
},
thread::{self, JoinHandle},
time::{SystemTime, UNIX_EPOCH},
};
@@ -13,6 +16,7 @@ use napi_derive::napi;
use ogg::writing::{PacketWriteEndInfo, PacketWriter};
use opus_codec::{Application, Channels, Encoder, FrameSize, SampleRate as OpusSampleRate};
use rubato::Resampler;
use tokio::sync::{Mutex as AsyncMutex, mpsc, oneshot};
#[cfg(any(target_os = "macos", target_os = "windows"))]
use crate::audio_callback::AudioCallback;
@@ -24,6 +28,7 @@ use crate::windows::screen_capture_kit::ShareableContent;
const ENCODE_SAMPLE_RATE: OpusSampleRate = OpusSampleRate::Hz48000;
const MAX_PACKET_SIZE: usize = 4096;
const RESAMPLER_INPUT_CHUNK: usize = 1024;
const AUDIO_CHUNK_QUEUE_CAPACITY: usize = 1024;
type RecordingResult<T> = std::result::Result<T, RecordingError>;
@@ -55,6 +60,8 @@ pub struct RecordingArtifact {
pub channels: u32,
pub duration_ms: i64,
pub size: i64,
pub degraded: bool,
pub overflow_count: u32,
}
#[derive(Debug, thiserror::Error)]
@@ -448,6 +455,8 @@ impl OggOpusWriter {
channels: self.channels.as_usize() as u32,
duration_ms,
size,
degraded: false,
overflow_count: 0,
})
}
}
@@ -507,17 +516,41 @@ impl PlatformCapture {
}
enum ControlMessage {
Stop(Sender<RecordingResult<RecordingArtifact>>),
Stop {
reply_tx: oneshot::Sender<RecordingResult<RecordingArtifact>>,
},
Abort {
reply_tx: oneshot::Sender<RecordingResult<()>>,
},
}
struct ActiveRecording {
id: String,
control_tx: Sender<ControlMessage>,
control_tx: mpsc::UnboundedSender<ControlMessage>,
controller: Option<JoinHandle<()>>,
}
static ACTIVE_RECORDING: LazyLock<Mutex<Option<ActiveRecording>>> = LazyLock::new(|| Mutex::new(None));
static START_RECORDING_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
#[derive(Default)]
struct RecordingQualityMetrics {
overflow_count: Arc<AtomicU64>,
}
impl RecordingQualityMetrics {
fn shared_counter(&self) -> Arc<AtomicU64> {
Arc::clone(&self.overflow_count)
}
fn overflow_count(&self) -> u32 {
self
.overflow_count
.load(Ordering::Relaxed)
.try_into()
.unwrap_or(u32::MAX)
}
}
static ACTIVE_RECORDING: LazyLock<AsyncMutex<Option<ActiveRecording>>> = LazyLock::new(|| AsyncMutex::new(None));
static START_RECORDING_LOCK: LazyLock<AsyncMutex<()>> = LazyLock::new(|| AsyncMutex::new(()));
fn now_millis() -> i64 {
SystemTime::now()
@@ -568,10 +601,17 @@ fn build_excluded_refs(ids: &[u32]) -> Result<Vec<ApplicationInfo>> {
Ok(excluded)
}
fn start_capture(opts: &RecordingStartOptions, tx: Sender<Vec<f32>>) -> Result<(PlatformCapture, u32, u32)> {
fn start_capture(
opts: &RecordingStartOptions,
tx: Sender<Vec<f32>>,
overflow_count: Arc<AtomicU64>,
) -> Result<(PlatformCapture, u32, u32)> {
#[cfg(target_os = "macos")]
{
let callback = AudioCallback::Channel(tx);
let callback = AudioCallback::Channel {
sender: tx,
overflow_count,
};
let session = if let Some(app_id) = opts.app_process_id {
ShareableContent::tap_audio_with_callback(app_id, callback)?
} else {
@@ -586,7 +626,10 @@ fn start_capture(opts: &RecordingStartOptions, tx: Sender<Vec<f32>>) -> Result<(
#[cfg(target_os = "windows")]
{
let callback = AudioCallback::Channel(tx);
let callback = AudioCallback::Channel {
sender: tx,
overflow_count,
};
let session =
ShareableContent::tap_audio_with_callback(opts.app_process_id.unwrap_or(0), callback, opts.sample_rate)?;
let sample_rate = session.get_sample_rate().round() as u32;
@@ -598,6 +641,7 @@ fn start_capture(opts: &RecordingStartOptions, tx: Sender<Vec<f32>>) -> Result<(
{
let _ = opts;
let _ = tx;
let _ = overflow_count;
Err(RecordingError::UnsupportedPlatform.into())
}
}
@@ -625,13 +669,19 @@ fn spawn_recording_controller(
id: String,
filepath: PathBuf,
opts: RecordingStartOptions,
) -> (Receiver<RecordingResult<u32>>, Sender<ControlMessage>, JoinHandle<()>) {
let (started_tx, started_rx) = bounded(1);
let (control_tx, control_rx) = bounded(1);
) -> (
oneshot::Receiver<RecordingResult<u32>>,
mpsc::UnboundedSender<ControlMessage>,
JoinHandle<()>,
) {
let (started_tx, started_rx) = oneshot::channel();
let (control_tx, mut control_rx) = mpsc::unbounded_channel();
let controller = thread::spawn(move || {
let (tx, rx) = bounded::<Vec<f32>>(32);
let (mut capture, capture_rate, capture_channels) = match start_capture(&opts, tx.clone()) {
let (tx, rx) = bounded::<Vec<f32>>(AUDIO_CHUNK_QUEUE_CAPACITY);
let metrics = RecordingQualityMetrics::default();
let (mut capture, capture_rate, capture_channels) = match start_capture(&opts, tx.clone(), metrics.shared_counter())
{
Ok(capture) => capture,
Err(error) => {
let _ = started_tx.send(Err(RecordingError::Start(error.to_string())));
@@ -675,28 +725,52 @@ fn spawn_recording_controller(
return;
}
while let Ok(message) = control_rx.recv() {
if let Some(message) = control_rx.blocking_recv() {
match message {
ControlMessage::Stop(reply_tx) => {
let result = match capture.stop() {
Ok(()) => {
drop(audio_tx.take());
match worker.take() {
Some(handle) => match handle.join() {
Ok(result) => result,
Err(_) => Err(RecordingError::Join),
},
None => Err(RecordingError::Join),
}
}
ControlMessage::Stop { reply_tx } => {
let stop_result = capture.stop();
drop(audio_tx.take());
let worker_result = match worker.take() {
Some(handle) => match handle.join() {
Ok(result) => result,
Err(_) => Err(RecordingError::Join),
},
None => Err(RecordingError::Join),
};
let result = match stop_result {
Ok(()) => worker_result.map(|mut artifact| {
artifact.overflow_count = metrics.overflow_count();
artifact.degraded = artifact.overflow_count > 0;
artifact
}),
Err(error) => Err(RecordingError::Start(error.to_string())),
};
let _ = reply_tx.send(result);
}
ControlMessage::Abort { reply_tx } => {
let stop_result = capture.stop();
drop(audio_tx.take());
let worker_result = match worker.take() {
Some(handle) => match handle.join() {
Ok(result) => result,
Err(_) => Err(RecordingError::Join),
},
None => Err(RecordingError::Join),
};
let result = match stop_result {
Ok(()) => match worker_result {
Ok(artifact) => {
fs::remove_file(&artifact.filepath).ok();
Ok(())
}
Err(RecordingError::Empty) => Ok(()),
Err(error) => Err(error),
},
Err(error) => Err(RecordingError::Start(error.to_string())),
};
if worker.is_none() {
break;
}
let _ = reply_tx.send(result);
}
}
}
@@ -711,17 +785,65 @@ fn spawn_recording_controller(
(started_rx, control_tx, controller)
}
fn cleanup_recording_controller(control_tx: &Sender<ControlMessage>, controller: JoinHandle<()>) {
let (reply_tx, reply_rx) = bounded(1);
let _ = control_tx.send(ControlMessage::Stop(reply_tx));
let _ = reply_rx.recv();
let _ = controller.join();
async fn join_controller_handle(controller: JoinHandle<()>) -> RecordingResult<()> {
tokio::task::spawn_blocking(move || controller.join().map_err(|_| RecordingError::Join))
.await
.map_err(|_| RecordingError::Join)?
}
fn take_active_recording(id: &str) -> RecordingResult<ActiveRecording> {
let mut active_recording = ACTIVE_RECORDING
.lock()
.map_err(|_| RecordingError::Start("lock poisoned".into()))?;
async fn cleanup_recording_controller(control_tx: &mpsc::UnboundedSender<ControlMessage>, controller: JoinHandle<()>) {
let (reply_tx, reply_rx) = oneshot::channel();
let _ = control_tx.send(ControlMessage::Abort { reply_tx });
let _ = reply_rx.await;
let _ = join_controller_handle(controller).await;
}
fn map_recording_result<T>(result: RecordingResult<T>) -> Result<T> {
match result {
Ok(value) => Ok(value),
Err(RecordingError::Start(message)) => Err(RecordingError::Start(message).into()),
Err(error) => Err(error.into()),
}
}
async fn send_control_message<T>(
id: &str,
message: ControlMessage,
reply_rx: oneshot::Receiver<RecordingResult<T>>,
) -> Result<T> {
let control_tx = {
let recording = ACTIVE_RECORDING.lock().await;
let active = recording.as_ref().ok_or(RecordingError::NotFound)?;
if active.id != id {
return Err(RecordingError::NotFound.into());
}
active.control_tx.clone()
};
if control_tx.send(message).is_err() {
if let Ok(recording) = take_active_recording(id).await {
let _ = join_active_recording(recording).await;
}
return Err(RecordingError::Join.into());
}
let response = match reply_rx.await {
Ok(response) => response,
Err(_) => {
if let Ok(recording) = take_active_recording(id).await {
let _ = join_active_recording(recording).await;
}
return Err(RecordingError::Join.into());
}
};
let active_recording = take_active_recording(id).await?;
join_active_recording(active_recording).await?;
map_recording_result(response)
}
async fn take_active_recording(id: &str) -> RecordingResult<ActiveRecording> {
let mut active_recording = ACTIVE_RECORDING.lock().await;
let recording = active_recording.take().ok_or(RecordingError::NotFound)?;
if recording.id != id {
*active_recording = Some(recording);
@@ -730,15 +852,14 @@ fn take_active_recording(id: &str) -> RecordingResult<ActiveRecording> {
Ok(recording)
}
fn join_active_recording(mut recording: ActiveRecording) -> RecordingResult<()> {
async fn join_active_recording(mut recording: ActiveRecording) -> RecordingResult<()> {
if let Some(handle) = recording.controller.take() {
handle.join().map_err(|_| RecordingError::Join)?;
join_controller_handle(handle).await?;
}
Ok(())
}
#[napi]
pub fn start_recording(opts: RecordingStartOptions) -> Result<RecordingSessionMeta> {
async fn start_recording_inner(opts: RecordingStartOptions) -> Result<RecordingSessionMeta> {
if let Some(fmt) = opts.format.as_deref()
&& !fmt.eq_ignore_ascii_case("opus")
{
@@ -748,17 +869,12 @@ pub fn start_recording(opts: RecordingStartOptions) -> Result<RecordingSessionMe
normalize_channel_count(channels)?;
}
let _start_lock = START_RECORDING_LOCK
.lock()
.map_err(|_| RecordingError::Start("lock poisoned".into()))?;
let _start_lock = START_RECORDING_LOCK.lock().await;
let output_dir = validate_output_dir(&opts.output_dir)?;
let id = sanitize_id(opts.id.clone());
{
let recording = ACTIVE_RECORDING
.lock()
.map_err(|_| RecordingError::Start("lock poisoned".into()))?;
let recording = ACTIVE_RECORDING.lock().await;
if recording.is_some() {
return Err(RecordingError::Start("recording already active".into()).into());
}
@@ -770,9 +886,17 @@ pub fn start_recording(opts: RecordingStartOptions) -> Result<RecordingSessionMe
}
let (started_rx, control_tx, controller) = spawn_recording_controller(id.clone(), filepath.clone(), opts);
let encoding_channels = started_rx
.recv()
.map_err(|_| RecordingError::Start("failed to start recording controller".into()))??;
let encoding_channels = match started_rx.await {
Ok(Ok(channels)) => channels,
Ok(Err(error)) => {
let _ = join_controller_handle(controller).await;
return Err(error.into());
}
Err(_) => {
let _ = join_controller_handle(controller).await;
return Err(RecordingError::Start("failed to start recording controller".into()).into());
}
};
let meta = RecordingSessionMeta {
id: id.clone(),
@@ -782,16 +906,10 @@ pub fn start_recording(opts: RecordingStartOptions) -> Result<RecordingSessionMe
started_at: now_millis(),
};
let mut recording = match ACTIVE_RECORDING.lock() {
Ok(recording) => recording,
Err(_) => {
cleanup_recording_controller(&control_tx, controller);
return Err(RecordingError::Start("lock poisoned".into()).into());
}
};
let mut recording = ACTIVE_RECORDING.lock().await;
if recording.is_some() {
cleanup_recording_controller(&control_tx, controller);
cleanup_recording_controller(&control_tx, controller).await;
return Err(RecordingError::Start("recording already active".into()).into());
}
@@ -805,63 +923,50 @@ pub fn start_recording(opts: RecordingStartOptions) -> Result<RecordingSessionMe
}
#[napi]
pub fn stop_recording(id: String) -> Result<RecordingArtifact> {
let control_tx = {
let recording = ACTIVE_RECORDING
.lock()
.map_err(|_| RecordingError::Start("lock poisoned".into()))?;
pub async fn start_recording(opts: RecordingStartOptions) -> Result<RecordingSessionMeta> {
start_recording_inner(opts).await
}
let active = recording.as_ref().ok_or(RecordingError::NotFound)?;
if active.id != id {
return Err(RecordingError::NotFound.into());
}
active.control_tx.clone()
};
async fn stop_recording_inner(id: String) -> Result<RecordingArtifact> {
let (reply_tx, reply_rx) = oneshot::channel();
send_control_message(&id, ControlMessage::Stop { reply_tx }, reply_rx).await
}
let (reply_tx, reply_rx) = bounded(1);
if control_tx.send(ControlMessage::Stop(reply_tx)).is_err() {
if let Ok(recording) = take_active_recording(&id) {
let _ = join_active_recording(recording);
}
return Err(RecordingError::Join.into());
}
#[napi]
pub async fn stop_recording(id: String) -> Result<RecordingArtifact> {
stop_recording_inner(id).await
}
let response = match reply_rx.recv() {
Ok(response) => response,
Err(_) => {
if let Ok(recording) = take_active_recording(&id) {
let _ = join_active_recording(recording);
}
return Err(RecordingError::Join.into());
}
};
async fn abort_recording_inner(id: String) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
send_control_message(&id, ControlMessage::Abort { reply_tx }, reply_rx).await
}
let artifact = match response {
Ok(artifact) => artifact,
Err(RecordingError::Start(message)) => {
return Err(RecordingError::Start(message).into());
}
Err(error) => {
if let Ok(recording) = take_active_recording(&id) {
let _ = join_active_recording(recording);
}
return Err(error.into());
}
};
let active_recording = take_active_recording(&id)?;
join_active_recording(active_recording)?;
Ok(artifact)
#[napi]
pub async fn abort_recording(id: String) -> Result<()> {
abort_recording_inner(id).await
}
#[cfg(test)]
mod tests {
use std::{env, fs::File, path::PathBuf};
use std::{env, fs::File, path::PathBuf, thread};
use ogg::PacketReader;
use tokio::runtime::Builder;
use super::{OggOpusWriter, convert_interleaved_channels};
use super::{
ACTIVE_RECORDING, ActiveRecording, ControlMessage, OggOpusWriter, RecordingArtifact, RecordingError,
RecordingQualityMetrics, START_RECORDING_LOCK, abort_recording_inner, bounded, convert_interleaved_channels, mpsc,
stop_recording_inner,
};
use crate::audio_callback::AudioCallback;
fn block_on<F: std::future::Future>(future: F) -> F::Output {
Builder::new_current_thread()
.build()
.expect("create runtime")
.block_on(future)
}
fn temp_recording_path() -> PathBuf {
env::temp_dir().join(format!("affine-recording-test-{}.opus", rand::random::<u64>()))
@@ -939,4 +1044,105 @@ mod tests {
vec![3.0, 3.0, 4.0, 4.0]
);
}
#[test]
fn stop_recording_clears_active_session_after_stop_error() {
let _lock = START_RECORDING_LOCK.blocking_lock();
let id = String::from("stop-error");
let (control_tx, mut control_rx) = mpsc::unbounded_channel();
let controller = thread::spawn(move || {
let Some(ControlMessage::Stop { reply_tx }) = control_rx.blocking_recv() else {
panic!("expected stop");
};
let _ = reply_tx.send(Err(RecordingError::Start(String::from("boom"))));
});
*ACTIVE_RECORDING.blocking_lock() = Some(ActiveRecording {
id: id.clone(),
control_tx,
controller: Some(controller),
});
let error = match block_on(stop_recording_inner(id)) {
Ok(_) => panic!("stop should fail"),
Err(error) => error,
};
assert!(
error.to_string().contains("start failure: boom"),
"unexpected error: {error}"
);
assert!(ACTIVE_RECORDING.blocking_lock().is_none());
}
#[test]
fn stop_recording_returns_artifact_and_clears_active_session() {
let _lock = START_RECORDING_LOCK.blocking_lock();
let id = String::from("stop-success");
let (control_tx, mut control_rx) = mpsc::unbounded_channel();
let controller = thread::spawn(move || {
let Some(ControlMessage::Stop { reply_tx }) = control_rx.blocking_recv() else {
panic!("expected stop");
};
let _ = reply_tx.send(Ok(RecordingArtifact {
id: String::from("stop-success"),
filepath: String::from("/tmp/recording.opus"),
sample_rate: 48_000,
channels: 2,
duration_ms: 1_000,
size: 128,
degraded: true,
overflow_count: 3,
}));
});
*ACTIVE_RECORDING.blocking_lock() = Some(ActiveRecording {
id: id.clone(),
control_tx,
controller: Some(controller),
});
let artifact = block_on(stop_recording_inner(id)).expect("stop should succeed");
assert_eq!(artifact.filepath, "/tmp/recording.opus");
assert!(artifact.degraded);
assert_eq!(artifact.overflow_count, 3);
assert!(ACTIVE_RECORDING.blocking_lock().is_none());
}
#[test]
fn abort_recording_clears_active_session() {
let _lock = START_RECORDING_LOCK.blocking_lock();
let id = String::from("abort-success");
let (control_tx, mut control_rx) = mpsc::unbounded_channel();
let controller = thread::spawn(move || {
let Some(ControlMessage::Abort { reply_tx }) = control_rx.blocking_recv() else {
panic!("expected abort");
};
let _ = reply_tx.send(Ok(()));
});
*ACTIVE_RECORDING.blocking_lock() = Some(ActiveRecording {
id: id.clone(),
control_tx,
controller: Some(controller),
});
block_on(abort_recording_inner(id)).expect("abort should succeed");
assert!(ACTIVE_RECORDING.blocking_lock().is_none());
}
#[test]
fn queue_overflow_marks_recording_as_degraded() {
let metrics = RecordingQualityMetrics::default();
let (sender, receiver) = bounded(1);
let callback = AudioCallback::Channel {
sender,
overflow_count: metrics.shared_counter(),
};
callback.call(vec![0.1, 0.2]);
callback.call(vec![0.3, 0.4]);
let _ = receiver.recv().expect("queued audio");
assert_eq!(metrics.overflow_count(), 1);
}
}

View File

@@ -192,19 +192,26 @@ impl AudioCaptureSession {
if self.stopped.load(Ordering::SeqCst) {
return Ok(());
}
self.stopped.store(true, Ordering::SeqCst);
let mut pause_errors = Vec::new();
self
.mic_stream
.pause()
.map_err(|e| Error::new(Status::GenericFailure, format!("{e}")))?;
.map_err(|e| pause_errors.push(format!("pause mic stream: {e}")))
.ok();
self
.lb_stream
.pause()
.map_err(|e| Error::new(Status::GenericFailure, format!("{e}")))?;
self.stopped.store(true, Ordering::SeqCst);
.map_err(|e| pause_errors.push(format!("pause loopback stream: {e}")))
.ok();
if let Some(jh) = self.jh.take() {
let _ = jh.join(); // ignore poison
}
Ok(())
if pause_errors.is_empty() {
Ok(())
} else {
Err(Error::new(Status::GenericFailure, pause_errors.join("; ")))
}
}
}