From b7d2777bac810739b61e96061c1f62fc0b723f4a Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Tue, 19 May 2026 11:04:12 -0700 Subject: [PATCH 1/3] improvement(cleanup): cleanup refs along in logs cleanup job --- apps/sim/background/cleanup-logs.test.ts | 104 +++++++++++++++ apps/sim/background/cleanup-logs.ts | 148 ++++++++++++++++++--- apps/sim/lib/billing/cleanup-dispatcher.ts | 117 ++++++++-------- apps/sim/lib/billing/core/plan.ts | 13 +- 4 files changed, 304 insertions(+), 78 deletions(-) create mode 100644 apps/sim/background/cleanup-logs.test.ts diff --git a/apps/sim/background/cleanup-logs.test.ts b/apps/sim/background/cleanup-logs.test.ts new file mode 100644 index 00000000000..a6e3e74c735 --- /dev/null +++ b/apps/sim/background/cleanup-logs.test.ts @@ -0,0 +1,104 @@ +/** + * @vitest-environment node + */ +import { describe, expect, it, vi } from 'vitest' +import { collectExecutionLargeValueKeys } from '@/background/cleanup-logs' + +vi.mock('@trigger.dev/sdk', () => ({ + task: vi.fn((definition) => definition), +})) + +describe('collectExecutionLargeValueKeys', () => { + it('collects large-value keys owned by the purged execution', () => { + const ownedRef = { + __simLargeValueRef: true, + version: 1, + id: 'lv_OWNEDREF0001', + kind: 'object', + size: 128, + key: 'execution/workspace-1/workflow-1/execution-1/large-value-lv_OWNEDREF0001.json', + executionId: 'execution-1', + } + const ownedManifest = { + __simLargeArrayManifest: true, + version: 2, + kind: 'array', + totalCount: 1, + chunkCount: 1, + byteSize: 128, + chunks: [ + { + ref: { + __simLargeValueRef: true, + version: 1, + id: 'lv_CHUNKREF0001', + kind: 'array', + size: 128, + key: 'execution/workspace-1/workflow-1/execution-1/large-value-lv_CHUNKREF0001.json', + executionId: 'execution-1', + }, + count: 1, + byteSize: 128, + }, + ], + preview: [], + } + const inheritedRef = { + __simLargeValueRef: true, + version: 1, + id: 'lv_INHERITED001', + kind: 'object', + size: 128, + key: 'execution/workspace-1/workflow-1/source-execution/large-value-lv_INHERITED001.json', + executionId: 'source-execution', + } + + const keys = collectExecutionLargeValueKeys( + { + output: { + ownedRef, + ownedManifest, + inheritedRef, + }, + }, + 'execution-1' + ) + + expect(keys.sort()).toEqual([ + 'execution/workspace-1/workflow-1/execution-1/large-value-lv_CHUNKREF0001.json', + 'execution/workspace-1/workflow-1/execution-1/large-value-lv_OWNEDREF0001.json', + ]) + }) + + it('deduplicates repeated refs and ignores keyless cache-only refs', () => { + const ref = { + __simLargeValueRef: true, + version: 1, + id: 'lv_REPEATREF001', + kind: 'string', + size: 128, + key: 'execution/workspace-1/workflow-1/execution-1/large-value-lv_REPEATREF001.json', + executionId: 'execution-1', + } + + const keys = collectExecutionLargeValueKeys( + { + first: ref, + second: ref, + keyless: { + __simLargeValueRef: true, + version: 1, + id: 'lv_KEYLESSREF01', + kind: 'string', + size: 128, + executionId: 'execution-1', + }, + }, + 'execution-1' + ) + + expect(keys).toEqual([ + 'execution/workspace-1/workflow-1/execution-1/large-value-lv_REPEATREF001.json', + ]) + }) +}) diff --git a/apps/sim/background/cleanup-logs.ts b/apps/sim/background/cleanup-logs.ts index 41fb5189357..b69d4002610 100644 --- a/apps/sim/background/cleanup-logs.ts +++ b/apps/sim/background/cleanup-logs.ts @@ -1,14 +1,15 @@ import { db } from '@sim/db' -import { jobExecutionLogs, workflowExecutionLogs } from '@sim/db/schema' +import { jobExecutionLogs, pausedExecutions, workflowExecutionLogs } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { task } from '@trigger.dev/sdk' -import { and, inArray, lt } from 'drizzle-orm' +import { and, eq, inArray, isNull, lt, notInArray, or } from 'drizzle-orm' import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher' import { batchDeleteByWorkspaceAndTimestamp, chunkedBatchDelete, type TableCleanupResult, } from '@/lib/cleanup/batch-delete' +import { isLargeValueRef } from '@/lib/execution/payloads/large-value-ref' import { snapshotService } from '@/lib/logs/execution/snapshot/service' import { isUsingCloudStorage, StorageService } from '@/lib/uploads' import { deleteFileMetadata } from '@/lib/uploads/server/metadata' @@ -19,6 +20,60 @@ interface FileDeleteStats { filesTotal: number filesDeleted: number filesDeleteFailed: number + largeValuesTotal: number + largeValuesDeleted: number + largeValuesDeleteFailed: number +} + +const RESUMABLE_PAUSED_STATUSES = ['paused', 'partially_resumed', 'cancelling'] + +export function collectExecutionLargeValueKeys(value: unknown, executionId: string): string[] { + const keys = new Set() + collectExecutionLargeValueKeysInto(value, executionId, new WeakSet(), keys) + return Array.from(keys) +} + +function getExecutionIdFromStorageKey(key: string): string | undefined { + const parts = key.split('/') + if (parts[0] !== 'execution' || parts.length < 5) { + return undefined + } + return parts[3] +} + +function collectExecutionLargeValueKeysInto( + value: unknown, + executionId: string, + seen: WeakSet, + keys: Set +): void { + if (!value || typeof value !== 'object') { + return + } + + if (seen.has(value)) { + return + } + + if (isLargeValueRef(value)) { + if (value.key && getExecutionIdFromStorageKey(value.key) === executionId) { + keys.add(value.key) + } + return + } + + seen.add(value) + + if (Array.isArray(value)) { + for (const item of value) { + collectExecutionLargeValueKeysInto(item, executionId, seen, keys) + } + return + } + + for (const entryValue of Object.values(value)) { + collectExecutionLargeValueKeysInto(entryValue, executionId, seen, keys) + } } async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Promise { @@ -41,12 +96,39 @@ async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Pro ) } +async function deleteLargeValueStorageKeys(keys: string[], stats: FileDeleteStats): Promise { + if (!isUsingCloudStorage() || keys.length === 0) return + + const uniqueKeys = Array.from(new Set(keys)) + stats.largeValuesTotal += uniqueKeys.length + + await Promise.all( + uniqueKeys.map(async (key) => { + try { + await StorageService.deleteFile({ key, context: 'execution' }) + await deleteFileMetadata(key) + stats.largeValuesDeleted++ + } catch (error) { + stats.largeValuesDeleteFailed++ + logger.error(`Failed to delete large execution value ${key}:`, { error }) + } + }) + ) +} + async function cleanupWorkflowExecutionLogs( workspaceIds: string[], retentionDate: Date, label: string ): Promise { - const fileStats: FileDeleteStats = { filesTotal: 0, filesDeleted: 0, filesDeleteFailed: 0 } + const fileStats: FileDeleteStats = { + filesTotal: 0, + filesDeleted: 0, + filesDeleteFailed: 0, + largeValuesTotal: 0, + largeValuesDeleted: 0, + largeValuesDeleteFailed: 0, + } const dbStats = await chunkedBatchDelete({ tableDef: workflowExecutionLogs, @@ -54,23 +136,59 @@ async function cleanupWorkflowExecutionLogs( tableName: `${label}/workflow_execution_logs`, selectChunk: (chunkIds, limit) => db - .select({ id: workflowExecutionLogs.id, files: workflowExecutionLogs.files }) + .select({ + id: workflowExecutionLogs.id, + executionId: workflowExecutionLogs.executionId, + executionData: workflowExecutionLogs.executionData, + files: workflowExecutionLogs.files, + }) .from(workflowExecutionLogs) + .leftJoin( + pausedExecutions, + eq(pausedExecutions.executionId, workflowExecutionLogs.executionId) + ) .where( and( inArray(workflowExecutionLogs.workspaceId, chunkIds), - lt(workflowExecutionLogs.startedAt, retentionDate) + lt(workflowExecutionLogs.startedAt, retentionDate), + or( + isNull(pausedExecutions.status), + notInArray(pausedExecutions.status, RESUMABLE_PAUSED_STATUSES) + ) ) ) .limit(limit), onBatch: async (rows) => { - for (const row of rows) await deleteExecutionFiles(row.files, fileStats) + for (const row of rows) { + await deleteExecutionFiles(row.files, fileStats) + await deleteLargeValueStorageKeys( + collectExecutionLargeValueKeys(row.executionData, row.executionId), + fileStats + ) + } }, }) return { ...dbStats, ...fileStats } } +async function cleanupFreePlanOrphanedSnapshots( + payload: CleanupJobPayload, + retentionHours: number +): Promise { + if (payload.plan !== 'free') { + return + } + + try { + const retentionDays = Math.floor(retentionHours / 24) + const snapshotsCleaned = await snapshotService.cleanupOrphanedSnapshots(retentionDays + 1) + logger.info(`Cleaned up ${snapshotsCleaned} orphaned snapshots`) + } catch (snapshotError) { + logger.error('Error cleaning up orphaned snapshots:', { snapshotError }) + } +} + export async function runCleanupLogs(payload: CleanupJobPayload): Promise { const startTime = Date.now() @@ -82,12 +200,14 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise const { workspaceIds, retentionHours, label } = scope + const retentionDate = new Date(Date.now() - retentionHours * 60 * 60 * 1000) + if (workspaceIds.length === 0) { logger.info(`[${label}] No workspaces to process`) + await cleanupFreePlanOrphanedSnapshots(payload, retentionHours) return } - const retentionDate = new Date(Date.now() - retentionHours * 60 * 60 * 1000) logger.info( `[${label}] Cleaning ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}` ) @@ -96,6 +216,9 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise logger.info( `[${label}] workflow_execution_logs files: ${workflowResults.filesDeleted}/${workflowResults.filesTotal} deleted, ${workflowResults.filesDeleteFailed} failed` ) + logger.info( + `[${label}] workflow_execution_logs large values: ${workflowResults.largeValuesDeleted}/${workflowResults.largeValuesTotal} deleted, ${workflowResults.largeValuesDeleteFailed} failed` + ) await batchDeleteByWorkspaceAndTimestamp({ tableDef: jobExecutionLogs, @@ -106,16 +229,7 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise tableName: `${label}/job_execution_logs`, }) - // Snapshot cleanup runs only on the free job to avoid running it N times for N enterprise workspaces. - if (payload.plan === 'free') { - try { - const retentionDays = Math.floor(retentionHours / 24) - const snapshotsCleaned = await snapshotService.cleanupOrphanedSnapshots(retentionDays + 1) - logger.info(`Cleaned up ${snapshotsCleaned} orphaned snapshots`) - } catch (snapshotError) { - logger.error('Error cleaning up orphaned snapshots:', { snapshotError }) - } - } + await cleanupFreePlanOrphanedSnapshots(payload, retentionHours) const timeElapsed = (Date.now() - startTime) / 1000 logger.info(`[${label}] Job completed in ${timeElapsed.toFixed(2)}s`) diff --git a/apps/sim/lib/billing/cleanup-dispatcher.ts b/apps/sim/lib/billing/cleanup-dispatcher.ts index 2ee91159b70..3b33d4c5f04 100644 --- a/apps/sim/lib/billing/cleanup-dispatcher.ts +++ b/apps/sim/lib/billing/cleanup-dispatcher.ts @@ -1,10 +1,10 @@ import { db } from '@sim/db' -import { organization, subscription, workspace } from '@sim/db/schema' +import { organization, workspace } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { tasks } from '@trigger.dev/sdk' -import { and, eq, inArray, isNotNull, isNull, sql } from 'drizzle-orm' -import { type PlanCategory, sqlIsPaid, sqlIsPro, sqlIsTeam } from '@/lib/billing/plan-helpers' -import { ENTITLED_SUBSCRIPTION_STATUSES } from '@/lib/billing/subscriptions/utils' +import { eq, isNull } from 'drizzle-orm' +import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' +import { getPlanType, type PlanCategory } from '@/lib/billing/plan-helpers' import { getJobQueue } from '@/lib/core/async-jobs' import { shouldExecuteInline } from '@/lib/core/async-jobs/config' import type { EnqueueOptions } from '@/lib/core/async-jobs/types' @@ -38,6 +38,12 @@ interface CleanupJobConfig { defaults: Record } +interface WorkspaceCleanupScopeRow { + id: string + billedAccountUserId: string + organizationSettings: OrganizationRetentionSettings | null +} + const DAY = 24 /** @@ -61,44 +67,50 @@ export const CLEANUP_CONFIG = { }, } as const satisfies Record -/** - * Bulk-lookup workspace IDs for a non-enterprise plan category. Enterprise is - * per-workspace (routed through the owning organization's retention config). - */ -async function resolveWorkspaceIdsForPlan(plan: NonEnterprisePlan): Promise { - if (plan === 'free') { - const rows = await db - .select({ id: workspace.id }) - .from(workspace) - .leftJoin( - subscription, - and( - eq(subscription.referenceId, workspace.billedAccountUserId), - inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES), - sqlIsPaid(subscription.plan) - ) - ) - .where(and(isNull(subscription.id), isNull(workspace.archivedAt))) - - return rows.map((r) => r.id) - } - - const planPredicate = plan === 'pro' ? sqlIsPro(subscription.plan) : sqlIsTeam(subscription.plan) +async function listActiveWorkspaceCleanupScopeRows(): Promise { const rows = await db - .select({ id: workspace.id }) + .select({ + id: workspace.id, + billedAccountUserId: workspace.billedAccountUserId, + organizationSettings: organization.dataRetentionSettings, + }) .from(workspace) - .innerJoin( - subscription, - and( - eq(subscription.referenceId, workspace.billedAccountUserId), - inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES), - planPredicate! - ) - ) + .leftJoin(organization, eq(organization.id, workspace.organizationId)) .where(isNull(workspace.archivedAt)) - .groupBy(workspace.id) - return rows.map((r) => r.id) + return rows.map((row) => ({ + ...row, + organizationSettings: + (row.organizationSettings as OrganizationRetentionSettings | null) ?? null, + })) +} + +async function resolvePlanTypesByBilledUserId( + rows: WorkspaceCleanupScopeRow[] +): Promise> { + const billedUserIds = Array.from(new Set(rows.map((row) => row.billedAccountUserId))) + const entries = await Promise.all( + billedUserIds.map(async (userId) => { + const subscription = await getHighestPrioritySubscription(userId, { onError: 'throw' }) + return [userId, getPlanType(subscription?.plan)] as const + }) + ) + + return new Map(entries) +} + +/** + * Bulk-lookup workspace IDs for a non-enterprise plan category using the same + * effective-plan lookup used by execution, limits, and workspace policy. + * Enterprise is per-workspace (routed through the owning organization's + * retention config). + */ +async function resolveWorkspaceIdsForPlan(plan: NonEnterprisePlan): Promise { + const rows = await listActiveWorkspaceCleanupScopeRows() + const planByBilledUserId = await resolvePlanTypesByBilledUserId(rows) + return rows + .filter((row) => planByBilledUserId.get(row.billedAccountUserId) === plan) + .map((row) => row.id) } export interface ResolvedCleanupScope { @@ -185,28 +197,13 @@ export async function dispatchCleanupJobs( jobIds.push(jobId) } - // Enterprise: workspaces whose owning org is on an active enterprise sub and - // has a non-NULL value for this job's retention key. groupBy dedupes in case - // multiple entitled subscription rows exist for the same org. - const enterpriseRows = await db - .select({ id: workspace.id }) - .from(workspace) - .innerJoin(organization, eq(organization.id, workspace.organizationId)) - .innerJoin( - subscription, - and( - eq(subscription.referenceId, organization.id), - inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES), - eq(subscription.plan, 'enterprise') - ) - ) - .where( - and( - isNull(workspace.archivedAt), - isNotNull(sql`${organization.dataRetentionSettings}->>${config.key}`) - ) - ) - .groupBy(workspace.id) + const activeWorkspaceRows = await listActiveWorkspaceCleanupScopeRows() + const planByBilledUserId = await resolvePlanTypesByBilledUserId(activeWorkspaceRows) + const enterpriseRows = activeWorkspaceRows.filter( + (row) => + planByBilledUserId.get(row.billedAccountUserId) === 'enterprise' && + row.organizationSettings?.[config.key] != null + ) const enterpriseCount = enterpriseRows.length diff --git a/apps/sim/lib/billing/core/plan.ts b/apps/sim/lib/billing/core/plan.ts index cb8fea848db..d517acce4d0 100644 --- a/apps/sim/lib/billing/core/plan.ts +++ b/apps/sim/lib/billing/core/plan.ts @@ -13,6 +13,10 @@ const logger = createLogger('PlanLookup') export type HighestPrioritySubscription = Awaited> +interface GetHighestPrioritySubscriptionOptions { + onError?: 'return-null' | 'throw' +} + /** * Get the highest priority paid subscription for a user. * @@ -27,7 +31,11 @@ export type HighestPrioritySubscription = Awaited Date: Tue, 19 May 2026 11:31:00 -0700 Subject: [PATCH 2/3] address comments --- apps/sim/lib/billing/cleanup-dispatcher.ts | 69 +++++++++++++++++++--- apps/sim/lib/billing/core/billing.ts | 13 +++- 2 files changed, 72 insertions(+), 10 deletions(-) diff --git a/apps/sim/lib/billing/cleanup-dispatcher.ts b/apps/sim/lib/billing/cleanup-dispatcher.ts index 3b33d4c5f04..2841d7f28ef 100644 --- a/apps/sim/lib/billing/cleanup-dispatcher.ts +++ b/apps/sim/lib/billing/cleanup-dispatcher.ts @@ -1,14 +1,17 @@ import { db } from '@sim/db' +import type { WorkspaceMode } from '@sim/db/schema' import { organization, workspace } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { tasks } from '@trigger.dev/sdk' import { eq, isNull } from 'drizzle-orm' +import { getOrganizationSubscription } from '@/lib/billing/core/billing' import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' import { getPlanType, type PlanCategory } from '@/lib/billing/plan-helpers' import { getJobQueue } from '@/lib/core/async-jobs' import { shouldExecuteInline } from '@/lib/core/async-jobs/config' import type { EnqueueOptions } from '@/lib/core/async-jobs/types' import { isTriggerAvailable } from '@/lib/knowledge/documents/service' +import { isOrganizationWorkspace } from '@/lib/workspaces/policy' const logger = createLogger('RetentionDispatcher') @@ -41,11 +44,15 @@ interface CleanupJobConfig { interface WorkspaceCleanupScopeRow { id: string billedAccountUserId: string + organizationId: string | null + workspaceMode: WorkspaceMode organizationSettings: OrganizationRetentionSettings | null } const DAY = 24 +type PlanResolutionEntry = readonly [string, PlanCategory] + /** * Single source of truth for cleanup retention: which key each job type reads * from `organization.dataRetentionSettings`, and the default retention (in @@ -72,6 +79,8 @@ async function listActiveWorkspaceCleanupScopeRows(): Promise row.billedAccountUserId))) const entries = await Promise.all( billedUserIds.map(async (userId) => { - const subscription = await getHighestPrioritySubscription(userId, { onError: 'throw' }) - return [userId, getPlanType(subscription?.plan)] as const + try { + const subscription = await getHighestPrioritySubscription(userId, { onError: 'throw' }) + return [userId, getPlanType(subscription?.plan)] as const + } catch (error) { + logger.error('Skipping cleanup for billed user after plan lookup failed', { + userId, + error, + }) + return null + } + }) + ) + + return new Map(entries.filter((entry): entry is PlanResolutionEntry => entry !== null)) +} + +async function resolvePlanTypesByWorkspaceId( + rows: WorkspaceCleanupScopeRow[] +): Promise> { + const userScopedRows = rows.filter((row) => !isOrganizationWorkspace(row)) + const userPlanByBilledUserId = await resolvePlanTypesByBilledUserId(userScopedRows) + const entries = await Promise.all( + rows.map(async (row) => { + const organizationId = isOrganizationWorkspace(row) ? row.organizationId : null + if (organizationId) { + try { + const subscription = await getOrganizationSubscription(organizationId, { + onError: 'throw', + }) + return [row.id, getPlanType(subscription?.plan)] as const + } catch (error) { + logger.error('Skipping cleanup for organization workspace after plan lookup failed', { + workspaceId: row.id, + organizationId, + error, + }) + return null + } + } + + const plan = userPlanByBilledUserId.get(row.billedAccountUserId) + if (plan === undefined) { + return null + } + + return [row.id, plan] as const }) ) - return new Map(entries) + return new Map(entries.filter((entry): entry is PlanResolutionEntry => entry !== null)) } /** @@ -107,10 +160,8 @@ async function resolvePlanTypesByBilledUserId( */ async function resolveWorkspaceIdsForPlan(plan: NonEnterprisePlan): Promise { const rows = await listActiveWorkspaceCleanupScopeRows() - const planByBilledUserId = await resolvePlanTypesByBilledUserId(rows) - return rows - .filter((row) => planByBilledUserId.get(row.billedAccountUserId) === plan) - .map((row) => row.id) + const planByWorkspaceId = await resolvePlanTypesByWorkspaceId(rows) + return rows.filter((row) => planByWorkspaceId.get(row.id) === plan).map((row) => row.id) } export interface ResolvedCleanupScope { @@ -198,10 +249,10 @@ export async function dispatchCleanupJobs( } const activeWorkspaceRows = await listActiveWorkspaceCleanupScopeRows() - const planByBilledUserId = await resolvePlanTypesByBilledUserId(activeWorkspaceRows) + const planByWorkspaceId = await resolvePlanTypesByWorkspaceId(activeWorkspaceRows) const enterpriseRows = activeWorkspaceRows.filter( (row) => - planByBilledUserId.get(row.billedAccountUserId) === 'enterprise' && + planByWorkspaceId.get(row.id) === 'enterprise' && row.organizationSettings?.[config.key] != null ) diff --git a/apps/sim/lib/billing/core/billing.ts b/apps/sim/lib/billing/core/billing.ts index e56261a62a9..78c359ed569 100644 --- a/apps/sim/lib/billing/core/billing.ts +++ b/apps/sim/lib/billing/core/billing.ts @@ -28,6 +28,10 @@ import { createLogger } from '@sim/logger' const logger = createLogger('Billing') +interface GetOrganizationSubscriptionOptions { + onError?: 'return-null' | 'throw' +} + /** * Get the organization's subscription row when its status is one of * `ENTITLED_SUBSCRIPTION_STATUSES` (includes `past_due`). Use this @@ -37,7 +41,11 @@ const logger = createLogger('Billing') * (from `core/subscription.ts`), which excludes `past_due`. * Returns `null` when there is no entitled sub. */ -export async function getOrganizationSubscription(organizationId: string) { +export async function getOrganizationSubscription( + organizationId: string, + options: GetOrganizationSubscriptionOptions = {} +) { + const { onError = 'return-null' } = options try { const orgSubs = await db .select() @@ -54,6 +62,9 @@ export async function getOrganizationSubscription(organizationId: string) { return orgSubs.length > 0 ? orgSubs[0] : null } catch (error) { logger.error('Error getting organization subscription', { error, organizationId }) + if (onError === 'throw') { + throw error + } return null } } From b9ffe66aab718d9023ecddb959b5923fb54d7817 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Tue, 19 May 2026 12:02:33 -0700 Subject: [PATCH 3/3] cleanup code --- apps/sim/background/cleanup-logs.test.ts | 104 --------------------- apps/sim/background/cleanup-logs.ts | 80 ++++++---------- apps/sim/lib/billing/cleanup-dispatcher.ts | 50 ++++++++-- apps/sim/lib/billing/core/plan.ts | 60 +++++++++--- apps/sim/lib/billing/core/subscription.ts | 7 +- 5 files changed, 123 insertions(+), 178 deletions(-) delete mode 100644 apps/sim/background/cleanup-logs.test.ts diff --git a/apps/sim/background/cleanup-logs.test.ts b/apps/sim/background/cleanup-logs.test.ts deleted file mode 100644 index a6e3e74c735..00000000000 --- a/apps/sim/background/cleanup-logs.test.ts +++ /dev/null @@ -1,104 +0,0 @@ -/** - * @vitest-environment node - */ -import { describe, expect, it, vi } from 'vitest' -import { collectExecutionLargeValueKeys } from '@/background/cleanup-logs' - -vi.mock('@trigger.dev/sdk', () => ({ - task: vi.fn((definition) => definition), -})) - -describe('collectExecutionLargeValueKeys', () => { - it('collects large-value keys owned by the purged execution', () => { - const ownedRef = { - __simLargeValueRef: true, - version: 1, - id: 'lv_OWNEDREF0001', - kind: 'object', - size: 128, - key: 'execution/workspace-1/workflow-1/execution-1/large-value-lv_OWNEDREF0001.json', - executionId: 'execution-1', - } - const ownedManifest = { - __simLargeArrayManifest: true, - version: 2, - kind: 'array', - totalCount: 1, - chunkCount: 1, - byteSize: 128, - chunks: [ - { - ref: { - __simLargeValueRef: true, - version: 1, - id: 'lv_CHUNKREF0001', - kind: 'array', - size: 128, - key: 'execution/workspace-1/workflow-1/execution-1/large-value-lv_CHUNKREF0001.json', - executionId: 'execution-1', - }, - count: 1, - byteSize: 128, - }, - ], - preview: [], - } - const inheritedRef = { - __simLargeValueRef: true, - version: 1, - id: 'lv_INHERITED001', - kind: 'object', - size: 128, - key: 'execution/workspace-1/workflow-1/source-execution/large-value-lv_INHERITED001.json', - executionId: 'source-execution', - } - - const keys = collectExecutionLargeValueKeys( - { - output: { - ownedRef, - ownedManifest, - inheritedRef, - }, - }, - 'execution-1' - ) - - expect(keys.sort()).toEqual([ - 'execution/workspace-1/workflow-1/execution-1/large-value-lv_CHUNKREF0001.json', - 'execution/workspace-1/workflow-1/execution-1/large-value-lv_OWNEDREF0001.json', - ]) - }) - - it('deduplicates repeated refs and ignores keyless cache-only refs', () => { - const ref = { - __simLargeValueRef: true, - version: 1, - id: 'lv_REPEATREF001', - kind: 'string', - size: 128, - key: 'execution/workspace-1/workflow-1/execution-1/large-value-lv_REPEATREF001.json', - executionId: 'execution-1', - } - - const keys = collectExecutionLargeValueKeys( - { - first: ref, - second: ref, - keyless: { - __simLargeValueRef: true, - version: 1, - id: 'lv_KEYLESSREF01', - kind: 'string', - size: 128, - executionId: 'execution-1', - }, - }, - 'execution-1' - ) - - expect(keys).toEqual([ - 'execution/workspace-1/workflow-1/execution-1/large-value-lv_REPEATREF001.json', - ]) - }) -}) diff --git a/apps/sim/background/cleanup-logs.ts b/apps/sim/background/cleanup-logs.ts index b69d4002610..1f09fa16ecc 100644 --- a/apps/sim/background/cleanup-logs.ts +++ b/apps/sim/background/cleanup-logs.ts @@ -2,14 +2,14 @@ import { db } from '@sim/db' import { jobExecutionLogs, pausedExecutions, workflowExecutionLogs } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { task } from '@trigger.dev/sdk' -import { and, eq, inArray, isNull, lt, notInArray, or } from 'drizzle-orm' +import { and, eq, inArray, isNull, lt, notInArray, or, sql } from 'drizzle-orm' import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher' import { batchDeleteByWorkspaceAndTimestamp, chunkedBatchDelete, type TableCleanupResult, } from '@/lib/cleanup/batch-delete' -import { isLargeValueRef } from '@/lib/execution/payloads/large-value-ref' +import { collectLargeValueKeys } from '@/lib/execution/payloads/large-execution-value' import { snapshotService } from '@/lib/logs/execution/snapshot/service' import { isUsingCloudStorage, StorageService } from '@/lib/uploads' import { deleteFileMetadata } from '@/lib/uploads/server/metadata' @@ -27,53 +27,31 @@ interface FileDeleteStats { const RESUMABLE_PAUSED_STATUSES = ['paused', 'partially_resumed', 'cancelling'] -export function collectExecutionLargeValueKeys(value: unknown, executionId: string): string[] { - const keys = new Set() - collectExecutionLargeValueKeysInto(value, executionId, new WeakSet(), keys) - return Array.from(keys) -} - -function getExecutionIdFromStorageKey(key: string): string | undefined { - const parts = key.split('/') - if (parts[0] !== 'execution' || parts.length < 5) { - return undefined - } - return parts[3] -} - -function collectExecutionLargeValueKeysInto( - value: unknown, - executionId: string, - seen: WeakSet, - keys: Set -): void { - if (!value || typeof value !== 'object') { - return - } - - if (seen.has(value)) { - return - } - - if (isLargeValueRef(value)) { - if (value.key && getExecutionIdFromStorageKey(value.key) === executionId) { - keys.add(value.key) - } - return - } - - seen.add(value) +async function filterLargeValueKeysWithoutRetainedReferences( + keys: string[], + deletedLogIds: string[] +): Promise { + if (keys.length === 0 || deletedLogIds.length === 0) return [] + + const unreferencedKeys: string[] = [] + for (const key of Array.from(new Set(keys))) { + const [referencingLog] = await db + .select({ id: workflowExecutionLogs.id }) + .from(workflowExecutionLogs) + .where( + and( + notInArray(workflowExecutionLogs.id, deletedLogIds), + sql`position(${key} in ${workflowExecutionLogs.executionData}::text) > 0` + ) + ) + .limit(1) - if (Array.isArray(value)) { - for (const item of value) { - collectExecutionLargeValueKeysInto(item, executionId, seen, keys) + if (!referencingLog) { + unreferencedKeys.push(key) } - return } - for (const entryValue of Object.values(value)) { - collectExecutionLargeValueKeysInto(entryValue, executionId, seen, keys) - } + return unreferencedKeys } async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Promise { @@ -159,13 +137,17 @@ async function cleanupWorkflowExecutionLogs( ) .limit(limit), onBatch: async (rows) => { + const deletedLogIds = rows.map((row) => row.id) + const largeValueKeys = rows.flatMap((row) => collectLargeValueKeys(row.executionData)) + const unreferencedLargeValueKeys = await filterLargeValueKeysWithoutRetainedReferences( + largeValueKeys, + deletedLogIds + ) + for (const row of rows) { await deleteExecutionFiles(row.files, fileStats) - await deleteLargeValueStorageKeys( - collectExecutionLargeValueKeys(row.executionData, row.executionId), - fileStats - ) } + await deleteLargeValueStorageKeys(unreferencedLargeValueKeys, fileStats) }, }) diff --git a/apps/sim/lib/billing/cleanup-dispatcher.ts b/apps/sim/lib/billing/cleanup-dispatcher.ts index 2841d7f28ef..3c1c5cd81d8 100644 --- a/apps/sim/lib/billing/cleanup-dispatcher.ts +++ b/apps/sim/lib/billing/cleanup-dispatcher.ts @@ -5,13 +5,13 @@ import { createLogger } from '@sim/logger' import { tasks } from '@trigger.dev/sdk' import { eq, isNull } from 'drizzle-orm' import { getOrganizationSubscription } from '@/lib/billing/core/billing' -import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' +import { getHighestPriorityPersonalSubscription } from '@/lib/billing/core/subscription' import { getPlanType, type PlanCategory } from '@/lib/billing/plan-helpers' import { getJobQueue } from '@/lib/core/async-jobs' import { shouldExecuteInline } from '@/lib/core/async-jobs/config' import type { EnqueueOptions } from '@/lib/core/async-jobs/types' import { isTriggerAvailable } from '@/lib/knowledge/documents/service' -import { isOrganizationWorkspace } from '@/lib/workspaces/policy' +import { isOrganizationWorkspace, WORKSPACE_MODE } from '@/lib/workspaces/policy' const logger = createLogger('RetentionDispatcher') @@ -94,14 +94,16 @@ async function listActiveWorkspaceCleanupScopeRows(): Promise> { const billedUserIds = Array.from(new Set(rows.map((row) => row.billedAccountUserId))) const entries = await Promise.all( billedUserIds.map(async (userId) => { try { - const subscription = await getHighestPrioritySubscription(userId, { onError: 'throw' }) + const subscription = await getHighestPriorityPersonalSubscription(userId, { + onError: 'throw', + }) return [userId, getPlanType(subscription?.plan)] as const } catch (error) { logger.error('Skipping cleanup for billed user after plan lookup failed', { @@ -119,16 +121,32 @@ async function resolvePlanTypesByBilledUserId( async function resolvePlanTypesByWorkspaceId( rows: WorkspaceCleanupScopeRow[] ): Promise> { - const userScopedRows = rows.filter((row) => !isOrganizationWorkspace(row)) - const userPlanByBilledUserId = await resolvePlanTypesByBilledUserId(userScopedRows) + const userScopedRows = rows.filter((row) => row.workspaceMode !== WORKSPACE_MODE.ORGANIZATION) + const userPlanByBilledUserId = await resolvePersonalPlanTypesByBilledUserId(userScopedRows) const entries = await Promise.all( rows.map(async (row) => { - const organizationId = isOrganizationWorkspace(row) ? row.organizationId : null - if (organizationId) { + if (row.workspaceMode === WORKSPACE_MODE.ORGANIZATION) { + const organizationId = isOrganizationWorkspace(row) ? row.organizationId : null + if (!organizationId) { + logger.error('Skipping cleanup for malformed organization workspace', { + workspaceId: row.id, + organizationId: row.organizationId, + }) + return null + } + try { const subscription = await getOrganizationSubscription(organizationId, { onError: 'throw', }) + if (!subscription) { + logger.warn('Skipping cleanup for organization workspace without an org subscription', { + workspaceId: row.id, + organizationId, + }) + return null + } + return [row.id, getPlanType(subscription?.plan)] as const } catch (error) { logger.error('Skipping cleanup for organization workspace after plan lookup failed', { @@ -190,12 +208,26 @@ export async function resolveCleanupScope( } const [row] = await db - .select({ settings: organization.dataRetentionSettings }) + .select({ + id: workspace.id, + organizationId: workspace.organizationId, + workspaceMode: workspace.workspaceMode, + billedAccountUserId: workspace.billedAccountUserId, + settings: organization.dataRetentionSettings, + }) .from(workspace) .innerJoin(organization, eq(organization.id, workspace.organizationId)) .where(eq(workspace.id, payload.workspaceId)) .limit(1) + if (!row || !isOrganizationWorkspace(row)) return null + + const organizationId = row.organizationId + if (!organizationId) return null + + const subscription = await getOrganizationSubscription(organizationId, { onError: 'throw' }) + if (getPlanType(subscription?.plan) !== 'enterprise') return null + const hours = row?.settings?.[config.key] if (hours == null) return null diff --git a/apps/sim/lib/billing/core/plan.ts b/apps/sim/lib/billing/core/plan.ts index d517acce4d0..8bbd516ee10 100644 --- a/apps/sim/lib/billing/core/plan.ts +++ b/apps/sim/lib/billing/core/plan.ts @@ -17,6 +17,48 @@ interface GetHighestPrioritySubscriptionOptions { onError?: 'return-null' | 'throw' } +function pickHighestPrioritySubscription( + subscriptions: TSubscription[], + predicates: Array<(subscription: TSubscription) => boolean> +): TSubscription | null { + for (const predicate of predicates) { + const match = subscriptions.find(predicate) + if (match) return match + } + + return null +} + +export async function getHighestPriorityPersonalSubscription( + userId: string, + options: GetHighestPrioritySubscriptionOptions = {} +) { + const { onError = 'return-null' } = options + try { + const personalSubs = await db + .select() + .from(subscription) + .where( + and( + eq(subscription.referenceId, userId), + inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES) + ) + ) + + return pickHighestPrioritySubscription(personalSubs, [ + checkEnterprisePlan, + checkTeamPlan, + checkProPlan, + ]) + } catch (error) { + logger.error('Error getting highest priority personal subscription', { error, userId }) + if (onError === 'throw') { + throw error + } + return null + } +} + /** * Get the highest priority paid subscription for a user. * @@ -79,20 +121,10 @@ export async function getHighestPrioritySubscription( if (personalSubs.length === 0 && orgSubs.length === 0) return null - // Within each tier, prefer org-scoped over personally-scoped. - const pickAtTier = (predicate: (sub: (typeof personalSubs)[number]) => boolean) => - orgSubs.find(predicate) ?? personalSubs.find(predicate) - - const enterpriseSub = pickAtTier(checkEnterprisePlan) - if (enterpriseSub) return enterpriseSub - - const teamSub = pickAtTier(checkTeamPlan) - if (teamSub) return teamSub - - const proSub = pickAtTier(checkProPlan) - if (proSub) return proSub - - return null + return pickHighestPrioritySubscription( + [...orgSubs, ...personalSubs], + [checkEnterprisePlan, checkTeamPlan, checkProPlan] + ) } catch (error) { logger.error('Error getting highest priority subscription', { error, userId }) if (onError === 'throw') { diff --git a/apps/sim/lib/billing/core/subscription.ts b/apps/sim/lib/billing/core/subscription.ts index 75dfd706291..550fdcd3400 100644 --- a/apps/sim/lib/billing/core/subscription.ts +++ b/apps/sim/lib/billing/core/subscription.ts @@ -3,7 +3,10 @@ import { member, organization, subscription, user } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq, inArray, sql } from 'drizzle-orm' import { getEffectiveBillingStatus, isOrganizationBillingBlocked } from '@/lib/billing/core/access' -import { getHighestPrioritySubscription } from '@/lib/billing/core/plan' +import { + getHighestPriorityPersonalSubscription, + getHighestPrioritySubscription, +} from '@/lib/billing/core/plan' import { getPlanTierCredits, isPro as isPlanPro, @@ -29,7 +32,7 @@ import { getBaseUrl } from '@/lib/core/utils/urls' const logger = createLogger('SubscriptionCore') -export { getHighestPrioritySubscription } +export { getHighestPriorityPersonalSubscription, getHighestPrioritySubscription } export interface SubscriptionMetadata { billingInterval?: 'month' | 'year'