From 4305ca94cdfac3114de20a354eb7e7f447687e6f Mon Sep 17 00:00:00 2001 From: bharathkumar39293 Date: Tue, 13 Jan 2026 09:25:03 +0530 Subject: [PATCH 1/4] fix(sdk): fallback to self for root/parent stream targets in root tasks --- packages/trigger-sdk/src/v3/streams.test.ts | 64 +++++++++++++++++++++ packages/trigger-sdk/src/v3/streams.ts | 4 +- 2 files changed, 66 insertions(+), 2 deletions(-) create mode 100644 packages/trigger-sdk/src/v3/streams.test.ts diff --git a/packages/trigger-sdk/src/v3/streams.test.ts b/packages/trigger-sdk/src/v3/streams.test.ts new file mode 100644 index 0000000000..67ee00fd26 --- /dev/null +++ b/packages/trigger-sdk/src/v3/streams.test.ts @@ -0,0 +1,64 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { streams } from "./streams.js"; +import { taskContext, realtimeStreams } from "@trigger.dev/core/v3"; + +vi.mock("@trigger.dev/core/v3", async (importOriginal) => { + const original = await importOriginal(); + return { + ...original, + taskContext: { + ctx: { + run: { + id: "run_123", + // parentTaskRunId and rootTaskRunId are undefined for root tasks + }, + }, + }, + realtimeStreams: { + pipe: vi.fn().mockReturnValue({ + wait: () => Promise.resolve(), + stream: new ReadableStream(), + }), + }, + }; +}); + +describe("streams.pipe consistency", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("should not throw and should use self runId when target is 'root' in a root task", async () => { + const mockStream = new ReadableStream(); + + // This should not throw anymore + const { waitUntilComplete } = streams.pipe("test-key", mockStream, { + target: "root", + }); + + expect(realtimeStreams.pipe).toHaveBeenCalledWith( + "test-key", + mockStream, + expect.objectContaining({ + target: "run_123", + }) + ); + }); + + it("should not throw and should use self runId when target is 'parent' in a root task", async () => { + const mockStream = new ReadableStream(); + + // This should not throw anymore + const { waitUntilComplete } = streams.pipe("test-key", mockStream, { + target: "parent", + }); + + expect(realtimeStreams.pipe).toHaveBeenCalledWith( + "test-key", + mockStream, + expect.objectContaining({ + target: "run_123", + }) + ); + }); +}); diff --git a/packages/trigger-sdk/src/v3/streams.ts b/packages/trigger-sdk/src/v3/streams.ts index 78b70f299c..3bdde70bea 100644 --- a/packages/trigger-sdk/src/v3/streams.ts +++ b/packages/trigger-sdk/src/v3/streams.ts @@ -665,11 +665,11 @@ export const streams = { function getRunIdForOptions(options?: RealtimeStreamOperationOptions): string | undefined { if (options?.target) { if (options.target === "parent") { - return taskContext.ctx?.run?.parentTaskRunId; + return taskContext.ctx?.run?.parentTaskRunId ?? taskContext.ctx?.run?.id; } if (options.target === "root") { - return taskContext.ctx?.run?.rootTaskRunId; + return taskContext.ctx?.run?.rootTaskRunId ?? taskContext.ctx?.run?.id; } if (options.target === "self") { From e9fe93deb57fae64181f716ddc43a35495bc53f1 Mon Sep 17 00:00:00 2001 From: bharathkumar39293 Date: Tue, 13 Jan 2026 09:29:22 +0530 Subject: [PATCH 2/4] fix(sdk): fallback to self for root/parent stream targets in root tasks --- .changeset/consistent-stream-targets.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/consistent-stream-targets.md diff --git a/.changeset/consistent-stream-targets.md b/.changeset/consistent-stream-targets.md new file mode 100644 index 0000000000..5fbb578030 --- /dev/null +++ b/.changeset/consistent-stream-targets.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/sdk": patch +--- + +Aligned the SDK's `getRunIdForOptions` logic with the Core package to handle semantic targets (`root`, `parent`) in root tasks. From e229d615ca5c749fd9708b65fb0b9f86d0a5a50a Mon Sep 17 00:00:00 2001 From: bharathkumar39293 Date: Tue, 13 Jan 2026 10:29:31 +0530 Subject: [PATCH 3/4] Apply throttle changes --- apps/webapp/app/utils/throttle.ts | 32 +++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/apps/webapp/app/utils/throttle.ts b/apps/webapp/app/utils/throttle.ts index a6c1a77a32..24f904fb29 100644 --- a/apps/webapp/app/utils/throttle.ts +++ b/apps/webapp/app/utils/throttle.ts @@ -1,20 +1,28 @@ -//From: https://kettanaito.com/blog/debounce-vs-throttle - -/** A very simple throttle. Will execute the function at the end of each period and discard any other calls during that period. */ +/** A throttle that fires the first call immediately and ensures the last call during the duration is also fired. */ export function throttle( func: (...args: any[]) => void, durationMs: number ): (...args: any[]) => void { - let isPrimedToFire = false; - - return (...args: any[]) => { - if (!isPrimedToFire) { - isPrimedToFire = true; + let timeoutId: NodeJS.Timeout | null = null; + let nextArgs: any[] | null = null; - setTimeout(() => { - func(...args); - isPrimedToFire = false; - }, durationMs); + const wrapped = (...args: any[]) => { + if (timeoutId) { + nextArgs = args; + return; } + + func(...args); + + timeoutId = setTimeout(() => { + timeoutId = null; + if (nextArgs) { + const argsToUse = nextArgs; + nextArgs = null; + wrapped(...argsToUse); + } + }, durationMs); }; + + return wrapped; } From 50737c28b7b00b7d0c57765d73ebdeb126a48a20 Mon Sep 17 00:00:00 2001 From: bharathkumar39293 Date: Tue, 13 Jan 2026 13:28:40 +0530 Subject: [PATCH 4/4] revert(webapp): remove unrelated throttle change from SDK PR --- apps/webapp/app/utils/throttle.ts | 32 ++++++++++++------------------- 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/apps/webapp/app/utils/throttle.ts b/apps/webapp/app/utils/throttle.ts index 24f904fb29..a6c1a77a32 100644 --- a/apps/webapp/app/utils/throttle.ts +++ b/apps/webapp/app/utils/throttle.ts @@ -1,28 +1,20 @@ -/** A throttle that fires the first call immediately and ensures the last call during the duration is also fired. */ +//From: https://kettanaito.com/blog/debounce-vs-throttle + +/** A very simple throttle. Will execute the function at the end of each period and discard any other calls during that period. */ export function throttle( func: (...args: any[]) => void, durationMs: number ): (...args: any[]) => void { - let timeoutId: NodeJS.Timeout | null = null; - let nextArgs: any[] | null = null; - - const wrapped = (...args: any[]) => { - if (timeoutId) { - nextArgs = args; - return; - } + let isPrimedToFire = false; - func(...args); + return (...args: any[]) => { + if (!isPrimedToFire) { + isPrimedToFire = true; - timeoutId = setTimeout(() => { - timeoutId = null; - if (nextArgs) { - const argsToUse = nextArgs; - nextArgs = null; - wrapped(...argsToUse); - } - }, durationMs); + setTimeout(() => { + func(...args); + isPrimedToFire = false; + }, durationMs); + } }; - - return wrapped; }