feat: add ping for event source (#7493)

This commit is contained in:
darkskygit
2024-07-15 04:16:21 +00:00
parent 7103b2e594
commit e4b816f153

View File

@@ -14,12 +14,16 @@ import {
concatMap,
connect,
EMPTY,
finalize,
from,
interval,
map,
merge,
mergeMap,
Observable,
Subject,
switchMap,
takeUntil,
toArray,
} from 'rxjs';
@@ -41,7 +45,7 @@ import { CopilotCapability, CopilotTextProvider } from './types';
import { CopilotWorkflowService, GraphExecutorState } from './workflow';
export interface ChatEvent {
type: 'event' | 'attachment' | 'message' | 'error';
type: 'event' | 'attachment' | 'message' | 'error' | 'ping';
id?: string;
data: string | object;
}
@@ -51,6 +55,8 @@ type CheckResult = {
hasAttachment?: boolean;
};
const PING_INTERVAL = 5000;
@Controller('/api/copilot')
export class CopilotController {
private readonly logger = new Logger(CopilotController.name);
@@ -159,6 +165,19 @@ export class CopilotController {
return num;
}
private mergePingStream(
messageId: string,
source$: Observable<ChatEvent>
): Observable<ChatEvent> {
const subject$ = new Subject();
const ping$ = interval(PING_INTERVAL).pipe(
map(() => ({ type: 'ping' as const, id: messageId, data: '' })),
takeUntil(subject$)
);
return merge(source$.pipe(finalize(() => subject$.next(null))), ping$);
}
@Get('/chat/:sessionId')
async chat(
@CurrentUser() user: CurrentUser,
@@ -216,7 +235,7 @@ export class CopilotController {
const session = await this.appendSessionMessage(sessionId, messageId);
return from(
const source$ = from(
provider.generateTextStream(session.finish(params), session.model, {
...session.config.promptConfig,
signal: this.getSignal(req),
@@ -246,6 +265,8 @@ export class CopilotController {
),
catchError(mapSseError)
);
return this.mergePingStream(messageId, source$);
} catch (err) {
return mapSseError(err);
}
@@ -270,7 +291,7 @@ export class CopilotController {
});
}
return from(
const source$ = from(
this.workflow.runGraph(params, session.model, {
...session.config.promptConfig,
signal: this.getSignal(req),
@@ -316,6 +337,8 @@ export class CopilotController {
),
catchError(mapSseError)
);
return this.mergePingStream(messageId, source$);
} catch (err) {
return mapSseError(err);
}
@@ -353,7 +376,7 @@ export class CopilotController {
sessionId
);
return from(
const source$ = from(
provider.generateImagesStream(session.finish(params), session.model, {
seed: this.parseNumber(params.seed),
signal: this.getSignal(req),
@@ -389,6 +412,8 @@ export class CopilotController {
),
catchError(mapSseError)
);
return this.mergePingStream(messageId, source$);
} catch (err) {
return mapSseError(err);
}