Skip to content

feat(sse): per-client live subscription filter#86

Open
aakhter wants to merge 1 commit into
Ark0N:masterfrom
aakhter:feat/per-client-sse-filter
Open

feat(sse): per-client live subscription filter#86
aakhter wants to merge 1 commit into
Ark0N:masterfrom
aakhter:feat/per-client-sse-filter

Conversation

@aakhter
Copy link
Copy Markdown
Contributor

@aakhter aakhter commented May 15, 2026

Summary

Lets a connected client narrow its SSE stream to a single session without forcing an EventSource reconnect. With many open sessions (N tabs in the UI, all generating output), this cuts terminal-event SSE traffic by roughly — we only send the actively-rendered session's bytes instead of all of them.

The existing ?sessions= query filter only works at connect time; narrowing or widening it requires tearing down the EventSource and losing in-flight messages. That was acceptable when filters were set once at page load, but the UI now flips active sessions on every tab switch.

How it works

  • Client generates a stable per-page UUID (_clientId) once at CodemanApp construction and includes it on the SSE URL:
    GET /api/events?clientId=<uuid>&sessions=<active-id>
    
  • Server records a clientId → reply mapping in addition to the existing reply → sessionFilter map.
  • New endpoint:
    POST /api/events/subscribe { clientId, sessions: string[] | null }
    
    updates the in-memory filter for the matching reply. Returns 204 on success, 404 if the client isn't known yet (a benign race on the first selectSession after reconnect — the next reconnect carries the filter via the URL anyway).
  • On every selectSession the client fires a fire-and-forget POST. No reconnect, no re-init, no replay buffer needed.

⚠ Behavioural change to broadcast()

The per-event session filter is removed from broadcast(). Previously that path filtered lifecycle/metadata events (session:created, session:updated, ralph:*, hook:*) by extracting a sessionId from the payload. With per-client narrow filters, that meant a client subscribed to session A would never see session:created for B and the sidebar would silently de-sync.

New contract:

  • Lifecycle/metadata events (low-volume, UI-correctness critical) broadcast to all clients regardless of filter.
  • Terminal events (high-volume, the actual reason for filtering) apply the filter in flushSessionTerminalBatch (already there; unchanged).

extractSessionId() was only used by the old broadcast() filter and has been removed (typecheck confirms no other callers).

Files

  • src/web/sse-stream-manager.ts (+34/-29): add sseClientsById, optional clientId arg to addClient/removeClient cleanup, new updateClientFilter(), and the broadcast() change above.
  • src/web/server.ts (+22/-3): parse clientId on /api/events, pass to addClient, register POST /api/events/subscribe handler.
  • src/web/public/app.js (+41/-1): generate _clientId, build the EventSource URL with both clientId + active session, add _updateSseSubscription(), call it on selectSession.

Test plan

  • Open 3 sessions, each running a script that prints continuously. DevTools → Network → EventStream: confirm only the active tab's bytes arrive (terminal events for the others are filtered out at the server).
  • Switch tabs rapidly: no EventSource disconnect/reconnect events; the URL bar of the EventStream stays the same.
  • Verify session:created for a brand-new session arrives in all open tabs (lifecycle events are NOT filtered).
  • Verify session:updated (rename, color change) reaches all tabs even if a different session is currently focused.
  • Hard-refresh a tab while a _updateSseSubscription request is mid-flight: server returns 404 for the now-stale clientId; reconnect carries the new clientId via URL; behaviour normalizes.
  • Backwards-compat: a client that doesn't supply clientId still works (gets the connect-time filter only, like today).

🤖 Generated with Claude Code

Lets a connected client narrow its SSE stream to a single session
without forcing an EventSource reconnect. With many open sessions
(N tabs in the UI, all generating output), this cuts terminal-event
SSE traffic by roughly Nx — we only send the actively-rendered
session's bytes instead of all of them.

The existing ?sessions= query filter only worked at connect time;
narrowing or widening it required tearing down the EventSource and
losing in-flight messages. That was acceptable when filters were set
once at page load, but the UI now flips active sessions on every
tab switch.

How it works
============

- Client generates a stable per-page UUID (`_clientId`) once at
  CodemanApp construction and includes it on the SSE URL:
    GET /api/events?clientId=<uuid>&sessions=<active-id>
- Server records a `clientId -> reply` mapping in addition to the
  existing `reply -> sessionFilter` map.
- New endpoint:
    POST /api/events/subscribe { clientId, sessions: string[] | null }
  updates the in-memory filter for the matching reply. 204 on success,
  404 if the client isn't known yet (race on first selectSession after
  reconnect — the next reconnect carries the filter via the URL).
- On every selectSession the client fires a fire-and-forget POST. No
  reconnect, no re-init, no replay buffer needed.

Behavioural change to broadcast()
=================================

The per-event session filter is removed from `broadcast()`. Previously
that path filtered lifecycle/metadata events (`session:created`,
`session:updated`, `ralph:*`, `hook:*`) by extracting a `sessionId` from
the payload. With per-client narrow filters, that meant a client
subscribed to session A would never see session:created for B and the
sidebar would silently de-sync.

The new contract:
- **Lifecycle/metadata events** (low-volume, UI-correctness critical)
  broadcast to all clients regardless of filter.
- **Terminal events** (high-volume, the actual reason for filtering)
  apply the filter in `flushSessionTerminalBatch` (already there;
  unchanged).

`extractSessionId()` was only used by the old broadcast() filter and
has been removed.

Files
=====

- src/web/sse-stream-manager.ts (+34/-29): add `sseClientsById`,
  optional `clientId` arg to addClient/removeClient cleanup, new
  `updateClientFilter()`, and the broadcast() change above.
- src/web/server.ts (+22/-3): parse `clientId` on /api/events, pass
  to `addClient`, register POST /api/events/subscribe handler.
- src/web/public/app.js (+41/-1): generate `_clientId`, build the
  EventSource URL with both clientId + active session, add
  `_updateSseSubscription()`, call it on selectSession.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant