diff --git a/packages/backend/server/src/base/nestjs/exception.ts b/packages/backend/server/src/base/nestjs/exception.ts index 3b1cf06fff..fa7675c73d 100644 --- a/packages/backend/server/src/base/nestjs/exception.ts +++ b/packages/backend/server/src/base/nestjs/exception.ts @@ -134,9 +134,9 @@ export const GatewayErrorWrapper = (event: string): MethodDecorator => { }; }; -export function mapSseError(originalError: any) { +export function mapSseError(originalError: any, info: object) { const error = mapAnyError(originalError); - error.log('Sse'); + error.log('Sse', info); metrics.sse.counter('error').add(1, { status: error.status }); return of({ type: 'error' as const, diff --git a/packages/backend/server/src/plugins/copilot/controller.ts b/packages/backend/server/src/plugins/copilot/controller.ts index 5ac2735ee8..afec7bf2ff 100644 --- a/packages/backend/server/src/plugins/copilot/controller.ts +++ b/packages/backend/server/src/plugins/copilot/controller.ts @@ -245,16 +245,20 @@ export class CopilotController implements BeforeApplicationShutdown { @Param('sessionId') sessionId: string, @Query() params: Record ): Promise> { - const { messageId } = this.prepareParams(params); + const info: any = { sessionId, params, throwInStream: false }; - const provider = await this.chooseTextProvider( - user.id, - sessionId, - messageId - ); - - const session = await this.appendSessionMessage(sessionId, messageId); try { + const { messageId } = this.prepareParams(params); + + const provider = await this.chooseTextProvider( + user.id, + sessionId, + messageId + ); + + const session = await this.appendSessionMessage(sessionId, messageId); + info.model = session.model; + metrics.ai.counter('chat_stream_calls').add(1, { model: session.model }); this.ongoingStreamCount$.next(this.ongoingStreamCount$.value + 1); const source$ = from( @@ -286,10 +290,9 @@ export class CopilotController implements BeforeApplicationShutdown { ) ), catchError(e => { - metrics.ai - .counter('chat_stream_errors') - .add(1, { model: session.model }); - return mapSseError(e); + metrics.ai.counter('chat_stream_errors').add(1); + info.throwInStream = true; + return mapSseError(e, info); }), finalize(() => { this.ongoingStreamCount$.next(this.ongoingStreamCount$.value - 1); @@ -298,8 +301,8 @@ export class CopilotController implements BeforeApplicationShutdown { return this.mergePingStream(messageId, source$); } catch (err) { - metrics.ai.counter('chat_stream_errors').add(1, { model: session.model }); - return mapSseError(err); + metrics.ai.counter('chat_stream_errors').add(1, info); + return mapSseError(err, info); } } @@ -311,10 +314,13 @@ export class CopilotController implements BeforeApplicationShutdown { @Param('sessionId') sessionId: string, @Query() params: Record ): Promise> { - const { messageId } = this.prepareParams(params); - - const session = await this.appendSessionMessage(sessionId, messageId); + const info: any = { sessionId, params, throwInStream: false }; try { + const { messageId } = this.prepareParams(params); + + const session = await this.appendSessionMessage(sessionId, messageId); + info.model = session.model; + metrics.ai.counter('workflow_calls').add(1, { model: session.model }); const latestMessage = session.stashMessages.findLast( m => m.role === 'user' @@ -383,10 +389,9 @@ export class CopilotController implements BeforeApplicationShutdown { ) ), catchError(e => { - metrics.ai - .counter('workflow_errors') - .add(1, { model: session.model }); - return mapSseError(e); + metrics.ai.counter('workflow_errors').add(1, info); + info.throwInStream = true; + return mapSseError(e, info); }), finalize(() => this.ongoingStreamCount$.next(this.ongoingStreamCount$.value - 1) @@ -395,8 +400,8 @@ export class CopilotController implements BeforeApplicationShutdown { return this.mergePingStream(messageId, source$); } catch (err) { - metrics.ai.counter('workflow_errors').add(1, { model: session.model }); - return mapSseError(err); + metrics.ai.counter('workflow_errors').add(1, info); + return mapSseError(err, info); } } @@ -408,25 +413,28 @@ export class CopilotController implements BeforeApplicationShutdown { @Param('sessionId') sessionId: string, @Query() params: Record ): Promise> { - const { messageId } = this.prepareParams(params); - - const { model, hasAttachment } = await this.checkRequest( - user.id, - sessionId, - messageId - ); - const provider = await this.provider.getProviderByCapability( - hasAttachment - ? CopilotCapability.ImageToImage - : CopilotCapability.TextToImage, - model - ); - if (!provider) { - throw new NoCopilotProviderAvailable(); - } - - const session = await this.appendSessionMessage(sessionId, messageId); + const info: any = { sessionId, params, throwInStream: false }; try { + const { messageId } = this.prepareParams(params); + + const { model, hasAttachment } = await this.checkRequest( + user.id, + sessionId, + messageId + ); + const provider = await this.provider.getProviderByCapability( + hasAttachment + ? CopilotCapability.ImageToImage + : CopilotCapability.TextToImage, + model + ); + if (!provider) { + throw new NoCopilotProviderAvailable(); + } + + const session = await this.appendSessionMessage(sessionId, messageId); + info.model = session.model; + metrics.ai .counter('images_stream_calls') .add(1, { model: session.model }); @@ -472,10 +480,9 @@ export class CopilotController implements BeforeApplicationShutdown { ) ), catchError(e => { - metrics.ai - .counter('images_stream_errors') - .add(1, { model: session.model }); - return mapSseError(e); + metrics.ai.counter('images_stream_errors').add(1, info); + info.throwInStream = true; + return mapSseError(e, info); }), finalize(() => this.ongoingStreamCount$.next(this.ongoingStreamCount$.value - 1) @@ -484,10 +491,8 @@ export class CopilotController implements BeforeApplicationShutdown { return this.mergePingStream(messageId, source$); } catch (err) { - metrics.ai - .counter('images_stream_errors') - .add(1, { model: session.model }); - return mapSseError(err); + metrics.ai.counter('images_stream_errors').add(1, info); + return mapSseError(err, info); } }