diff --git a/.changeset/consistent-stream-targets.md b/.changeset/consistent-stream-targets.md new file mode 100644 index 0000000000..5fbb578030 --- /dev/null +++ b/.changeset/consistent-stream-targets.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/sdk": patch +--- + +Aligned the SDK's `getRunIdForOptions` logic with the Core package to handle semantic targets (`root`, `parent`) in root tasks. diff --git a/packages/trigger-sdk/src/v3/streams.test.ts b/packages/trigger-sdk/src/v3/streams.test.ts new file mode 100644 index 0000000000..67ee00fd26 --- /dev/null +++ b/packages/trigger-sdk/src/v3/streams.test.ts @@ -0,0 +1,64 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { streams } from "./streams.js"; +import { taskContext, realtimeStreams } from "@trigger.dev/core/v3"; + +vi.mock("@trigger.dev/core/v3", async (importOriginal) => { + const original = await importOriginal(); + return { + ...original, + taskContext: { + ctx: { + run: { + id: "run_123", + // parentTaskRunId and rootTaskRunId are undefined for root tasks + }, + }, + }, + realtimeStreams: { + pipe: vi.fn().mockReturnValue({ + wait: () => Promise.resolve(), + stream: new ReadableStream(), + }), + }, + }; +}); + +describe("streams.pipe consistency", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("should not throw and should use self runId when target is 'root' in a root task", async () => { + const mockStream = new ReadableStream(); + + // This should not throw anymore + const { waitUntilComplete } = streams.pipe("test-key", mockStream, { + target: "root", + }); + + expect(realtimeStreams.pipe).toHaveBeenCalledWith( + "test-key", + mockStream, + expect.objectContaining({ + target: "run_123", + }) + ); + }); + + it("should not throw and should use self runId when target is 'parent' in a root task", async () => { + const mockStream = new ReadableStream(); + + // This should not throw anymore + const { waitUntilComplete } = streams.pipe("test-key", mockStream, { + target: "parent", + }); + + expect(realtimeStreams.pipe).toHaveBeenCalledWith( + "test-key", + mockStream, + expect.objectContaining({ + target: "run_123", + }) + ); + }); +}); diff --git a/packages/trigger-sdk/src/v3/streams.ts b/packages/trigger-sdk/src/v3/streams.ts index 78b70f299c..3bdde70bea 100644 --- a/packages/trigger-sdk/src/v3/streams.ts +++ b/packages/trigger-sdk/src/v3/streams.ts @@ -665,11 +665,11 @@ export const streams = { function getRunIdForOptions(options?: RealtimeStreamOperationOptions): string | undefined { if (options?.target) { if (options.target === "parent") { - return taskContext.ctx?.run?.parentTaskRunId; + return taskContext.ctx?.run?.parentTaskRunId ?? taskContext.ctx?.run?.id; } if (options.target === "root") { - return taskContext.ctx?.run?.rootTaskRunId; + return taskContext.ctx?.run?.rootTaskRunId ?? taskContext.ctx?.run?.id; } if (options.target === "self") {