diff --git a/packages/frontend/core/src/components/blocksuite/block-suite-editor/ai/event-source.ts b/packages/frontend/core/src/components/blocksuite/block-suite-editor/ai/event-source.ts new file mode 100644 index 0000000000..27e1b8511c --- /dev/null +++ b/packages/frontend/core/src/components/blocksuite/block-suite-editor/ai/event-source.ts @@ -0,0 +1,86 @@ +import { GeneralNetworkError } from '@blocksuite/blocks'; + +export function delay(ms: number) { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +export type AffineTextEvent = { + type: 'attachment' | 'message'; + data: string; +}; + +type AffineTextStream = AsyncIterable; + +type toTextStreamOptions = { + timeout?: number; +}; + +export function toTextStream( + eventSource: EventSource, + { timeout }: toTextStreamOptions = {} +): AffineTextStream { + return { + [Symbol.asyncIterator]: async function* () { + const messageQueue: AffineTextEvent[] = []; + let resolveMessagePromise: () => void; + let rejectMessagePromise: (err: Error) => void; + + function resetMessagePromise() { + if (resolveMessagePromise) { + resolveMessagePromise(); + } + return new Promise((resolve, reject) => { + resolveMessagePromise = resolve; + rejectMessagePromise = reject; + }); + } + let messagePromise = resetMessagePromise(); + + function messageListener(event: MessageEvent) { + messageQueue.push({ + type: event.type as 'attachment' | 'message', + data: event.data as string, + }); + messagePromise = resetMessagePromise(); + } + + eventSource.addEventListener('message', messageListener); + eventSource.addEventListener('attachment', messageListener); + + eventSource.addEventListener('error', event => { + const errorMessage = (event as unknown as { data: string }).data; + // if there is data in Error event, it means the server sent an error message + // otherwise, the stream is finished successfully + if (event.type === 'error' && errorMessage) { + rejectMessagePromise(new GeneralNetworkError(errorMessage)); + } else { + resolveMessagePromise(); + } + eventSource.close(); + }); + + try { + while (eventSource.readyState !== EventSource.CLOSED) { + if (messageQueue.length === 0) { + // Wait for the next message or timeout + await (timeout + ? Promise.race([ + messagePromise, + delay(timeout).then(() => { + throw new Error('Timeout'); + }), + ]) + : messagePromise); + } else if (messageQueue.length > 0) { + const top = messageQueue.shift(); + if (top) { + yield top; + } + } + } + } finally { + eventSource.close(); + } + }, + }; +} diff --git a/packages/frontend/core/src/components/blocksuite/block-suite-editor/ai/request.ts b/packages/frontend/core/src/components/blocksuite/block-suite-editor/ai/request.ts index af41b8b9b3..fa50d6f29d 100644 --- a/packages/frontend/core/src/components/blocksuite/block-suite-editor/ai/request.ts +++ b/packages/frontend/core/src/components/blocksuite/block-suite-editor/ai/request.ts @@ -1,7 +1,7 @@ -import { toTextStream } from '@blocksuite/presets'; import { partition } from 'lodash-es'; import { CopilotClient } from './copilot-client'; +import { delay, toTextStream } from './event-source'; import type { PromptKey } from './prompt'; const TIMEOUT = 50000; @@ -116,21 +116,22 @@ export function textToText({ params, sessionId, }); - const eventSource = client.chatTextStream({ sessionId: message.sessionId, messageId: message.messageId, }); - yield* toTextStream(eventSource, { timeout }); + for await (const event of toTextStream(eventSource, { timeout })) { + if (event.type === 'message') { + yield event.data; + } + } }, }; } else { return Promise.race([ timeout - ? new Promise((_res, rej) => { - setTimeout(() => { - rej(new Error('Timeout')); - }, timeout); + ? delay(timeout).then(() => { + throw new Error('Timeout'); }) : null, createSessionMessage({ @@ -141,8 +142,8 @@ export function textToText({ attachments, params, sessionId, - }).then(async message => { - return await client.chatText({ + }).then(message => { + return client.chatText({ sessionId: message.sessionId, messageId: message.messageId, }); @@ -175,7 +176,11 @@ export function toImage({ }); const eventSource = client.imagesStream(messageId, sessionId); - yield* toTextStream(eventSource, { timeout, type: 'attachment' }); + for await (const event of toTextStream(eventSource, { timeout })) { + if (event.type === 'attachment') { + yield event.data; + } + } }, }; }