From bfda16bcac889b77d37f9c2e94e209b58558317d Mon Sep 17 00:00:00 2001 From: bharathkumar39293 Date: Tue, 13 Jan 2026 10:29:31 +0530 Subject: [PATCH 01/11] Apply throttle changes --- apps/webapp/app/utils/throttle.ts | 32 +++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/apps/webapp/app/utils/throttle.ts b/apps/webapp/app/utils/throttle.ts index a6c1a77a32..24f904fb29 100644 --- a/apps/webapp/app/utils/throttle.ts +++ b/apps/webapp/app/utils/throttle.ts @@ -1,20 +1,28 @@ -//From: https://kettanaito.com/blog/debounce-vs-throttle - -/** A very simple throttle. Will execute the function at the end of each period and discard any other calls during that period. */ +/** A throttle that fires the first call immediately and ensures the last call during the duration is also fired. */ export function throttle( func: (...args: any[]) => void, durationMs: number ): (...args: any[]) => void { - let isPrimedToFire = false; - - return (...args: any[]) => { - if (!isPrimedToFire) { - isPrimedToFire = true; + let timeoutId: NodeJS.Timeout | null = null; + let nextArgs: any[] | null = null; - setTimeout(() => { - func(...args); - isPrimedToFire = false; - }, durationMs); + const wrapped = (...args: any[]) => { + if (timeoutId) { + nextArgs = args; + return; } + + func(...args); + + timeoutId = setTimeout(() => { + timeoutId = null; + if (nextArgs) { + const argsToUse = nextArgs; + nextArgs = null; + wrapped(...argsToUse); + } + }, durationMs); }; + + return wrapped; } From e8d84fa14b9e36b66dfae04d1e5c89e96070f30e Mon Sep 17 00:00:00 2001 From: bharathkumar39293 Date: Tue, 13 Jan 2026 11:34:26 +0530 Subject: [PATCH 02/11] fix(webapp): reconcile trace with run lifecycle to handle clickhouse lag --- .../app/presenters/v3/RunPresenter.server.ts | 131 +++++++++++----- apps/webapp/test/RunPresenter.test.ts | 141 ++++++++++++++++++ 2 files changed, 237 insertions(+), 35 deletions(-) create mode 100644 apps/webapp/test/RunPresenter.test.ts diff --git a/apps/webapp/app/presenters/v3/RunPresenter.server.ts b/apps/webapp/app/presenters/v3/RunPresenter.server.ts index 437d6b6458..e18a740b0f 100644 --- a/apps/webapp/app/presenters/v3/RunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunPresenter.server.ts @@ -6,7 +6,7 @@ import { getUsername } from "~/utils/username"; import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server"; import { SpanSummary } from "~/v3/eventRepository/eventRepository.types"; import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; -import { isFinalRunStatus } from "~/v3/taskStatus"; +import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus"; import { env } from "~/env.server"; type Result = Awaited>; @@ -211,49 +211,47 @@ export class RunPresenter { let totalDuration = tree?.data.duration ?? 0; const events = tree ? flattenTree(tree).map((n) => { - const offset = millisecondsToNanoseconds( - n.data.startTime.getTime() - treeRootStartTimeMs - ); - //only let non-debug events extend the total duration - if (!n.data.isDebug) { - totalDuration = Math.max(totalDuration, offset + n.data.duration); - } - return { - ...n, - data: { - ...n.data, - timelineEvents: createTimelineSpanEventsFromSpanEvents( - n.data.events, - user?.admin ?? false, - treeRootStartTimeMs - ), - //set partial nodes to null duration - duration: n.data.isPartial ? null : n.data.duration, - offset, - isRoot: n.id === traceSummary.rootSpan.id, - }, - }; - }) + const offset = millisecondsToNanoseconds( + n.data.startTime.getTime() - treeRootStartTimeMs + ); + //only let non-debug events extend the total duration + if (!n.data.isDebug) { + totalDuration = Math.max(totalDuration, offset + n.data.duration); + } + return { + ...n, + data: { + ...n.data, + timelineEvents: createTimelineSpanEventsFromSpanEvents( + n.data.events, + user?.admin ?? false, + treeRootStartTimeMs + ), + //set partial nodes to null duration + duration: n.data.isPartial ? null : n.data.duration, + offset, + isRoot: n.id === traceSummary.rootSpan.id, + }, + }; + }) : []; //total duration should be a minimum of 1ms totalDuration = Math.max(totalDuration, millisecondsToNanoseconds(1)); - let rootSpanStatus: "executing" | "completed" | "failed" = "executing"; - if (events[0]) { - if (events[0].data.isError) { - rootSpanStatus = "failed"; - } else if (!events[0].data.isPartial) { - rootSpanStatus = "completed"; - } - } + const reconciled = reconcileTraceWithRunLifecycle( + runData, + traceSummary.rootSpan.id, + events, + totalDuration + ); return { run: runData, trace: { - rootSpanStatus, - events: events, - duration: totalDuration, + rootSpanStatus: reconciled.rootSpanStatus, + events: reconciled.events, + duration: reconciled.totalDuration, rootStartedAt: tree?.data.startTime, startedAt: run.startedAt, queuedDuration: run.startedAt @@ -265,3 +263,66 @@ export class RunPresenter { }; } } + +// NOTE: Clickhouse trace ingestion is eventually consistent. +// When a run is marked finished in Postgres, we reconcile the +// root span to reflect completion even if telemetry is still partial. +// This is a deliberate UI-layer tradeoff to prevent stale or "stuck" +// run states in the dashboard. +export function reconcileTraceWithRunLifecycle( + runData: { + isFinished: boolean; + status: Run["status"]; + createdAt: Date; + completedAt: Date | null; + rootTaskRun: { createdAt: Date } | null; + }, + rootSpanId: string, + events: RunEvent[], + totalDuration: number +): { + events: RunEvent[]; + totalDuration: number; + rootSpanStatus: "executing" | "completed" | "failed"; +} { + const currentStatus: "executing" | "completed" | "failed" = events[0] + ? events[0].data.isError + ? "failed" + : !events[0].data.isPartial + ? "completed" + : "executing" + : "executing"; + + if (!runData.isFinished) { + return { events, totalDuration, rootSpanStatus: currentStatus }; + } + + const postgresRunDuration = runData.completedAt + ? millisecondsToNanoseconds( + runData.completedAt.getTime() - (runData.rootTaskRun?.createdAt ?? runData.createdAt).getTime() + ) + : 0; + + const updatedTotalDuration = Math.max(totalDuration, postgresRunDuration); + + const updatedEvents = events.map((e) => { + if (e.id === rootSpanId && e.data.isPartial) { + return { + ...e, + data: { + ...e.data, + isPartial: false, + duration: Math.max(e.data.duration ?? 0, postgresRunDuration), + isError: isFailedRunStatus(runData.status), + }, + }; + } + return e; + }); + + return { + events: updatedEvents, + totalDuration: updatedTotalDuration, + rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed", + }; +} diff --git a/apps/webapp/test/RunPresenter.test.ts b/apps/webapp/test/RunPresenter.test.ts new file mode 100644 index 0000000000..35fe59e7bd --- /dev/null +++ b/apps/webapp/test/RunPresenter.test.ts @@ -0,0 +1,141 @@ +import { vi, describe, it, expect } from "vitest"; + +vi.mock("../app/env.server", () => ({ + env: { + MAXIMUM_LIVE_RELOADING_EVENTS: 1000, + }, +})); + +vi.mock("../app/db.server", () => ({ + prisma: {}, + $replica: {}, + $transaction: vi.fn(), +})); + +vi.mock("../app/v3/eventRepository/index.server", () => ({ + resolveEventRepositoryForStore: vi.fn(), +})); + +vi.mock("../app/v3/taskEventStore.server", () => ({ + getTaskEventStoreTableForRun: vi.fn(), +})); + +vi.mock("../app/utils/username", () => ({ + getUsername: vi.fn(), +})); + +import { reconcileTraceWithRunLifecycle } from "../app/presenters/v3/RunPresenter.server"; +import { millisecondsToNanoseconds } from "@trigger.dev/core/v3"; + +describe("reconcileTraceWithRunLifecycle", () => { + const rootSpanId = "root-span-id"; + const createdAt = new Date("2024-01-01T00:00:00Z"); + const completedAt = new Date("2024-01-01T00:00:05Z"); + + const runData: any = { + isFinished: true, + status: "COMPLETED_SUCCESSFULLY", + createdAt, + completedAt, + rootTaskRun: null, + }; + + const initialEvents = [ + { + id: rootSpanId, + data: { + isPartial: true, + duration: millisecondsToNanoseconds(1000), // 1s, less than the 5s run duration + isError: false, + }, + }, + { + id: "child-span-id", + data: { + isPartial: false, + duration: millisecondsToNanoseconds(500), + isError: false, + }, + }, + ]; + + it("should reconcile a finished run with lagging partial telemetry", () => { + const totalDuration = millisecondsToNanoseconds(1000); + const result = reconcileTraceWithRunLifecycle(runData, rootSpanId, initialEvents as any, totalDuration); + + expect(result.rootSpanStatus).toBe("completed"); + + const rootEvent = result.events.find((e: any) => e.id === rootSpanId); + expect(rootEvent?.data.isPartial).toBe(false); + // 5s duration = 5000ms + expect(rootEvent?.data.duration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(5000)); + expect(result.totalDuration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(5000)); + }); + + it("should not override duration if Clickhouse already has a longer finished duration", () => { + const longDuration = millisecondsToNanoseconds(10000); + const finishedEvents = [ + { + id: rootSpanId, + data: { + isPartial: false, + duration: longDuration, + isError: false, + }, + }, + ]; + + const result = reconcileTraceWithRunLifecycle(runData, rootSpanId, finishedEvents as any, longDuration); + + const rootEvent = result.events.find((e: any) => e.id === rootSpanId); + expect(rootEvent?.data.duration).toBe(longDuration); + expect(rootEvent?.data.isPartial).toBe(false); + expect(result.totalDuration).toBe(longDuration); + }); + + it("should handle unfinished runs without modification", () => { + const unfinishedRun = { ...runData, isFinished: false, completedAt: null }; + const totalDuration = millisecondsToNanoseconds(1000); + const result = reconcileTraceWithRunLifecycle(unfinishedRun, rootSpanId, initialEvents as any, totalDuration); + + expect(result.rootSpanStatus).toBe("executing"); + + const rootEvent = result.events.find((e: any) => e.id === rootSpanId); + expect(rootEvent?.data.isPartial).toBe(true); + expect(rootEvent?.data.duration).toBe(millisecondsToNanoseconds(1000)); + }); + + it("should reconcile failed runs correctly", () => { + const failedRun = { ...runData, status: "COMPLETED_WITH_ERRORS" }; + const result = reconcileTraceWithRunLifecycle(failedRun, rootSpanId, initialEvents as any, millisecondsToNanoseconds(1000)); + + expect(result.rootSpanStatus).toBe("failed"); + const rootEvent = result.events.find((e: any) => e.id === rootSpanId); + expect(rootEvent?.data.isError).toBe(true); + expect(rootEvent?.data.isPartial).toBe(false); + }); + + it("should use rootTaskRun createdAt if available for duration calculation", () => { + const rootTaskCreatedAt = new Date("2023-12-31T23:59:50Z"); // 10s before run.createdAt + const runDataWithRoot: any = { + ...runData, + rootTaskRun: { createdAt: rootTaskCreatedAt }, + }; + + const result = reconcileTraceWithRunLifecycle(runDataWithRoot, rootSpanId, initialEvents as any, millisecondsToNanoseconds(1000)); + + // Duration should be from 23:59:50 to 00:00:05 = 15s + const rootEvent = result.events.find((e: any) => e.id === rootSpanId); + expect(rootEvent?.data.duration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(15000)); + expect(result.totalDuration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(15000)); + }); + + it("should handle missing root span gracefully", () => { + const result = reconcileTraceWithRunLifecycle(runData, "non-existent-id", initialEvents as any, millisecondsToNanoseconds(1000)); + + expect(result.rootSpanStatus).toBe("completed"); + expect(result.events).toEqual(initialEvents); + // totalDuration should still be updated to postgres duration even if root span is missing from events list + expect(result.totalDuration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(5000)); + }); +}); From b591e284c0b1ff67e37260c129664f1c23cd8c38 Mon Sep 17 00:00:00 2001 From: bharathkumar39293 Date: Tue, 13 Jan 2026 11:53:01 +0530 Subject: [PATCH 03/11] chore(webapp): revert unrelated throttle changes --- apps/webapp/app/utils/throttle.ts | 32 ++++++++++++------------------- 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/apps/webapp/app/utils/throttle.ts b/apps/webapp/app/utils/throttle.ts index 24f904fb29..a6c1a77a32 100644 --- a/apps/webapp/app/utils/throttle.ts +++ b/apps/webapp/app/utils/throttle.ts @@ -1,28 +1,20 @@ -/** A throttle that fires the first call immediately and ensures the last call during the duration is also fired. */ +//From: https://kettanaito.com/blog/debounce-vs-throttle + +/** A very simple throttle. Will execute the function at the end of each period and discard any other calls during that period. */ export function throttle( func: (...args: any[]) => void, durationMs: number ): (...args: any[]) => void { - let timeoutId: NodeJS.Timeout | null = null; - let nextArgs: any[] | null = null; - - const wrapped = (...args: any[]) => { - if (timeoutId) { - nextArgs = args; - return; - } + let isPrimedToFire = false; - func(...args); + return (...args: any[]) => { + if (!isPrimedToFire) { + isPrimedToFire = true; - timeoutId = setTimeout(() => { - timeoutId = null; - if (nextArgs) { - const argsToUse = nextArgs; - nextArgs = null; - wrapped(...argsToUse); - } - }, durationMs); + setTimeout(() => { + func(...args); + isPrimedToFire = false; + }, durationMs); + } }; - - return wrapped; } From 425c3b4e6c497cbdb9a063cff0bd816a5dabce98 Mon Sep 17 00:00:00 2001 From: bharathkumar39293 Date: Tue, 13 Jan 2026 12:04:27 +0530 Subject: [PATCH 04/11] style(webapp): clean up imports and logic in RunPresenter --- apps/webapp/app/presenters/v3/RunPresenter.server.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/presenters/v3/RunPresenter.server.ts b/apps/webapp/app/presenters/v3/RunPresenter.server.ts index e18a740b0f..2b19a722e0 100644 --- a/apps/webapp/app/presenters/v3/RunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunPresenter.server.ts @@ -299,7 +299,8 @@ export function reconcileTraceWithRunLifecycle( const postgresRunDuration = runData.completedAt ? millisecondsToNanoseconds( - runData.completedAt.getTime() - (runData.rootTaskRun?.createdAt ?? runData.createdAt).getTime() + runData.completedAt.getTime() - + (runData.rootTaskRun?.createdAt ?? runData.createdAt).getTime() ) : 0; From 4b19a1f62587a811998108c1f7f8ec449f4abf96 Mon Sep 17 00:00:00 2001 From: bharathkumar39293 Date: Tue, 13 Jan 2026 12:15:20 +0530 Subject: [PATCH 05/11] fix(webapp): remove root span assumption and fix duplication in RunPresenter --- apps/webapp/app/presenters/v3/RunPresenter.server.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/apps/webapp/app/presenters/v3/RunPresenter.server.ts b/apps/webapp/app/presenters/v3/RunPresenter.server.ts index 2b19a722e0..0c2b33fb66 100644 --- a/apps/webapp/app/presenters/v3/RunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunPresenter.server.ts @@ -285,10 +285,11 @@ export function reconcileTraceWithRunLifecycle( totalDuration: number; rootSpanStatus: "executing" | "completed" | "failed"; } { - const currentStatus: "executing" | "completed" | "failed" = events[0] - ? events[0].data.isError + const rootEvent = events.find((e) => e.id === rootSpanId); + const currentStatus: "executing" | "completed" | "failed" = rootEvent + ? rootEvent.data.isError ? "failed" - : !events[0].data.isPartial + : !rootEvent.data.isPartial ? "completed" : "executing" : "executing"; From e030cc2d458fa1753b0abb08a13f74871b234d49 Mon Sep 17 00:00:00 2001 From: bharathkumar39293 Date: Tue, 13 Jan 2026 13:44:53 +0530 Subject: [PATCH 06/11] fix(webapp): optimize reconciliation to O(1) and add trailing-edge throttle --- .../app/presenters/v3/RunPresenter.server.ts | 106 ++++++++++++------ apps/webapp/app/utils/throttle.ts | 32 ++++-- 2 files changed, 90 insertions(+), 48 deletions(-) diff --git a/apps/webapp/app/presenters/v3/RunPresenter.server.ts b/apps/webapp/app/presenters/v3/RunPresenter.server.ts index 0c2b33fb66..e5d68c6d8c 100644 --- a/apps/webapp/app/presenters/v3/RunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunPresenter.server.ts @@ -208,16 +208,41 @@ export class RunPresenter { //we need the start offset for each item, and the total duration of the entire tree const treeRootStartTimeMs = tree ? tree?.data.startTime.getTime() : 0; + + const postgresRunDuration = + runData.isFinished && run.completedAt + ? millisecondsToNanoseconds( + run.completedAt.getTime() - + (run.rootTaskRun?.createdAt ?? run.createdAt).getTime() + ) + : 0; + let totalDuration = tree?.data.duration ?? 0; const events = tree - ? flattenTree(tree).map((n) => { - const offset = millisecondsToNanoseconds( - n.data.startTime.getTime() - treeRootStartTimeMs - ); + ? flattenTree(tree).map((n, index) => { + const isRoot = index === 0; + const offset = millisecondsToNanoseconds(n.data.startTime.getTime() - treeRootStartTimeMs); + + let nIsPartial = n.data.isPartial; + let nDuration = n.data.duration; + let nIsError = n.data.isError; + + // NOTE: Clickhouse trace ingestion is eventually consistent. + // When a run is marked finished in Postgres, we reconcile the + // root span to reflect completion even if telemetry is still partial. + // This is a deliberate UI-layer tradeoff to prevent stale or "stuck" + // run states in the dashboard. + if (isRoot && runData.isFinished && nIsPartial) { + nIsPartial = false; + nDuration = Math.max(nDuration ?? 0, postgresRunDuration); + nIsError = isFailedRunStatus(runData.status); + } + //only let non-debug events extend the total duration if (!n.data.isDebug) { - totalDuration = Math.max(totalDuration, offset + n.data.duration); + totalDuration = Math.max(totalDuration, offset + (nIsPartial ? 0 : nDuration)); } + return { ...n, data: { @@ -228,23 +253,24 @@ export class RunPresenter { treeRootStartTimeMs ), //set partial nodes to null duration - duration: n.data.isPartial ? null : n.data.duration, + duration: nIsPartial ? null : nDuration, + isPartial: nIsPartial, + isError: nIsError, offset, - isRoot: n.id === traceSummary.rootSpan.id, + isRoot, }, }; }) : []; + if (runData.isFinished) { + totalDuration = Math.max(totalDuration, postgresRunDuration); + } + //total duration should be a minimum of 1ms totalDuration = Math.max(totalDuration, millisecondsToNanoseconds(1)); - const reconciled = reconcileTraceWithRunLifecycle( - runData, - traceSummary.rootSpan.id, - events, - totalDuration - ); + const reconciled = reconcileTraceWithRunLifecycle(runData, traceSummary.rootSpan.id, events, totalDuration); return { run: runData, @@ -285,14 +311,17 @@ export function reconcileTraceWithRunLifecycle( totalDuration: number; rootSpanStatus: "executing" | "completed" | "failed"; } { - const rootEvent = events.find((e) => e.id === rootSpanId); - const currentStatus: "executing" | "completed" | "failed" = rootEvent - ? rootEvent.data.isError - ? "failed" - : !rootEvent.data.isPartial - ? "completed" - : "executing" - : "executing"; + const rootEvent = events[0]; + const isActualRoot = rootEvent?.id === rootSpanId; + + const currentStatus: "executing" | "completed" | "failed" = + isActualRoot && rootEvent + ? rootEvent.data.isError + ? "failed" + : !rootEvent.data.isPartial + ? "completed" + : "executing" + : "executing"; if (!runData.isFinished) { return { events, totalDuration, rootSpanStatus: currentStatus }; @@ -307,23 +336,28 @@ export function reconcileTraceWithRunLifecycle( const updatedTotalDuration = Math.max(totalDuration, postgresRunDuration); - const updatedEvents = events.map((e) => { - if (e.id === rootSpanId && e.data.isPartial) { - return { - ...e, - data: { - ...e.data, - isPartial: false, - duration: Math.max(e.data.duration ?? 0, postgresRunDuration), - isError: isFailedRunStatus(runData.status), - }, - }; - } - return e; - }); + // We only need to potentially update the root event (the first one) if it matches our ID + if (isActualRoot && rootEvent && rootEvent.data.isPartial) { + const updatedEvents = [...events]; + updatedEvents[0] = { + ...rootEvent, + data: { + ...rootEvent.data, + isPartial: false, + duration: Math.max(rootEvent.data.duration ?? 0, postgresRunDuration), + isError: isFailedRunStatus(runData.status), + }, + }; + + return { + events: updatedEvents, + totalDuration: updatedTotalDuration, + rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed", + }; + } return { - events: updatedEvents, + events, totalDuration: updatedTotalDuration, rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed", }; diff --git a/apps/webapp/app/utils/throttle.ts b/apps/webapp/app/utils/throttle.ts index a6c1a77a32..24f904fb29 100644 --- a/apps/webapp/app/utils/throttle.ts +++ b/apps/webapp/app/utils/throttle.ts @@ -1,20 +1,28 @@ -//From: https://kettanaito.com/blog/debounce-vs-throttle - -/** A very simple throttle. Will execute the function at the end of each period and discard any other calls during that period. */ +/** A throttle that fires the first call immediately and ensures the last call during the duration is also fired. */ export function throttle( func: (...args: any[]) => void, durationMs: number ): (...args: any[]) => void { - let isPrimedToFire = false; - - return (...args: any[]) => { - if (!isPrimedToFire) { - isPrimedToFire = true; + let timeoutId: NodeJS.Timeout | null = null; + let nextArgs: any[] | null = null; - setTimeout(() => { - func(...args); - isPrimedToFire = false; - }, durationMs); + const wrapped = (...args: any[]) => { + if (timeoutId) { + nextArgs = args; + return; } + + func(...args); + + timeoutId = setTimeout(() => { + timeoutId = null; + if (nextArgs) { + const argsToUse = nextArgs; + nextArgs = null; + wrapped(...argsToUse); + } + }, durationMs); }; + + return wrapped; } From 4c3f0e874b6977e1b5dfdfe4665ba3e03a428591 Mon Sep 17 00:00:00 2001 From: bharathkumar39293 Date: Tue, 13 Jan 2026 13:50:18 +0530 Subject: [PATCH 07/11] fix(webapp): add missing createdAt to runData for accurate duration reconciliation --- apps/webapp/app/presenters/v3/RunPresenter.server.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/webapp/app/presenters/v3/RunPresenter.server.ts b/apps/webapp/app/presenters/v3/RunPresenter.server.ts index e5d68c6d8c..dd737af4c7 100644 --- a/apps/webapp/app/presenters/v3/RunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunPresenter.server.ts @@ -124,6 +124,7 @@ export class RunPresenter { isFinished: isFinalRunStatus(run.status), startedAt: run.startedAt, completedAt: run.completedAt, + createdAt: run.createdAt, logsDeletedAt: showDeletedLogs ? null : run.logsDeletedAt, rootTaskRun: run.rootTaskRun, parentTaskRun: run.parentTaskRun, From 4dfadb9527cdd58acf9a60754d62bf887eab9b71 Mon Sep 17 00:00:00 2001 From: bharathkumar39293 Date: Tue, 13 Jan 2026 16:00:34 +0530 Subject: [PATCH 08/11] refactor(webapp): move reconciliation logic to separate file for better testability --- .../app/presenters/v3/RunPresenter.server.ts | 107 ++++++------------ .../presenters/v3/reconcileTrace.server.ts | 89 +++++++++++++++ 2 files changed, 125 insertions(+), 71 deletions(-) create mode 100644 apps/webapp/app/presenters/v3/reconcileTrace.server.ts diff --git a/apps/webapp/app/presenters/v3/RunPresenter.server.ts b/apps/webapp/app/presenters/v3/RunPresenter.server.ts index dd737af4c7..0c2b33fb66 100644 --- a/apps/webapp/app/presenters/v3/RunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunPresenter.server.ts @@ -124,7 +124,6 @@ export class RunPresenter { isFinished: isFinalRunStatus(run.status), startedAt: run.startedAt, completedAt: run.completedAt, - createdAt: run.createdAt, logsDeletedAt: showDeletedLogs ? null : run.logsDeletedAt, rootTaskRun: run.rootTaskRun, parentTaskRun: run.parentTaskRun, @@ -209,41 +208,16 @@ export class RunPresenter { //we need the start offset for each item, and the total duration of the entire tree const treeRootStartTimeMs = tree ? tree?.data.startTime.getTime() : 0; - - const postgresRunDuration = - runData.isFinished && run.completedAt - ? millisecondsToNanoseconds( - run.completedAt.getTime() - - (run.rootTaskRun?.createdAt ?? run.createdAt).getTime() - ) - : 0; - let totalDuration = tree?.data.duration ?? 0; const events = tree - ? flattenTree(tree).map((n, index) => { - const isRoot = index === 0; - const offset = millisecondsToNanoseconds(n.data.startTime.getTime() - treeRootStartTimeMs); - - let nIsPartial = n.data.isPartial; - let nDuration = n.data.duration; - let nIsError = n.data.isError; - - // NOTE: Clickhouse trace ingestion is eventually consistent. - // When a run is marked finished in Postgres, we reconcile the - // root span to reflect completion even if telemetry is still partial. - // This is a deliberate UI-layer tradeoff to prevent stale or "stuck" - // run states in the dashboard. - if (isRoot && runData.isFinished && nIsPartial) { - nIsPartial = false; - nDuration = Math.max(nDuration ?? 0, postgresRunDuration); - nIsError = isFailedRunStatus(runData.status); - } - + ? flattenTree(tree).map((n) => { + const offset = millisecondsToNanoseconds( + n.data.startTime.getTime() - treeRootStartTimeMs + ); //only let non-debug events extend the total duration if (!n.data.isDebug) { - totalDuration = Math.max(totalDuration, offset + (nIsPartial ? 0 : nDuration)); + totalDuration = Math.max(totalDuration, offset + n.data.duration); } - return { ...n, data: { @@ -254,24 +228,23 @@ export class RunPresenter { treeRootStartTimeMs ), //set partial nodes to null duration - duration: nIsPartial ? null : nDuration, - isPartial: nIsPartial, - isError: nIsError, + duration: n.data.isPartial ? null : n.data.duration, offset, - isRoot, + isRoot: n.id === traceSummary.rootSpan.id, }, }; }) : []; - if (runData.isFinished) { - totalDuration = Math.max(totalDuration, postgresRunDuration); - } - //total duration should be a minimum of 1ms totalDuration = Math.max(totalDuration, millisecondsToNanoseconds(1)); - const reconciled = reconcileTraceWithRunLifecycle(runData, traceSummary.rootSpan.id, events, totalDuration); + const reconciled = reconcileTraceWithRunLifecycle( + runData, + traceSummary.rootSpan.id, + events, + totalDuration + ); return { run: runData, @@ -312,17 +285,14 @@ export function reconcileTraceWithRunLifecycle( totalDuration: number; rootSpanStatus: "executing" | "completed" | "failed"; } { - const rootEvent = events[0]; - const isActualRoot = rootEvent?.id === rootSpanId; - - const currentStatus: "executing" | "completed" | "failed" = - isActualRoot && rootEvent - ? rootEvent.data.isError - ? "failed" - : !rootEvent.data.isPartial - ? "completed" - : "executing" - : "executing"; + const rootEvent = events.find((e) => e.id === rootSpanId); + const currentStatus: "executing" | "completed" | "failed" = rootEvent + ? rootEvent.data.isError + ? "failed" + : !rootEvent.data.isPartial + ? "completed" + : "executing" + : "executing"; if (!runData.isFinished) { return { events, totalDuration, rootSpanStatus: currentStatus }; @@ -337,28 +307,23 @@ export function reconcileTraceWithRunLifecycle( const updatedTotalDuration = Math.max(totalDuration, postgresRunDuration); - // We only need to potentially update the root event (the first one) if it matches our ID - if (isActualRoot && rootEvent && rootEvent.data.isPartial) { - const updatedEvents = [...events]; - updatedEvents[0] = { - ...rootEvent, - data: { - ...rootEvent.data, - isPartial: false, - duration: Math.max(rootEvent.data.duration ?? 0, postgresRunDuration), - isError: isFailedRunStatus(runData.status), - }, - }; - - return { - events: updatedEvents, - totalDuration: updatedTotalDuration, - rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed", - }; - } + const updatedEvents = events.map((e) => { + if (e.id === rootSpanId && e.data.isPartial) { + return { + ...e, + data: { + ...e.data, + isPartial: false, + duration: Math.max(e.data.duration ?? 0, postgresRunDuration), + isError: isFailedRunStatus(runData.status), + }, + }; + } + return e; + }); return { - events, + events: updatedEvents, totalDuration: updatedTotalDuration, rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed", }; diff --git a/apps/webapp/app/presenters/v3/reconcileTrace.server.ts b/apps/webapp/app/presenters/v3/reconcileTrace.server.ts new file mode 100644 index 0000000000..dc2e2ba565 --- /dev/null +++ b/apps/webapp/app/presenters/v3/reconcileTrace.server.ts @@ -0,0 +1,89 @@ +import { millisecondsToNanoseconds } from "@trigger.dev/core/v3"; +import { isFailedRunStatus } from "~/v3/taskStatus"; +import type { TaskRunStatus } from "@trigger.dev/database"; + +export type ReconcileRunData = { + isFinished: boolean; + status: TaskRunStatus; + createdAt: Date; + completedAt: Date | null; + rootTaskRun: { createdAt: Date } | null; +}; + +export type ReconcileEvent = { + id: string; + data: { + isPartial: boolean; + isError: boolean; + duration?: number | null; + }; +}; + +export type ReconcileResult = { + events: any[]; + totalDuration: number; + rootSpanStatus: "executing" | "completed" | "failed"; +}; + +// NOTE: Clickhouse trace ingestion is eventually consistent. +// When a run is marked finished in Postgres, we reconcile the +// root span to reflect completion even if telemetry is still partial. +// This is a deliberate UI-layer tradeoff to prevent stale or "stuck" +// run states in the dashboard. +export function reconcileTraceWithRunLifecycle( + runData: ReconcileRunData, + rootSpanId: string, + events: any[], + totalDuration: number +): ReconcileResult { + const rootEvent = events[0]; + const isActualRoot = rootEvent?.id === rootSpanId; + + const currentStatus: "executing" | "completed" | "failed" = + isActualRoot && rootEvent + ? rootEvent.data.isError + ? "failed" + : !rootEvent.data.isPartial + ? "completed" + : "executing" + : "executing"; + + if (!runData.isFinished) { + return { events, totalDuration, rootSpanStatus: currentStatus }; + } + + const postgresRunDuration = runData.completedAt + ? millisecondsToNanoseconds( + runData.completedAt.getTime() - + (runData.rootTaskRun?.createdAt ?? runData.createdAt).getTime() + ) + : 0; + + const updatedTotalDuration = Math.max(totalDuration, postgresRunDuration); + + // We only need to potentially update the root event (the first one) if it matches our ID + if (isActualRoot && rootEvent && rootEvent.data.isPartial) { + const updatedEvents = [...events]; + updatedEvents[0] = { + ...rootEvent, + data: { + ...rootEvent.data, + isPartial: false, + duration: Math.max(rootEvent.data.duration ?? 0, postgresRunDuration), + isError: isFailedRunStatus(runData.status), + }, + }; + + return { + events: updatedEvents, + totalDuration: updatedTotalDuration, + rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed", + }; + } + + return { + events, + totalDuration: updatedTotalDuration, + rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed", + }; +} From 80c5baf5f81cb098cc5e02bdfeecd26971cf7323 Mon Sep 17 00:00:00 2001 From: bharathkumar39293 Date: Tue, 13 Jan 2026 16:15:27 +0530 Subject: [PATCH 09/11] refactor(webapp): complete modularization of reconciliation logic --- .../app/presenters/v3/RunPresenter.server.ts | 98 +++++-------------- apps/webapp/test/RunPresenter.test.ts | 2 +- 2 files changed, 28 insertions(+), 72 deletions(-) diff --git a/apps/webapp/app/presenters/v3/RunPresenter.server.ts b/apps/webapp/app/presenters/v3/RunPresenter.server.ts index 0c2b33fb66..03d5dd4d55 100644 --- a/apps/webapp/app/presenters/v3/RunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunPresenter.server.ts @@ -8,6 +8,7 @@ import { SpanSummary } from "~/v3/eventRepository/eventRepository.types"; import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus"; import { env } from "~/env.server"; +import { reconcileTraceWithRunLifecycle } from "./reconcileTrace.server"; type Result = Awaited>; export type Run = Result["run"]; @@ -124,6 +125,7 @@ export class RunPresenter { isFinished: isFinalRunStatus(run.status), startedAt: run.startedAt, completedAt: run.completedAt, + createdAt: run.createdAt, logsDeletedAt: showDeletedLogs ? null : run.logsDeletedAt, rootTaskRun: run.rootTaskRun, parentTaskRun: run.parentTaskRun, @@ -210,14 +212,30 @@ export class RunPresenter { const treeRootStartTimeMs = tree ? tree?.data.startTime.getTime() : 0; let totalDuration = tree?.data.duration ?? 0; const events = tree - ? flattenTree(tree).map((n) => { - const offset = millisecondsToNanoseconds( - n.data.startTime.getTime() - treeRootStartTimeMs - ); + ? flattenTree(tree).map((n, index) => { + const isRoot = index === 0; + const offset = millisecondsToNanoseconds(n.data.startTime.getTime() - treeRootStartTimeMs); + + let nIsPartial = n.data.isPartial; + let nDuration = n.data.duration; + let nIsError = n.data.isError; + + // NOTE: Clickhouse trace ingestion is eventually consistent. + // When a run is marked finished in Postgres, we reconcile the + // root span to reflect completion even if telemetry is still partial. + // This is a deliberate UI-layer tradeoff to prevent stale or "stuck" + // run states in the dashboard. + if (isRoot && runData.isFinished && nIsPartial) { + nIsPartial = false; + nDuration = Math.max(nDuration ?? 0, postgresRunDuration); + nIsError = isFailedRunStatus(runData.status); + } + //only let non-debug events extend the total duration if (!n.data.isDebug) { - totalDuration = Math.max(totalDuration, offset + n.data.duration); + totalDuration = Math.max(totalDuration, offset + (nIsPartial ? 0 : nDuration)); } + return { ...n, data: { @@ -228,9 +246,11 @@ export class RunPresenter { treeRootStartTimeMs ), //set partial nodes to null duration - duration: n.data.isPartial ? null : n.data.duration, + duration: nIsPartial ? null : nDuration, + isPartial: nIsPartial, + isError: nIsError, offset, - isRoot: n.id === traceSummary.rootSpan.id, + isRoot, }, }; }) @@ -264,67 +284,3 @@ export class RunPresenter { } } -// NOTE: Clickhouse trace ingestion is eventually consistent. -// When a run is marked finished in Postgres, we reconcile the -// root span to reflect completion even if telemetry is still partial. -// This is a deliberate UI-layer tradeoff to prevent stale or "stuck" -// run states in the dashboard. -export function reconcileTraceWithRunLifecycle( - runData: { - isFinished: boolean; - status: Run["status"]; - createdAt: Date; - completedAt: Date | null; - rootTaskRun: { createdAt: Date } | null; - }, - rootSpanId: string, - events: RunEvent[], - totalDuration: number -): { - events: RunEvent[]; - totalDuration: number; - rootSpanStatus: "executing" | "completed" | "failed"; -} { - const rootEvent = events.find((e) => e.id === rootSpanId); - const currentStatus: "executing" | "completed" | "failed" = rootEvent - ? rootEvent.data.isError - ? "failed" - : !rootEvent.data.isPartial - ? "completed" - : "executing" - : "executing"; - - if (!runData.isFinished) { - return { events, totalDuration, rootSpanStatus: currentStatus }; - } - - const postgresRunDuration = runData.completedAt - ? millisecondsToNanoseconds( - runData.completedAt.getTime() - - (runData.rootTaskRun?.createdAt ?? runData.createdAt).getTime() - ) - : 0; - - const updatedTotalDuration = Math.max(totalDuration, postgresRunDuration); - - const updatedEvents = events.map((e) => { - if (e.id === rootSpanId && e.data.isPartial) { - return { - ...e, - data: { - ...e.data, - isPartial: false, - duration: Math.max(e.data.duration ?? 0, postgresRunDuration), - isError: isFailedRunStatus(runData.status), - }, - }; - } - return e; - }); - - return { - events: updatedEvents, - totalDuration: updatedTotalDuration, - rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed", - }; -} diff --git a/apps/webapp/test/RunPresenter.test.ts b/apps/webapp/test/RunPresenter.test.ts index 35fe59e7bd..15d459487a 100644 --- a/apps/webapp/test/RunPresenter.test.ts +++ b/apps/webapp/test/RunPresenter.test.ts @@ -24,7 +24,7 @@ vi.mock("../app/utils/username", () => ({ getUsername: vi.fn(), })); -import { reconcileTraceWithRunLifecycle } from "../app/presenters/v3/RunPresenter.server"; +import { reconcileTraceWithRunLifecycle } from "../app/presenters/v3/reconcileTrace.server"; import { millisecondsToNanoseconds } from "@trigger.dev/core/v3"; describe("reconcileTraceWithRunLifecycle", () => { From 17dbcd0528239c65fe02a2b32040339d41655607 Mon Sep 17 00:00:00 2001 From: bharathkumar39293 Date: Tue, 13 Jan 2026 16:32:16 +0530 Subject: [PATCH 10/11] refactor(webapp): modularize reconciliation logic and move tests to conventional location --- .../app/presenters/v3/RunPresenter.server.ts | 10 ------- .../v3/reconcileTrace.server.test.ts} | 29 ++----------------- apps/webapp/vitest.config.ts | 2 +- 3 files changed, 3 insertions(+), 38 deletions(-) rename apps/webapp/{test/RunPresenter.test.ts => app/presenters/v3/reconcileTrace.server.test.ts} (88%) diff --git a/apps/webapp/app/presenters/v3/RunPresenter.server.ts b/apps/webapp/app/presenters/v3/RunPresenter.server.ts index 03d5dd4d55..2e2bb84291 100644 --- a/apps/webapp/app/presenters/v3/RunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunPresenter.server.ts @@ -220,16 +220,6 @@ export class RunPresenter { let nDuration = n.data.duration; let nIsError = n.data.isError; - // NOTE: Clickhouse trace ingestion is eventually consistent. - // When a run is marked finished in Postgres, we reconcile the - // root span to reflect completion even if telemetry is still partial. - // This is a deliberate UI-layer tradeoff to prevent stale or "stuck" - // run states in the dashboard. - if (isRoot && runData.isFinished && nIsPartial) { - nIsPartial = false; - nDuration = Math.max(nDuration ?? 0, postgresRunDuration); - nIsError = isFailedRunStatus(runData.status); - } //only let non-debug events extend the total duration if (!n.data.isDebug) { diff --git a/apps/webapp/test/RunPresenter.test.ts b/apps/webapp/app/presenters/v3/reconcileTrace.server.test.ts similarity index 88% rename from apps/webapp/test/RunPresenter.test.ts rename to apps/webapp/app/presenters/v3/reconcileTrace.server.test.ts index 15d459487a..9f5eca11e7 100644 --- a/apps/webapp/test/RunPresenter.test.ts +++ b/apps/webapp/app/presenters/v3/reconcileTrace.server.test.ts @@ -1,30 +1,5 @@ -import { vi, describe, it, expect } from "vitest"; - -vi.mock("../app/env.server", () => ({ - env: { - MAXIMUM_LIVE_RELOADING_EVENTS: 1000, - }, -})); - -vi.mock("../app/db.server", () => ({ - prisma: {}, - $replica: {}, - $transaction: vi.fn(), -})); - -vi.mock("../app/v3/eventRepository/index.server", () => ({ - resolveEventRepositoryForStore: vi.fn(), -})); - -vi.mock("../app/v3/taskEventStore.server", () => ({ - getTaskEventStoreTableForRun: vi.fn(), -})); - -vi.mock("../app/utils/username", () => ({ - getUsername: vi.fn(), -})); - -import { reconcileTraceWithRunLifecycle } from "../app/presenters/v3/reconcileTrace.server"; +import { describe, it, expect } from "vitest"; +import { reconcileTraceWithRunLifecycle } from "./reconcileTrace.server"; import { millisecondsToNanoseconds } from "@trigger.dev/core/v3"; describe("reconcileTraceWithRunLifecycle", () => { diff --git a/apps/webapp/vitest.config.ts b/apps/webapp/vitest.config.ts index 0c08af40ea..ae86814e97 100644 --- a/apps/webapp/vitest.config.ts +++ b/apps/webapp/vitest.config.ts @@ -3,7 +3,7 @@ import tsconfigPaths from "vite-tsconfig-paths"; export default defineConfig({ test: { - include: ["test/**/*.test.ts"], + include: ["test/**/*.test.ts", "app/**/*.test.ts"], globals: true, pool: "forks", }, From 960b23f844a66693240879c6287eff088b34e785 Mon Sep 17 00:00:00 2001 From: bharathkumar39293 Date: Tue, 13 Jan 2026 16:38:00 +0530 Subject: [PATCH 11/11] refactor(webapp): remove unused import in RunPresenter.server.ts --- apps/webapp/app/presenters/v3/RunPresenter.server.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/webapp/app/presenters/v3/RunPresenter.server.ts b/apps/webapp/app/presenters/v3/RunPresenter.server.ts index 2e2bb84291..19d7f74c53 100644 --- a/apps/webapp/app/presenters/v3/RunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunPresenter.server.ts @@ -6,7 +6,7 @@ import { getUsername } from "~/utils/username"; import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server"; import { SpanSummary } from "~/v3/eventRepository/eventRepository.types"; import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; -import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus"; +import { isFinalRunStatus } from "~/v3/taskStatus"; import { env } from "~/env.server"; import { reconcileTraceWithRunLifecycle } from "./reconcileTrace.server";