Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/realtime-streams-creds-cache.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Cache the `PUT /realtime/v1/streams/:runId/self/:key` response per `(runId, key)` so repeated `streams.pipe()` / `chat.response.write` / `chat.stream.writer` calls reuse the same S2 credentials instead of issuing a fresh PUT (and the `realtimeStreams || $1` array push it triggers on `TaskRun`) for every chunk. Hot-loop writers, most notably `chat.response.write` called per chunk inside a `chat.agent` turn, now do one PUT per `(run, stream-key)` instead of one per write, eliminating the writer-pool lock contention that scaled with the customer's chunk rate. S2 v2 access tokens are scoped to the org basin with a 1-day server-side TTL so reusing them across calls within a single run is safe; the cache evicts on `createStream` failure and on `manager.reset()`.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
"vite-tsconfig-paths": "^4.0.5",
"vitest": "3.1.4"
},
"packageManager": "pnpm@10.23.0",
"packageManager": "pnpm@10.33.2",
"dependencies": {
"@changesets/cli": "2.26.2",
"@remix-run/changelog-github": "^0.0.5",
Expand Down
35 changes: 34 additions & 1 deletion packages/core/src/v3/realtimeStreams/manager.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { ApiClient } from "../apiClient/index.js";
import { ensureAsyncIterable, ensureReadableStream } from "../streams/asyncIterableStream.js";
import { AnyZodFetchOptions } from "../zodfetch.js";
import { taskContext } from "../task-context-api.js";
import { StreamInstance } from "./streamInstance.js";
import { CreateStreamResponseLike, StreamInstance } from "./streamInstance.js";
import {
RealtimeStreamInstance,
RealtimeStreamOperationOptions,
Expand All @@ -21,8 +22,39 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
abortController: AbortController;
}>();

// Cache of in-flight / resolved `createStream` responses, keyed by
// `${runId}:${key}`. S2 v2 access tokens are scoped to the org basin
// (default 1-day TTL server-side) so reusing them across repeated
// `pipe()` calls for the same `(runId, key)` is safe, and avoids the
// per-call PUT that pushes `streamId` onto `TaskRun.realtimeStreams`,
// which under chat-agent-style hot-loop writers caused row-lock
// contention on the writer DB.
private createStreamCache = new Map<string, Promise<CreateStreamResponseLike>>();

reset(): void {
this.activeStreams.clear();
this.createStreamCache.clear();
}

private getCachedCreateStream(
runId: string,
key: string,
requestOptions: AnyZodFetchOptions | undefined
): Promise<CreateStreamResponseLike> {
const cacheKey = `${runId}:${key}`;
const cached = this.createStreamCache.get(cacheKey);
if (cached) return cached;

const promise = this.apiClient.createStream(runId, "self", key, requestOptions);
this.createStreamCache.set(cacheKey, promise);
// Evict on failure so the next call retries instead of returning a
// poisoned cache entry forever.
promise.catch(() => {
if (this.createStreamCache.get(cacheKey) === promise) {
this.createStreamCache.delete(cacheKey);
}
});
return promise;
}

public pipe<T>(
Expand Down Expand Up @@ -58,6 +90,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
requestOptions: options?.requestOptions,
target: options?.target,
debug: this.debug,
createStream: () => this.getCachedCreateStream(runId, key, options?.requestOptions),
});

// Register this stream
Expand Down
30 changes: 24 additions & 6 deletions packages/core/src/v3/realtimeStreams/streamInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import { StreamsWriterV1 } from "./streamsWriterV1.js";
import { StreamsWriterV2 } from "./streamsWriterV2.js";
import { StreamsWriter, StreamWriteResult } from "./types.js";

export type CreateStreamResponseLike = {
version: string;
headers?: Record<string, string>;
};

export type StreamInstanceOptions<T> = {
apiClient: ApiClient;
baseUrl: string;
Expand All @@ -15,6 +20,14 @@ export type StreamInstanceOptions<T> = {
requestOptions?: AnyZodFetchOptions;
target?: "self" | "parent" | "root" | string;
debug?: boolean;
/**
* Optional override for the create-stream call. Defaults to
* `apiClient.createStream(runId, "self", key, requestOptions)`. The
* manager passes a cached version so repeated `pipe()` calls for the
* same `(runId, key)` share a single PUT instead of hammering the
* server on every chunk.
*/
createStream?: () => Promise<CreateStreamResponseLike>;
};

type StreamsWriterInstance<T> = StreamsWriterV1<T> | StreamsWriterV2<T>;
Expand All @@ -27,12 +40,17 @@ export class StreamInstance<T> implements StreamsWriter {
}

private async initializeWriter(): Promise<StreamsWriterInstance<T>> {
const { version, headers } = await this.options.apiClient.createStream(
this.options.runId,
"self",
this.options.key,
this.options?.requestOptions
);
const createStreamFn =
this.options.createStream ??
(() =>
this.options.apiClient.createStream(
this.options.runId,
"self",
this.options.key,
this.options?.requestOptions
));

const { version, headers } = await createStreamFn();

const parsedResponse = parseCreateStreamResponse(version, headers);

Expand Down
Loading