Skip to content

POC: KEDA ScaledJob for queue jobs (payload stays in Redis)#41

Open
abnegate wants to merge 5 commits into
mainfrom
feat/queue-keda-poc
Open

POC: KEDA ScaledJob for queue jobs (payload stays in Redis)#41
abnegate wants to merge 5 commits into
mainfrom
feat/queue-keda-poc

Conversation

@abnegate

@abnegate abnegate commented Jul 1, 2026

Copy link
Copy Markdown
Member

Alternative POC to the env-var KubernetesJob broker (#35), built to compare the two ways of running queue jobs as Kubernetes Jobs.

Approach

The K8s-native pattern: the payload stays in the Redis queue, and KEDA scales worker Jobs off the queue depth.

  • Producers enqueue with the ordinary Utopia\Queue\Broker\Redis broker — no custom broker, no Kubernetes involvement at enqueue time.
  • A KEDA ScaledJob watches the queue's Redis list length and spawns one-shot worker Jobs (up to maxReplicaCount).
  • Each worker (tests/Queue/servers/Keda/worker.php) drains the queue with the same Redis broker (receive → handle → commit) and exits.

Notably this needs zero library code — just the existing Redis broker + a drain worker + the ScaledJob manifest.

Proven end-to-end

tests/keda-e2e.sh stands up kind + KEDA (helm) + Redis + the ScaledJob, loads the worker image, and runs KedaTest, which enqueues messages and asserts KEDA spawns Jobs that drain the queue. Verified on kind (OK (1 test, 4 assertions); observed 8 queue-worker-* Jobs draining the queue). bin/monorepo check queue passes (pint + phpstan + rector).

env-var KubernetesJob (#35) vs KEDA ScaledJob (this PR)

env-var KubernetesJob KEDA ScaledJob
Payload inlined in Job env var — etcd (~1.5 MB)/ARG_MAX limits, visible in pod spec, duplicated per Pod stays in Redis; never on a K8s object
Custom broker yes none — reuses the Redis broker
Producer needs cluster access yes (creates Jobs) no (just Redis)
Scaling one Job per message KEDA scales N Jobs off depth; workers batch-drain
Extra dependency appwrite-labs/php-k8s in the app KEDA operator in the cluster
Secrets/PII in payload exposed via pod spec fine (stays in Redis)

Recommendation: KEDA is the more correct/standard path (keeps the queue a queue, no payload on etcd, no custom broker); its cost is requiring the KEDA operator in the cluster. See tests/Queue/servers/Keda/README.md.

🤖 Generated with Claude Code

Alternative to the env-var KubernetesJob broker: instead of inlining the payload
in a per-message Job, producers enqueue to the ordinary Redis broker and a KEDA
ScaledJob scales one-shot worker Jobs off the queue depth. Each worker drains
the queue with the same Redis broker and exits — no custom broker, no payload on
any Kubernetes object.

Adds a drain worker, Dockerfile, redis + ScaledJob manifests, a kubectl-driven
e2e (KedaTest) proving KEDA spawns Jobs that drain the queue, and keda-e2e.sh
(kind + KEDA + redis). Verified end-to-end on kind. See servers/Keda/README.md
for the comparison.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings July 1, 2026 04:05

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a proof-of-concept for running utopia-php/queue jobs as Kubernetes Jobs via KEDA ScaledJob, keeping job payloads in the existing Redis queue (no Kubernetes involvement at enqueue time, no payload stored on K8s objects).

Changes:

  • Introduces a KEDA worker image + one-shot drain worker script and a ScaledJob manifest for scaling off Redis list depth.
  • Adds an end-to-end PHPUnit test (KedaTest) plus a kind + KEDA + Redis harness script to validate the flow.
  • Documents the approach and trade-offs vs the env-var KubernetesJob approach.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
packages/queue/tests/Queue/servers/Keda/worker.php One-shot worker that drains Redis queue messages using the existing Redis broker.
packages/queue/tests/Queue/servers/Keda/README.md Documentation for running the KEDA ScaledJob POC and trade-offs.
packages/queue/tests/Queue/servers/Keda/k8s.yaml Kind-friendly namespace + Redis + KEDA ScaledJob manifest for scaling jobs from Redis depth.
packages/queue/tests/Queue/servers/Keda/Dockerfile Builds the worker container image (PHP + redis extension + project code).
packages/queue/tests/Queue/E2E/Adapter/KedaTest.php E2E test that enqueues directly into Redis and asserts KEDA drains the queue via Jobs.
packages/queue/tests/keda-e2e.sh Harness that provisions kind + KEDA, loads the worker image, applies manifests, and runs the test.
packages/queue/phpunit.xml Registers KedaTest in the e2e testsuite (skips unless KEDA_E2E=true).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread packages/queue/tests/Queue/servers/Keda/worker.php Outdated
Comment thread packages/queue/tests/Queue/servers/Keda/k8s.yaml Outdated
Comment thread packages/queue/tests/keda-e2e.sh Outdated
@greptile-apps

greptile-apps Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This POC introduces a KEDA ScaledJob-based queue worker: a new KubernetesJob adapter that drains a Redis queue synchronously and exits, letting Kubernetes Jobs complete naturally, with no payload ever touching a Kubernetes object.

  • KubernetesJob adapter (src/Queue/Adapter/KubernetesJob.php): overrides consume() to break on a timeout (drain-and-exit) rather than continue (long-poll), and wires onWorkerStart/onWorkerStop callbacks for compatibility with Server. The adapter is covered by unit tests against InMemoryConnection.
  • KEDA infrastructure (tests/Queue/servers/Keda/): k8s.yaml defines the namespace, Redis Deployment/Service, and ScaledJob manifest; worker.php is the container entry-point; keda-lib.sh provisions kind + KEDA with SHA256-verified binary downloads; both a standalone keda-e2e.sh and integration into the existing e2e.sh are provided.
  • KedaTest: end-to-end test that enqueues three payloads and asserts KEDA spawns at least one worker Job that drains both the main queue and the failed queue to zero.

Confidence Score: 5/5

Safe to merge; all changes are additive test infrastructure and a new adapter with no impact on existing adapters or production paths.

All changes are purely additive — a new adapter class, new test files, and new shell scripts. The existing Redis broker, Swoole/Workerman adapters, and the rest of the queue library are untouched. The two observations raised are both robustness nits in new code with no effect on the existing test suite or production deployments.

No files require special attention; both findings are in new files only.

Important Files Changed

Filename Overview
packages/queue/src/Queue/Adapter/KubernetesJob.php New run-to-completion adapter that drains the queue synchronously and returns; logic is correct but start() lacks a try/finally to guarantee onWorkerStop callbacks (consumer close) run on exception.
packages/queue/tests/Queue/E2E/Adapter/KedaTest.php E2e test correctly guards with KEDA_E2E and now asserts failedLength(), but the failed-queue assertion may race against in-flight rejections that haven't completed yet when queueLength() first hits 0.
packages/queue/tests/Queue/E2E/Adapter/KubernetesJobAdapterTest.php Solid unit coverage against InMemoryConnection: drains queue, handles empty queue, and verifies failed-message rejection; correctly placed in the unit testsuite.
packages/queue/tests/keda-lib.sh Shared harness for kind+KEDA setup; downloads kind, kubectl, and helm at pinned versions with SHA256 verification before execution; correctly tracks cluster ownership to avoid deleting pre-existing clusters.
packages/queue/tests/Queue/servers/Keda/k8s.yaml Namespace, Redis Deployment/Service, and KEDA ScaledJob manifest; backoffLimit:0 and crash-recovery limitations are documented inline and in the README.
packages/queue/tests/e2e.sh Adds conditional keda_up so KedaTest runs in CI and gracefully skips when Docker is unavailable locally; keda_down is wired into the cleanup trap.
packages/queue/tests/Queue/servers/Keda/worker.php Minimal KEDA worker entry-point; uses KubernetesJob adapter with env-var-configured Redis host/queue, errors written to stderr.
packages/queue/tests/keda-e2e.sh Standalone KEDA e2e runner that provisions the cluster, runs KedaTest, and tears down; straightforward and correct.

Reviews (5): Last reviewed commit: "test(queue): make KEDA setup best-effort..." | Re-trigger Greptile

Comment thread packages/queue/tests/Queue/E2E/Adapter/KedaTest.php
Comment thread packages/queue/tests/Queue/servers/Keda/k8s.yaml
Comment thread packages/queue/tests/Queue/E2E/Adapter/KedaTest.php
Factor the kind + KEDA + Redis + ScaledJob provisioning into tests/keda-lib.sh
(keda_up/keda_down) and have both keda-e2e.sh and the package's e2e.sh use it,
so `bin/monorepo test queue` (and thus CI) stands up KEDA and runs KedaTest
against a real cluster instead of skipping it.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Comment thread packages/queue/tests/keda-lib.sh
- keda-lib.sh: pin + SHA-256-verify Helm (same as kind/kubectl) instead of
  curl|bash of an unpinned installer; only tear down a kind cluster this run
  created, never a pre-existing one.
- KedaTest: also assert the .failed.* list is empty — draining the main queue
  alone doesn't prove success since receive() pops before handling; note that
  enqueue() mirrors Redis::enqueue()'s envelope.
- k8s.yaml: correct the scaling comment (KEDA scales N Jobs, workers batch-drain)
  and document the backoffLimit/processing-orphan crash-recovery caveat.
- README: crash-recovery section (processing-list reaper via Publisher::retry).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@abnegate

abnegate commented Jul 1, 2026

Copy link
Copy Markdown
Member Author

Addressed in 4781c8f:

greptile-apps

  • P1 — assertion passes even if all messages are rejected (KedaTest): receive() pops from the main list before handling, so a drained main queue alone isn't proof. Now also asserts LLEN utopia-queue.failed.keda == 0 (OK (1 test, 5 assertions) on kind).
  • P1/security — Helm via unpinned curl | bash (keda-lib.sh): Helm is now downloaded at a pinned version (v3.16.4) and SHA-256-verified before use, matching the kind/kubectl pattern.
  • P2 — backoffLimit: 0 can orphan messages (k8s.yaml): documented — a pod killed after receive() claims a message but before commit/reject strands it in the processing list, invisible to KEDA; backoffLimit can't recover it (a new pod only drains the main queue). Production needs a periodic Publisher::retry() reaper. Noted in k8s.yaml and servers/Keda/README.md.
  • P2 — enqueue hard-codes the wire format (KedaTest): added a comment noting it mirrors Redis::enqueue()'s envelope (the in-cluster Redis isn't exposed to the host, so we push via kubectl).

Copilot

  • k8s.yaml comment — corrected to reflect that KEDA scales N Jobs off queue depth and workers batch-drain (not one Job per message).
  • keda-e2e.sh cleanup deleting a pre-existing clusterkeda-lib.sh now tracks whether it created the cluster and only deletes it in that case.
  • worker.php "string interpolation is a parse error" — false positive: "{$message->getPid()}" is valid PHP (curly-brace syntax supports method calls). php -l is clean and the worker runs in the e2e (it drains the queue). Left as-is.

The KEDA approach still needs one bit of library code: a consumer that drains
the queue and exits (so a Job completes) instead of blocking like the Swoole/
Workerman adapters. Extract that out of the test worker into
Utopia\Queue\Adapter\KubernetesJob, cover it with a bare-host unit test
(KubernetesJobAdapterTest), and have the KEDA worker run it via Server. Producers
are unchanged — they still enqueue with any Publisher (e.g. the Redis broker).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Comment thread packages/queue/tests/e2e.sh
keda_up ran unconditionally under set -e, so a provisioning failure (no
Docker, kind download failure) aborted the whole e2e suite before phpunit,
skipping the Swoole/Workerman/pool tests. Gate it: required in CI (hard fail
so KEDA regressions surface), best-effort locally where KedaTest self-skips.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants