diff --git a/apps/sim/background/cleanup-logs.ts b/apps/sim/background/cleanup-logs.ts index 41fb5189357..1f09fa16ecc 100644 --- a/apps/sim/background/cleanup-logs.ts +++ b/apps/sim/background/cleanup-logs.ts @@ -1,14 +1,15 @@ import { db } from '@sim/db' -import { jobExecutionLogs, workflowExecutionLogs } from '@sim/db/schema' +import { jobExecutionLogs, pausedExecutions, workflowExecutionLogs } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { task } from '@trigger.dev/sdk' -import { and, inArray, lt } from 'drizzle-orm' +import { and, eq, inArray, isNull, lt, notInArray, or, sql } from 'drizzle-orm' import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher' import { batchDeleteByWorkspaceAndTimestamp, chunkedBatchDelete, type TableCleanupResult, } from '@/lib/cleanup/batch-delete' +import { collectLargeValueKeys } from '@/lib/execution/payloads/large-execution-value' import { snapshotService } from '@/lib/logs/execution/snapshot/service' import { isUsingCloudStorage, StorageService } from '@/lib/uploads' import { deleteFileMetadata } from '@/lib/uploads/server/metadata' @@ -19,6 +20,38 @@ interface FileDeleteStats { filesTotal: number filesDeleted: number filesDeleteFailed: number + largeValuesTotal: number + largeValuesDeleted: number + largeValuesDeleteFailed: number +} + +const RESUMABLE_PAUSED_STATUSES = ['paused', 'partially_resumed', 'cancelling'] + +async function filterLargeValueKeysWithoutRetainedReferences( + keys: string[], + deletedLogIds: string[] +): Promise { + if (keys.length === 0 || deletedLogIds.length === 0) return [] + + const unreferencedKeys: string[] = [] + for (const key of Array.from(new Set(keys))) { + const [referencingLog] = await db + .select({ id: workflowExecutionLogs.id }) + .from(workflowExecutionLogs) + .where( + and( + notInArray(workflowExecutionLogs.id, deletedLogIds), + sql`position(${key} in ${workflowExecutionLogs.executionData}::text) > 0` + ) + ) + .limit(1) + + if (!referencingLog) { + unreferencedKeys.push(key) + } + } + + return unreferencedKeys } async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Promise { @@ -41,12 +74,39 @@ async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Pro ) } +async function deleteLargeValueStorageKeys(keys: string[], stats: FileDeleteStats): Promise { + if (!isUsingCloudStorage() || keys.length === 0) return + + const uniqueKeys = Array.from(new Set(keys)) + stats.largeValuesTotal += uniqueKeys.length + + await Promise.all( + uniqueKeys.map(async (key) => { + try { + await StorageService.deleteFile({ key, context: 'execution' }) + await deleteFileMetadata(key) + stats.largeValuesDeleted++ + } catch (error) { + stats.largeValuesDeleteFailed++ + logger.error(`Failed to delete large execution value ${key}:`, { error }) + } + }) + ) +} + async function cleanupWorkflowExecutionLogs( workspaceIds: string[], retentionDate: Date, label: string ): Promise { - const fileStats: FileDeleteStats = { filesTotal: 0, filesDeleted: 0, filesDeleteFailed: 0 } + const fileStats: FileDeleteStats = { + filesTotal: 0, + filesDeleted: 0, + filesDeleteFailed: 0, + largeValuesTotal: 0, + largeValuesDeleted: 0, + largeValuesDeleteFailed: 0, + } const dbStats = await chunkedBatchDelete({ tableDef: workflowExecutionLogs, @@ -54,23 +114,63 @@ async function cleanupWorkflowExecutionLogs( tableName: `${label}/workflow_execution_logs`, selectChunk: (chunkIds, limit) => db - .select({ id: workflowExecutionLogs.id, files: workflowExecutionLogs.files }) + .select({ + id: workflowExecutionLogs.id, + executionId: workflowExecutionLogs.executionId, + executionData: workflowExecutionLogs.executionData, + files: workflowExecutionLogs.files, + }) .from(workflowExecutionLogs) + .leftJoin( + pausedExecutions, + eq(pausedExecutions.executionId, workflowExecutionLogs.executionId) + ) .where( and( inArray(workflowExecutionLogs.workspaceId, chunkIds), - lt(workflowExecutionLogs.startedAt, retentionDate) + lt(workflowExecutionLogs.startedAt, retentionDate), + or( + isNull(pausedExecutions.status), + notInArray(pausedExecutions.status, RESUMABLE_PAUSED_STATUSES) + ) ) ) .limit(limit), onBatch: async (rows) => { - for (const row of rows) await deleteExecutionFiles(row.files, fileStats) + const deletedLogIds = rows.map((row) => row.id) + const largeValueKeys = rows.flatMap((row) => collectLargeValueKeys(row.executionData)) + const unreferencedLargeValueKeys = await filterLargeValueKeysWithoutRetainedReferences( + largeValueKeys, + deletedLogIds + ) + + for (const row of rows) { + await deleteExecutionFiles(row.files, fileStats) + } + await deleteLargeValueStorageKeys(unreferencedLargeValueKeys, fileStats) }, }) return { ...dbStats, ...fileStats } } +async function cleanupFreePlanOrphanedSnapshots( + payload: CleanupJobPayload, + retentionHours: number +): Promise { + if (payload.plan !== 'free') { + return + } + + try { + const retentionDays = Math.floor(retentionHours / 24) + const snapshotsCleaned = await snapshotService.cleanupOrphanedSnapshots(retentionDays + 1) + logger.info(`Cleaned up ${snapshotsCleaned} orphaned snapshots`) + } catch (snapshotError) { + logger.error('Error cleaning up orphaned snapshots:', { snapshotError }) + } +} + export async function runCleanupLogs(payload: CleanupJobPayload): Promise { const startTime = Date.now() @@ -82,12 +182,14 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise const { workspaceIds, retentionHours, label } = scope + const retentionDate = new Date(Date.now() - retentionHours * 60 * 60 * 1000) + if (workspaceIds.length === 0) { logger.info(`[${label}] No workspaces to process`) + await cleanupFreePlanOrphanedSnapshots(payload, retentionHours) return } - const retentionDate = new Date(Date.now() - retentionHours * 60 * 60 * 1000) logger.info( `[${label}] Cleaning ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}` ) @@ -96,6 +198,9 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise logger.info( `[${label}] workflow_execution_logs files: ${workflowResults.filesDeleted}/${workflowResults.filesTotal} deleted, ${workflowResults.filesDeleteFailed} failed` ) + logger.info( + `[${label}] workflow_execution_logs large values: ${workflowResults.largeValuesDeleted}/${workflowResults.largeValuesTotal} deleted, ${workflowResults.largeValuesDeleteFailed} failed` + ) await batchDeleteByWorkspaceAndTimestamp({ tableDef: jobExecutionLogs, @@ -106,16 +211,7 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise tableName: `${label}/job_execution_logs`, }) - // Snapshot cleanup runs only on the free job to avoid running it N times for N enterprise workspaces. - if (payload.plan === 'free') { - try { - const retentionDays = Math.floor(retentionHours / 24) - const snapshotsCleaned = await snapshotService.cleanupOrphanedSnapshots(retentionDays + 1) - logger.info(`Cleaned up ${snapshotsCleaned} orphaned snapshots`) - } catch (snapshotError) { - logger.error('Error cleaning up orphaned snapshots:', { snapshotError }) - } - } + await cleanupFreePlanOrphanedSnapshots(payload, retentionHours) const timeElapsed = (Date.now() - startTime) / 1000 logger.info(`[${label}] Job completed in ${timeElapsed.toFixed(2)}s`) diff --git a/apps/sim/lib/billing/cleanup-dispatcher.ts b/apps/sim/lib/billing/cleanup-dispatcher.ts index 2ee91159b70..3c1c5cd81d8 100644 --- a/apps/sim/lib/billing/cleanup-dispatcher.ts +++ b/apps/sim/lib/billing/cleanup-dispatcher.ts @@ -1,14 +1,17 @@ import { db } from '@sim/db' -import { organization, subscription, workspace } from '@sim/db/schema' +import type { WorkspaceMode } from '@sim/db/schema' +import { organization, workspace } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { tasks } from '@trigger.dev/sdk' -import { and, eq, inArray, isNotNull, isNull, sql } from 'drizzle-orm' -import { type PlanCategory, sqlIsPaid, sqlIsPro, sqlIsTeam } from '@/lib/billing/plan-helpers' -import { ENTITLED_SUBSCRIPTION_STATUSES } from '@/lib/billing/subscriptions/utils' +import { eq, isNull } from 'drizzle-orm' +import { getOrganizationSubscription } from '@/lib/billing/core/billing' +import { getHighestPriorityPersonalSubscription } from '@/lib/billing/core/subscription' +import { getPlanType, type PlanCategory } from '@/lib/billing/plan-helpers' import { getJobQueue } from '@/lib/core/async-jobs' import { shouldExecuteInline } from '@/lib/core/async-jobs/config' import type { EnqueueOptions } from '@/lib/core/async-jobs/types' import { isTriggerAvailable } from '@/lib/knowledge/documents/service' +import { isOrganizationWorkspace, WORKSPACE_MODE } from '@/lib/workspaces/policy' const logger = createLogger('RetentionDispatcher') @@ -38,8 +41,18 @@ interface CleanupJobConfig { defaults: Record } +interface WorkspaceCleanupScopeRow { + id: string + billedAccountUserId: string + organizationId: string | null + workspaceMode: WorkspaceMode + organizationSettings: OrganizationRetentionSettings | null +} + const DAY = 24 +type PlanResolutionEntry = readonly [string, PlanCategory] + /** * Single source of truth for cleanup retention: which key each job type reads * from `organization.dataRetentionSettings`, and the default retention (in @@ -61,44 +74,112 @@ export const CLEANUP_CONFIG = { }, } as const satisfies Record -/** - * Bulk-lookup workspace IDs for a non-enterprise plan category. Enterprise is - * per-workspace (routed through the owning organization's retention config). - */ -async function resolveWorkspaceIdsForPlan(plan: NonEnterprisePlan): Promise { - if (plan === 'free') { - const rows = await db - .select({ id: workspace.id }) - .from(workspace) - .leftJoin( - subscription, - and( - eq(subscription.referenceId, workspace.billedAccountUserId), - inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES), - sqlIsPaid(subscription.plan) - ) - ) - .where(and(isNull(subscription.id), isNull(workspace.archivedAt))) - - return rows.map((r) => r.id) - } - - const planPredicate = plan === 'pro' ? sqlIsPro(subscription.plan) : sqlIsTeam(subscription.plan) +async function listActiveWorkspaceCleanupScopeRows(): Promise { const rows = await db - .select({ id: workspace.id }) + .select({ + id: workspace.id, + billedAccountUserId: workspace.billedAccountUserId, + organizationId: workspace.organizationId, + workspaceMode: workspace.workspaceMode, + organizationSettings: organization.dataRetentionSettings, + }) .from(workspace) - .innerJoin( - subscription, - and( - eq(subscription.referenceId, workspace.billedAccountUserId), - inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES), - planPredicate! - ) - ) + .leftJoin(organization, eq(organization.id, workspace.organizationId)) .where(isNull(workspace.archivedAt)) - .groupBy(workspace.id) - return rows.map((r) => r.id) + return rows.map((row) => ({ + ...row, + organizationSettings: + (row.organizationSettings as OrganizationRetentionSettings | null) ?? null, + })) +} + +async function resolvePersonalPlanTypesByBilledUserId( + rows: WorkspaceCleanupScopeRow[] +): Promise> { + const billedUserIds = Array.from(new Set(rows.map((row) => row.billedAccountUserId))) + const entries = await Promise.all( + billedUserIds.map(async (userId) => { + try { + const subscription = await getHighestPriorityPersonalSubscription(userId, { + onError: 'throw', + }) + return [userId, getPlanType(subscription?.plan)] as const + } catch (error) { + logger.error('Skipping cleanup for billed user after plan lookup failed', { + userId, + error, + }) + return null + } + }) + ) + + return new Map(entries.filter((entry): entry is PlanResolutionEntry => entry !== null)) +} + +async function resolvePlanTypesByWorkspaceId( + rows: WorkspaceCleanupScopeRow[] +): Promise> { + const userScopedRows = rows.filter((row) => row.workspaceMode !== WORKSPACE_MODE.ORGANIZATION) + const userPlanByBilledUserId = await resolvePersonalPlanTypesByBilledUserId(userScopedRows) + const entries = await Promise.all( + rows.map(async (row) => { + if (row.workspaceMode === WORKSPACE_MODE.ORGANIZATION) { + const organizationId = isOrganizationWorkspace(row) ? row.organizationId : null + if (!organizationId) { + logger.error('Skipping cleanup for malformed organization workspace', { + workspaceId: row.id, + organizationId: row.organizationId, + }) + return null + } + + try { + const subscription = await getOrganizationSubscription(organizationId, { + onError: 'throw', + }) + if (!subscription) { + logger.warn('Skipping cleanup for organization workspace without an org subscription', { + workspaceId: row.id, + organizationId, + }) + return null + } + + return [row.id, getPlanType(subscription?.plan)] as const + } catch (error) { + logger.error('Skipping cleanup for organization workspace after plan lookup failed', { + workspaceId: row.id, + organizationId, + error, + }) + return null + } + } + + const plan = userPlanByBilledUserId.get(row.billedAccountUserId) + if (plan === undefined) { + return null + } + + return [row.id, plan] as const + }) + ) + + return new Map(entries.filter((entry): entry is PlanResolutionEntry => entry !== null)) +} + +/** + * Bulk-lookup workspace IDs for a non-enterprise plan category using the same + * effective-plan lookup used by execution, limits, and workspace policy. + * Enterprise is per-workspace (routed through the owning organization's + * retention config). + */ +async function resolveWorkspaceIdsForPlan(plan: NonEnterprisePlan): Promise { + const rows = await listActiveWorkspaceCleanupScopeRows() + const planByWorkspaceId = await resolvePlanTypesByWorkspaceId(rows) + return rows.filter((row) => planByWorkspaceId.get(row.id) === plan).map((row) => row.id) } export interface ResolvedCleanupScope { @@ -127,12 +208,26 @@ export async function resolveCleanupScope( } const [row] = await db - .select({ settings: organization.dataRetentionSettings }) + .select({ + id: workspace.id, + organizationId: workspace.organizationId, + workspaceMode: workspace.workspaceMode, + billedAccountUserId: workspace.billedAccountUserId, + settings: organization.dataRetentionSettings, + }) .from(workspace) .innerJoin(organization, eq(organization.id, workspace.organizationId)) .where(eq(workspace.id, payload.workspaceId)) .limit(1) + if (!row || !isOrganizationWorkspace(row)) return null + + const organizationId = row.organizationId + if (!organizationId) return null + + const subscription = await getOrganizationSubscription(organizationId, { onError: 'throw' }) + if (getPlanType(subscription?.plan) !== 'enterprise') return null + const hours = row?.settings?.[config.key] if (hours == null) return null @@ -185,28 +280,13 @@ export async function dispatchCleanupJobs( jobIds.push(jobId) } - // Enterprise: workspaces whose owning org is on an active enterprise sub and - // has a non-NULL value for this job's retention key. groupBy dedupes in case - // multiple entitled subscription rows exist for the same org. - const enterpriseRows = await db - .select({ id: workspace.id }) - .from(workspace) - .innerJoin(organization, eq(organization.id, workspace.organizationId)) - .innerJoin( - subscription, - and( - eq(subscription.referenceId, organization.id), - inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES), - eq(subscription.plan, 'enterprise') - ) - ) - .where( - and( - isNull(workspace.archivedAt), - isNotNull(sql`${organization.dataRetentionSettings}->>${config.key}`) - ) - ) - .groupBy(workspace.id) + const activeWorkspaceRows = await listActiveWorkspaceCleanupScopeRows() + const planByWorkspaceId = await resolvePlanTypesByWorkspaceId(activeWorkspaceRows) + const enterpriseRows = activeWorkspaceRows.filter( + (row) => + planByWorkspaceId.get(row.id) === 'enterprise' && + row.organizationSettings?.[config.key] != null + ) const enterpriseCount = enterpriseRows.length diff --git a/apps/sim/lib/billing/core/billing.ts b/apps/sim/lib/billing/core/billing.ts index e56261a62a9..78c359ed569 100644 --- a/apps/sim/lib/billing/core/billing.ts +++ b/apps/sim/lib/billing/core/billing.ts @@ -28,6 +28,10 @@ import { createLogger } from '@sim/logger' const logger = createLogger('Billing') +interface GetOrganizationSubscriptionOptions { + onError?: 'return-null' | 'throw' +} + /** * Get the organization's subscription row when its status is one of * `ENTITLED_SUBSCRIPTION_STATUSES` (includes `past_due`). Use this @@ -37,7 +41,11 @@ const logger = createLogger('Billing') * (from `core/subscription.ts`), which excludes `past_due`. * Returns `null` when there is no entitled sub. */ -export async function getOrganizationSubscription(organizationId: string) { +export async function getOrganizationSubscription( + organizationId: string, + options: GetOrganizationSubscriptionOptions = {} +) { + const { onError = 'return-null' } = options try { const orgSubs = await db .select() @@ -54,6 +62,9 @@ export async function getOrganizationSubscription(organizationId: string) { return orgSubs.length > 0 ? orgSubs[0] : null } catch (error) { logger.error('Error getting organization subscription', { error, organizationId }) + if (onError === 'throw') { + throw error + } return null } } diff --git a/apps/sim/lib/billing/core/plan.ts b/apps/sim/lib/billing/core/plan.ts index cb8fea848db..8bbd516ee10 100644 --- a/apps/sim/lib/billing/core/plan.ts +++ b/apps/sim/lib/billing/core/plan.ts @@ -13,6 +13,52 @@ const logger = createLogger('PlanLookup') export type HighestPrioritySubscription = Awaited> +interface GetHighestPrioritySubscriptionOptions { + onError?: 'return-null' | 'throw' +} + +function pickHighestPrioritySubscription( + subscriptions: TSubscription[], + predicates: Array<(subscription: TSubscription) => boolean> +): TSubscription | null { + for (const predicate of predicates) { + const match = subscriptions.find(predicate) + if (match) return match + } + + return null +} + +export async function getHighestPriorityPersonalSubscription( + userId: string, + options: GetHighestPrioritySubscriptionOptions = {} +) { + const { onError = 'return-null' } = options + try { + const personalSubs = await db + .select() + .from(subscription) + .where( + and( + eq(subscription.referenceId, userId), + inArray(subscription.status, ENTITLED_SUBSCRIPTION_STATUSES) + ) + ) + + return pickHighestPrioritySubscription(personalSubs, [ + checkEnterprisePlan, + checkTeamPlan, + checkProPlan, + ]) + } catch (error) { + logger.error('Error getting highest priority personal subscription', { error, userId }) + if (onError === 'throw') { + throw error + } + return null + } +} + /** * Get the highest priority paid subscription for a user. * @@ -27,7 +73,11 @@ export type HighestPrioritySubscription = Awaited boolean) => - orgSubs.find(predicate) ?? personalSubs.find(predicate) - - const enterpriseSub = pickAtTier(checkEnterprisePlan) - if (enterpriseSub) return enterpriseSub - - const teamSub = pickAtTier(checkTeamPlan) - if (teamSub) return teamSub - - const proSub = pickAtTier(checkProPlan) - if (proSub) return proSub - - return null + return pickHighestPrioritySubscription( + [...orgSubs, ...personalSubs], + [checkEnterprisePlan, checkTeamPlan, checkProPlan] + ) } catch (error) { logger.error('Error getting highest priority subscription', { error, userId }) + if (onError === 'throw') { + throw error + } return null } } diff --git a/apps/sim/lib/billing/core/subscription.ts b/apps/sim/lib/billing/core/subscription.ts index 75dfd706291..550fdcd3400 100644 --- a/apps/sim/lib/billing/core/subscription.ts +++ b/apps/sim/lib/billing/core/subscription.ts @@ -3,7 +3,10 @@ import { member, organization, subscription, user } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq, inArray, sql } from 'drizzle-orm' import { getEffectiveBillingStatus, isOrganizationBillingBlocked } from '@/lib/billing/core/access' -import { getHighestPrioritySubscription } from '@/lib/billing/core/plan' +import { + getHighestPriorityPersonalSubscription, + getHighestPrioritySubscription, +} from '@/lib/billing/core/plan' import { getPlanTierCredits, isPro as isPlanPro, @@ -29,7 +32,7 @@ import { getBaseUrl } from '@/lib/core/utils/urls' const logger = createLogger('SubscriptionCore') -export { getHighestPrioritySubscription } +export { getHighestPriorityPersonalSubscription, getHighestPrioritySubscription } export interface SubscriptionMetadata { billingInterval?: 'month' | 'year'