diff --git a/.changeset/smart-coins-hammer.md b/.changeset/smart-coins-hammer.md new file mode 100644 index 0000000000..bea810c6ca --- /dev/null +++ b/.changeset/smart-coins-hammer.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +fix: Realtime streams: prevent enqueuing into closed ReadableStream diff --git a/packages/core/src/v3/apiClient/stream.ts b/packages/core/src/v3/apiClient/stream.ts index 77487954cf..d73d7d4a3f 100644 --- a/packages/core/src/v3/apiClient/stream.ts +++ b/packages/core/src/v3/apiClient/stream.ts @@ -89,8 +89,10 @@ class ReadableShapeStream = Row> { readonly #changeStream: AsyncIterableStream; #error: FetchError | false = false; #unsubscribe?: () => void; + #isStreamClosed: boolean = false; stop() { + this.#isStreamClosed = true; this.#unsubscribe?.(); } @@ -101,52 +103,69 @@ class ReadableShapeStream = Row> { const source = new ReadableStream[]>({ start: (controller) => { this.#unsubscribe = this.#stream.subscribe( - (messages) => controller.enqueue(messages), + (messages) => { + if (!this.#isStreamClosed) { + controller.enqueue(messages); + } + }, this.#handleError.bind(this) ); }, + cancel: () => { + this.#isStreamClosed = true; + this.#unsubscribe?.(); + } }); // Create the transformed stream that processes messages and emits complete rows this.#changeStream = createAsyncIterableStream(source, { transform: (messages, controller) => { - const updatedKeys = new Set(); - - for (const message of messages) { - if (isChangeMessage(message)) { - const key = message.key; - switch (message.headers.operation) { - case "insert": { - // New row entirely - this.#currentState.set(key, message.value); - updatedKeys.add(key); - break; + if (this.#isStreamClosed) { + return; + } + + try { + const updatedKeys = new Set(); + + for (const message of messages) { + if (isChangeMessage(message)) { + const key = message.key; + switch (message.headers.operation) { + case "insert": { + this.#currentState.set(key, message.value); + updatedKeys.add(key); + break; + } + case "update": { + const existingRow = this.#currentState.get(key); + const updatedRow = existingRow + ? { ...existingRow, ...message.value } + : message.value; + this.#currentState.set(key, updatedRow); + updatedKeys.add(key); + break; + } } - case "update": { - // Merge updates into existing row if any, otherwise treat as new - const existingRow = this.#currentState.get(key); - const updatedRow = existingRow - ? { ...existingRow, ...message.value } - : message.value; - this.#currentState.set(key, updatedRow); - updatedKeys.add(key); - break; + } else if (isControlMessage(message)) { + if (message.headers.control === "must-refetch") { + this.#currentState.clear(); + this.#error = false; } } - } else if (isControlMessage(message)) { - if (message.headers.control === "must-refetch") { - this.#currentState.clear(); - this.#error = false; - } } - } - // Now enqueue only one updated row per key, after all messages have been processed. - for (const key of updatedKeys) { - const finalRow = this.#currentState.get(key); - if (finalRow) { - controller.enqueue(finalRow); + // Now enqueue only one updated row per key, after all messages have been processed. + if (!this.#isStreamClosed) { + for (const key of updatedKeys) { + const finalRow = this.#currentState.get(key); + if (finalRow) { + controller.enqueue(finalRow); + } + } } + } catch (error) { + console.error("Error processing stream messages:", error); + this.#handleError(error as Error); } }, }); @@ -192,6 +211,8 @@ class ReadableShapeStream = Row> { if (e instanceof FetchError) { this.#error = e; } + this.#isStreamClosed = true; + this.#unsubscribe?.(); } }