Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ export function Chat() {
/**
* Processes streaming response from workflow execution
* Reads the stream chunk by chunk and updates the message content in real-time
* When the final event arrives, extracts any additional selected outputs (model, tokens, toolCalls)
* @param stream - ReadableStream containing the workflow execution response
* @param responseMessageId - ID of the message to update with streamed content
*/
Expand Down Expand Up @@ -529,6 +530,35 @@ export function Chat() {
return
}

if (
selectedOutputs.length > 0 &&
'logs' in result &&
Array.isArray(result.logs) &&
activeWorkflowId
) {
const additionalOutputs: string[] = []

for (const outputId of selectedOutputs) {
const blockId = extractBlockIdFromOutputId(outputId)
const path = extractPathFromOutputId(outputId, blockId)

if (path === 'content') continue

const outputValue = extractOutputFromLogs(result.logs as BlockLog[], outputId)
if (outputValue !== undefined) {
const formattedValue =
typeof outputValue === 'string' ? outputValue : JSON.stringify(outputValue)
if (formattedValue) {
additionalOutputs.push(`**${path}:** ${formattedValue}`)
}
}
}

if (additionalOutputs.length > 0) {
appendMessageContent(responseMessageId, `\n\n${additionalOutputs.join('\n\n')}`)
}
}

finalizeMessageStream(responseMessageId)
} else if (contentChunk) {
accumulatedContent += contentChunk
Expand All @@ -552,7 +582,7 @@ export function Chat() {
focusInput(100)
}
},
[appendMessageContent, finalizeMessageStream, focusInput]
[appendMessageContent, finalizeMessageStream, focusInput, selectedOutputs, activeWorkflowId]
)

/**
Expand All @@ -564,7 +594,6 @@ export function Chat() {
if (!result || !activeWorkflowId) return
if (typeof result !== 'object') return

// Handle streaming response
if ('stream' in result && result.stream instanceof ReadableStream) {
const responseMessageId = crypto.randomUUID()
addMessage({
Expand All @@ -578,7 +607,6 @@ export function Chat() {
return
}

// Handle success with logs
if ('success' in result && result.success && 'logs' in result && Array.isArray(result.logs)) {
selectedOutputs
.map((outputId) => extractOutputFromLogs(result.logs as BlockLog[], outputId))
Expand All @@ -596,7 +624,6 @@ export function Chat() {
return
}

// Handle error response
if ('success' in result && !result.success) {
const errorMessage =
'error' in result && typeof result.error === 'string'
Expand All @@ -622,7 +649,6 @@ export function Chat() {

const sentMessage = chatMessage.trim()

// Update prompt history (only if new unique message)
if (sentMessage && promptHistory[promptHistory.length - 1] !== sentMessage) {
setPromptHistory((prev) => [...prev, sentMessage])
}
Expand All @@ -631,10 +657,8 @@ export function Chat() {
const conversationId = getConversationId(activeWorkflowId)

try {
// Process file attachments
const attachmentsWithData = await processFileAttachments(chatFiles)

// Add user message
const messageContent =
sentMessage || (chatFiles.length > 0 ? `Uploaded ${chatFiles.length} file(s)` : '')
addMessage({
Expand All @@ -644,7 +668,6 @@ export function Chat() {
attachments: attachmentsWithData,
})

// Prepare workflow input
const workflowInput: {
input: string
conversationId: string
Expand All @@ -667,13 +690,11 @@ export function Chat() {
}
}

// Clear input and files
setChatMessage('')
clearFiles()
clearErrors()
focusInput(10)

// Execute workflow
const result = await handleRunWorkflow(workflowInput)
handleWorkflowResponse(result)
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ export function useWorkflowExecution() {

const activeBlocksSet = new Set<string>()
const streamedContent = new Map<string, string>()
const accumulatedBlockLogs: BlockLog[] = []

// Execute the workflow
try {
Expand Down Expand Up @@ -933,14 +934,30 @@ export function useWorkflowExecution() {

// Edges already tracked in onBlockStarted, no need to track again

const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()

// Accumulate block log for the execution result
accumulatedBlockLogs.push({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
input: data.input || {},
output: data.output,
success: true,
durationMs: data.durationMs,
startedAt,
endedAt,
})

// Add to console
addConsole({
input: data.input || {},
output: data.output,
success: true,
durationMs: data.durationMs,
startedAt: new Date(Date.now() - data.durationMs).toISOString(),
endedAt: new Date().toISOString(),
startedAt,
endedAt,
workflowId: activeWorkflowId,
blockId: data.blockId,
executionId: executionId || uuidv4(),
Expand All @@ -967,15 +984,33 @@ export function useWorkflowExecution() {

// Track failed block execution in run path
setBlockRunStatus(data.blockId, 'error')

const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()

// Accumulate block error log for the execution result
accumulatedBlockLogs.push({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
input: data.input || {},
output: {},
success: false,
error: data.error,
durationMs: data.durationMs,
startedAt,
endedAt,
})

// Add error to console
addConsole({
input: data.input || {},
output: {},
success: false,
error: data.error,
durationMs: data.durationMs,
startedAt: new Date(Date.now() - data.durationMs).toISOString(),
endedAt: new Date().toISOString(),
startedAt,
endedAt,
workflowId: activeWorkflowId,
blockId: data.blockId,
executionId: executionId || uuidv4(),
Expand Down Expand Up @@ -1029,7 +1064,7 @@ export function useWorkflowExecution() {
startTime: data.startTime,
endTime: data.endTime,
},
logs: [],
logs: accumulatedBlockLogs,
}
},

Expand All @@ -1041,7 +1076,7 @@ export function useWorkflowExecution() {
metadata: {
duration: data.duration,
},
logs: [],
logs: accumulatedBlockLogs,
}

// Only add workflow-level error if no blocks have executed yet
Expand Down