From 896d2f96f550cb0ed5560847984a1732c21ff106 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sun, 17 May 2026 22:17:48 +0100 Subject: [PATCH 1/2] fix(sdk): cache realtime-stream create response per (runId, key) `chat.response.write`, `streams.writer`, and other one-shot writers each call `streams.pipe` internally, which constructs a fresh `StreamInstance` and issues `PUT /realtime/v1/streams/:runId/self/:key` on every call. Each PUT does `UPDATE "TaskRun" SET realtimeStreams = realtimeStreams || $1`, so a per-chunk writer loop in a chat-agent turn produces one row-update per chunk and bloats the array with thousands of duplicate entries. Under concurrency this monopolised the writer pool with tuple-lock contention on a single TaskRun row. Cache the `createStream` response in `StandardRealtimeStreamsManager` keyed by `${runId}:${key}`. First call PUTs as before; subsequent calls reuse the cached promise and construct a `StreamsWriterV2` straight from the cached S2 access token / basin / stream name. Net effect for hot-loop writers is one PUT per `(run, stream-key)` for the lifetime of the SDK process (S2 tokens default to a 1-day server-side TTL). Cache evicts on `createStream` failure and on `manager.reset()`. --- .changeset/realtime-streams-creds-cache.md | 5 +++ .../core/src/v3/realtimeStreams/manager.ts | 35 ++++++++++++++++++- .../src/v3/realtimeStreams/streamInstance.ts | 30 ++++++++++++---- 3 files changed, 63 insertions(+), 7 deletions(-) create mode 100644 .changeset/realtime-streams-creds-cache.md diff --git a/.changeset/realtime-streams-creds-cache.md b/.changeset/realtime-streams-creds-cache.md new file mode 100644 index 00000000000..b8d0deb0d51 --- /dev/null +++ b/.changeset/realtime-streams-creds-cache.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Cache the `PUT /realtime/v1/streams/:runId/self/:key` response per `(runId, key)` so repeated `streams.pipe()` / `chat.response.write` / `chat.stream.writer` calls reuse the same S2 credentials instead of issuing a fresh PUT (and the `realtimeStreams || $1` array push it triggers on `TaskRun`) for every chunk. Hot-loop writers, most notably `chat.response.write` called per chunk inside a `chat.agent` turn, now do one PUT per `(run, stream-key)` instead of one per write, eliminating the writer-pool lock contention that scaled with the customer's chunk rate. S2 v2 access tokens are scoped to the org basin with a 1-day server-side TTL so reusing them across calls within a single run is safe; the cache evicts on `createStream` failure and on `manager.reset()`. diff --git a/packages/core/src/v3/realtimeStreams/manager.ts b/packages/core/src/v3/realtimeStreams/manager.ts index beda3535fb4..b8e0ddc9c41 100644 --- a/packages/core/src/v3/realtimeStreams/manager.ts +++ b/packages/core/src/v3/realtimeStreams/manager.ts @@ -1,7 +1,8 @@ import { ApiClient } from "../apiClient/index.js"; import { ensureAsyncIterable, ensureReadableStream } from "../streams/asyncIterableStream.js"; +import { AnyZodFetchOptions } from "../zodfetch.js"; import { taskContext } from "../task-context-api.js"; -import { StreamInstance } from "./streamInstance.js"; +import { CreateStreamResponseLike, StreamInstance } from "./streamInstance.js"; import { RealtimeStreamInstance, RealtimeStreamOperationOptions, @@ -21,8 +22,39 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager { abortController: AbortController; }>(); + // Cache of in-flight / resolved `createStream` responses, keyed by + // `${runId}:${key}`. S2 v2 access tokens are scoped to the org basin + // (default 1-day TTL server-side) so reusing them across repeated + // `pipe()` calls for the same `(runId, key)` is safe, and avoids the + // per-call PUT that pushes `streamId` onto `TaskRun.realtimeStreams`, + // which under chat-agent-style hot-loop writers caused row-lock + // contention on the writer DB. + private createStreamCache = new Map>(); + reset(): void { this.activeStreams.clear(); + this.createStreamCache.clear(); + } + + private getCachedCreateStream( + runId: string, + key: string, + requestOptions: AnyZodFetchOptions | undefined + ): Promise { + const cacheKey = `${runId}:${key}`; + const cached = this.createStreamCache.get(cacheKey); + if (cached) return cached; + + const promise = this.apiClient.createStream(runId, "self", key, requestOptions); + this.createStreamCache.set(cacheKey, promise); + // Evict on failure so the next call retries instead of returning a + // poisoned cache entry forever. + promise.catch(() => { + if (this.createStreamCache.get(cacheKey) === promise) { + this.createStreamCache.delete(cacheKey); + } + }); + return promise; } public pipe( @@ -58,6 +90,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager { requestOptions: options?.requestOptions, target: options?.target, debug: this.debug, + createStream: () => this.getCachedCreateStream(runId, key, options?.requestOptions), }); // Register this stream diff --git a/packages/core/src/v3/realtimeStreams/streamInstance.ts b/packages/core/src/v3/realtimeStreams/streamInstance.ts index 07ee0158bfb..e5cd3f84aea 100644 --- a/packages/core/src/v3/realtimeStreams/streamInstance.ts +++ b/packages/core/src/v3/realtimeStreams/streamInstance.ts @@ -5,6 +5,11 @@ import { StreamsWriterV1 } from "./streamsWriterV1.js"; import { StreamsWriterV2 } from "./streamsWriterV2.js"; import { StreamsWriter, StreamWriteResult } from "./types.js"; +export type CreateStreamResponseLike = { + version: string; + headers?: Record; +}; + export type StreamInstanceOptions = { apiClient: ApiClient; baseUrl: string; @@ -15,6 +20,14 @@ export type StreamInstanceOptions = { requestOptions?: AnyZodFetchOptions; target?: "self" | "parent" | "root" | string; debug?: boolean; + /** + * Optional override for the create-stream call. Defaults to + * `apiClient.createStream(runId, "self", key, requestOptions)`. The + * manager passes a cached version so repeated `pipe()` calls for the + * same `(runId, key)` share a single PUT instead of hammering the + * server on every chunk. + */ + createStream?: () => Promise; }; type StreamsWriterInstance = StreamsWriterV1 | StreamsWriterV2; @@ -27,12 +40,17 @@ export class StreamInstance implements StreamsWriter { } private async initializeWriter(): Promise> { - const { version, headers } = await this.options.apiClient.createStream( - this.options.runId, - "self", - this.options.key, - this.options?.requestOptions - ); + const createStreamFn = + this.options.createStream ?? + (() => + this.options.apiClient.createStream( + this.options.runId, + "self", + this.options.key, + this.options?.requestOptions + )); + + const { version, headers } = await createStreamFn(); const parsedResponse = parseCreateStreamResponse(version, headers); From 215c502050960eb4749a8d58261a39623d0e0013 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sun, 17 May 2026 22:42:06 +0100 Subject: [PATCH 2/2] ci: bump packageManager to pnpm@10.33.2 to match release workflow The release.yml on main now pins pnpm/action-setup v5 with version 10.33.2, and v5 errors if the workflow's version doesn't match package.json's packageManager. workflow_dispatch resolves the workflow YAML from the default branch (main), so our hotfix branch's tree needs to match the version main's release workflow installs. --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index e745c3c5642..ef35cd48e84 100644 --- a/package.json +++ b/package.json @@ -66,7 +66,7 @@ "vite-tsconfig-paths": "^4.0.5", "vitest": "3.1.4" }, - "packageManager": "pnpm@10.23.0", + "packageManager": "pnpm@10.33.2", "dependencies": { "@changesets/cli": "2.26.2", "@remix-run/changelog-github": "^0.0.5",