diff --git a/apps/webapp/app/presenters/v3/RunPresenter.server.ts b/apps/webapp/app/presenters/v3/RunPresenter.server.ts index 437d6b6458..19d7f74c53 100644 --- a/apps/webapp/app/presenters/v3/RunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunPresenter.server.ts @@ -8,6 +8,7 @@ import { SpanSummary } from "~/v3/eventRepository/eventRepository.types"; import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; import { isFinalRunStatus } from "~/v3/taskStatus"; import { env } from "~/env.server"; +import { reconcileTraceWithRunLifecycle } from "./reconcileTrace.server"; type Result = Awaited>; export type Run = Result["run"]; @@ -124,6 +125,7 @@ export class RunPresenter { isFinished: isFinalRunStatus(run.status), startedAt: run.startedAt, completedAt: run.completedAt, + createdAt: run.createdAt, logsDeletedAt: showDeletedLogs ? null : run.logsDeletedAt, rootTaskRun: run.rootTaskRun, parentTaskRun: run.parentTaskRun, @@ -210,50 +212,56 @@ export class RunPresenter { const treeRootStartTimeMs = tree ? tree?.data.startTime.getTime() : 0; let totalDuration = tree?.data.duration ?? 0; const events = tree - ? flattenTree(tree).map((n) => { - const offset = millisecondsToNanoseconds( - n.data.startTime.getTime() - treeRootStartTimeMs - ); - //only let non-debug events extend the total duration - if (!n.data.isDebug) { - totalDuration = Math.max(totalDuration, offset + n.data.duration); - } - return { - ...n, - data: { - ...n.data, - timelineEvents: createTimelineSpanEventsFromSpanEvents( - n.data.events, - user?.admin ?? false, - treeRootStartTimeMs - ), - //set partial nodes to null duration - duration: n.data.isPartial ? null : n.data.duration, - offset, - isRoot: n.id === traceSummary.rootSpan.id, - }, - }; - }) + ? flattenTree(tree).map((n, index) => { + const isRoot = index === 0; + const offset = millisecondsToNanoseconds(n.data.startTime.getTime() - treeRootStartTimeMs); + + let nIsPartial = n.data.isPartial; + let nDuration = n.data.duration; + let nIsError = n.data.isError; + + + //only let non-debug events extend the total duration + if (!n.data.isDebug) { + totalDuration = Math.max(totalDuration, offset + (nIsPartial ? 0 : nDuration)); + } + + return { + ...n, + data: { + ...n.data, + timelineEvents: createTimelineSpanEventsFromSpanEvents( + n.data.events, + user?.admin ?? false, + treeRootStartTimeMs + ), + //set partial nodes to null duration + duration: nIsPartial ? null : nDuration, + isPartial: nIsPartial, + isError: nIsError, + offset, + isRoot, + }, + }; + }) : []; //total duration should be a minimum of 1ms totalDuration = Math.max(totalDuration, millisecondsToNanoseconds(1)); - let rootSpanStatus: "executing" | "completed" | "failed" = "executing"; - if (events[0]) { - if (events[0].data.isError) { - rootSpanStatus = "failed"; - } else if (!events[0].data.isPartial) { - rootSpanStatus = "completed"; - } - } + const reconciled = reconcileTraceWithRunLifecycle( + runData, + traceSummary.rootSpan.id, + events, + totalDuration + ); return { run: runData, trace: { - rootSpanStatus, - events: events, - duration: totalDuration, + rootSpanStatus: reconciled.rootSpanStatus, + events: reconciled.events, + duration: reconciled.totalDuration, rootStartedAt: tree?.data.startTime, startedAt: run.startedAt, queuedDuration: run.startedAt @@ -265,3 +273,4 @@ export class RunPresenter { }; } } + diff --git a/apps/webapp/app/presenters/v3/reconcileTrace.server.test.ts b/apps/webapp/app/presenters/v3/reconcileTrace.server.test.ts new file mode 100644 index 0000000000..9f5eca11e7 --- /dev/null +++ b/apps/webapp/app/presenters/v3/reconcileTrace.server.test.ts @@ -0,0 +1,116 @@ +import { describe, it, expect } from "vitest"; +import { reconcileTraceWithRunLifecycle } from "./reconcileTrace.server"; +import { millisecondsToNanoseconds } from "@trigger.dev/core/v3"; + +describe("reconcileTraceWithRunLifecycle", () => { + const rootSpanId = "root-span-id"; + const createdAt = new Date("2024-01-01T00:00:00Z"); + const completedAt = new Date("2024-01-01T00:00:05Z"); + + const runData: any = { + isFinished: true, + status: "COMPLETED_SUCCESSFULLY", + createdAt, + completedAt, + rootTaskRun: null, + }; + + const initialEvents = [ + { + id: rootSpanId, + data: { + isPartial: true, + duration: millisecondsToNanoseconds(1000), // 1s, less than the 5s run duration + isError: false, + }, + }, + { + id: "child-span-id", + data: { + isPartial: false, + duration: millisecondsToNanoseconds(500), + isError: false, + }, + }, + ]; + + it("should reconcile a finished run with lagging partial telemetry", () => { + const totalDuration = millisecondsToNanoseconds(1000); + const result = reconcileTraceWithRunLifecycle(runData, rootSpanId, initialEvents as any, totalDuration); + + expect(result.rootSpanStatus).toBe("completed"); + + const rootEvent = result.events.find((e: any) => e.id === rootSpanId); + expect(rootEvent?.data.isPartial).toBe(false); + // 5s duration = 5000ms + expect(rootEvent?.data.duration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(5000)); + expect(result.totalDuration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(5000)); + }); + + it("should not override duration if Clickhouse already has a longer finished duration", () => { + const longDuration = millisecondsToNanoseconds(10000); + const finishedEvents = [ + { + id: rootSpanId, + data: { + isPartial: false, + duration: longDuration, + isError: false, + }, + }, + ]; + + const result = reconcileTraceWithRunLifecycle(runData, rootSpanId, finishedEvents as any, longDuration); + + const rootEvent = result.events.find((e: any) => e.id === rootSpanId); + expect(rootEvent?.data.duration).toBe(longDuration); + expect(rootEvent?.data.isPartial).toBe(false); + expect(result.totalDuration).toBe(longDuration); + }); + + it("should handle unfinished runs without modification", () => { + const unfinishedRun = { ...runData, isFinished: false, completedAt: null }; + const totalDuration = millisecondsToNanoseconds(1000); + const result = reconcileTraceWithRunLifecycle(unfinishedRun, rootSpanId, initialEvents as any, totalDuration); + + expect(result.rootSpanStatus).toBe("executing"); + + const rootEvent = result.events.find((e: any) => e.id === rootSpanId); + expect(rootEvent?.data.isPartial).toBe(true); + expect(rootEvent?.data.duration).toBe(millisecondsToNanoseconds(1000)); + }); + + it("should reconcile failed runs correctly", () => { + const failedRun = { ...runData, status: "COMPLETED_WITH_ERRORS" }; + const result = reconcileTraceWithRunLifecycle(failedRun, rootSpanId, initialEvents as any, millisecondsToNanoseconds(1000)); + + expect(result.rootSpanStatus).toBe("failed"); + const rootEvent = result.events.find((e: any) => e.id === rootSpanId); + expect(rootEvent?.data.isError).toBe(true); + expect(rootEvent?.data.isPartial).toBe(false); + }); + + it("should use rootTaskRun createdAt if available for duration calculation", () => { + const rootTaskCreatedAt = new Date("2023-12-31T23:59:50Z"); // 10s before run.createdAt + const runDataWithRoot: any = { + ...runData, + rootTaskRun: { createdAt: rootTaskCreatedAt }, + }; + + const result = reconcileTraceWithRunLifecycle(runDataWithRoot, rootSpanId, initialEvents as any, millisecondsToNanoseconds(1000)); + + // Duration should be from 23:59:50 to 00:00:05 = 15s + const rootEvent = result.events.find((e: any) => e.id === rootSpanId); + expect(rootEvent?.data.duration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(15000)); + expect(result.totalDuration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(15000)); + }); + + it("should handle missing root span gracefully", () => { + const result = reconcileTraceWithRunLifecycle(runData, "non-existent-id", initialEvents as any, millisecondsToNanoseconds(1000)); + + expect(result.rootSpanStatus).toBe("completed"); + expect(result.events).toEqual(initialEvents); + // totalDuration should still be updated to postgres duration even if root span is missing from events list + expect(result.totalDuration).toBeGreaterThanOrEqual(millisecondsToNanoseconds(5000)); + }); +}); diff --git a/apps/webapp/app/presenters/v3/reconcileTrace.server.ts b/apps/webapp/app/presenters/v3/reconcileTrace.server.ts new file mode 100644 index 0000000000..dc2e2ba565 --- /dev/null +++ b/apps/webapp/app/presenters/v3/reconcileTrace.server.ts @@ -0,0 +1,89 @@ +import { millisecondsToNanoseconds } from "@trigger.dev/core/v3"; +import { isFailedRunStatus } from "~/v3/taskStatus"; +import type { TaskRunStatus } from "@trigger.dev/database"; + +export type ReconcileRunData = { + isFinished: boolean; + status: TaskRunStatus; + createdAt: Date; + completedAt: Date | null; + rootTaskRun: { createdAt: Date } | null; +}; + +export type ReconcileEvent = { + id: string; + data: { + isPartial: boolean; + isError: boolean; + duration?: number | null; + }; +}; + +export type ReconcileResult = { + events: any[]; + totalDuration: number; + rootSpanStatus: "executing" | "completed" | "failed"; +}; + +// NOTE: Clickhouse trace ingestion is eventually consistent. +// When a run is marked finished in Postgres, we reconcile the +// root span to reflect completion even if telemetry is still partial. +// This is a deliberate UI-layer tradeoff to prevent stale or "stuck" +// run states in the dashboard. +export function reconcileTraceWithRunLifecycle( + runData: ReconcileRunData, + rootSpanId: string, + events: any[], + totalDuration: number +): ReconcileResult { + const rootEvent = events[0]; + const isActualRoot = rootEvent?.id === rootSpanId; + + const currentStatus: "executing" | "completed" | "failed" = + isActualRoot && rootEvent + ? rootEvent.data.isError + ? "failed" + : !rootEvent.data.isPartial + ? "completed" + : "executing" + : "executing"; + + if (!runData.isFinished) { + return { events, totalDuration, rootSpanStatus: currentStatus }; + } + + const postgresRunDuration = runData.completedAt + ? millisecondsToNanoseconds( + runData.completedAt.getTime() - + (runData.rootTaskRun?.createdAt ?? runData.createdAt).getTime() + ) + : 0; + + const updatedTotalDuration = Math.max(totalDuration, postgresRunDuration); + + // We only need to potentially update the root event (the first one) if it matches our ID + if (isActualRoot && rootEvent && rootEvent.data.isPartial) { + const updatedEvents = [...events]; + updatedEvents[0] = { + ...rootEvent, + data: { + ...rootEvent.data, + isPartial: false, + duration: Math.max(rootEvent.data.duration ?? 0, postgresRunDuration), + isError: isFailedRunStatus(runData.status), + }, + }; + + return { + events: updatedEvents, + totalDuration: updatedTotalDuration, + rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed", + }; + } + + return { + events, + totalDuration: updatedTotalDuration, + rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed", + }; +} diff --git a/apps/webapp/app/utils/throttle.ts b/apps/webapp/app/utils/throttle.ts index a6c1a77a32..24f904fb29 100644 --- a/apps/webapp/app/utils/throttle.ts +++ b/apps/webapp/app/utils/throttle.ts @@ -1,20 +1,28 @@ -//From: https://kettanaito.com/blog/debounce-vs-throttle - -/** A very simple throttle. Will execute the function at the end of each period and discard any other calls during that period. */ +/** A throttle that fires the first call immediately and ensures the last call during the duration is also fired. */ export function throttle( func: (...args: any[]) => void, durationMs: number ): (...args: any[]) => void { - let isPrimedToFire = false; - - return (...args: any[]) => { - if (!isPrimedToFire) { - isPrimedToFire = true; + let timeoutId: NodeJS.Timeout | null = null; + let nextArgs: any[] | null = null; - setTimeout(() => { - func(...args); - isPrimedToFire = false; - }, durationMs); + const wrapped = (...args: any[]) => { + if (timeoutId) { + nextArgs = args; + return; } + + func(...args); + + timeoutId = setTimeout(() => { + timeoutId = null; + if (nextArgs) { + const argsToUse = nextArgs; + nextArgs = null; + wrapped(...argsToUse); + } + }, durationMs); }; + + return wrapped; } diff --git a/apps/webapp/vitest.config.ts b/apps/webapp/vitest.config.ts index 0c08af40ea..ae86814e97 100644 --- a/apps/webapp/vitest.config.ts +++ b/apps/webapp/vitest.config.ts @@ -3,7 +3,7 @@ import tsconfigPaths from "vite-tsconfig-paths"; export default defineConfig({ test: { - include: ["test/**/*.test.ts"], + include: ["test/**/*.test.ts", "app/**/*.test.ts"], globals: true, pool: "forks", },