diff --git a/.changeset/realtime-streams-creds-cache.md b/.changeset/realtime-streams-creds-cache.md new file mode 100644 index 00000000000..b8d0deb0d51 --- /dev/null +++ b/.changeset/realtime-streams-creds-cache.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Cache the `PUT /realtime/v1/streams/:runId/self/:key` response per `(runId, key)` so repeated `streams.pipe()` / `chat.response.write` / `chat.stream.writer` calls reuse the same S2 credentials instead of issuing a fresh PUT (and the `realtimeStreams || $1` array push it triggers on `TaskRun`) for every chunk. Hot-loop writers, most notably `chat.response.write` called per chunk inside a `chat.agent` turn, now do one PUT per `(run, stream-key)` instead of one per write, eliminating the writer-pool lock contention that scaled with the customer's chunk rate. S2 v2 access tokens are scoped to the org basin with a 1-day server-side TTL so reusing them across calls within a single run is safe; the cache evicts on `createStream` failure and on `manager.reset()`. diff --git a/package.json b/package.json index e745c3c5642..ef35cd48e84 100644 --- a/package.json +++ b/package.json @@ -66,7 +66,7 @@ "vite-tsconfig-paths": "^4.0.5", "vitest": "3.1.4" }, - "packageManager": "pnpm@10.23.0", + "packageManager": "pnpm@10.33.2", "dependencies": { "@changesets/cli": "2.26.2", "@remix-run/changelog-github": "^0.0.5", diff --git a/packages/core/src/v3/realtimeStreams/manager.ts b/packages/core/src/v3/realtimeStreams/manager.ts index beda3535fb4..b8e0ddc9c41 100644 --- a/packages/core/src/v3/realtimeStreams/manager.ts +++ b/packages/core/src/v3/realtimeStreams/manager.ts @@ -1,7 +1,8 @@ import { ApiClient } from "../apiClient/index.js"; import { ensureAsyncIterable, ensureReadableStream } from "../streams/asyncIterableStream.js"; +import { AnyZodFetchOptions } from "../zodfetch.js"; import { taskContext } from "../task-context-api.js"; -import { StreamInstance } from "./streamInstance.js"; +import { CreateStreamResponseLike, StreamInstance } from "./streamInstance.js"; import { RealtimeStreamInstance, RealtimeStreamOperationOptions, @@ -21,8 +22,39 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager { abortController: AbortController; }>(); + // Cache of in-flight / resolved `createStream` responses, keyed by + // `${runId}:${key}`. S2 v2 access tokens are scoped to the org basin + // (default 1-day TTL server-side) so reusing them across repeated + // `pipe()` calls for the same `(runId, key)` is safe, and avoids the + // per-call PUT that pushes `streamId` onto `TaskRun.realtimeStreams`, + // which under chat-agent-style hot-loop writers caused row-lock + // contention on the writer DB. + private createStreamCache = new Map>(); + reset(): void { this.activeStreams.clear(); + this.createStreamCache.clear(); + } + + private getCachedCreateStream( + runId: string, + key: string, + requestOptions: AnyZodFetchOptions | undefined + ): Promise { + const cacheKey = `${runId}:${key}`; + const cached = this.createStreamCache.get(cacheKey); + if (cached) return cached; + + const promise = this.apiClient.createStream(runId, "self", key, requestOptions); + this.createStreamCache.set(cacheKey, promise); + // Evict on failure so the next call retries instead of returning a + // poisoned cache entry forever. + promise.catch(() => { + if (this.createStreamCache.get(cacheKey) === promise) { + this.createStreamCache.delete(cacheKey); + } + }); + return promise; } public pipe( @@ -58,6 +90,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager { requestOptions: options?.requestOptions, target: options?.target, debug: this.debug, + createStream: () => this.getCachedCreateStream(runId, key, options?.requestOptions), }); // Register this stream diff --git a/packages/core/src/v3/realtimeStreams/streamInstance.ts b/packages/core/src/v3/realtimeStreams/streamInstance.ts index 07ee0158bfb..e5cd3f84aea 100644 --- a/packages/core/src/v3/realtimeStreams/streamInstance.ts +++ b/packages/core/src/v3/realtimeStreams/streamInstance.ts @@ -5,6 +5,11 @@ import { StreamsWriterV1 } from "./streamsWriterV1.js"; import { StreamsWriterV2 } from "./streamsWriterV2.js"; import { StreamsWriter, StreamWriteResult } from "./types.js"; +export type CreateStreamResponseLike = { + version: string; + headers?: Record; +}; + export type StreamInstanceOptions = { apiClient: ApiClient; baseUrl: string; @@ -15,6 +20,14 @@ export type StreamInstanceOptions = { requestOptions?: AnyZodFetchOptions; target?: "self" | "parent" | "root" | string; debug?: boolean; + /** + * Optional override for the create-stream call. Defaults to + * `apiClient.createStream(runId, "self", key, requestOptions)`. The + * manager passes a cached version so repeated `pipe()` calls for the + * same `(runId, key)` share a single PUT instead of hammering the + * server on every chunk. + */ + createStream?: () => Promise; }; type StreamsWriterInstance = StreamsWriterV1 | StreamsWriterV2; @@ -27,12 +40,17 @@ export class StreamInstance implements StreamsWriter { } private async initializeWriter(): Promise> { - const { version, headers } = await this.options.apiClient.createStream( - this.options.runId, - "self", - this.options.key, - this.options?.requestOptions - ); + const createStreamFn = + this.options.createStream ?? + (() => + this.options.apiClient.createStream( + this.options.runId, + "self", + this.options.key, + this.options?.requestOptions + )); + + const { version, headers } = await createStreamFn(); const parsedResponse = parseCreateStreamResponse(version, headers);