From 94223b9139ddc637196945850f30e81b9c0ea84e Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Sun, 17 May 2026 14:31:27 -0700 Subject: [PATCH 1/3] improvement(mothership): abort path race preventing persistence --- .../app/api/copilot/chat/stop/route.test.ts | 85 ++++++++-- apps/sim/app/api/copilot/chat/stop/route.ts | 84 +++------ .../lib/copilot/chat/terminal-state.test.ts | 128 ++++++++++---- apps/sim/lib/copilot/chat/terminal-state.ts | 160 +++++++++++++----- 4 files changed, 307 insertions(+), 150 deletions(-) diff --git a/apps/sim/app/api/copilot/chat/stop/route.test.ts b/apps/sim/app/api/copilot/chat/stop/route.test.ts index a87f35a2987..916ee64ec97 100644 --- a/apps/sim/app/api/copilot/chat/stop/route.test.ts +++ b/apps/sim/app/api/copilot/chat/stop/route.test.ts @@ -10,24 +10,49 @@ const { mockFrom, mockWhereSelect, mockLimit, + mockForUpdate, mockUpdate, mockSet, mockWhereUpdate, mockReturning, mockPublishStatusChanged, mockSql, -} = vi.hoisted(() => ({ - mockSelect: vi.fn(), - mockFrom: vi.fn(), - mockWhereSelect: vi.fn(), - mockLimit: vi.fn(), - mockUpdate: vi.fn(), - mockSet: vi.fn(), - mockWhereUpdate: vi.fn(), - mockReturning: vi.fn(), - mockPublishStatusChanged: vi.fn(), - mockSql: vi.fn((strings: TemplateStringsArray, ...values: unknown[]) => ({ strings, values })), -})) + mockTransaction, +} = vi.hoisted(() => { + const mockSelect = vi.fn() + const mockFrom = vi.fn() + const mockWhereSelect = vi.fn() + const mockLimit = vi.fn() + const mockForUpdate = vi.fn() + const mockUpdate = vi.fn() + const mockSet = vi.fn() + const mockWhereUpdate = vi.fn() + const mockReturning = vi.fn() + const mockPublishStatusChanged = vi.fn() + const mockSql = vi.fn((strings: TemplateStringsArray, ...values: unknown[]) => ({ + strings, + values, + })) + const mockTransaction = vi.fn( + (callback: (tx: { select: typeof mockSelect; update: typeof mockUpdate }) => unknown) => + callback({ select: mockSelect, update: mockUpdate }) + ) + + return { + mockSelect, + mockFrom, + mockWhereSelect, + mockLimit, + mockForUpdate, + mockUpdate, + mockSet, + mockWhereUpdate, + mockReturning, + mockPublishStatusChanged, + mockSql, + mockTransaction, + } +}) vi.mock('@sim/db/schema', () => ({ copilotChats: { @@ -41,8 +66,7 @@ vi.mock('@sim/db/schema', () => ({ vi.mock('@sim/db', () => ({ db: { - select: mockSelect, - update: mockUpdate, + transaction: mockTransaction, }, })) @@ -78,9 +102,11 @@ describe('copilot chat stop route', () => { { workspaceId: 'ws-1', messages: [{ id: 'stream-1', role: 'user', content: 'hello' }], + conversationId: 'stream-1', }, ]) - mockWhereSelect.mockReturnValue({ limit: mockLimit }) + mockForUpdate.mockReturnValue({ limit: mockLimit }) + mockWhereSelect.mockReturnValue({ for: mockForUpdate }) mockFrom.mockReturnValue({ where: mockWhereSelect }) mockSelect.mockReturnValue({ from: mockFrom }) @@ -153,4 +179,33 @@ describe('copilot chat stop route', () => { streamId: 'stream-1', }) }) + + it('appends a stopped assistant message if the stream marker was already cleared', async () => { + mockLimit.mockResolvedValueOnce([ + { + workspaceId: 'ws-1', + messages: [{ id: 'stream-1', role: 'user', content: 'hello' }], + conversationId: null, + }, + ]) + + const response = await POST( + createRequest({ + chatId: 'chat-1', + streamId: 'stream-1', + content: 'partial', + }) + ) + + expect(response.status).toBe(200) + expect(await response.json()).toEqual({ success: true }) + + const setArg = mockSet.mock.calls[0]?.[0] + expect(setArg.messages).toBeTruthy() + const appendedPayload = JSON.parse(setArg.messages.values[1] as string) + expect(appendedPayload[0]).toMatchObject({ + role: 'assistant', + content: 'partial', + }) + }) }) diff --git a/apps/sim/app/api/copilot/chat/stop/route.ts b/apps/sim/app/api/copilot/chat/stop/route.ts index 36d3b8ae43d..eac6baa3b60 100644 --- a/apps/sim/app/api/copilot/chat/stop/route.ts +++ b/apps/sim/app/api/copilot/chat/stop/route.ts @@ -1,13 +1,11 @@ -import { db } from '@sim/db' -import { copilotChats } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { generateId } from '@sim/utils/id' -import { and, eq, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { copilotChatStopContract } from '@/lib/api/contracts/copilot' import { parseRequest } from '@/lib/api/server' import { getSession } from '@/lib/auth' import { normalizeMessage, type PersistedMessage } from '@/lib/copilot/chat/persisted-message' +import { finalizeAssistantTurn } from '@/lib/copilot/chat/terminal-state' import { CopilotStopOutcome } from '@/lib/copilot/generated/trace-attribute-values-v1' import { TraceAttr } from '@/lib/copilot/generated/trace-attributes-v1' import { TraceSpan } from '@/lib/copilot/generated/trace-spans-v1' @@ -44,40 +42,6 @@ export const POST = withRouteHandler((req: NextRequest) => ...(requestId ? { [TraceAttr.RequestId]: requestId } : {}), }) - const [row] = await db - .select({ - workspaceId: copilotChats.workspaceId, - messages: copilotChats.messages, - }) - .from(copilotChats) - .where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, session.user.id))) - .limit(1) - - if (!row) { - span.setAttribute(TraceAttr.CopilotStopOutcome, CopilotStopOutcome.ChatNotFound) - return NextResponse.json({ success: true }) - } - - const messages: Record[] = Array.isArray(row.messages) ? row.messages : [] - const userIdx = messages.findIndex((message) => message.id === streamId) - const alreadyHasResponse = - userIdx >= 0 && - userIdx + 1 < messages.length && - (messages[userIdx + 1] as Record)?.role === 'assistant' - const canAppendAssistant = - userIdx >= 0 && userIdx === messages.length - 1 && !alreadyHasResponse - - const updateWhere = and( - eq(copilotChats.id, chatId), - eq(copilotChats.userId, session.user.id), - eq(copilotChats.conversationId, streamId) - ) - - const setClause: Record = { - conversationId: null, - updatedAt: new Date(), - } - const hasContent = content.trim().length > 0 const hasBlocks = Array.isArray(contentBlocks) && contentBlocks.length > 0 const synthesizedStoppedBlocks = hasBlocks @@ -85,30 +49,26 @@ export const POST = withRouteHandler((req: NextRequest) => : hasContent ? [{ type: 'text', channel: 'assistant', content }, { type: 'stopped' }] : [{ type: 'stopped' }] - if (canAppendAssistant) { - const normalized = normalizeMessage({ - id: generateId(), - role: 'assistant', - content, - timestamp: new Date().toISOString(), - contentBlocks: synthesizedStoppedBlocks, - // Persist so the UI copy-request-id button survives refetch. - ...(requestId ? { requestId } : {}), - }) - const assistantMessage: PersistedMessage = normalized - setClause.messages = sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb` - } - span.setAttribute(TraceAttr.CopilotStopAppendedAssistant, canAppendAssistant) - - const [updated] = await db - .update(copilotChats) - .set(setClause) - .where(updateWhere) - .returning({ workspaceId: copilotChats.workspaceId }) + const assistantMessage: PersistedMessage = normalizeMessage({ + id: generateId(), + role: 'assistant', + content, + timestamp: new Date().toISOString(), + contentBlocks: synthesizedStoppedBlocks, + ...(requestId ? { requestId } : {}), + }) + const result = await finalizeAssistantTurn({ + chatId, + userId: session.user.id, + userMessageId: streamId, + assistantMessage, + streamMarkerPolicy: 'active-or-cleared', + }) + span.setAttribute(TraceAttr.CopilotStopAppendedAssistant, result.appendedAssistant) - if (updated?.workspaceId) { + if (result.updated && result.workspaceId) { taskPubSub?.publishStatusChanged({ - workspaceId: updated.workspaceId, + workspaceId: result.workspaceId, chatId, type: 'completed', streamId, @@ -117,7 +77,11 @@ export const POST = withRouteHandler((req: NextRequest) => span.setAttribute( TraceAttr.CopilotStopOutcome, - updated ? CopilotStopOutcome.Persisted : CopilotStopOutcome.NoMatchingRow + result.found + ? result.updated + ? CopilotStopOutcome.Persisted + : CopilotStopOutcome.NoMatchingRow + : CopilotStopOutcome.ChatNotFound ) return NextResponse.json({ success: true }) } catch (error) { diff --git a/apps/sim/lib/copilot/chat/terminal-state.test.ts b/apps/sim/lib/copilot/chat/terminal-state.test.ts index a9705573360..c313156f3b5 100644 --- a/apps/sim/lib/copilot/chat/terminal-state.test.ts +++ b/apps/sim/lib/copilot/chat/terminal-state.test.ts @@ -3,36 +3,51 @@ */ import { copilotChats } from '@sim/db/schema' -import { and, eq } from 'drizzle-orm' +import { eq } from 'drizzle-orm' import { beforeEach, describe, expect, it, vi } from 'vitest' -const { selectLimit, selectWhere, selectFrom, select, updateWhere, updateSet, update } = vi.hoisted( - () => { - const selectLimit = vi.fn() - const selectWhere = vi.fn(() => ({ limit: selectLimit })) - const selectFrom = vi.fn(() => ({ where: selectWhere })) - const select = vi.fn(() => ({ from: selectFrom })) - - const updateWhere = vi.fn() - const updateSet = vi.fn(() => ({ where: updateWhere })) - const update = vi.fn(() => ({ set: updateSet })) - - return { - selectLimit, - selectWhere, - selectFrom, - select, - updateWhere, - updateSet, - update, - } +const { + selectForUpdate, + selectLimit, + selectWhere, + selectFrom, + select, + updateWhere, + updateSet, + update, + transaction, +} = vi.hoisted(() => { + const selectLimit = vi.fn() + const selectForUpdate = vi.fn(() => ({ limit: selectLimit })) + const selectWhere = vi.fn(() => ({ for: selectForUpdate })) + const selectFrom = vi.fn(() => ({ where: selectWhere })) + const select = vi.fn(() => ({ from: selectFrom })) + + const updateWhere = vi.fn() + const updateSet = vi.fn(() => ({ where: updateWhere })) + const update = vi.fn(() => ({ set: updateSet })) + + const transaction = vi.fn( + (callback: (tx: { select: typeof select; update: typeof update }) => unknown) => + callback({ select, update }) + ) + + return { + selectForUpdate, + selectLimit, + selectWhere, + selectFrom, + select, + updateWhere, + updateSet, + update, + transaction, } -) +}) vi.mock('@sim/db', () => ({ db: { - select, - update, + transaction, }, })) @@ -48,6 +63,8 @@ describe('finalizeAssistantTurn', () => { selectLimit.mockResolvedValue([ { messages: [{ id: 'user-1', role: 'user', content: 'hello' }], + conversationId: 'user-1', + workspaceId: 'ws-1', }, ]) @@ -69,9 +86,7 @@ describe('finalizeAssistantTurn', () => { messages: expect.anything(), }) ) - expect(updateWhere).toHaveBeenCalledWith( - and(eq(copilotChats.id, 'chat-1'), eq(copilotChats.conversationId, 'user-1')) - ) + expect(updateWhere).toHaveBeenCalledWith(eq(copilotChats.id, 'chat-1')) }) it('only clears the active stream marker when a response is already persisted', async () => { @@ -81,6 +96,8 @@ describe('finalizeAssistantTurn', () => { { id: 'user-1', role: 'user', content: 'hello' }, { id: 'assistant-1', role: 'assistant', content: 'partial' }, ], + conversationId: 'user-1', + workspaceId: 'ws-1', }, ]) @@ -108,8 +125,61 @@ describe('finalizeAssistantTurn', () => { }) ) expect(Object.hasOwn(updateArg, 'messages')).toBe(false) - expect(updateWhere).toHaveBeenCalledWith( - and(eq(copilotChats.id, 'chat-1'), eq(copilotChats.conversationId, 'user-1')) + expect(updateWhere).toHaveBeenCalledWith(eq(copilotChats.id, 'chat-1')) + }) + + it('appends a stopped assistant when the stream marker was already cleared', async () => { + selectLimit.mockResolvedValue([ + { + messages: [{ id: 'user-1', role: 'user', content: 'hello' }], + conversationId: null, + workspaceId: 'ws-1', + }, + ]) + + const result = await finalizeAssistantTurn({ + chatId: 'chat-1', + userMessageId: 'user-1', + streamMarkerPolicy: 'active-or-cleared', + assistantMessage: { + id: 'assistant-1', + role: 'assistant', + content: 'partial', + timestamp: '2024-01-01T00:00:00.000Z', + }, + }) + + expect(result.appendedAssistant).toBe(true) + expect(updateSet).toHaveBeenCalledWith( + expect.objectContaining({ + updatedAt: expect.any(Date), + conversationId: null, + messages: expect.anything(), + }) ) }) + + it('does not append on a cleared marker unless the policy allows it', async () => { + selectLimit.mockResolvedValue([ + { + messages: [{ id: 'user-1', role: 'user', content: 'hello' }], + conversationId: null, + workspaceId: 'ws-1', + }, + ]) + + const result = await finalizeAssistantTurn({ + chatId: 'chat-1', + userMessageId: 'user-1', + assistantMessage: { + id: 'assistant-1', + role: 'assistant', + content: 'partial', + timestamp: '2024-01-01T00:00:00.000Z', + }, + }) + + expect(result.updated).toBe(false) + expect(updateSet).not.toHaveBeenCalled() + }) }) diff --git a/apps/sim/lib/copilot/chat/terminal-state.ts b/apps/sim/lib/copilot/chat/terminal-state.ts index f0f43cb6bb0..d9cada50168 100644 --- a/apps/sim/lib/copilot/chat/terminal-state.ts +++ b/apps/sim/lib/copilot/chat/terminal-state.ts @@ -7,10 +7,22 @@ import { TraceAttr } from '@/lib/copilot/generated/trace-attributes-v1' import { TraceSpan } from '@/lib/copilot/generated/trace-spans-v1' import { withCopilotSpan } from '@/lib/copilot/request/otel' +type StreamMarkerPolicy = 'active-only' | 'active-or-cleared' + interface FinalizeAssistantTurnParams { chatId: string userMessageId: string + userId?: string assistantMessage?: PersistedMessage + streamMarkerPolicy?: StreamMarkerPolicy +} + +export interface FinalizeAssistantTurnResult { + found: boolean + updated: boolean + appendedAssistant: boolean + workspaceId?: string | null + outcome: (typeof CopilotChatFinalizeOutcome)[keyof typeof CopilotChatFinalizeOutcome] } /** @@ -21,8 +33,10 @@ interface FinalizeAssistantTurnParams { export async function finalizeAssistantTurn({ chatId, userMessageId, + userId, assistantMessage, -}: FinalizeAssistantTurnParams): Promise { + streamMarkerPolicy = 'active-only', +}: FinalizeAssistantTurnParams): Promise { return withCopilotSpan( TraceSpan.CopilotChatFinalizeAssistantTurn, { @@ -33,55 +47,109 @@ export async function finalizeAssistantTurn({ [TraceAttr.ChatHasAssistantMessage]: !!assistantMessage, }, async (span) => { - const [row] = await db - .select({ messages: copilotChats.messages }) - .from(copilotChats) - .where(eq(copilotChats.id, chatId)) - .limit(1) + const result = await db.transaction(async (tx) => { + const where = userId + ? and(eq(copilotChats.id, chatId), eq(copilotChats.userId, userId)) + : eq(copilotChats.id, chatId) + const [row] = await tx + .select({ + messages: copilotChats.messages, + conversationId: copilotChats.conversationId, + workspaceId: copilotChats.workspaceId, + }) + .from(copilotChats) + .where(where) + .for('update') + .limit(1) - const messages: Record[] = Array.isArray(row?.messages) ? row.messages : [] - span.setAttribute(TraceAttr.ChatExistingMessageCount, messages.length) - const userIdx = messages.findIndex((message) => message.id === userMessageId) - const alreadyHasResponse = - userIdx >= 0 && - userIdx + 1 < messages.length && - (messages[userIdx + 1] as Record)?.role === 'assistant' - const canAppendAssistant = - userIdx >= 0 && userIdx === messages.length - 1 && !alreadyHasResponse - const updateWhere = and( - eq(copilotChats.id, chatId), - eq(copilotChats.conversationId, userMessageId) - ) + const messages: Record[] = Array.isArray(row?.messages) ? row.messages : [] + span.setAttribute(TraceAttr.ChatExistingMessageCount, messages.length) - const baseUpdate = { - conversationId: null, - updatedAt: new Date(), - } + if (!row) { + return { + found: false, + updated: false, + appendedAssistant: false, + workspaceId: null, + outcome: CopilotChatFinalizeOutcome.StaleUserMessage, + } + } - if (assistantMessage && canAppendAssistant) { - await db - .update(copilotChats) - .set({ - ...baseUpdate, - messages: sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`, - }) - .where(updateWhere) - span.setAttribute( - TraceAttr.ChatFinalizeOutcome, - CopilotChatFinalizeOutcome.AppendedAssistant - ) - return - } + const markerMatches = row.conversationId === userMessageId + const markerAlreadyCleared = row.conversationId === null + const ownsTurn = + markerMatches || (streamMarkerPolicy === 'active-or-cleared' && markerAlreadyCleared) + if (!ownsTurn) { + return { + found: true, + updated: false, + appendedAssistant: false, + workspaceId: row.workspaceId, + outcome: CopilotChatFinalizeOutcome.StaleUserMessage, + } + } + + const userIdx = messages.findIndex((message) => message.id === userMessageId) + const alreadyHasResponse = + userIdx >= 0 && + userIdx + 1 < messages.length && + (messages[userIdx + 1] as Record)?.role === 'assistant' + const canAppendAssistant = + userIdx >= 0 && userIdx === messages.length - 1 && !alreadyHasResponse + + const updateWhere = userId + ? and(eq(copilotChats.id, chatId), eq(copilotChats.userId, userId)) + : eq(copilotChats.id, chatId) + const baseUpdate = { + conversationId: null, + updatedAt: new Date(), + } + + if (assistantMessage && canAppendAssistant) { + await tx + .update(copilotChats) + .set({ + ...baseUpdate, + messages: sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`, + }) + .where(updateWhere) + return { + found: true, + updated: true, + appendedAssistant: true, + workspaceId: row.workspaceId, + outcome: CopilotChatFinalizeOutcome.AppendedAssistant, + } + } + + if (markerMatches) { + await tx.update(copilotChats).set(baseUpdate).where(updateWhere) + return { + found: true, + updated: true, + appendedAssistant: false, + workspaceId: row.workspaceId, + outcome: assistantMessage + ? alreadyHasResponse + ? CopilotChatFinalizeOutcome.AssistantAlreadyPersisted + : CopilotChatFinalizeOutcome.StaleUserMessage + : CopilotChatFinalizeOutcome.ClearedStreamMarkerOnly, + } + } + + return { + found: true, + updated: false, + appendedAssistant: false, + workspaceId: row.workspaceId, + outcome: alreadyHasResponse + ? CopilotChatFinalizeOutcome.AssistantAlreadyPersisted + : CopilotChatFinalizeOutcome.StaleUserMessage, + } + }) - await db.update(copilotChats).set(baseUpdate).where(updateWhere) - span.setAttribute( - TraceAttr.ChatFinalizeOutcome, - assistantMessage - ? alreadyHasResponse - ? 'assistant_already_persisted' - : 'stale_user_message' - : 'cleared_stream_marker_only' - ) + span.setAttribute(TraceAttr.ChatFinalizeOutcome, result.outcome) + return result } ) } From 10d7cda243d96f6cdd5656384315161e783bbf3b Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Sun, 17 May 2026 14:56:41 -0700 Subject: [PATCH 2/3] address comments --- .../app/api/copilot/chat/stop/route.test.ts | 38 ++++ apps/sim/app/api/copilot/chat/stop/route.ts | 47 +++-- .../sim/lib/copilot/chat/persisted-message.ts | 39 ++++ apps/sim/lib/copilot/chat/post.test.ts | 102 ++++++++++- apps/sim/lib/copilot/chat/post.ts | 32 +++- .../lib/copilot/chat/terminal-state.test.ts | 29 +++ .../lib/copilot/request/lifecycle/run.test.ts | 168 ++++++++++++++++++ apps/sim/lib/copilot/request/lifecycle/run.ts | 47 +++-- 8 files changed, 464 insertions(+), 38 deletions(-) create mode 100644 apps/sim/lib/copilot/request/lifecycle/run.test.ts diff --git a/apps/sim/app/api/copilot/chat/stop/route.test.ts b/apps/sim/app/api/copilot/chat/stop/route.test.ts index 916ee64ec97..bab5465507d 100644 --- a/apps/sim/app/api/copilot/chat/stop/route.test.ts +++ b/apps/sim/app/api/copilot/chat/stop/route.test.ts @@ -207,5 +207,43 @@ describe('copilot chat stop route', () => { role: 'assistant', content: 'partial', }) + + expect(mockPublishStatusChanged).toHaveBeenCalledWith({ + workspaceId: 'ws-1', + chatId: 'chat-1', + type: 'completed', + streamId: 'stream-1', + }) + }) + + it('republishes completed status when the assistant was already persisted', async () => { + mockLimit.mockResolvedValueOnce([ + { + workspaceId: 'ws-1', + messages: [ + { id: 'stream-1', role: 'user', content: 'hello' }, + { id: 'assistant-1', role: 'assistant', content: 'partial' }, + ], + conversationId: null, + }, + ]) + + const response = await POST( + createRequest({ + chatId: 'chat-1', + streamId: 'stream-1', + content: 'partial', + }) + ) + + expect(response.status).toBe(200) + expect(await response.json()).toEqual({ success: true }) + expect(mockUpdate).not.toHaveBeenCalled() + expect(mockPublishStatusChanged).toHaveBeenCalledWith({ + workspaceId: 'ws-1', + chatId: 'chat-1', + type: 'completed', + streamId: 'stream-1', + }) }) }) diff --git a/apps/sim/app/api/copilot/chat/stop/route.ts b/apps/sim/app/api/copilot/chat/stop/route.ts index eac6baa3b60..d17b9621a98 100644 --- a/apps/sim/app/api/copilot/chat/stop/route.ts +++ b/apps/sim/app/api/copilot/chat/stop/route.ts @@ -4,9 +4,16 @@ import { type NextRequest, NextResponse } from 'next/server' import { copilotChatStopContract } from '@/lib/api/contracts/copilot' import { parseRequest } from '@/lib/api/server' import { getSession } from '@/lib/auth' -import { normalizeMessage, type PersistedMessage } from '@/lib/copilot/chat/persisted-message' +import { + normalizeMessage, + type PersistedMessage, + withStoppedContentBlock, +} from '@/lib/copilot/chat/persisted-message' import { finalizeAssistantTurn } from '@/lib/copilot/chat/terminal-state' -import { CopilotStopOutcome } from '@/lib/copilot/generated/trace-attribute-values-v1' +import { + CopilotChatFinalizeOutcome, + CopilotStopOutcome, +} from '@/lib/copilot/generated/trace-attribute-values-v1' import { TraceAttr } from '@/lib/copilot/generated/trace-attributes-v1' import { TraceSpan } from '@/lib/copilot/generated/trace-spans-v1' import { withIncomingGoSpan } from '@/lib/copilot/request/otel' @@ -49,14 +56,16 @@ export const POST = withRouteHandler((req: NextRequest) => : hasContent ? [{ type: 'text', channel: 'assistant', content }, { type: 'stopped' }] : [{ type: 'stopped' }] - const assistantMessage: PersistedMessage = normalizeMessage({ - id: generateId(), - role: 'assistant', - content, - timestamp: new Date().toISOString(), - contentBlocks: synthesizedStoppedBlocks, - ...(requestId ? { requestId } : {}), - }) + const assistantMessage: PersistedMessage = withStoppedContentBlock( + normalizeMessage({ + id: generateId(), + role: 'assistant', + content, + timestamp: new Date().toISOString(), + contentBlocks: synthesizedStoppedBlocks, + ...(requestId ? { requestId } : {}), + }) + ) const result = await finalizeAssistantTurn({ chatId, userId: session.user.id, @@ -65,8 +74,15 @@ export const POST = withRouteHandler((req: NextRequest) => streamMarkerPolicy: 'active-or-cleared', }) span.setAttribute(TraceAttr.CopilotStopAppendedAssistant, result.appendedAssistant) + const stopOutcome = !result.found + ? CopilotStopOutcome.ChatNotFound + : result.updated || result.outcome === CopilotChatFinalizeOutcome.AssistantAlreadyPersisted + ? CopilotStopOutcome.Persisted + : CopilotStopOutcome.NoMatchingRow + const shouldPublishCompleted = + result.updated || result.outcome === CopilotChatFinalizeOutcome.AssistantAlreadyPersisted - if (result.updated && result.workspaceId) { + if (shouldPublishCompleted && result.workspaceId) { taskPubSub?.publishStatusChanged({ workspaceId: result.workspaceId, chatId, @@ -75,14 +91,7 @@ export const POST = withRouteHandler((req: NextRequest) => }) } - span.setAttribute( - TraceAttr.CopilotStopOutcome, - result.found - ? result.updated - ? CopilotStopOutcome.Persisted - : CopilotStopOutcome.NoMatchingRow - : CopilotStopOutcome.ChatNotFound - ) + span.setAttribute(TraceAttr.CopilotStopOutcome, stopOutcome) return NextResponse.json({ success: true }) } catch (error) { logger.error('Error stopping chat stream:', error) diff --git a/apps/sim/lib/copilot/chat/persisted-message.ts b/apps/sim/lib/copilot/chat/persisted-message.ts index 4629b591192..efde8aa104f 100644 --- a/apps/sim/lib/copilot/chat/persisted-message.ts +++ b/apps/sim/lib/copilot/chat/persisted-message.ts @@ -224,6 +224,45 @@ export function buildPersistedAssistantMessage( return message } +export function withStoppedContentBlock(message: PersistedMessage): PersistedMessage { + const contentBlocks = message.contentBlocks ?? [] + const hasAssistantText = contentBlocks.some( + (block) => + block.type === MothershipStreamV1EventType.text && + block.channel !== MothershipStreamV1TextChannel.thinking && + block.content?.trim() + ) + if ( + contentBlocks.some( + (block) => + block.type === MothershipStreamV1EventType.complete && + block.status === MothershipStreamV1CompletionStatus.cancelled + ) + ) { + return message + } + + return normalizeMessage({ + ...message, + contentBlocks: [ + ...(hasAssistantText || !message.content.trim() + ? [] + : [ + { + type: MothershipStreamV1EventType.text, + channel: MothershipStreamV1TextChannel.assistant, + content: message.content, + }, + ]), + ...contentBlocks, + { + type: MothershipStreamV1EventType.complete, + status: MothershipStreamV1CompletionStatus.cancelled, + }, + ], + }) +} + export interface UserMessageParams { id: string content: string diff --git a/apps/sim/lib/copilot/chat/post.test.ts b/apps/sim/lib/copilot/chat/post.test.ts index 1a6dee8a4b1..8b937704ac6 100644 --- a/apps/sim/lib/copilot/chat/post.test.ts +++ b/apps/sim/lib/copilot/chat/post.test.ts @@ -27,6 +27,8 @@ const { getPendingChatStreamId, releasePendingChatStream, resolveOrCreateChat, + finalizeAssistantTurn, + mockPublishStatusChanged, } = vi.hoisted(() => ({ getEffectiveDecryptedEnv: vi.fn(), generateWorkspaceContext: vi.fn(), @@ -38,6 +40,8 @@ const { getPendingChatStreamId: vi.fn(), releasePendingChatStream: vi.fn(), resolveOrCreateChat: vi.fn(), + finalizeAssistantTurn: vi.fn(), + mockPublishStatusChanged: vi.fn(), })) const getSession = authMockFns.mockGetSession @@ -78,9 +82,13 @@ vi.mock('@/lib/copilot/chat/lifecycle', () => ({ resolveOrCreateChat, })) +vi.mock('@/lib/copilot/chat/terminal-state', () => ({ + finalizeAssistantTurn, +})) + vi.mock('@/lib/copilot/tasks', () => ({ taskPubSub: { - publishStatusChanged: vi.fn(), + publishStatusChanged: mockPublishStatusChanged, }, })) @@ -137,6 +145,13 @@ describe('handleUnifiedChatPost', () => { conversationHistory: [], isNew: true, }) + finalizeAssistantTurn.mockResolvedValue({ + found: true, + updated: true, + appendedAssistant: true, + workspaceId: 'ws-1', + outcome: 'appended_assistant', + }) }) it('routes workflow-attached chat requests through the copilot backend path', async () => { @@ -176,6 +191,7 @@ describe('handleUnifiedChatPost', () => { body: JSON.stringify({ message: 'Hello', workspaceId: 'ws-1', + createNewChat: true, }), }) ) @@ -205,6 +221,90 @@ describe('handleUnifiedChatPost', () => { ) }) + it('persists cancelled partial responses from the server lifecycle', async () => { + await handleUnifiedChatPost( + new NextRequest('http://localhost/api/copilot/chat', { + method: 'POST', + body: JSON.stringify({ + message: 'Hello', + workspaceId: 'ws-1', + createNewChat: true, + }), + }) + ) + + const streamArgs = createSSEStream.mock.calls[0]?.[0] + const onComplete = streamArgs?.orchestrateOptions?.onComplete + expect(onComplete).toBeTypeOf('function') + + await onComplete({ + success: false, + cancelled: true, + content: 'partial answer', + contentBlocks: [], + toolCalls: [], + chatId: 'chat-1', + requestId: 'request-1', + }) + + expect(finalizeAssistantTurn).toHaveBeenCalledWith( + expect.objectContaining({ + chatId: 'chat-1', + userMessageId: expect.any(String), + streamMarkerPolicy: 'active-or-cleared', + assistantMessage: expect.objectContaining({ + role: 'assistant', + content: 'partial answer', + contentBlocks: expect.arrayContaining([ + expect.objectContaining({ type: 'complete', status: 'cancelled' }), + ]), + }), + }) + ) + }) + + it('republishes completed status when cancelled lifecycle persistence already ran', async () => { + await handleUnifiedChatPost( + new NextRequest('http://localhost/api/copilot/chat', { + method: 'POST', + body: JSON.stringify({ + message: 'Hello', + workspaceId: 'ws-1', + createNewChat: true, + }), + }) + ) + + const streamArgs = createSSEStream.mock.calls[0]?.[0] + const onComplete = streamArgs?.orchestrateOptions?.onComplete + expect(onComplete).toBeTypeOf('function') + + finalizeAssistantTurn.mockResolvedValueOnce({ + found: true, + updated: false, + appendedAssistant: false, + workspaceId: 'ws-1', + outcome: 'assistant_already_persisted', + }) + + await onComplete({ + success: false, + cancelled: true, + content: 'partial answer', + contentBlocks: [], + toolCalls: [], + chatId: 'chat-1', + requestId: 'request-1', + }) + + expect(mockPublishStatusChanged).toHaveBeenCalledWith({ + workspaceId: 'ws-1', + chatId: 'chat-1', + type: 'completed', + streamId: streamArgs?.streamId, + }) + }) + it('rejects requests that have neither workflow nor workspace attachment', async () => { const response = await handleUnifiedChatPost( new NextRequest('http://localhost/api/copilot/chat', { diff --git a/apps/sim/lib/copilot/chat/post.ts b/apps/sim/lib/copilot/chat/post.ts index 9eaa77b43d3..a7ba4573879 100644 --- a/apps/sim/lib/copilot/chat/post.ts +++ b/apps/sim/lib/copilot/chat/post.ts @@ -14,6 +14,7 @@ import { buildCopilotRequestPayload } from '@/lib/copilot/chat/payload' import { buildPersistedAssistantMessage, buildPersistedUserMessage, + withStoppedContentBlock, } from '@/lib/copilot/chat/persisted-message' import { processContextsServer, @@ -23,6 +24,7 @@ import { finalizeAssistantTurn } from '@/lib/copilot/chat/terminal-state' import { generateWorkspaceContext } from '@/lib/copilot/chat/workspace-context' import { COPILOT_REQUEST_MODES } from '@/lib/copilot/constants' import { + CopilotChatFinalizeOutcome, CopilotChatPersistOutcome, CopilotTransport, } from '@/lib/copilot/generated/trace-attribute-values-v1' @@ -425,13 +427,31 @@ function buildOnComplete(params: { if (!chatId) return - // On cancel, /chat/stop is the sole DB writer — it persists - // partial content AND clears conversationId in one UPDATE. If we - // finalize here first the filter misses and content vanishes. - // Real errors still finalize so the stream marker clears. - if (result.cancelled) return - try { + if (result.cancelled) { + const finalization = await finalizeAssistantTurn({ + chatId, + userMessageId, + assistantMessage: withStoppedContentBlock( + buildPersistedAssistantMessage(result, requestId) + ), + streamMarkerPolicy: 'active-or-cleared', + }) + const shouldPublishCompletion = + finalization.updated || + finalization.outcome === CopilotChatFinalizeOutcome.AssistantAlreadyPersisted + + if (notifyWorkspaceStatus && workspaceId && shouldPublishCompletion) { + taskPubSub?.publishStatusChanged({ + workspaceId, + chatId, + type: 'completed', + streamId: userMessageId, + }) + } + return + } + await finalizeAssistantTurn({ chatId, userMessageId, diff --git a/apps/sim/lib/copilot/chat/terminal-state.test.ts b/apps/sim/lib/copilot/chat/terminal-state.test.ts index c313156f3b5..cf4a230bf31 100644 --- a/apps/sim/lib/copilot/chat/terminal-state.test.ts +++ b/apps/sim/lib/copilot/chat/terminal-state.test.ts @@ -182,4 +182,33 @@ describe('finalizeAssistantTurn', () => { expect(result.updated).toBe(false) expect(updateSet).not.toHaveBeenCalled() }) + + it('reports already persisted when a cleared marker races with a duplicate stop', async () => { + selectLimit.mockResolvedValue([ + { + messages: [ + { id: 'user-1', role: 'user', content: 'hello' }, + { id: 'assistant-1', role: 'assistant', content: 'partial' }, + ], + conversationId: null, + workspaceId: 'ws-1', + }, + ]) + + const result = await finalizeAssistantTurn({ + chatId: 'chat-1', + userMessageId: 'user-1', + streamMarkerPolicy: 'active-or-cleared', + assistantMessage: { + id: 'assistant-2', + role: 'assistant', + content: 'partial', + timestamp: '2024-01-01T00:00:00.000Z', + }, + }) + + expect(result.updated).toBe(false) + expect(result.outcome).toBe('assistant_already_persisted') + expect(updateSet).not.toHaveBeenCalled() + }) }) diff --git a/apps/sim/lib/copilot/request/lifecycle/run.test.ts b/apps/sim/lib/copilot/request/lifecycle/run.test.ts new file mode 100644 index 00000000000..ded80fdb39d --- /dev/null +++ b/apps/sim/lib/copilot/request/lifecycle/run.test.ts @@ -0,0 +1,168 @@ +/** + * @vitest-environment node + */ + +import { beforeEach, describe, expect, it, vi } from 'vitest' +import type { ExecutionContext, StreamingContext } from '@/lib/copilot/request/types' + +const { + mockCreateRunSegment, + mockGetEffectiveDecryptedEnv, + mockGetMothershipBaseURL, + mockGetMothershipSourceEnvHeaders, + mockPrepareExecutionContext, + mockRunStreamLoop, + mockUpdateRunStatus, +} = vi.hoisted(() => ({ + mockCreateRunSegment: vi.fn(), + mockGetEffectiveDecryptedEnv: vi.fn(), + mockGetMothershipBaseURL: vi.fn(), + mockGetMothershipSourceEnvHeaders: vi.fn(), + mockPrepareExecutionContext: vi.fn(), + mockRunStreamLoop: vi.fn(), + mockUpdateRunStatus: vi.fn(), +})) + +vi.mock('@/lib/copilot/async-runs/repository', () => ({ + createRunSegment: mockCreateRunSegment, + updateRunStatus: mockUpdateRunStatus, +})) + +vi.mock('@/lib/copilot/request/go/stream', () => { + class CopilotBackendError extends Error { + status?: number + + constructor(message: string, options?: { status?: number }) { + super(message) + this.name = 'CopilotBackendError' + this.status = options?.status + } + } + + class BillingLimitError extends Error { + userId: string + + constructor(userId: string) { + super('Usage limit reached') + this.name = 'BillingLimitError' + this.userId = userId + } + } + + return { + BillingLimitError, + CopilotBackendError, + runStreamLoop: mockRunStreamLoop, + } +}) + +vi.mock('@/lib/copilot/server/agent-url', () => ({ + getMothershipBaseURL: mockGetMothershipBaseURL, + getMothershipSourceEnvHeaders: mockGetMothershipSourceEnvHeaders, +})) + +vi.mock('@/lib/core/config/env', () => ({ + env: { + COPILOT_API_KEY: undefined, + }, + getEnv: vi.fn((key: string) => (key === 'NEXT_PUBLIC_APP_URL' ? 'http://localhost:3000' : '')), + isTruthy: vi.fn((value: string | undefined) => value === 'true'), +})) + +vi.mock('@/lib/environment/utils', () => ({ + getEffectiveDecryptedEnv: mockGetEffectiveDecryptedEnv, +})) + +vi.mock('@/lib/copilot/tools/handlers/context', () => ({ + prepareExecutionContext: mockPrepareExecutionContext, +})) + +vi.mock('@/lib/copilot/request/tools/billing', () => ({ + handleBillingLimitResponse: vi.fn(), +})) + +vi.mock('@/lib/copilot/request/tools/executor', () => ({ + executeToolAndReport: vi.fn(), +})) + +import { runCopilotLifecycle } from '@/lib/copilot/request/lifecycle/run' + +describe('runCopilotLifecycle', () => { + beforeEach(() => { + vi.clearAllMocks() + mockGetMothershipBaseURL.mockResolvedValue('http://mothership.test') + mockGetMothershipSourceEnvHeaders.mockReturnValue({}) + }) + + it('runs cancelled completion persistence when a stream throws after abort', async () => { + const abortController = new AbortController() + abortController.abort('stop') + const onComplete = vi.fn() + const onError = vi.fn() + const executionContext: ExecutionContext = { + userId: 'user-1', + workflowId: '', + workspaceId: 'ws-1', + chatId: 'chat-1', + decryptedEnvVars: {}, + } + + mockRunStreamLoop.mockImplementationOnce( + async ( + _fetchUrl: string, + _fetchOptions: RequestInit, + context: StreamingContext + ): Promise => { + context.accumulatedContent = 'partial answer' + context.contentBlocks.push({ + type: 'text', + content: 'partial answer', + timestamp: 1, + }) + throw new Error('publisher closed after stop') + } + ) + + const result = await runCopilotLifecycle( + { message: 'hello', messageId: 'stream-1' }, + { + userId: 'user-1', + workspaceId: 'ws-1', + chatId: 'chat-1', + executionId: 'exec-1', + runId: 'run-1', + abortSignal: abortController.signal, + executionContext, + onComplete, + onError, + } + ) + + expect(onError).not.toHaveBeenCalled() + expect(onComplete).toHaveBeenCalledWith( + expect.objectContaining({ + success: false, + cancelled: true, + content: 'partial answer', + chatId: 'chat-1', + requestId: undefined, + error: 'publisher closed after stop', + contentBlocks: [ + expect.objectContaining({ + type: 'text', + content: 'partial answer', + }), + ], + }) + ) + expect(result).toEqual( + expect.objectContaining({ + success: false, + cancelled: true, + content: 'partial answer', + chatId: 'chat-1', + error: 'publisher closed after stop', + }) + ) + }) +}) diff --git a/apps/sim/lib/copilot/request/lifecycle/run.ts b/apps/sim/lib/copilot/request/lifecycle/run.ts index 141184b49b2..c98ae698b6f 100644 --- a/apps/sim/lib/copilot/request/lifecycle/run.ts +++ b/apps/sim/lib/copilot/request/lifecycle/run.ts @@ -122,6 +122,7 @@ export async function runCopilotLifecycle( messageId: payloadMsgId, ...(lifecycleOptions.trace ? { trace: lifecycleOptions.trace } : {}), }) + let onCompleteStarted = false try { await runCheckpointLoop(requestPayload, context, execContext, lifecycleOptions, goRoute) @@ -129,9 +130,10 @@ export async function runCopilotLifecycle( const result: OrchestratorResult = { success: context.errors.length === 0 && !context.wasAborted, // `cancelled` is an explicit discriminator so callers can tell - // "user hit Stop" (don't clear the chat row; /chat/stop owns it) - // from "backend errored" (do clear the row so the chat isn't - // stuck with a non-null `conversationId`). An error that also + // "user hit Stop" (persist partial assistant content through the + // cancelled completion path) from "backend errored" (do clear the + // row so the chat isn't stuck with a non-null `conversationId`). + // An error that also // happens to fire the abort signal still counts as an error // path, but practically that doesn't happen in the success // branch here — if there are errors we never reach a @@ -146,33 +148,54 @@ export async function runCopilotLifecycle( usage: context.usage, cost: context.cost, } - await lifecycleOptions.onComplete?.(result) + if (lifecycleOptions.onComplete) { + onCompleteStarted = true + await lifecycleOptions.onComplete(result) + } return result } catch (error) { - const err = error instanceof Error ? error : new Error('Copilot orchestration failed') + const err = toError(error) logger.error('Copilot orchestration failed', { error: err.message }) // If the abort signal fired, this throw is a consequence of the // cancel (publisher.publish fails once the client disconnects, a // downstream Go read throws on ctx cancel, etc.) — NOT a real // backend error. Don't invoke `onError`, because on the cancel - // path `/api/copilot/chat/stop` is the single DB writer and - // `onError` would race with it via `finalizeAssistantTurn`, - // clearing `conversationId` before stop's UPDATE can match (see - // `buildOnComplete` in chat/post.ts for the full rationale). + // path `onComplete(cancelled)` persists partial content with an + // idempotent row-locked finalizer. `onError` would race with it via + // `finalizeAssistantTurn`, clearing `conversationId` before the + // partial content can be appended. // Return `cancelled: true` so upstream classification stays // consistent with the success-path cancel result. const wasCancelled = lifecycleOptions.abortSignal?.aborted ?? false if (!wasCancelled) { await lifecycleOptions.onError?.(err) + } else if (!onCompleteStarted && lifecycleOptions.onComplete) { + await lifecycleOptions.onComplete({ + success: false, + cancelled: true, + content: context.accumulatedContent, + contentBlocks: context.contentBlocks, + toolCalls: buildToolCallSummaries(context), + chatId: context.chatId, + requestId: context.requestId, + error: err.message, + errors: context.errors.length ? context.errors : undefined, + usage: context.usage, + cost: context.cost, + }) } return { success: false, cancelled: wasCancelled, - content: '', - contentBlocks: [], - toolCalls: [], + content: wasCancelled ? context.accumulatedContent : '', + contentBlocks: wasCancelled ? context.contentBlocks : [], + toolCalls: wasCancelled ? buildToolCallSummaries(context) : [], chatId: context.chatId, + requestId: context.requestId, error: err.message, + errors: context.errors.length ? context.errors : undefined, + usage: context.usage, + cost: context.cost, } } } From 05830df582d64ba20c6699f280963e159a9fab47 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Sun, 17 May 2026 15:23:06 -0700 Subject: [PATCH 3/3] address bugbot comment --- apps/sim/app/api/copilot/chat/stop/route.ts | 8 +-- .../lib/copilot/request/lifecycle/run.test.ts | 57 +++++++++++++++++++ apps/sim/lib/copilot/request/lifecycle/run.ts | 32 +++++------ 3 files changed, 75 insertions(+), 22 deletions(-) diff --git a/apps/sim/app/api/copilot/chat/stop/route.ts b/apps/sim/app/api/copilot/chat/stop/route.ts index d17b9621a98..05d7303d94c 100644 --- a/apps/sim/app/api/copilot/chat/stop/route.ts +++ b/apps/sim/app/api/copilot/chat/stop/route.ts @@ -51,18 +51,18 @@ export const POST = withRouteHandler((req: NextRequest) => const hasContent = content.trim().length > 0 const hasBlocks = Array.isArray(contentBlocks) && contentBlocks.length > 0 - const synthesizedStoppedBlocks = hasBlocks + const assistantBlocks = hasBlocks ? contentBlocks : hasContent - ? [{ type: 'text', channel: 'assistant', content }, { type: 'stopped' }] - : [{ type: 'stopped' }] + ? [{ type: 'text', channel: 'assistant', content }] + : [] const assistantMessage: PersistedMessage = withStoppedContentBlock( normalizeMessage({ id: generateId(), role: 'assistant', content, timestamp: new Date().toISOString(), - contentBlocks: synthesizedStoppedBlocks, + contentBlocks: assistantBlocks, ...(requestId ? { requestId } : {}), }) ) diff --git a/apps/sim/lib/copilot/request/lifecycle/run.test.ts b/apps/sim/lib/copilot/request/lifecycle/run.test.ts index ded80fdb39d..31b9c4dad58 100644 --- a/apps/sim/lib/copilot/request/lifecycle/run.test.ts +++ b/apps/sim/lib/copilot/request/lifecycle/run.test.ts @@ -165,4 +165,61 @@ describe('runCopilotLifecycle', () => { }) ) }) + + it('returns the cancelled result when cancelled completion persistence fails', async () => { + const abortController = new AbortController() + abortController.abort('stop') + const onComplete = vi.fn().mockRejectedValue(new Error('db unavailable')) + const onError = vi.fn() + const executionContext: ExecutionContext = { + userId: 'user-1', + workflowId: '', + workspaceId: 'ws-1', + chatId: 'chat-1', + decryptedEnvVars: {}, + } + + mockRunStreamLoop.mockImplementationOnce( + async ( + _fetchUrl: string, + _fetchOptions: RequestInit, + context: StreamingContext + ): Promise => { + context.accumulatedContent = 'partial answer' + throw new Error('publisher closed after stop') + } + ) + + const result = await runCopilotLifecycle( + { message: 'hello', messageId: 'stream-1' }, + { + userId: 'user-1', + workspaceId: 'ws-1', + chatId: 'chat-1', + executionId: 'exec-1', + runId: 'run-1', + abortSignal: abortController.signal, + executionContext, + onComplete, + onError, + } + ) + + expect(onError).not.toHaveBeenCalled() + expect(onComplete).toHaveBeenCalledWith( + expect.objectContaining({ + success: false, + cancelled: true, + content: 'partial answer', + }) + ) + expect(result).toEqual( + expect.objectContaining({ + success: false, + cancelled: true, + content: 'partial answer', + error: 'publisher closed after stop', + }) + ) + }) }) diff --git a/apps/sim/lib/copilot/request/lifecycle/run.ts b/apps/sim/lib/copilot/request/lifecycle/run.ts index c98ae698b6f..e06341f43ef 100644 --- a/apps/sim/lib/copilot/request/lifecycle/run.ts +++ b/apps/sim/lib/copilot/request/lifecycle/run.ts @@ -167,24 +167,7 @@ export async function runCopilotLifecycle( // Return `cancelled: true` so upstream classification stays // consistent with the success-path cancel result. const wasCancelled = lifecycleOptions.abortSignal?.aborted ?? false - if (!wasCancelled) { - await lifecycleOptions.onError?.(err) - } else if (!onCompleteStarted && lifecycleOptions.onComplete) { - await lifecycleOptions.onComplete({ - success: false, - cancelled: true, - content: context.accumulatedContent, - contentBlocks: context.contentBlocks, - toolCalls: buildToolCallSummaries(context), - chatId: context.chatId, - requestId: context.requestId, - error: err.message, - errors: context.errors.length ? context.errors : undefined, - usage: context.usage, - cost: context.cost, - }) - } - return { + const result: OrchestratorResult = { success: false, cancelled: wasCancelled, content: wasCancelled ? context.accumulatedContent : '', @@ -197,6 +180,19 @@ export async function runCopilotLifecycle( usage: context.usage, cost: context.cost, } + + if (!wasCancelled) { + await lifecycleOptions.onError?.(err) + } else if (!onCompleteStarted && lifecycleOptions.onComplete) { + try { + await lifecycleOptions.onComplete(result) + } catch (completeError) { + logger.error('Cancelled copilot completion callback failed', { + error: toError(completeError).message, + }) + } + } + return result } }