diff --git a/packages/backend/server/src/plugins/copilot/controller.ts b/packages/backend/server/src/plugins/copilot/controller.ts index 7697ca24f1..38d4dfe992 100644 --- a/packages/backend/server/src/plugins/copilot/controller.ts +++ b/packages/backend/server/src/plugins/copilot/controller.ts @@ -299,6 +299,13 @@ export class CopilotController implements BeforeApplicationShutdown { this.ongoingStreamCount$.next(this.ongoingStreamCount$.value + 1); const { signal, onConnectionClosed } = getSignal(req); + let endBeforePromiseResolve = false; + onConnectionClosed(isAborted => { + if (isAborted) { + endBeforePromiseResolve = true; + } + }); + const { messageId, reasoning, webSearch } = ChatQuerySchema.parse(query); const source$ = from( @@ -322,21 +329,21 @@ export class CopilotController implements BeforeApplicationShutdown { shared$.pipe( reduce((acc, chunk) => acc + chunk, ''), tap(buffer => { - onConnectionClosed(isAborted => { - session.push({ - role: 'assistant', - content: isAborted ? '> Request aborted' : buffer, - createdAt: new Date(), - }); - void session - .save() - .catch(err => - this.logger.error( - 'Failed to save session in sse stream', - err - ) - ); + session.push({ + role: 'assistant', + content: endBeforePromiseResolve + ? '> Request aborted' + : buffer, + createdAt: new Date(), }); + void session + .save() + .catch(err => + this.logger.error( + 'Failed to save session in sse stream', + err + ) + ); }), ignoreElements() ) @@ -384,6 +391,13 @@ export class CopilotController implements BeforeApplicationShutdown { this.ongoingStreamCount$.next(this.ongoingStreamCount$.value + 1); const { signal, onConnectionClosed } = getSignal(req); + let endBeforePromiseResolve = false; + onConnectionClosed(isAborted => { + if (isAborted) { + endBeforePromiseResolve = true; + } + }); + const { messageId, reasoning, webSearch } = ChatQuerySchema.parse(query); const source$ = from( @@ -407,25 +421,25 @@ export class CopilotController implements BeforeApplicationShutdown { shared$.pipe( reduce((acc, chunk) => acc.concat([chunk]), [] as StreamObject[]), tap(result => { - onConnectionClosed(isAborted => { - const parser = new StreamObjectParser(); - const streamObjects = parser.mergeTextDelta(result); - const content = parser.mergeContent(streamObjects); - session.push({ - role: 'assistant', - content: isAborted ? '> Request aborted' : content, - streamObjects: isAborted ? null : streamObjects, - createdAt: new Date(), - }); - void session - .save() - .catch(err => - this.logger.error( - 'Failed to save session in sse stream', - err - ) - ); + const parser = new StreamObjectParser(); + const streamObjects = parser.mergeTextDelta(result); + const content = parser.mergeContent(streamObjects); + session.push({ + role: 'assistant', + content: endBeforePromiseResolve + ? '> Request aborted' + : content, + streamObjects: endBeforePromiseResolve ? null : streamObjects, + createdAt: new Date(), }); + void session + .save() + .catch(err => + this.logger.error( + 'Failed to save session in sse stream', + err + ) + ); }), ignoreElements() ) @@ -477,6 +491,13 @@ export class CopilotController implements BeforeApplicationShutdown { this.ongoingStreamCount$.next(this.ongoingStreamCount$.value + 1); const { signal, onConnectionClosed } = getSignal(req); + let endBeforePromiseResolve = false; + onConnectionClosed(isAborted => { + if (isAborted) { + endBeforePromiseResolve = true; + } + }); + const source$ = from( this.workflow.runGraph(params, session.model, { ...session.config.promptConfig, @@ -526,21 +547,21 @@ export class CopilotController implements BeforeApplicationShutdown { return acc; }, ''), tap(content => { - onConnectionClosed(isAborted => { - session.push({ - role: 'assistant', - content: isAborted ? '> Request aborted' : content, - createdAt: new Date(), - }); - void session - .save() - .catch(err => - this.logger.error( - 'Failed to save session in sse stream', - err - ) - ); + session.push({ + role: 'assistant', + content: endBeforePromiseResolve + ? '> Request aborted' + : content, + createdAt: new Date(), }); + void session + .save() + .catch(err => + this.logger.error( + 'Failed to save session in sse stream', + err + ) + ); }), ignoreElements() ) @@ -604,6 +625,13 @@ export class CopilotController implements BeforeApplicationShutdown { this.ongoingStreamCount$.next(this.ongoingStreamCount$.value + 1); const { signal, onConnectionClosed } = getSignal(req); + let endBeforePromiseResolve = false; + onConnectionClosed(isAborted => { + if (isAborted) { + endBeforePromiseResolve = true; + } + }); + const source$ = from( provider.streamImages( { @@ -639,22 +667,20 @@ export class CopilotController implements BeforeApplicationShutdown { shared$.pipe( reduce((acc, chunk) => acc.concat([chunk]), [] as string[]), tap(attachments => { - onConnectionClosed(isAborted => { - session.push({ - role: 'assistant', - content: isAborted ? '> Request aborted' : '', - attachments: isAborted ? [] : attachments, - createdAt: new Date(), - }); - void session - .save() - .catch(err => - this.logger.error( - 'Failed to save session in sse stream', - err - ) - ); + session.push({ + role: 'assistant', + content: endBeforePromiseResolve ? '> Request aborted' : '', + attachments: endBeforePromiseResolve ? [] : attachments, + createdAt: new Date(), }); + void session + .save() + .catch(err => + this.logger.error( + 'Failed to save session in sse stream', + err + ) + ); }), ignoreElements() )