diff --git a/apps/sim/lib/core/config/redis.ts b/apps/sim/lib/core/config/redis.ts index aea707daa00..04f976b44ae 100644 --- a/apps/sim/lib/core/config/redis.ts +++ b/apps/sim/lib/core/config/redis.ts @@ -1,7 +1,7 @@ import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' import { randomFloat } from '@sim/utils/random' -import Redis from 'ioredis' +import Redis, { type RedisOptions } from 'ioredis' import { env } from '@/lib/core/config/env' const logger = createLogger('Redis') @@ -16,7 +16,7 @@ const redisUrl = env.REDIS_URL * * For DNS hosts: no override needed, default verification works. */ -function resolveTlsOptions(url: string | undefined): { servername: string } | undefined { +function resolveRedisTlsOptions(url: string | undefined): { servername: string } | undefined { if (!url) return undefined let parsed: URL try { @@ -37,6 +37,23 @@ function resolveTlsOptions(url: string | undefined): { servername: string } | un return { servername: env.REDIS_TLS_SERVERNAME } } +/** + * Shared connection defaults — keepAlive, connectTimeout, enableOfflineQueue, + * and TLS SNI when REDIS_URL targets an IP. Every Redis client we open should + * spread this; callers add their own retry / timeout policy on top. + */ +export function getRedisConnectionDefaults( + url: string | undefined +): Pick { + const tls = resolveRedisTlsOptions(url) + return { + keepAlive: 1000, + connectTimeout: 10000, + enableOfflineQueue: true, + ...(tls ? { tls } : {}), + } +} + let globalRedisClient: Redis | null = null let pingFailures = 0 let pingInterval: NodeJS.Timeout | null = null @@ -117,18 +134,15 @@ export function getRedisClient(): Redis | null { if (globalRedisClient) return globalRedisClient // Outside the try/catch so config errors aren't silently swallowed. - const tls = resolveTlsOptions(redisUrl) + const defaults = getRedisConnectionDefaults(redisUrl) try { logger.info('Initializing Redis client') globalRedisClient = new Redis(redisUrl, { - keepAlive: 1000, - connectTimeout: 10000, + ...defaults, commandTimeout: 5000, maxRetriesPerRequest: 5, - enableOfflineQueue: true, - ...(tls ? { tls } : {}), retryStrategy: (times) => { if (times > 10) { diff --git a/apps/sim/lib/events/pubsub.ts b/apps/sim/lib/events/pubsub.ts index b299eafc055..f866d8d1459 100644 --- a/apps/sim/lib/events/pubsub.ts +++ b/apps/sim/lib/events/pubsub.ts @@ -9,6 +9,7 @@ import { EventEmitter } from 'events' import { createLogger } from '@sim/logger' import Redis, { type RedisOptions } from 'ioredis' import { env } from '@/lib/core/config/env' +import { getRedisConnectionDefaults } from '@/lib/core/config/redis' const logger = createLogger('PubSub') @@ -31,13 +32,12 @@ class RedisPubSubChannel implements PubSubChannel { constructor( redisUrl: string, + connectionDefaults: ReturnType, private config: PubSubChannelConfig ) { const commonOpts = { - keepAlive: 1000, - connectTimeout: 10000, + ...connectionDefaults, maxRetriesPerRequest: null, - enableOfflineQueue: true, retryStrategy: (times: number) => { if (times > 10) return 30000 return Math.min(times * 500, 5000) @@ -139,16 +139,18 @@ class LocalPubSubChannel implements PubSubChannel { export function createPubSubChannel(config: PubSubChannelConfig): PubSubChannel { const redisUrl = env.REDIS_URL - - if (redisUrl) { - try { - logger.info(`${config.label}: Using Redis`) - return new RedisPubSubChannel(redisUrl, config) - } catch (err) { - logger.error(`Failed to create Redis ${config.label}, falling back to local:`, err) - return new LocalPubSubChannel(config) - } + if (!redisUrl) return new LocalPubSubChannel(config) + + // Resolve config-derived defaults outside the try so a missing + // REDIS_TLS_SERVERNAME (config error) surfaces instead of silently degrading + // to the in-process EventEmitter — that would break cross-replica pub/sub. + const connectionDefaults = getRedisConnectionDefaults(redisUrl) + + try { + logger.info(`${config.label}: Using Redis`) + return new RedisPubSubChannel(redisUrl, connectionDefaults, config) + } catch (err) { + logger.error(`Failed to create Redis ${config.label}, falling back to local:`, err) + return new LocalPubSubChannel(config) } - - return new LocalPubSubChannel(config) }