feat: better error handle for sse endpoint (#10155)

fix CLOUD-123
This commit is contained in:
darkskygit
2025-02-13 10:10:13 +00:00
parent a7de6edfef
commit 899b1d60e0
2 changed files with 56 additions and 51 deletions

View File

@@ -134,9 +134,9 @@ export const GatewayErrorWrapper = (event: string): MethodDecorator => {
}; };
}; };
export function mapSseError(originalError: any) { export function mapSseError(originalError: any, info: object) {
const error = mapAnyError(originalError); const error = mapAnyError(originalError);
error.log('Sse'); error.log('Sse', info);
metrics.sse.counter('error').add(1, { status: error.status }); metrics.sse.counter('error').add(1, { status: error.status });
return of({ return of({
type: 'error' as const, type: 'error' as const,

View File

@@ -245,16 +245,20 @@ export class CopilotController implements BeforeApplicationShutdown {
@Param('sessionId') sessionId: string, @Param('sessionId') sessionId: string,
@Query() params: Record<string, string> @Query() params: Record<string, string>
): Promise<Observable<ChatEvent>> { ): Promise<Observable<ChatEvent>> {
const { messageId } = this.prepareParams(params); const info: any = { sessionId, params, throwInStream: false };
const provider = await this.chooseTextProvider(
user.id,
sessionId,
messageId
);
const session = await this.appendSessionMessage(sessionId, messageId);
try { try {
const { messageId } = this.prepareParams(params);
const provider = await this.chooseTextProvider(
user.id,
sessionId,
messageId
);
const session = await this.appendSessionMessage(sessionId, messageId);
info.model = session.model;
metrics.ai.counter('chat_stream_calls').add(1, { model: session.model }); metrics.ai.counter('chat_stream_calls').add(1, { model: session.model });
this.ongoingStreamCount$.next(this.ongoingStreamCount$.value + 1); this.ongoingStreamCount$.next(this.ongoingStreamCount$.value + 1);
const source$ = from( const source$ = from(
@@ -286,10 +290,9 @@ export class CopilotController implements BeforeApplicationShutdown {
) )
), ),
catchError(e => { catchError(e => {
metrics.ai metrics.ai.counter('chat_stream_errors').add(1);
.counter('chat_stream_errors') info.throwInStream = true;
.add(1, { model: session.model }); return mapSseError(e, info);
return mapSseError(e);
}), }),
finalize(() => { finalize(() => {
this.ongoingStreamCount$.next(this.ongoingStreamCount$.value - 1); this.ongoingStreamCount$.next(this.ongoingStreamCount$.value - 1);
@@ -298,8 +301,8 @@ export class CopilotController implements BeforeApplicationShutdown {
return this.mergePingStream(messageId, source$); return this.mergePingStream(messageId, source$);
} catch (err) { } catch (err) {
metrics.ai.counter('chat_stream_errors').add(1, { model: session.model }); metrics.ai.counter('chat_stream_errors').add(1, info);
return mapSseError(err); return mapSseError(err, info);
} }
} }
@@ -311,10 +314,13 @@ export class CopilotController implements BeforeApplicationShutdown {
@Param('sessionId') sessionId: string, @Param('sessionId') sessionId: string,
@Query() params: Record<string, string> @Query() params: Record<string, string>
): Promise<Observable<ChatEvent>> { ): Promise<Observable<ChatEvent>> {
const { messageId } = this.prepareParams(params); const info: any = { sessionId, params, throwInStream: false };
const session = await this.appendSessionMessage(sessionId, messageId);
try { try {
const { messageId } = this.prepareParams(params);
const session = await this.appendSessionMessage(sessionId, messageId);
info.model = session.model;
metrics.ai.counter('workflow_calls').add(1, { model: session.model }); metrics.ai.counter('workflow_calls').add(1, { model: session.model });
const latestMessage = session.stashMessages.findLast( const latestMessage = session.stashMessages.findLast(
m => m.role === 'user' m => m.role === 'user'
@@ -383,10 +389,9 @@ export class CopilotController implements BeforeApplicationShutdown {
) )
), ),
catchError(e => { catchError(e => {
metrics.ai metrics.ai.counter('workflow_errors').add(1, info);
.counter('workflow_errors') info.throwInStream = true;
.add(1, { model: session.model }); return mapSseError(e, info);
return mapSseError(e);
}), }),
finalize(() => finalize(() =>
this.ongoingStreamCount$.next(this.ongoingStreamCount$.value - 1) this.ongoingStreamCount$.next(this.ongoingStreamCount$.value - 1)
@@ -395,8 +400,8 @@ export class CopilotController implements BeforeApplicationShutdown {
return this.mergePingStream(messageId, source$); return this.mergePingStream(messageId, source$);
} catch (err) { } catch (err) {
metrics.ai.counter('workflow_errors').add(1, { model: session.model }); metrics.ai.counter('workflow_errors').add(1, info);
return mapSseError(err); return mapSseError(err, info);
} }
} }
@@ -408,25 +413,28 @@ export class CopilotController implements BeforeApplicationShutdown {
@Param('sessionId') sessionId: string, @Param('sessionId') sessionId: string,
@Query() params: Record<string, string> @Query() params: Record<string, string>
): Promise<Observable<ChatEvent>> { ): Promise<Observable<ChatEvent>> {
const { messageId } = this.prepareParams(params); const info: any = { sessionId, params, throwInStream: false };
const { model, hasAttachment } = await this.checkRequest(
user.id,
sessionId,
messageId
);
const provider = await this.provider.getProviderByCapability(
hasAttachment
? CopilotCapability.ImageToImage
: CopilotCapability.TextToImage,
model
);
if (!provider) {
throw new NoCopilotProviderAvailable();
}
const session = await this.appendSessionMessage(sessionId, messageId);
try { try {
const { messageId } = this.prepareParams(params);
const { model, hasAttachment } = await this.checkRequest(
user.id,
sessionId,
messageId
);
const provider = await this.provider.getProviderByCapability(
hasAttachment
? CopilotCapability.ImageToImage
: CopilotCapability.TextToImage,
model
);
if (!provider) {
throw new NoCopilotProviderAvailable();
}
const session = await this.appendSessionMessage(sessionId, messageId);
info.model = session.model;
metrics.ai metrics.ai
.counter('images_stream_calls') .counter('images_stream_calls')
.add(1, { model: session.model }); .add(1, { model: session.model });
@@ -472,10 +480,9 @@ export class CopilotController implements BeforeApplicationShutdown {
) )
), ),
catchError(e => { catchError(e => {
metrics.ai metrics.ai.counter('images_stream_errors').add(1, info);
.counter('images_stream_errors') info.throwInStream = true;
.add(1, { model: session.model }); return mapSseError(e, info);
return mapSseError(e);
}), }),
finalize(() => finalize(() =>
this.ongoingStreamCount$.next(this.ongoingStreamCount$.value - 1) this.ongoingStreamCount$.next(this.ongoingStreamCount$.value - 1)
@@ -484,10 +491,8 @@ export class CopilotController implements BeforeApplicationShutdown {
return this.mergePingStream(messageId, source$); return this.mergePingStream(messageId, source$);
} catch (err) { } catch (err) {
metrics.ai metrics.ai.counter('images_stream_errors').add(1, info);
.counter('images_stream_errors') return mapSseError(err, info);
.add(1, { model: session.model });
return mapSseError(err);
} }
} }