diff --git a/apps/sim/app/api/a2a/agents/[agentId]/route.ts b/apps/sim/app/api/a2a/agents/[agentId]/route.ts new file mode 100644 index 0000000000..a2db9d2cb8 --- /dev/null +++ b/apps/sim/app/api/a2a/agents/[agentId]/route.ts @@ -0,0 +1,272 @@ +/** + * A2A Agent Card Endpoint + * + * Returns the Agent Card (discovery document) for an A2A agent. + * Also supports CRUD operations for managing agents. + */ + +import { db } from '@sim/db' +import { a2aAgent, workflow } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { eq } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { generateAgentCard, generateSkillsFromWorkflow } from '@/lib/a2a/agent-card' +import type { AgentAuthentication, AgentCapabilities, AgentSkill } from '@/lib/a2a/types' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' + +const logger = createLogger('A2AAgentCardAPI') + +export const dynamic = 'force-dynamic' + +interface RouteParams { + agentId: string +} + +/** + * GET - Returns the Agent Card for discovery + */ +export async function GET(request: NextRequest, { params }: { params: Promise }) { + const { agentId } = await params + + try { + const [agent] = await db + .select({ + agent: a2aAgent, + workflow: workflow, + }) + .from(a2aAgent) + .innerJoin(workflow, eq(a2aAgent.workflowId, workflow.id)) + .where(eq(a2aAgent.id, agentId)) + .limit(1) + + if (!agent) { + return NextResponse.json({ error: 'Agent not found' }, { status: 404 }) + } + + if (!agent.agent.isPublished) { + // Check if requester has access (for preview) + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success) { + return NextResponse.json({ error: 'Agent not published' }, { status: 404 }) + } + } + + const agentCard = generateAgentCard( + { + id: agent.agent.id, + name: agent.agent.name, + description: agent.agent.description, + version: agent.agent.version, + capabilities: agent.agent.capabilities as AgentCapabilities, + skills: agent.agent.skills as AgentSkill[], + authentication: agent.agent.authentication as AgentAuthentication, + }, + { + id: agent.workflow.id, + name: agent.workflow.name, + description: agent.workflow.description, + } + ) + + return NextResponse.json(agentCard, { + headers: { + 'Content-Type': 'application/json', + 'Cache-Control': agent.agent.isPublished ? 'public, max-age=3600' : 'private, no-cache', + }, + }) + } catch (error) { + logger.error('Error getting Agent Card:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} + +/** + * PUT - Update an agent + */ +export async function PUT(request: NextRequest, { params }: { params: Promise }) { + const { agentId } = await params + + try { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const [existingAgent] = await db + .select() + .from(a2aAgent) + .where(eq(a2aAgent.id, agentId)) + .limit(1) + + if (!existingAgent) { + return NextResponse.json({ error: 'Agent not found' }, { status: 404 }) + } + + const body = await request.json() + + // Update agent + const [updatedAgent] = await db + .update(a2aAgent) + .set({ + name: body.name ?? existingAgent.name, + description: body.description ?? existingAgent.description, + version: body.version ?? existingAgent.version, + capabilities: body.capabilities ?? existingAgent.capabilities, + skills: body.skills ?? existingAgent.skills, + authentication: body.authentication ?? existingAgent.authentication, + isPublished: body.isPublished ?? existingAgent.isPublished, + publishedAt: + body.isPublished && !existingAgent.isPublished ? new Date() : existingAgent.publishedAt, + updatedAt: new Date(), + }) + .where(eq(a2aAgent.id, agentId)) + .returning() + + logger.info(`Updated A2A agent: ${agentId}`) + + return NextResponse.json({ success: true, agent: updatedAgent }) + } catch (error) { + logger.error('Error updating agent:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} + +/** + * DELETE - Delete an agent + */ +export async function DELETE(request: NextRequest, { params }: { params: Promise }) { + const { agentId } = await params + + try { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const [existingAgent] = await db + .select() + .from(a2aAgent) + .where(eq(a2aAgent.id, agentId)) + .limit(1) + + if (!existingAgent) { + return NextResponse.json({ error: 'Agent not found' }, { status: 404 }) + } + + await db.delete(a2aAgent).where(eq(a2aAgent.id, agentId)) + + logger.info(`Deleted A2A agent: ${agentId}`) + + return NextResponse.json({ success: true }) + } catch (error) { + logger.error('Error deleting agent:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} + +/** + * POST - Publish/unpublish an agent + */ +export async function POST(request: NextRequest, { params }: { params: Promise }) { + const { agentId } = await params + + try { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + logger.warn('A2A agent publish auth failed:', { error: auth.error, hasUserId: !!auth.userId }) + return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 }) + } + + const [existingAgent] = await db + .select() + .from(a2aAgent) + .where(eq(a2aAgent.id, agentId)) + .limit(1) + + if (!existingAgent) { + return NextResponse.json({ error: 'Agent not found' }, { status: 404 }) + } + + const body = await request.json() + const action = body.action as 'publish' | 'unpublish' | 'refresh' + + if (action === 'publish') { + // Verify workflow is deployed + const [wf] = await db + .select({ isDeployed: workflow.isDeployed }) + .from(workflow) + .where(eq(workflow.id, existingAgent.workflowId)) + .limit(1) + + if (!wf?.isDeployed) { + return NextResponse.json( + { error: 'Workflow must be deployed before publishing agent' }, + { status: 400 } + ) + } + + await db + .update(a2aAgent) + .set({ + isPublished: true, + publishedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(a2aAgent.id, agentId)) + + logger.info(`Published A2A agent: ${agentId}`) + return NextResponse.json({ success: true, isPublished: true }) + } + + if (action === 'unpublish') { + await db + .update(a2aAgent) + .set({ + isPublished: false, + updatedAt: new Date(), + }) + .where(eq(a2aAgent.id, agentId)) + + logger.info(`Unpublished A2A agent: ${agentId}`) + return NextResponse.json({ success: true, isPublished: false }) + } + + if (action === 'refresh') { + // Refresh skills from workflow + const workflowData = await loadWorkflowFromNormalizedTables(existingAgent.workflowId) + if (!workflowData) { + return NextResponse.json({ error: 'Failed to load workflow' }, { status: 500 }) + } + + const [wf] = await db + .select({ name: workflow.name, description: workflow.description }) + .from(workflow) + .where(eq(workflow.id, existingAgent.workflowId)) + .limit(1) + + const skills = generateSkillsFromWorkflow( + existingAgent.workflowId, + wf?.name || existingAgent.name, + wf?.description, + workflowData.blocks + ) + + await db + .update(a2aAgent) + .set({ + skills, + updatedAt: new Date(), + }) + .where(eq(a2aAgent.id, agentId)) + + logger.info(`Refreshed skills for A2A agent: ${agentId}`) + return NextResponse.json({ success: true, skills }) + } + + return NextResponse.json({ error: 'Invalid action' }, { status: 400 }) + } catch (error) { + logger.error('Error with agent action:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} diff --git a/apps/sim/app/api/a2a/agents/route.ts b/apps/sim/app/api/a2a/agents/route.ts new file mode 100644 index 0000000000..c3e3a81ac4 --- /dev/null +++ b/apps/sim/app/api/a2a/agents/route.ts @@ -0,0 +1,197 @@ +/** + * A2A Agents List Endpoint + * + * List and create A2A agents for a workspace. + */ + +import { db } from '@sim/db' +import { a2aAgent, workflow, workspace } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq, sql } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { v4 as uuidv4 } from 'uuid' +import { generateSkillsFromWorkflow } from '@/lib/a2a/agent-card' +import { A2A_DEFAULT_CAPABILITIES } from '@/lib/a2a/constants' +import { sanitizeAgentName } from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils' +import { hasValidStartBlockInState } from '@/lib/workflows/triggers/trigger-utils' + +const logger = createLogger('A2AAgentsAPI') + +export const dynamic = 'force-dynamic' + +/** + * GET - List all A2A agents for a workspace + */ +export async function GET(request: NextRequest) { + try { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const { searchParams } = new URL(request.url) + const workspaceId = searchParams.get('workspaceId') + + if (!workspaceId) { + return NextResponse.json({ error: 'workspaceId is required' }, { status: 400 }) + } + + // Verify workspace access + const [ws] = await db + .select({ id: workspace.id }) + .from(workspace) + .where(eq(workspace.id, workspaceId)) + .limit(1) + + if (!ws) { + return NextResponse.json({ error: 'Workspace not found' }, { status: 404 }) + } + + // Get agents with workflow info + const agents = await db + .select({ + id: a2aAgent.id, + workspaceId: a2aAgent.workspaceId, + workflowId: a2aAgent.workflowId, + name: a2aAgent.name, + description: a2aAgent.description, + version: a2aAgent.version, + capabilities: a2aAgent.capabilities, + skills: a2aAgent.skills, + authentication: a2aAgent.authentication, + isPublished: a2aAgent.isPublished, + publishedAt: a2aAgent.publishedAt, + createdAt: a2aAgent.createdAt, + updatedAt: a2aAgent.updatedAt, + workflowName: workflow.name, + workflowDescription: workflow.description, + isDeployed: workflow.isDeployed, + taskCount: sql`( + SELECT COUNT(*)::int + FROM "a2a_task" + WHERE "a2a_task"."agent_id" = "a2a_agent"."id" + )`.as('task_count'), + }) + .from(a2aAgent) + .leftJoin(workflow, eq(a2aAgent.workflowId, workflow.id)) + .where(eq(a2aAgent.workspaceId, workspaceId)) + .orderBy(a2aAgent.createdAt) + + logger.info(`Listed ${agents.length} A2A agents for workspace ${workspaceId}`) + + return NextResponse.json({ success: true, agents }) + } catch (error) { + logger.error('Error listing agents:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} + +/** + * POST - Create a new A2A agent from a workflow + */ +export async function POST(request: NextRequest) { + try { + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const body = await request.json() + const { workspaceId, workflowId, name, description, capabilities, authentication } = body + + if (!workspaceId || !workflowId) { + return NextResponse.json( + { error: 'workspaceId and workflowId are required' }, + { status: 400 } + ) + } + + // Verify workflow exists and belongs to workspace + const [wf] = await db + .select({ + id: workflow.id, + name: workflow.name, + description: workflow.description, + workspaceId: workflow.workspaceId, + isDeployed: workflow.isDeployed, + }) + .from(workflow) + .where(and(eq(workflow.id, workflowId), eq(workflow.workspaceId, workspaceId))) + .limit(1) + + if (!wf) { + return NextResponse.json( + { error: 'Workflow not found or does not belong to workspace' }, + { status: 404 } + ) + } + + // Check if agent already exists for this workflow + const [existing] = await db + .select({ id: a2aAgent.id }) + .from(a2aAgent) + .where(and(eq(a2aAgent.workspaceId, workspaceId), eq(a2aAgent.workflowId, workflowId))) + .limit(1) + + if (existing) { + return NextResponse.json( + { error: 'An agent already exists for this workflow' }, + { status: 409 } + ) + } + + // Verify workflow has a start block + const workflowData = await loadWorkflowFromNormalizedTables(workflowId) + if (!workflowData || !hasValidStartBlockInState(workflowData)) { + return NextResponse.json( + { error: 'Workflow must have a Start block to be exposed as an A2A agent' }, + { status: 400 } + ) + } + + // Generate skills from workflow + const skills = generateSkillsFromWorkflow( + workflowId, + name || wf.name, + description || wf.description, + workflowData.blocks + ) + + // Create agent + const agentId = uuidv4() + const agentName = name || sanitizeAgentName(wf.name) + + const [agent] = await db + .insert(a2aAgent) + .values({ + id: agentId, + workspaceId, + workflowId, + createdBy: auth.userId, + name: agentName, + description: description || wf.description, + version: '1.0.0', + capabilities: { + ...A2A_DEFAULT_CAPABILITIES, + ...capabilities, + }, + skills, + authentication: authentication || { + schemes: ['bearer', 'apiKey'], + }, + isPublished: false, + createdAt: new Date(), + updatedAt: new Date(), + }) + .returning() + + logger.info(`Created A2A agent ${agentId} for workflow ${workflowId}`) + + return NextResponse.json({ success: true, agent }, { status: 201 }) + } catch (error) { + logger.error('Error creating agent:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} diff --git a/apps/sim/app/api/a2a/serve/[agentId]/route.ts b/apps/sim/app/api/a2a/serve/[agentId]/route.ts new file mode 100644 index 0000000000..50cd727270 --- /dev/null +++ b/apps/sim/app/api/a2a/serve/[agentId]/route.ts @@ -0,0 +1,739 @@ +/** + * A2A Serve Endpoint - Implements A2A protocol for workflow agents + * + * Handles JSON-RPC 2.0 requests for: + * - tasks/send: Create or continue a task + * - tasks/get: Query task status + * - tasks/cancel: Cancel a running task + * - tasks/sendSubscribe: SSE streaming for real-time updates + */ + +import { db } from '@sim/db' +import { a2aAgent, a2aTask, workflow } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { eq } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { A2A_DEFAULT_TIMEOUT, A2A_METHODS } from '@/lib/a2a/constants' +import { + A2AErrorCode, + type Artifact, + type Task, + type TaskCancelParams, + type TaskMessage, + type TaskQueryParams, + type TaskSendParams, + type TaskState, +} from '@/lib/a2a/types' +import { + createAgentMessage, + createTaskStatus, + extractTextContent, + formatTaskResponse, + generateTaskId, + isTerminalState, +} from '@/lib/a2a/utils' +import { checkHybridAuth } from '@/lib/auth/hybrid' +import { SSE_HEADERS } from '@/lib/core/utils/sse' +import { getBaseUrl } from '@/lib/core/utils/urls' + +const logger = createLogger('A2AServeAPI') + +export const dynamic = 'force-dynamic' +export const runtime = 'nodejs' + +interface RouteParams { + agentId: string +} + +interface JSONRPCRequest { + jsonrpc: '2.0' + id: string | number + method: string + params?: unknown +} + +interface JSONRPCResponse { + jsonrpc: '2.0' + id: string | number | null + result?: unknown + error?: { + code: number + message: string + data?: unknown + } +} + +function createResponse(id: string | number | null, result: unknown): JSONRPCResponse { + return { jsonrpc: '2.0', id, result } +} + +function createError( + id: string | number | null, + code: number, + message: string, + data?: unknown +): JSONRPCResponse { + return { jsonrpc: '2.0', id, error: { code, message, data } } +} + +function isJSONRPCRequest(obj: unknown): obj is JSONRPCRequest { + if (!obj || typeof obj !== 'object') return false + const r = obj as Record + return r.jsonrpc === '2.0' && typeof r.method === 'string' && r.id !== undefined +} + +/** + * GET - Returns the Agent Card (discovery document) + * + * This allows clients to discover the agent's capabilities by calling GET on the serve endpoint. + */ +export async function GET(request: NextRequest, { params }: { params: Promise }) { + const { agentId } = await params + + try { + const [agent] = await db + .select({ + id: a2aAgent.id, + name: a2aAgent.name, + description: a2aAgent.description, + version: a2aAgent.version, + capabilities: a2aAgent.capabilities, + skills: a2aAgent.skills, + authentication: a2aAgent.authentication, + isPublished: a2aAgent.isPublished, + }) + .from(a2aAgent) + .where(eq(a2aAgent.id, agentId)) + .limit(1) + + if (!agent) { + return NextResponse.json({ error: 'Agent not found' }, { status: 404 }) + } + + if (!agent.isPublished) { + return NextResponse.json({ error: 'Agent not published' }, { status: 404 }) + } + + const baseUrl = getBaseUrl() + + // Return full Agent Card for discovery + return NextResponse.json( + { + name: agent.name, + description: agent.description, + url: `${baseUrl}/api/a2a/serve/${agent.id}`, + version: agent.version, + documentationUrl: `${baseUrl}/docs/a2a`, + provider: { + organization: 'Sim Studio', + url: baseUrl, + }, + capabilities: agent.capabilities, + skills: agent.skills, + authentication: agent.authentication, + defaultInputModes: ['text', 'data'], + defaultOutputModes: ['text', 'data'], + }, + { + headers: { + 'Content-Type': 'application/json', + 'Cache-Control': 'public, max-age=3600', + }, + } + ) + } catch (error) { + logger.error('Error getting Agent Card:', error) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} + +/** + * POST - Handle JSON-RPC requests + */ +export async function POST(request: NextRequest, { params }: { params: Promise }) { + const { agentId } = await params + + try { + // Verify agent exists and is published + const [agent] = await db + .select({ + id: a2aAgent.id, + name: a2aAgent.name, + workflowId: a2aAgent.workflowId, + workspaceId: a2aAgent.workspaceId, + isPublished: a2aAgent.isPublished, + capabilities: a2aAgent.capabilities, + }) + .from(a2aAgent) + .where(eq(a2aAgent.id, agentId)) + .limit(1) + + if (!agent) { + return NextResponse.json( + createError(null, A2AErrorCode.AGENT_UNAVAILABLE, 'Agent not found'), + { status: 404 } + ) + } + + if (!agent.isPublished) { + return NextResponse.json( + createError(null, A2AErrorCode.AGENT_UNAVAILABLE, 'Agent not published'), + { status: 404 } + ) + } + + // Auth check + const auth = await checkHybridAuth(request, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json( + createError(null, A2AErrorCode.AUTHENTICATION_REQUIRED, 'Unauthorized'), + { status: 401 } + ) + } + + // Verify workflow is deployed + const [wf] = await db + .select({ isDeployed: workflow.isDeployed }) + .from(workflow) + .where(eq(workflow.id, agent.workflowId)) + .limit(1) + + if (!wf?.isDeployed) { + return NextResponse.json( + createError(null, A2AErrorCode.AGENT_UNAVAILABLE, 'Workflow is not deployed'), + { status: 400 } + ) + } + + // Parse JSON-RPC request + const body = await request.json() + + if (!isJSONRPCRequest(body)) { + return NextResponse.json( + createError(null, A2AErrorCode.INVALID_REQUEST, 'Invalid JSON-RPC request'), + { status: 400 } + ) + } + + const { id, method, params: rpcParams } = body + const apiKey = + request.headers.get('X-API-Key') || + request.headers.get('Authorization')?.replace('Bearer ', '') + + logger.info(`A2A request: ${method} for agent ${agentId}`) + + switch (method) { + case A2A_METHODS.TASKS_SEND: + return handleTaskSend(id, agent, rpcParams as TaskSendParams, apiKey) + + case A2A_METHODS.TASKS_GET: + return handleTaskGet(id, rpcParams as TaskQueryParams) + + case A2A_METHODS.TASKS_CANCEL: + return handleTaskCancel(id, rpcParams as TaskCancelParams) + + case A2A_METHODS.TASKS_SEND_SUBSCRIBE: + return handleTaskSendSubscribe(request, id, agent, rpcParams as TaskSendParams, apiKey) + + default: + return NextResponse.json( + createError(id, A2AErrorCode.METHOD_NOT_FOUND, `Method not found: ${method}`), + { status: 404 } + ) + } + } catch (error) { + logger.error('Error handling A2A request:', error) + return NextResponse.json(createError(null, A2AErrorCode.INTERNAL_ERROR, 'Internal error'), { + status: 500, + }) + } +} + +/** + * Handle tasks/send - Create or continue a task + */ +async function handleTaskSend( + id: string | number, + agent: { + id: string + name: string + workflowId: string + workspaceId: string + }, + params: TaskSendParams, + apiKey?: string | null +): Promise { + if (!params?.message) { + return NextResponse.json(createError(id, A2AErrorCode.INVALID_PARAMS, 'Message is required'), { + status: 400, + }) + } + + const taskId = params.id || generateTaskId() + const contextId = params.contextId + + // Check if task exists (continuation) + let existingTask: typeof a2aTask.$inferSelect | null = null + if (params.id) { + const [found] = await db.select().from(a2aTask).where(eq(a2aTask.id, params.id)).limit(1) + existingTask = found || null + + if (!existingTask) { + return NextResponse.json(createError(id, A2AErrorCode.TASK_NOT_FOUND, 'Task not found'), { + status: 404, + }) + } + + if (isTerminalState(existingTask.status as TaskState)) { + return NextResponse.json( + createError(id, A2AErrorCode.TASK_ALREADY_COMPLETE, 'Task already in terminal state'), + { status: 400 } + ) + } + } + + // Get existing history or start fresh + const history: TaskMessage[] = existingTask?.messages + ? (existingTask.messages as TaskMessage[]) + : [] + + // Add the new user message + history.push(params.message) + + // Create or update task + if (existingTask) { + await db + .update(a2aTask) + .set({ + status: 'working', + messages: history, + updatedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + } else { + await db.insert(a2aTask).values({ + id: taskId, + agentId: agent.id, + sessionId: contextId || null, + status: 'working', + messages: history, + metadata: params.metadata || {}, + createdAt: new Date(), + updatedAt: new Date(), + }) + } + + // Execute the workflow + const executeUrl = `${getBaseUrl()}/api/workflows/${agent.workflowId}/execute` + const headers: Record = { 'Content-Type': 'application/json' } + if (apiKey) headers['X-API-Key'] = apiKey + + logger.info(`Executing workflow ${agent.workflowId} for A2A task ${taskId}`) + + try { + // Extract text content from the TaskMessage for easier workflow consumption + const messageText = extractTextContent(params.message) + + const response = await fetch(executeUrl, { + method: 'POST', + headers, + body: JSON.stringify({ + input: messageText, + triggerType: 'api', + }), + signal: AbortSignal.timeout(A2A_DEFAULT_TIMEOUT), + }) + + const executeResult = await response.json() + + // Determine final state + const finalState: TaskState = response.ok ? 'completed' : 'failed' + + // Create agent response message + const agentContent = + executeResult.output?.content || + (typeof executeResult.output === 'object' + ? JSON.stringify(executeResult.output) + : String(executeResult.output || executeResult.error || 'Task completed')) + + const agentMessage = createAgentMessage(agentContent) + history.push(agentMessage) + + // Extract artifacts if present + const artifacts = executeResult.output?.artifacts || [] + + // Update task with result + await db + .update(a2aTask) + .set({ + status: finalState, + messages: history, + artifacts, + executionId: executeResult.metadata?.executionId, + completedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + + const task: Task = { + id: taskId, + contextId: contextId || undefined, + status: createTaskStatus(finalState), + history, + artifacts, + metadata: params.metadata, + kind: 'task', + } + + return NextResponse.json(createResponse(id, task)) + } catch (error) { + logger.error(`Error executing workflow for task ${taskId}:`, error) + + // Mark task as failed + const errorMessage = error instanceof Error ? error.message : 'Workflow execution failed' + + await db + .update(a2aTask) + .set({ + status: 'failed', + updatedAt: new Date(), + completedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + + return NextResponse.json(createError(id, A2AErrorCode.INTERNAL_ERROR, errorMessage), { + status: 500, + }) + } +} + +/** + * Handle tasks/get - Query task status + */ +async function handleTaskGet(id: string | number, params: TaskQueryParams): Promise { + if (!params?.id) { + return NextResponse.json(createError(id, A2AErrorCode.INVALID_PARAMS, 'Task ID is required'), { + status: 400, + }) + } + + // Validate historyLength if provided + const historyLength = + params.historyLength !== undefined && params.historyLength >= 0 + ? params.historyLength + : undefined + + const [task] = await db.select().from(a2aTask).where(eq(a2aTask.id, params.id)).limit(1) + + if (!task) { + return NextResponse.json(createError(id, A2AErrorCode.TASK_NOT_FOUND, 'Task not found'), { + status: 404, + }) + } + + const result = formatTaskResponse( + { + id: task.id, + contextId: task.sessionId || undefined, + status: createTaskStatus(task.status as TaskState), + history: task.messages as TaskMessage[], + artifacts: (task.artifacts as Artifact[]) || [], + metadata: (task.metadata as Record) || {}, + kind: 'task', + }, + historyLength + ) + + return NextResponse.json(createResponse(id, result)) +} + +/** + * Handle tasks/cancel - Cancel a running task + */ +async function handleTaskCancel( + id: string | number, + params: TaskCancelParams +): Promise { + if (!params?.id) { + return NextResponse.json(createError(id, A2AErrorCode.INVALID_PARAMS, 'Task ID is required'), { + status: 400, + }) + } + + const [task] = await db.select().from(a2aTask).where(eq(a2aTask.id, params.id)).limit(1) + + if (!task) { + return NextResponse.json(createError(id, A2AErrorCode.TASK_NOT_FOUND, 'Task not found'), { + status: 404, + }) + } + + if (isTerminalState(task.status as TaskState)) { + return NextResponse.json( + createError(id, A2AErrorCode.TASK_ALREADY_COMPLETE, 'Task already in terminal state'), + { status: 400 } + ) + } + + await db + .update(a2aTask) + .set({ + status: 'canceled', + updatedAt: new Date(), + completedAt: new Date(), + }) + .where(eq(a2aTask.id, params.id)) + + const result: Task = { + id: task.id, + contextId: task.sessionId || undefined, + status: createTaskStatus('canceled'), + history: task.messages as TaskMessage[], + artifacts: (task.artifacts as Artifact[]) || [], + kind: 'task', + } + + return NextResponse.json(createResponse(id, result)) +} + +/** + * Handle tasks/sendSubscribe - SSE streaming + */ +async function handleTaskSendSubscribe( + request: NextRequest, + id: string | number, + agent: { + id: string + name: string + workflowId: string + workspaceId: string + }, + params: TaskSendParams, + apiKey?: string | null +): Promise { + if (!params?.message) { + return NextResponse.json(createError(id, A2AErrorCode.INVALID_PARAMS, 'Message is required'), { + status: 400, + }) + } + + const contextId = params.contextId + + // Get existing task or prepare for new one + let history: TaskMessage[] = [] + let existingTask: typeof a2aTask.$inferSelect | null = null + + if (params.id) { + const [found] = await db.select().from(a2aTask).where(eq(a2aTask.id, params.id)).limit(1) + existingTask = found || null + + if (!existingTask) { + return NextResponse.json(createError(id, A2AErrorCode.TASK_NOT_FOUND, 'Task not found'), { + status: 404, + }) + } + + if (isTerminalState(existingTask.status as TaskState)) { + return NextResponse.json( + createError(id, A2AErrorCode.TASK_ALREADY_COMPLETE, 'Task already in terminal state'), + { status: 400 } + ) + } + + history = existingTask.messages as TaskMessage[] + } + + const taskId = params.id || generateTaskId() + history.push(params.message) + + // Create or update task record + if (existingTask) { + await db + .update(a2aTask) + .set({ + status: 'working', + messages: history, + updatedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + } else { + await db.insert(a2aTask).values({ + id: taskId, + agentId: agent.id, + sessionId: contextId || null, + status: 'working', + messages: history, + metadata: params.metadata || {}, + createdAt: new Date(), + updatedAt: new Date(), + }) + } + + // Create SSE stream + const encoder = new TextEncoder() + + const stream = new ReadableStream({ + async start(controller) { + const sendEvent = (event: string, data: unknown) => { + try { + controller.enqueue(encoder.encode(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`)) + } catch (error) { + logger.error('Error sending SSE event:', error) + } + } + + // Send initial status + sendEvent('task:status', { + id: taskId, + status: { state: 'working', timestamp: new Date().toISOString() }, + }) + + try { + // Execute workflow with streaming + const executeUrl = `${getBaseUrl()}/api/workflows/${agent.workflowId}/execute` + const headers: Record = { + 'Content-Type': 'application/json', + 'X-Stream-Response': 'true', + } + if (apiKey) headers['X-API-Key'] = apiKey + + // Extract text content from the TaskMessage for easier workflow consumption + const messageText = extractTextContent(params.message) + + const response = await fetch(executeUrl, { + method: 'POST', + headers, + body: JSON.stringify({ + input: messageText, + triggerType: 'api', + stream: true, + }), + signal: AbortSignal.timeout(A2A_DEFAULT_TIMEOUT), + }) + + if (!response.ok) { + let errorMessage = 'Workflow execution failed' + try { + const errorResult = await response.json() + errorMessage = errorResult.error || errorMessage + } catch { + // Response may not be JSON + } + throw new Error(errorMessage) + } + + // Check content type to determine response handling + const contentType = response.headers.get('content-type') || '' + const isStreamingResponse = + contentType.includes('text/event-stream') || contentType.includes('text/plain') + + if (response.body && isStreamingResponse) { + // Handle streaming response - forward chunks + const reader = response.body.getReader() + const decoder = new TextDecoder() + let fullContent = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + + const chunk = decoder.decode(value, { stream: true }) + fullContent += chunk + + // Forward chunk as message event + sendEvent('task:message', { + id: taskId, + chunk, + }) + } + + // Create final agent message + const agentMessage = createAgentMessage(fullContent || 'Task completed') + history.push(agentMessage) + + // Update task + await db + .update(a2aTask) + .set({ + status: 'completed', + messages: history, + completedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + + sendEvent('task:status', { + id: taskId, + status: { state: 'completed', timestamp: new Date().toISOString() }, + final: true, + }) + } else { + // Handle JSON response (non-streaming workflow) + const result = await response.json() + + const content = + result.output?.content || + (typeof result.output === 'object' + ? JSON.stringify(result.output) + : String(result.output || 'Task completed')) + + // Send the complete content as a single message + sendEvent('task:message', { + id: taskId, + chunk: content, + }) + + const agentMessage = createAgentMessage(content) + history.push(agentMessage) + + const artifacts = (result.output?.artifacts as Artifact[]) || [] + + // Update task with result + await db + .update(a2aTask) + .set({ + status: 'completed', + messages: history, + artifacts, + executionId: result.metadata?.executionId, + completedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + + sendEvent('task:status', { + id: taskId, + status: { state: 'completed', timestamp: new Date().toISOString() }, + final: true, + }) + } + } catch (error) { + logger.error(`Streaming error for task ${taskId}:`, error) + + await db + .update(a2aTask) + .set({ + status: 'failed', + completedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(a2aTask.id, taskId)) + + sendEvent('error', { + code: A2AErrorCode.INTERNAL_ERROR, + message: error instanceof Error ? error.message : 'Streaming failed', + }) + } finally { + sendEvent('task:done', { id: taskId }) + controller.close() + } + }, + }) + + return new NextResponse(stream, { + headers: { + ...SSE_HEADERS, + 'X-Task-Id': taskId, + }, + }) +} diff --git a/apps/sim/blocks/blocks/a2a.ts b/apps/sim/blocks/blocks/a2a.ts new file mode 100644 index 0000000000..6f8109e87b --- /dev/null +++ b/apps/sim/blocks/blocks/a2a.ts @@ -0,0 +1,237 @@ +/** + * A2A Block (v0.2.6) + * + * Enables interaction with external A2A-compatible agents. + * Supports sending messages, querying tasks, cancelling tasks, and discovering agents. + */ + +import { A2AIcon } from '@/components/icons' +import type { BlockConfig } from '@/blocks/types' +import type { ToolResponse } from '@/tools/types' + +export interface A2AResponse extends ToolResponse { + output: { + /** Response content from the agent (send_task) */ + content?: string + /** Task ID */ + taskId?: string + /** Context ID for conversation continuity */ + contextId?: string + /** Task state */ + state?: string + /** Structured output artifacts */ + artifacts?: Array<{ + name?: string + description?: string + parts: Array<{ type: string; text?: string; data?: unknown }> + }> + /** Full message history */ + history?: Array<{ + role: 'user' | 'agent' + parts: Array<{ type: string; text?: string }> + }> + /** Whether cancellation was successful (cancel_task) */ + cancelled?: boolean + /** Agent name (get_agent_card) */ + name?: string + /** Agent description (get_agent_card) */ + description?: string + /** Agent URL (get_agent_card) */ + url?: string + /** Agent version (get_agent_card) */ + version?: string + /** Agent capabilities (get_agent_card) */ + capabilities?: Record + /** Agent skills (get_agent_card) */ + skills?: Array<{ id: string; name: string; description?: string }> + /** Agent authentication schemes (get_agent_card) */ + authentication?: { schemes: string[] } + } +} + +export const A2ABlock: BlockConfig = { + type: 'a2a', + name: 'A2A', + description: 'Interact with external A2A-compatible agents', + longDescription: + 'Use the A2A (Agent-to-Agent) protocol to interact with external AI agents. ' + + 'Send messages, query task status, cancel tasks, or discover agent capabilities. ' + + 'Compatible with any A2A-compliant agent including LangGraph, Google ADK, and other Sim Studio workflows.', + docsLink: 'https://docs.sim.ai/blocks/a2a', + category: 'tools', + bgColor: '#4151B5', + icon: A2AIcon, + subBlocks: [ + { + id: 'operation', + title: 'Operation', + type: 'dropdown', + options: [ + { label: 'Send Message', id: 'send_task' }, + { label: 'Get Task', id: 'get_task' }, + { label: 'Cancel Task', id: 'cancel_task' }, + { label: 'Get Agent Card', id: 'get_agent_card' }, + ], + defaultValue: 'send_task', + }, + { + id: 'agentUrl', + title: 'Agent URL', + type: 'short-input', + placeholder: 'https://api.example.com/a2a/serve/agent-id', + required: true, + description: 'The A2A endpoint URL', + }, + { + id: 'message', + title: 'Message', + type: 'long-input', + placeholder: 'Enter your message to the agent...', + description: 'The message to send to the agent', + condition: { field: 'operation', value: 'send_task' }, + required: { field: 'operation', value: 'send_task' }, + }, + { + id: 'taskId', + title: 'Task ID', + type: 'short-input', + placeholder: 'Task ID', + description: 'Task ID to query, cancel, or continue', + condition: { field: 'operation', value: ['send_task', 'get_task', 'cancel_task'] }, + required: { field: 'operation', value: ['get_task', 'cancel_task'] }, + }, + { + id: 'contextId', + title: 'Context ID', + type: 'short-input', + placeholder: 'Optional - for multi-turn conversations', + description: 'Context ID for conversation continuity across tasks', + condition: { field: 'operation', value: 'send_task' }, + }, + { + id: 'historyLength', + title: 'History Length', + type: 'short-input', + placeholder: 'Number of messages to include', + description: 'Number of history messages to include in the response', + condition: { field: 'operation', value: 'get_task' }, + }, + { + id: 'apiKey', + title: 'API Key', + type: 'short-input', + password: true, + placeholder: 'API key for the remote agent', + description: 'Authentication key for the A2A agent', + }, + ], + tools: { + access: ['a2a_send_task', 'a2a_get_task', 'a2a_cancel_task', 'a2a_get_agent_card'], + config: { + tool: (params: Record) => { + const operation = params.operation as string + switch (operation) { + case 'get_task': + return 'a2a_get_task' + case 'cancel_task': + return 'a2a_cancel_task' + case 'get_agent_card': + return 'a2a_get_agent_card' + default: + return 'a2a_send_task' + } + }, + }, + }, + inputs: { + operation: { + type: 'string', + description: 'A2A operation to perform', + }, + agentUrl: { + type: 'string', + description: 'A2A endpoint URL', + }, + message: { + type: 'string', + description: 'Message to send to the agent', + }, + taskId: { + type: 'string', + description: 'Task ID to query, cancel, or continue', + }, + contextId: { + type: 'string', + description: 'Context ID for conversation continuity', + }, + historyLength: { + type: 'number', + description: 'Number of history messages to include', + }, + apiKey: { + type: 'string', + description: 'API key for authentication', + }, + }, + outputs: { + // Send task outputs + content: { + type: 'string', + description: 'The text response from the agent', + }, + taskId: { + type: 'string', + description: 'Task ID for follow-up interactions', + }, + contextId: { + type: 'string', + description: 'Context ID for conversation continuity', + }, + state: { + type: 'string', + description: 'Task state (completed, failed, etc.)', + }, + artifacts: { + type: 'array', + description: 'Structured output artifacts from the agent', + }, + history: { + type: 'array', + description: 'Full message history of the conversation', + }, + // Cancel task output + cancelled: { + type: 'boolean', + description: 'Whether the task was successfully cancelled', + }, + // Get agent card outputs + name: { + type: 'string', + description: 'Agent name', + }, + description: { + type: 'string', + description: 'Agent description', + }, + url: { + type: 'string', + description: 'Agent endpoint URL', + }, + version: { + type: 'string', + description: 'Agent version', + }, + capabilities: { + type: 'json', + description: 'Agent capabilities (streaming, pushNotifications, etc.)', + }, + skills: { + type: 'array', + description: 'Skills the agent can perform', + }, + authentication: { + type: 'json', + description: 'Supported authentication schemes', + }, + }, +} diff --git a/apps/sim/blocks/registry.ts b/apps/sim/blocks/registry.ts index 8a4d75121f..d41f1b1bd7 100644 --- a/apps/sim/blocks/registry.ts +++ b/apps/sim/blocks/registry.ts @@ -1,3 +1,4 @@ +import { A2ABlock } from '@/blocks/blocks/a2a' import { AgentBlock } from '@/blocks/blocks/agent' import { AhrefsBlock } from '@/blocks/blocks/ahrefs' import { AirtableBlock } from '@/blocks/blocks/airtable' @@ -147,6 +148,7 @@ import { SQSBlock } from './blocks/sqs' // Registry of all available blocks, alphabetically sorted export const registry: Record = { + a2a: A2ABlock, agent: AgentBlock, ahrefs: AhrefsBlock, airtable: AirtableBlock, diff --git a/apps/sim/components/icons.tsx b/apps/sim/components/icons.tsx index de0ab92021..6295969a0b 100644 --- a/apps/sim/components/icons.tsx +++ b/apps/sim/components/icons.tsx @@ -4061,6 +4061,31 @@ export function McpIcon(props: SVGProps) { ) } +export function A2AIcon(props: SVGProps) { + return ( + + + + + + + + + + ) +} + export function WordpressIcon(props: SVGProps) { return ( diff --git a/apps/sim/hooks/queries/a2a-agents.ts b/apps/sim/hooks/queries/a2a-agents.ts new file mode 100644 index 0000000000..b84695b222 --- /dev/null +++ b/apps/sim/hooks/queries/a2a-agents.ts @@ -0,0 +1,278 @@ +/** + * A2A Agents React Query Hooks + * + * Hooks for managing A2A agents in the UI. + */ + +import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query' +import type { AgentAuthentication, AgentCapabilities, AgentSkill } from '@/lib/a2a/types' + +/** + * A2A Agent as returned from the API + */ +export interface A2AAgent { + id: string + workspaceId: string + workflowId: string + name: string + description?: string + version: string + capabilities: AgentCapabilities + skills: AgentSkill[] + authentication: AgentAuthentication + isPublished: boolean + publishedAt?: string + createdAt: string + updatedAt: string + workflowName?: string + workflowDescription?: string + isDeployed?: boolean + taskCount?: number +} + +/** + * Query keys for A2A agents + */ +export const a2aAgentKeys = { + all: ['a2a-agents'] as const, + list: (workspaceId: string) => [...a2aAgentKeys.all, 'list', workspaceId] as const, + detail: (agentId: string) => [...a2aAgentKeys.all, 'detail', agentId] as const, +} + +/** + * Fetch A2A agents for a workspace + */ +async function fetchA2AAgents(workspaceId: string): Promise { + const response = await fetch(`/api/a2a/agents?workspaceId=${workspaceId}`) + if (!response.ok) { + throw new Error('Failed to fetch A2A agents') + } + const data = await response.json() + return data.agents +} + +/** + * Hook to list A2A agents for a workspace + */ +export function useA2AAgents(workspaceId: string) { + return useQuery({ + queryKey: a2aAgentKeys.list(workspaceId), + queryFn: () => fetchA2AAgents(workspaceId), + enabled: Boolean(workspaceId), + staleTime: 60 * 1000, // 1 minute + }) +} + +/** + * Agent Card as returned from the agent detail endpoint + */ +export interface A2AAgentCard { + name: string + description?: string + url: string + version: string + documentationUrl?: string + provider?: { + organization: string + url?: string + } + capabilities: AgentCapabilities + skills: AgentSkill[] + authentication?: AgentAuthentication + defaultInputModes?: string[] + defaultOutputModes?: string[] +} + +/** + * Fetch a single A2A agent card (discovery document) + */ +async function fetchA2AAgentCard(agentId: string): Promise { + const response = await fetch(`/api/a2a/agents/${agentId}`) + if (!response.ok) { + throw new Error('Failed to fetch A2A agent') + } + return response.json() +} + +/** + * Hook to get a single A2A agent card (discovery document) + */ +export function useA2AAgentCard(agentId: string) { + return useQuery({ + queryKey: a2aAgentKeys.detail(agentId), + queryFn: () => fetchA2AAgentCard(agentId), + enabled: Boolean(agentId), + }) +} + +/** + * Create A2A agent params + */ +export interface CreateA2AAgentParams { + workspaceId: string + workflowId: string + name?: string + description?: string + capabilities?: AgentCapabilities + authentication?: AgentAuthentication +} + +/** + * Create a new A2A agent + */ +async function createA2AAgent(params: CreateA2AAgentParams): Promise { + const response = await fetch('/api/a2a/agents', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(params), + }) + if (!response.ok) { + const error = await response.json() + throw new Error(error.error || 'Failed to create A2A agent') + } + const data = await response.json() + return data.agent +} + +/** + * Hook to create an A2A agent + */ +export function useCreateA2AAgent() { + const queryClient = useQueryClient() + + return useMutation({ + mutationFn: createA2AAgent, + onSuccess: (data) => { + queryClient.invalidateQueries({ + queryKey: a2aAgentKeys.list(data.workspaceId), + }) + }, + }) +} + +/** + * Update A2A agent params + */ +export interface UpdateA2AAgentParams { + agentId: string + name?: string + description?: string + version?: string + capabilities?: AgentCapabilities + skills?: AgentSkill[] + authentication?: AgentAuthentication + isPublished?: boolean +} + +/** + * Update an A2A agent + */ +async function updateA2AAgent(params: UpdateA2AAgentParams): Promise { + const { agentId, ...body } = params + const response = await fetch(`/api/a2a/agents/${agentId}`, { + method: 'PUT', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + }) + if (!response.ok) { + const error = await response.json() + throw new Error(error.error || 'Failed to update A2A agent') + } + const data = await response.json() + return data.agent +} + +/** + * Hook to update an A2A agent + */ +export function useUpdateA2AAgent() { + const queryClient = useQueryClient() + + return useMutation({ + mutationFn: updateA2AAgent, + onSuccess: (data) => { + queryClient.invalidateQueries({ + queryKey: a2aAgentKeys.detail(data.id), + }) + queryClient.invalidateQueries({ + queryKey: a2aAgentKeys.list(data.workspaceId), + }) + }, + }) +} + +/** + * Delete an A2A agent + */ +async function deleteA2AAgent(params: { agentId: string; workspaceId: string }): Promise { + const response = await fetch(`/api/a2a/agents/${params.agentId}`, { + method: 'DELETE', + }) + if (!response.ok) { + const error = await response.json() + throw new Error(error.error || 'Failed to delete A2A agent') + } +} + +/** + * Hook to delete an A2A agent + */ +export function useDeleteA2AAgent() { + const queryClient = useQueryClient() + + return useMutation({ + mutationFn: deleteA2AAgent, + onSuccess: (_, variables) => { + queryClient.invalidateQueries({ + queryKey: a2aAgentKeys.list(variables.workspaceId), + }) + }, + }) +} + +/** + * Publish/unpublish agent params + */ +export interface PublishA2AAgentParams { + agentId: string + workspaceId: string + action: 'publish' | 'unpublish' | 'refresh' +} + +/** + * Publish or unpublish an A2A agent + */ +async function publishA2AAgent(params: PublishA2AAgentParams): Promise<{ + isPublished?: boolean + skills?: AgentSkill[] +}> { + const response = await fetch(`/api/a2a/agents/${params.agentId}`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ action: params.action }), + }) + if (!response.ok) { + const error = await response.json() + throw new Error(error.error || 'Failed to update A2A agent') + } + return response.json() +} + +/** + * Hook to publish/unpublish an A2A agent + */ +export function usePublishA2AAgent() { + const queryClient = useQueryClient() + + return useMutation({ + mutationFn: publishA2AAgent, + onSuccess: (_, variables) => { + queryClient.invalidateQueries({ + queryKey: a2aAgentKeys.detail(variables.agentId), + }) + queryClient.invalidateQueries({ + queryKey: a2aAgentKeys.list(variables.workspaceId), + }) + }, + }) +} diff --git a/apps/sim/lib/a2a/agent-card.ts b/apps/sim/lib/a2a/agent-card.ts new file mode 100644 index 0000000000..4dccc86fe7 --- /dev/null +++ b/apps/sim/lib/a2a/agent-card.ts @@ -0,0 +1,185 @@ +/** + * A2A Agent Card Generation + * + * Generates Agent Cards from workflow metadata and configuration. + */ + +import { getBaseUrl } from '@/lib/core/utils/urls' +import { + extractInputFormatFromBlocks, + generateToolInputSchema, +} from '@/lib/mcp/workflow-tool-schema' +import type { InputFormatField } from '@/lib/workflows/types' +import { + A2A_DEFAULT_CAPABILITIES, + A2A_DEFAULT_INPUT_MODES, + A2A_DEFAULT_OUTPUT_MODES, +} from './constants' +import type { + AgentAuthentication, + AgentCapabilities, + AgentCard, + AgentSkill, + JSONSchema, +} from './types' +import { buildA2AEndpointUrl, sanitizeAgentName } from './utils' + +interface WorkflowData { + id: string + name: string + description?: string | null +} + +interface AgentData { + id: string + name: string + description?: string | null + version: string + capabilities?: AgentCapabilities + skills?: AgentSkill[] + authentication?: AgentAuthentication +} + +/** + * Generate an Agent Card from agent and workflow data + */ +export function generateAgentCard(agent: AgentData, workflow: WorkflowData): AgentCard { + const baseUrl = getBaseUrl() + + return { + name: agent.name, + description: agent.description || workflow.description || undefined, + url: buildA2AEndpointUrl(baseUrl, agent.id), + version: agent.version || '1.0.0', + documentationUrl: `${baseUrl}/docs/a2a`, + provider: { + organization: 'Sim Studio', + url: baseUrl, + }, + capabilities: { + ...A2A_DEFAULT_CAPABILITIES, + ...agent.capabilities, + }, + skills: agent.skills || [ + { + id: 'execute', + name: `Execute ${workflow.name}`, + description: workflow.description || `Execute the ${workflow.name} workflow`, + }, + ], + authentication: agent.authentication || { + schemes: ['bearer', 'apiKey'], + }, + defaultInputModes: [...A2A_DEFAULT_INPUT_MODES], + defaultOutputModes: [...A2A_DEFAULT_OUTPUT_MODES], + } +} + +/** + * Generate skills from workflow input format + */ +export function generateSkillsFromWorkflow( + workflowId: string, + workflowName: string, + workflowDescription: string | undefined | null, + blocks: Record +): AgentSkill[] { + const inputFormat = extractInputFormatFromBlocks(blocks) + + const skill: AgentSkill = { + id: 'execute', + name: `Execute ${workflowName}`, + description: workflowDescription || `Execute the ${workflowName} workflow`, + tags: ['workflow', 'automation'], + } + + if (inputFormat && inputFormat.length > 0) { + skill.inputSchema = convertInputFormatToJSONSchema(inputFormat) + } + + // Add default output schema + skill.outputSchema = { + type: 'object', + properties: { + content: { + type: 'string', + description: 'The main text output from the workflow', + }, + data: { + type: 'object', + description: 'Structured data output from the workflow', + }, + }, + } + + return [skill] +} + +/** + * Convert InputFormatField array to JSON Schema + */ +export function convertInputFormatToJSONSchema(inputFormat: InputFormatField[]): JSONSchema { + const mcpSchema = generateToolInputSchema(inputFormat) + + return { + type: 'object', + properties: mcpSchema.properties as Record, + required: mcpSchema.required, + } +} + +/** + * Generate a default agent name from workflow name + */ +export function generateDefaultAgentName(workflowName: string): string { + return sanitizeAgentName(workflowName) +} + +/** + * Validate agent card structure + */ +export function validateAgentCard(card: unknown): card is AgentCard { + if (!card || typeof card !== 'object') return false + + const c = card as Record + + // Required fields + if (typeof c.name !== 'string' || !c.name) return false + if (typeof c.url !== 'string' || !c.url) return false + if (typeof c.version !== 'string' || !c.version) return false + + // Capabilities must be an object + if (c.capabilities && typeof c.capabilities !== 'object') return false + + // Skills must be an array + if (!Array.isArray(c.skills)) return false + + return true +} + +/** + * Merge agent card with updates (partial update support) + */ +export function mergeAgentCard(existing: AgentCard, updates: Partial): AgentCard { + return { + ...existing, + ...updates, + capabilities: { + ...existing.capabilities, + ...updates.capabilities, + }, + skills: updates.skills || existing.skills, + authentication: updates.authentication || existing.authentication, + } +} + +/** + * Create agent card URL paths + */ +export function getAgentCardPaths(agentId: string) { + const baseUrl = getBaseUrl() + return { + card: `${baseUrl}/api/a2a/agents/${agentId}`, + serve: `${baseUrl}/api/a2a/serve/${agentId}`, + } +} diff --git a/apps/sim/lib/a2a/constants.ts b/apps/sim/lib/a2a/constants.ts new file mode 100644 index 0000000000..6475b15826 --- /dev/null +++ b/apps/sim/lib/a2a/constants.ts @@ -0,0 +1,69 @@ +/** + * A2A Protocol Constants (v0.2.6) + */ + +/** A2A Protocol version */ +export const A2A_PROTOCOL_VERSION = '0.2.6' + +/** Default timeout for A2A requests (5 minutes) */ +export const A2A_DEFAULT_TIMEOUT = 300000 + +/** Maximum message history length */ +export const A2A_MAX_HISTORY_LENGTH = 100 + +/** Supported authentication schemes */ +export const A2A_AUTH_SCHEMES = ['bearer', 'apiKey', 'oauth2', 'none'] as const + +/** Task state values (v0.2.6) */ +export const A2A_TASK_STATE = { + SUBMITTED: 'submitted', + WORKING: 'working', + INPUT_REQUIRED: 'input-required', + COMPLETED: 'completed', + FAILED: 'failed', + CANCELED: 'canceled', + REJECTED: 'rejected', + AUTH_REQUIRED: 'auth-required', + UNKNOWN: 'unknown', +} as const + +/** Valid task state transitions */ +export const A2A_VALID_TRANSITIONS: Record = { + submitted: ['working', 'failed', 'canceled', 'rejected'], + working: ['completed', 'failed', 'canceled', 'input-required'], + 'input-required': ['working', 'failed', 'canceled'], + 'auth-required': ['working', 'failed', 'canceled'], + completed: [], + failed: [], + canceled: [], + rejected: [], + unknown: [], +} + +/** JSON-RPC methods supported by A2A */ +export const A2A_METHODS = { + TASKS_SEND: 'tasks/send', + TASKS_GET: 'tasks/get', + TASKS_CANCEL: 'tasks/cancel', + TASKS_SEND_SUBSCRIBE: 'tasks/sendSubscribe', +} as const + +/** Well-known path for agent card discovery */ +export const A2A_WELL_KNOWN_PATH = '/.well-known/agent.json' + +/** Default capabilities for new agents */ +export const A2A_DEFAULT_CAPABILITIES = { + streaming: true, + pushNotifications: false, + stateTransitionHistory: true, +} as const + +/** Default input/output modes */ +export const A2A_DEFAULT_INPUT_MODES = ['text', 'data'] as const +export const A2A_DEFAULT_OUTPUT_MODES = ['text', 'data'] as const + +/** Cache settings */ +export const A2A_CACHE = { + AGENT_CARD_TTL: 3600, // 1 hour + TASK_TTL: 86400, // 24 hours +} as const diff --git a/apps/sim/lib/a2a/index.ts b/apps/sim/lib/a2a/index.ts new file mode 100644 index 0000000000..fa179cb6d5 --- /dev/null +++ b/apps/sim/lib/a2a/index.ts @@ -0,0 +1,12 @@ +/** + * A2A (Agent-to-Agent) Protocol Implementation + * + * This module provides A2A protocol support for Sim Studio, + * enabling workflows to be exposed as A2A agents and allowing + * workflows to call external A2A agents. + */ + +export * from './agent-card' +export * from './constants' +export * from './types' +export * from './utils' diff --git a/apps/sim/lib/a2a/types.ts b/apps/sim/lib/a2a/types.ts new file mode 100644 index 0000000000..f3633ff1d0 --- /dev/null +++ b/apps/sim/lib/a2a/types.ts @@ -0,0 +1,368 @@ +/** + * A2A (Agent-to-Agent) Protocol Types + * + * Implements the A2A protocol specification for agent interoperability. + * @see https://a2a-protocol.org/specification + */ + +/** + * JSON Schema type for input/output definitions + */ +export interface JSONSchema { + type: string + properties?: Record + required?: string[] + items?: JSONSchema + description?: string + enum?: string[] + default?: unknown + additionalProperties?: boolean | JSONSchema +} + +export interface JSONSchemaProperty { + type: string + description?: string + enum?: string[] + default?: unknown + items?: JSONSchema +} + +/** + * Agent Card - Discovery document for A2A agents + * Describes an agent's capabilities, skills, and how to interact with it + */ +export interface AgentCard { + /** Human-readable name of the agent */ + name: string + /** Description of what the agent does */ + description?: string + /** Base URL for the agent's A2A endpoint */ + url: string + /** Version of the agent implementation */ + version: string + /** URL to agent documentation */ + documentationUrl?: string + /** Provider information */ + provider?: AgentProvider + /** Agent capabilities */ + capabilities: AgentCapabilities + /** Skills the agent can perform */ + skills: AgentSkill[] + /** Authentication configuration */ + authentication?: AgentAuthentication + /** Default input modes accepted */ + defaultInputModes?: InputMode[] + /** Default output modes produced */ + defaultOutputModes?: OutputMode[] +} + +export interface AgentProvider { + organization: string + url?: string +} + +export interface AgentCapabilities { + /** Whether the agent supports streaming responses */ + streaming?: boolean + /** Whether the agent supports push notifications */ + pushNotifications?: boolean + /** Whether the agent tracks state transition history */ + stateTransitionHistory?: boolean +} + +export interface AgentSkill { + /** Unique identifier for the skill */ + id: string + /** Human-readable name */ + name: string + /** Description of what the skill does */ + description?: string + /** Tags for categorization */ + tags?: string[] + /** JSON Schema for input parameters */ + inputSchema?: JSONSchema + /** JSON Schema for output */ + outputSchema?: JSONSchema + /** Example interactions */ + examples?: SkillExample[] +} + +export interface SkillExample { + input: TaskMessage + output: TaskMessage[] +} + +export interface AgentAuthentication { + /** Supported authentication schemes */ + schemes: AuthScheme[] + /** Credentials hint or reference */ + credentials?: string +} + +export type AuthScheme = 'bearer' | 'apiKey' | 'oauth2' | 'none' +export type InputMode = 'text' | 'file' | 'data' +export type OutputMode = 'text' | 'file' | 'data' + +/** + * Task - Core unit of work in A2A protocol (v0.2.6) + */ +export interface Task { + /** Unique task identifier */ + id: string + /** Server-generated context ID for contextual alignment across interactions */ + contextId?: string + /** Current task status */ + status: TaskStatusObject + /** Message history */ + history?: TaskMessage[] + /** Structured output artifacts */ + artifacts?: Artifact[] + /** Additional metadata */ + metadata?: Record + /** Event kind - always "task" */ + kind?: 'task' +} + +/** + * Task state lifecycle (v0.2.6) + */ +export type TaskState = + | 'submitted' + | 'working' + | 'input-required' + | 'completed' + | 'failed' + | 'canceled' + | 'rejected' + | 'auth-required' + | 'unknown' + +/** + * Task status object (v0.2.6) + * Represents the current state and associated context of a Task + */ +export interface TaskStatusObject { + /** The current lifecycle state of the task */ + state: TaskState + /** Additional status updates for the client */ + message?: TaskMessage + /** ISO 8601 datetime string indicating when the status was recorded */ + timestamp?: string +} + +/** + * Legacy TaskStatus type for backward compatibility + * @deprecated Use TaskState instead + */ +export type TaskStatus = TaskState + +/** + * Task message - A single message in a task conversation + */ +export interface TaskMessage { + /** Message role */ + role: 'user' | 'agent' + /** Message content parts */ + parts: MessagePart[] + /** Additional metadata */ + metadata?: Record +} + +/** + * Message part types + */ +export type MessagePart = TextPart | FilePart | DataPart + +export interface TextPart { + type: 'text' + text: string +} + +export interface FilePart { + type: 'file' + file: FileContent +} + +export interface FileContent { + name?: string + mimeType?: string + /** Base64 encoded content */ + bytes?: string + /** URI reference to file */ + uri?: string +} + +export interface DataPart { + type: 'data' + data: Record +} + +/** + * Artifact - Structured output from an agent + */ +export interface Artifact { + /** Artifact name */ + name?: string + /** Description of the artifact */ + description?: string + /** Content parts */ + parts: MessagePart[] + /** Index for ordering */ + index: number + /** Whether to append to existing artifact */ + append?: boolean + /** Whether this is the last chunk (for streaming) */ + lastChunk?: boolean +} + +/** + * JSON-RPC Request Parameters (v0.2.6) + */ +export interface TaskSendParams { + /** Task ID (optional for new tasks) */ + id?: string + /** Context ID for contextual alignment across interactions */ + contextId?: string + /** Message to send */ + message: TaskMessage + /** Accepted output modes */ + acceptedOutputModes?: OutputMode[] + /** Push notification configuration */ + pushNotificationConfig?: PushNotificationConfig + /** Additional metadata */ + metadata?: Record +} + +export interface TaskQueryParams { + /** Task ID to query */ + id: string + /** Number of history messages to include */ + historyLength?: number +} + +export interface TaskCancelParams { + /** Task ID to cancel */ + id: string +} + +export interface PushNotificationConfig { + /** Webhook URL for notifications */ + url: string + /** Authentication token */ + token?: string + /** Authentication configuration */ + authentication?: { + schemes: string[] + credentials?: string + } +} + +/** + * Task status update event (for streaming) + */ +export interface TaskStatusUpdate { + /** Task ID */ + id: string + /** Updated status */ + status: TaskStatusObject + /** Final result (if completed) */ + final?: boolean +} + +/** + * Task artifact update event (for streaming) + */ +export interface TaskArtifactUpdate { + /** Task ID */ + id: string + /** Artifact being updated */ + artifact: Artifact +} + +/** + * A2A Error codes (aligned with JSON-RPC) + */ +export const A2AErrorCode = { + // Standard JSON-RPC errors + PARSE_ERROR: -32700, + INVALID_REQUEST: -32600, + METHOD_NOT_FOUND: -32601, + INVALID_PARAMS: -32602, + INTERNAL_ERROR: -32603, + + // A2A-specific errors + TASK_NOT_FOUND: -32001, + TASK_ALREADY_COMPLETE: -32002, + AGENT_UNAVAILABLE: -32003, + SKILL_NOT_FOUND: -32004, + AUTHENTICATION_REQUIRED: -32005, + RATE_LIMITED: -32006, +} as const + +export type A2AErrorCodeType = (typeof A2AErrorCode)[keyof typeof A2AErrorCode] + +/** + * A2A Error class + */ +export class A2AError extends Error { + constructor( + message: string, + public code: A2AErrorCodeType = A2AErrorCode.INTERNAL_ERROR, + public data?: unknown + ) { + super(message) + this.name = 'A2AError' + } +} + +/** + * A2A API Response wrapper + */ +export interface A2AApiResponse { + success: boolean + data?: T + error?: string +} + +/** + * Database model types + */ +export interface A2AAgentConfig { + id: string + workspaceId: string + workflowId: string + name: string + description?: string + version: string + capabilities: AgentCapabilities + skills: AgentSkill[] + authentication: AgentAuthentication + isPublished: boolean + publishedAt?: Date + createdAt: Date + updatedAt: Date +} + +export interface A2ATaskRecord { + id: string + agentId: string + contextId?: string + status: TaskState + history: TaskMessage[] + artifacts?: Artifact[] + executionId?: string + metadata?: Record + createdAt: Date + updatedAt: Date + completedAt?: Date +} + +/** + * SSE Event types for streaming + */ +export type A2AStreamEvent = + | { type: 'task:status'; data: TaskStatusUpdate } + | { type: 'task:artifact'; data: TaskArtifactUpdate } + | { type: 'task:message'; data: { id: string; message: TaskMessage } } + | { type: 'task:done'; data: { id: string } } + | { type: 'error'; data: { code: number; message: string } } diff --git a/apps/sim/lib/a2a/utils.ts b/apps/sim/lib/a2a/utils.ts new file mode 100644 index 0000000000..bc2f9bec79 --- /dev/null +++ b/apps/sim/lib/a2a/utils.ts @@ -0,0 +1,188 @@ +/** + * A2A Protocol Utilities + */ + +import { v4 as uuidv4 } from 'uuid' +import { A2A_VALID_TRANSITIONS } from './constants' +import type { MessagePart, Task, TaskMessage, TaskState, TaskStatusObject, TextPart } from './types' + +/** + * Generate a unique task ID + */ +export function generateTaskId(): string { + return uuidv4() +} + +/** + * Generate a unique context ID + */ +export function generateContextId(): string { + return `ctx_${uuidv4()}` +} + +/** + * Check if a task status transition is valid + */ +export function isValidStatusTransition(from: TaskState, to: TaskState): boolean { + const validTransitions = A2A_VALID_TRANSITIONS[from] + return validTransitions?.includes(to) ?? false +} + +/** + * Check if a task is in a terminal state + */ +export function isTerminalState(state: TaskState): boolean { + return state === 'completed' || state === 'failed' || state === 'canceled' || state === 'rejected' +} + +/** + * Create a TaskStatusObject from a state + */ +export function createTaskStatus(state: TaskState, message?: TaskMessage): TaskStatusObject { + return { + state, + message, + timestamp: new Date().toISOString(), + } +} + +/** + * Create a text message part + */ +export function createTextPart(text: string): TextPart { + return { type: 'text', text } +} + +/** + * Create a user message + */ +export function createUserMessage(content: string | MessagePart[]): TaskMessage { + const parts = typeof content === 'string' ? [createTextPart(content)] : content + return { role: 'user', parts } +} + +/** + * Create an agent message + */ +export function createAgentMessage(content: string | MessagePart[]): TaskMessage { + const parts = typeof content === 'string' ? [createTextPart(content)] : content + return { role: 'agent', parts } +} + +/** + * Extract text content from a message + */ +export function extractTextContent(message: TaskMessage): string { + return message.parts + .filter((part): part is TextPart => part.type === 'text') + .map((part) => part.text) + .join('\n') +} + +/** + * Extract text content from multiple messages + */ +export function extractConversationText(messages: TaskMessage[]): string { + return messages.map((m) => `${m.role}: ${extractTextContent(m)}`).join('\n\n') +} + +/** + * Create an A2A tool ID from agent ID and skill ID + */ +export function createA2AToolId(agentId: string, skillId: string): string { + return `a2a:${agentId}:${skillId}` +} + +/** + * Parse an A2A tool ID into components + */ +export function parseA2AToolId(toolId: string): { agentId: string; skillId: string } | null { + const parts = toolId.split(':') + if (parts.length !== 3 || parts[0] !== 'a2a') { + return null + } + return { agentId: parts[1], skillId: parts[2] } +} + +/** + * Sanitize agent name for use as identifier + */ +export function sanitizeAgentName(name: string): string { + return name + .toLowerCase() + .replace(/[^a-z0-9]+/g, '-') + .replace(/^-+|-+$/g, '') + .substring(0, 64) +} + +/** + * Validate task structure + */ +export function validateTask(task: unknown): task is Task { + if (!task || typeof task !== 'object') return false + const t = task as Record + + if (typeof t.id !== 'string') return false + if (!t.status || typeof t.status !== 'object') return false + const status = t.status as Record + if (typeof status.state !== 'string') return false + + return true +} + +/** + * Create a minimal task object + */ +export function createTask(params: { + id?: string + contextId?: string + state?: TaskState + history?: TaskMessage[] + metadata?: Record +}): Task { + return { + id: params.id || generateTaskId(), + contextId: params.contextId, + status: createTaskStatus(params.state || 'submitted'), + history: params.history || [], + artifacts: [], + metadata: params.metadata, + kind: 'task', + } +} + +/** + * Format task for API response (remove internal fields) + */ +export function formatTaskResponse(task: Task, historyLength?: number): Task { + let history = task.history || [] + if (historyLength !== undefined && historyLength >= 0) { + history = history.slice(-historyLength) + } + + return { + id: task.id, + contextId: task.contextId, + status: task.status, + history, + artifacts: task.artifacts, + metadata: task.metadata, + kind: 'task', + } +} + +/** + * Build A2A endpoint URL + */ +export function buildA2AEndpointUrl(baseUrl: string, agentId: string): string { + const base = baseUrl.replace(/\/$/, '') + return `${base}/api/a2a/serve/${agentId}` +} + +/** + * Build Agent Card URL + */ +export function buildAgentCardUrl(baseUrl: string, agentId: string): string { + const base = baseUrl.replace(/\/$/, '') + return `${base}/api/a2a/agents/${agentId}` +} diff --git a/apps/sim/lib/auth/hybrid.ts b/apps/sim/lib/auth/hybrid.ts index 90559f4ed2..d7b2f8bf96 100644 --- a/apps/sim/lib/auth/hybrid.ts +++ b/apps/sim/lib/auth/hybrid.ts @@ -113,8 +113,9 @@ export async function checkHybridAuth( } } - // 3. Try API key auth - const apiKeyHeader = request.headers.get('x-api-key') + // 3. Try API key auth (check both X-API-Key and Authorization: Bearer as fallback) + const bearerToken = authHeader?.startsWith('Bearer ') ? authHeader.slice(7) : null + const apiKeyHeader = request.headers.get('x-api-key') || bearerToken if (apiKeyHeader) { const result = await authenticateApiKeyFromHeader(apiKeyHeader) if (result.success) { diff --git a/apps/sim/tools/a2a/cancel_task.ts b/apps/sim/tools/a2a/cancel_task.ts new file mode 100644 index 0000000000..90cbab4c03 --- /dev/null +++ b/apps/sim/tools/a2a/cancel_task.ts @@ -0,0 +1,107 @@ +/** + * A2A Cancel Task Tool + * + * Cancel a running A2A task. + */ + +import { createLogger } from '@sim/logger' +import { A2A_METHODS } from '@/lib/a2a/constants' +import type { Task } from '@/lib/a2a/types' +import type { ToolConfig } from '@/tools/types' +import type { A2ACancelTaskParams, A2ACancelTaskResponse } from './types' + +const logger = createLogger('A2ACancelTaskTool') + +export const a2aCancelTaskTool: ToolConfig = { + id: 'a2a_cancel_task', + name: 'A2A Cancel Task', + description: 'Cancel a running A2A task.', + version: '1.0.0', + + params: { + agentUrl: { + type: 'string', + required: true, + description: 'The A2A agent endpoint URL', + }, + taskId: { + type: 'string', + required: true, + description: 'Task ID to cancel', + }, + apiKey: { + type: 'string', + description: 'API key for authentication', + }, + }, + + request: { + url: (params: A2ACancelTaskParams) => params.agentUrl, + method: 'POST', + headers: (params: A2ACancelTaskParams) => { + const headers: Record = { + 'Content-Type': 'application/json', + } + if (params.apiKey) { + headers.Authorization = `Bearer ${params.apiKey}` + } + return headers + }, + body: (params: A2ACancelTaskParams) => ({ + jsonrpc: '2.0', + id: Date.now().toString(), + method: A2A_METHODS.TASKS_CANCEL, + params: { + id: params.taskId, + }, + }), + }, + + transformResponse: async (response: Response) => { + try { + const result = await response.json() + + if (result.error) { + return { + success: false, + output: { + cancelled: false, + state: 'failed', + }, + error: result.error.message || 'A2A request failed', + } + } + + const task = result.result as Task + + return { + success: true, + output: { + cancelled: true, + state: task.status.state, + }, + } + } catch (error) { + logger.error('Error parsing A2A response:', error) + return { + success: false, + output: { + cancelled: false, + state: 'failed', + }, + error: error instanceof Error ? error.message : 'Failed to parse response', + } + } + }, + + outputs: { + cancelled: { + type: 'boolean', + description: 'Whether cancellation was successful', + }, + state: { + type: 'string', + description: 'Task state after cancellation', + }, + }, +} diff --git a/apps/sim/tools/a2a/get_agent_card.ts b/apps/sim/tools/a2a/get_agent_card.ts new file mode 100644 index 0000000000..4245daa666 --- /dev/null +++ b/apps/sim/tools/a2a/get_agent_card.ts @@ -0,0 +1,128 @@ +/** + * A2A Get Agent Card Tool + * + * Fetch the Agent Card (discovery document) for an A2A agent. + */ + +import { createLogger } from '@sim/logger' +import type { AgentCard } from '@/lib/a2a/types' +import type { ToolConfig } from '@/tools/types' +import type { A2AGetAgentCardParams, A2AGetAgentCardResponse } from './types' + +const logger = createLogger('A2AGetAgentCardTool') + +export const a2aGetAgentCardTool: ToolConfig = { + id: 'a2a_get_agent_card', + name: 'A2A Get Agent Card', + description: 'Fetch the Agent Card (discovery document) for an A2A agent.', + version: '1.0.0', + + params: { + agentUrl: { + type: 'string', + required: true, + description: 'The A2A agent endpoint URL', + }, + apiKey: { + type: 'string', + description: 'API key for authentication (if required)', + }, + }, + + request: { + url: (params: A2AGetAgentCardParams) => params.agentUrl, + method: 'GET', + headers: (params: A2AGetAgentCardParams) => { + const headers: Record = { + Accept: 'application/json', + } + if (params.apiKey) { + headers.Authorization = `Bearer ${params.apiKey}` + } + return headers + }, + }, + + transformResponse: async (response: Response) => { + try { + if (!response.ok) { + return { + success: false, + output: { + name: '', + url: '', + version: '', + }, + error: `Failed to fetch agent card: ${response.status} ${response.statusText}`, + } + } + + const agentCard = (await response.json()) as AgentCard + + return { + success: true, + output: { + name: agentCard.name, + description: agentCard.description, + url: agentCard.url, + version: agentCard.version, + capabilities: agentCard.capabilities, + skills: agentCard.skills, + authentication: agentCard.authentication, + defaultInputModes: agentCard.defaultInputModes, + defaultOutputModes: agentCard.defaultOutputModes, + }, + } + } catch (error) { + logger.error('Error parsing Agent Card response:', error) + return { + success: false, + output: { + name: '', + url: '', + version: '', + }, + error: error instanceof Error ? error.message : 'Failed to parse Agent Card', + } + } + }, + + outputs: { + name: { + type: 'string', + description: 'Agent name', + }, + description: { + type: 'string', + description: 'Agent description', + }, + url: { + type: 'string', + description: 'Agent endpoint URL', + }, + version: { + type: 'string', + description: 'Agent version', + }, + capabilities: { + type: 'object', + description: 'Agent capabilities (streaming, pushNotifications, etc.)', + }, + skills: { + type: 'array', + description: 'Skills the agent can perform', + }, + authentication: { + type: 'object', + description: 'Supported authentication schemes', + }, + defaultInputModes: { + type: 'array', + description: 'Default input modes (text, file, data)', + }, + defaultOutputModes: { + type: 'array', + description: 'Default output modes (text, file, data)', + }, + }, +} diff --git a/apps/sim/tools/a2a/get_task.ts b/apps/sim/tools/a2a/get_task.ts new file mode 100644 index 0000000000..96296a82a3 --- /dev/null +++ b/apps/sim/tools/a2a/get_task.ts @@ -0,0 +1,127 @@ +/** + * A2A Get Task Tool + * + * Query the status of an existing A2A task. + */ + +import { createLogger } from '@sim/logger' +import { A2A_METHODS } from '@/lib/a2a/constants' +import type { Task } from '@/lib/a2a/types' +import type { ToolConfig } from '@/tools/types' +import type { A2AGetTaskParams, A2AGetTaskResponse } from './types' + +const logger = createLogger('A2AGetTaskTool') + +export const a2aGetTaskTool: ToolConfig = { + id: 'a2a_get_task', + name: 'A2A Get Task', + description: 'Query the status of an existing A2A task.', + version: '1.0.0', + + params: { + agentUrl: { + type: 'string', + required: true, + description: 'The A2A agent endpoint URL', + }, + taskId: { + type: 'string', + required: true, + description: 'Task ID to query', + }, + apiKey: { + type: 'string', + description: 'API key for authentication', + }, + historyLength: { + type: 'number', + description: 'Number of history messages to include', + }, + }, + + request: { + url: (params: A2AGetTaskParams) => params.agentUrl, + method: 'POST', + headers: (params: A2AGetTaskParams) => { + const headers: Record = { + 'Content-Type': 'application/json', + } + if (params.apiKey) { + headers.Authorization = `Bearer ${params.apiKey}` + } + return headers + }, + body: (params: A2AGetTaskParams) => ({ + jsonrpc: '2.0', + id: Date.now().toString(), + method: A2A_METHODS.TASKS_GET, + params: { + id: params.taskId, + historyLength: params.historyLength, + }, + }), + }, + + transformResponse: async (response: Response) => { + try { + const result = await response.json() + + if (result.error) { + return { + success: false, + output: { + taskId: '', + state: 'failed', + }, + error: result.error.message || 'A2A request failed', + } + } + + const task = result.result as Task + + return { + success: true, + output: { + taskId: task.id, + contextId: task.contextId, + state: task.status.state, + artifacts: task.artifacts, + history: task.history, + }, + } + } catch (error) { + logger.error('Error parsing A2A response:', error) + return { + success: false, + output: { + taskId: '', + state: 'failed', + }, + error: error instanceof Error ? error.message : 'Failed to parse response', + } + } + }, + + outputs: { + taskId: { + type: 'string', + description: 'Task ID', + }, + contextId: { + type: 'string', + description: 'Context ID', + }, + state: { + type: 'string', + description: 'Task state', + }, + artifacts: { + type: 'array', + description: 'Output artifacts', + }, + history: { + type: 'array', + description: 'Message history', + }, + }, +} diff --git a/apps/sim/tools/a2a/index.ts b/apps/sim/tools/a2a/index.ts new file mode 100644 index 0000000000..bac731559e --- /dev/null +++ b/apps/sim/tools/a2a/index.ts @@ -0,0 +1,11 @@ +/** + * A2A Tools + * + * Tools for interacting with external A2A-compatible agents. + */ + +export * from './cancel_task' +export * from './get_agent_card' +export * from './get_task' +export * from './send_task' +export * from './types' diff --git a/apps/sim/tools/a2a/send_task.ts b/apps/sim/tools/a2a/send_task.ts new file mode 100644 index 0000000000..a0a47bea78 --- /dev/null +++ b/apps/sim/tools/a2a/send_task.ts @@ -0,0 +1,152 @@ +/** + * A2A Send Task Tool + * + * Send a task to an external A2A-compatible agent. + */ + +import { createLogger } from '@sim/logger' +import { A2A_METHODS } from '@/lib/a2a/constants' +import type { Task, TaskMessage } from '@/lib/a2a/types' +import { extractTextContent } from '@/lib/a2a/utils' +import type { ToolConfig } from '@/tools/types' +import type { A2ASendTaskParams, A2ASendTaskResponse } from './types' + +const logger = createLogger('A2ASendTaskTool') + +export const a2aSendTaskTool: ToolConfig = { + id: 'a2a_send_task', + name: 'A2A Send Task', + description: 'Send a message to an external A2A-compatible agent.', + version: '1.0.0', + + params: { + agentUrl: { + type: 'string', + required: true, + description: 'The A2A agent endpoint URL', + }, + message: { + type: 'string', + required: true, + description: 'Message to send to the agent', + }, + taskId: { + type: 'string', + description: 'Task ID for continuing an existing conversation', + }, + contextId: { + type: 'string', + description: 'Context ID for conversation continuity', + }, + apiKey: { + type: 'string', + description: 'API key for authentication', + }, + }, + + request: { + url: (params: A2ASendTaskParams) => params.agentUrl, + method: 'POST', + headers: (params: A2ASendTaskParams) => { + const headers: Record = { + 'Content-Type': 'application/json', + } + if (params.apiKey) { + headers.Authorization = `Bearer ${params.apiKey}` + } + return headers + }, + body: (params: A2ASendTaskParams) => { + const userMessage: TaskMessage = { + role: 'user', + parts: [{ type: 'text', text: params.message }], + } + + return { + jsonrpc: '2.0', + id: Date.now().toString(), + method: A2A_METHODS.TASKS_SEND, + params: { + id: params.taskId, + contextId: params.contextId, + message: userMessage, + }, + } + }, + }, + + transformResponse: async (response: Response) => { + try { + const result = await response.json() + + if (result.error) { + return { + success: false, + output: { + content: result.error.message || 'A2A request failed', + taskId: '', + state: 'failed', + }, + error: result.error.message || 'A2A request failed', + } + } + + const task = result.result as Task + + // Extract content from the last agent message + const lastAgentMessage = task.history?.filter((m) => m.role === 'agent').pop() + + const content = lastAgentMessage ? extractTextContent(lastAgentMessage) : '' + + return { + success: true, + output: { + content, + taskId: task.id, + contextId: task.contextId, + state: task.status.state, + artifacts: task.artifacts, + history: task.history, + }, + } + } catch (error) { + logger.error('Error parsing A2A response:', error) + return { + success: false, + output: { + content: error instanceof Error ? error.message : 'Failed to parse response', + taskId: '', + state: 'failed', + }, + error: error instanceof Error ? error.message : 'Failed to parse response', + } + } + }, + + outputs: { + content: { + type: 'string', + description: 'The text response from the agent', + }, + taskId: { + type: 'string', + description: 'Task ID for follow-up interactions', + }, + contextId: { + type: 'string', + description: 'Context ID for conversation continuity', + }, + state: { + type: 'string', + description: 'Task state', + }, + artifacts: { + type: 'array', + description: 'Structured output artifacts', + }, + history: { + type: 'array', + description: 'Full message history', + }, + }, +} diff --git a/apps/sim/tools/a2a/types.ts b/apps/sim/tools/a2a/types.ts new file mode 100644 index 0000000000..703d9b6c73 --- /dev/null +++ b/apps/sim/tools/a2a/types.ts @@ -0,0 +1,119 @@ +/** + * A2A Tool Types (v0.2.6) + */ + +import type { + AgentAuthentication, + AgentCapabilities, + AgentSkill, + Artifact, + InputMode, + OutputMode, + TaskMessage, + TaskState, +} from '@/lib/a2a/types' +import type { ToolResponse } from '@/tools/types' + +export interface A2AGetAgentCardParams { + /** A2A agent endpoint URL */ + agentUrl: string + /** API key for authentication (if required) */ + apiKey?: string +} + +export interface A2AGetAgentCardResponse extends ToolResponse { + output: { + /** Agent name */ + name: string + /** Agent description */ + description?: string + /** Agent endpoint URL */ + url: string + /** Agent version */ + version: string + /** Agent capabilities */ + capabilities?: AgentCapabilities + /** Skills the agent can perform */ + skills?: AgentSkill[] + /** Supported authentication schemes */ + authentication?: AgentAuthentication + /** Default input modes */ + defaultInputModes?: InputMode[] + /** Default output modes */ + defaultOutputModes?: OutputMode[] + } +} + +export interface A2ASendTaskParams { + /** A2A agent endpoint URL */ + agentUrl: string + /** Message to send */ + message: string + /** Task ID (for continuing a task) */ + taskId?: string + /** Context ID (for multi-turn conversations) */ + contextId?: string + /** API key for authentication */ + apiKey?: string +} + +export interface A2ASendTaskResponse extends ToolResponse { + output: { + /** Response content text */ + content: string + /** Task ID */ + taskId: string + /** Context ID */ + contextId?: string + /** Task state */ + state: TaskState + /** Output artifacts */ + artifacts?: Artifact[] + /** Message history */ + history?: TaskMessage[] + } +} + +export interface A2AGetTaskParams { + /** A2A agent endpoint URL */ + agentUrl: string + /** Task ID to query */ + taskId: string + /** API key for authentication */ + apiKey?: string + /** Number of history messages to include */ + historyLength?: number +} + +export interface A2AGetTaskResponse extends ToolResponse { + output: { + /** Task ID */ + taskId: string + /** Context ID */ + contextId?: string + /** Task state */ + state: TaskState + /** Output artifacts */ + artifacts?: Artifact[] + /** Message history */ + history?: TaskMessage[] + } +} + +export interface A2ACancelTaskParams { + /** A2A agent endpoint URL */ + agentUrl: string + /** Task ID to cancel */ + taskId: string + /** API key for authentication */ + apiKey?: string +} + +export interface A2ACancelTaskResponse extends ToolResponse { + output: { + /** Whether cancellation was successful */ + cancelled: boolean + /** Task state after cancellation */ + state: TaskState + } +} diff --git a/apps/sim/tools/registry.ts b/apps/sim/tools/registry.ts index 157325374b..f0ee9c0d04 100644 --- a/apps/sim/tools/registry.ts +++ b/apps/sim/tools/registry.ts @@ -1,3 +1,9 @@ +import { + a2aCancelTaskTool, + a2aGetAgentCardTool, + a2aGetTaskTool, + a2aSendTaskTool, +} from '@/tools/a2a' import { ahrefsBacklinksStatsTool, ahrefsBacklinksTool, @@ -1415,6 +1421,10 @@ import { sqsSendTool } from './sqs' // Registry of all available tools export const tools: Record = { + a2a_cancel_task: a2aCancelTaskTool, + a2a_get_agent_card: a2aGetAgentCardTool, + a2a_get_task: a2aGetTaskTool, + a2a_send_task: a2aSendTaskTool, arxiv_search: arxivSearchTool, arxiv_get_paper: arxivGetPaperTool, arxiv_get_author_papers: arxivGetAuthorPapersTool, diff --git a/packages/db/schema.ts b/packages/db/schema.ts index c03d4e65b3..eb8f1bb1ab 100644 --- a/packages/db/schema.ts +++ b/packages/db/schema.ts @@ -1748,6 +1748,115 @@ export const workflowMcpTool = pgTable( }) ) +/** + * A2A Task State Enum (v0.2.6) + */ +export const a2aTaskStatusEnum = pgEnum('a2a_task_status', [ + 'submitted', + 'working', + 'input-required', + 'completed', + 'failed', + 'canceled', + 'rejected', + 'auth-required', + 'unknown', +]) + +/** + * A2A Agents - Workflows exposed as A2A-compatible agents + * These agents can be called by external A2A clients + */ +export const a2aAgent = pgTable( + 'a2a_agent', + { + id: text('id').primaryKey(), + workspaceId: text('workspace_id') + .notNull() + .references(() => workspace.id, { onDelete: 'cascade' }), + workflowId: text('workflow_id') + .notNull() + .references(() => workflow.id, { onDelete: 'cascade' }), + createdBy: text('created_by') + .notNull() + .references(() => user.id, { onDelete: 'cascade' }), + + /** Agent name (used in Agent Card) */ + name: text('name').notNull(), + /** Agent description */ + description: text('description'), + /** Agent version */ + version: text('version').notNull().default('1.0.0'), + + /** Agent capabilities (streaming, pushNotifications, etc.) */ + capabilities: jsonb('capabilities').notNull().default('{}'), + /** Agent skills derived from workflow */ + skills: jsonb('skills').notNull().default('[]'), + /** Authentication configuration */ + authentication: jsonb('authentication').notNull().default('{}'), + + /** Whether the agent is published and discoverable */ + isPublished: boolean('is_published').notNull().default(false), + /** When the agent was published */ + publishedAt: timestamp('published_at'), + + createdAt: timestamp('created_at').notNull().defaultNow(), + updatedAt: timestamp('updated_at').notNull().defaultNow(), + }, + (table) => ({ + workspaceIdIdx: index('a2a_agent_workspace_id_idx').on(table.workspaceId), + workflowIdIdx: index('a2a_agent_workflow_id_idx').on(table.workflowId), + createdByIdx: index('a2a_agent_created_by_idx').on(table.createdBy), + workspaceWorkflowUnique: uniqueIndex('a2a_agent_workspace_workflow_unique').on( + table.workspaceId, + table.workflowId + ), + }) +) + +/** + * A2A Tasks - Tracks task state for A2A agent interactions (v0.2.6) + * Each task represents a conversation/interaction with an agent + */ +export const a2aTask = pgTable( + 'a2a_task', + { + id: text('id').primaryKey(), + agentId: text('agent_id') + .notNull() + .references(() => a2aAgent.id, { onDelete: 'cascade' }), + + /** Context ID for multi-turn conversations (maps to API contextId) */ + sessionId: text('session_id'), + + /** Task state */ + status: a2aTaskStatusEnum('status').notNull().default('submitted'), + + /** Message history (maps to API history, array of TaskMessage) */ + messages: jsonb('messages').notNull().default('[]'), + + /** Structured output artifacts */ + artifacts: jsonb('artifacts').default('[]'), + + /** Link to workflow execution */ + executionId: text('execution_id'), + + /** Additional metadata */ + metadata: jsonb('metadata').default('{}'), + + createdAt: timestamp('created_at').notNull().defaultNow(), + updatedAt: timestamp('updated_at').notNull().defaultNow(), + completedAt: timestamp('completed_at'), + }, + (table) => ({ + agentIdIdx: index('a2a_task_agent_id_idx').on(table.agentId), + sessionIdIdx: index('a2a_task_session_id_idx').on(table.sessionId), + statusIdx: index('a2a_task_status_idx').on(table.status), + executionIdIdx: index('a2a_task_execution_id_idx').on(table.executionId), + createdAtIdx: index('a2a_task_created_at_idx').on(table.createdAt), + }) +) + export const usageLogCategoryEnum = pgEnum('usage_log_category', ['model', 'fixed']) export const usageLogSourceEnum = pgEnum('usage_log_source', ['workflow', 'wand', 'copilot'])