diff --git a/capabilities/web-security/tests/test_xss_verifier.py b/capabilities/web-security/tests/test_xss_verifier.py
new file mode 100644
index 0000000..d1a5386
--- /dev/null
+++ b/capabilities/web-security/tests/test_xss_verifier.py
@@ -0,0 +1,348 @@
+"""Tests for XssVerifier — programmatic XSS verification via agent-browser canary."""
+
+from __future__ import annotations
+
+import json
+import sys
+from pathlib import Path
+from unittest.mock import AsyncMock, patch
+
+import pytest
+
+pytestmark = pytest.mark.asyncio
+
+_REPO_ROOT = Path(__file__).resolve()
+while _REPO_ROOT != _REPO_ROOT.parent:
+ if (_REPO_ROOT / "capabilities" / "web-security" / "tools").is_dir():
+ break
+ _REPO_ROOT = _REPO_ROOT.parent
+sys.path.insert(0, str(_REPO_ROOT / "capabilities" / "web-security" / "tools"))
+
+from xss_verifier import XssVerifier
+
+
+@pytest.fixture
+def verifier() -> XssVerifier:
+ return XssVerifier()
+
+
+def _mock_eval(return_value: str) -> AsyncMock:
+ return AsyncMock(return_value=return_value)
+
+
+class TestToolDiscovery:
+ def test_tools_discovered(self, verifier: XssVerifier) -> None:
+ tools = verifier.get_tools()
+ names = {t.name for t in tools}
+ assert names == {"xss_inject_canary", "xss_verify", "xss_reset"}
+
+ def test_tools_have_catch(self, verifier: XssVerifier) -> None:
+ for tool in verifier.get_tools():
+ assert tool.catch is True
+
+ def test_inject_has_description(self, verifier: XssVerifier) -> None:
+ tools = {t.name: t for t in verifier.get_tools()}
+ assert "BEFORE" in tools["xss_inject_canary"].description
+
+ def test_verify_has_description(self, verifier: XssVerifier) -> None:
+ tools = {t.name: t for t in verifier.get_tools()}
+ assert "CONFIRMED" in tools["xss_verify"].description
+
+
+class TestInjectCanary:
+ @patch("xss_verifier._eval_js")
+ async def test_inject_arms_canary(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ result = await verifier.inject_canary()
+ assert "armed" in result.lower()
+ assert verifier._nonce is not None
+
+ @patch("xss_verifier._eval_js")
+ async def test_inject_generates_unique_nonce(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+ first_nonce = verifier._nonce
+ await verifier.inject_canary()
+ assert verifier._nonce != first_nonce
+
+ @patch("xss_verifier._eval_js")
+ async def test_inject_passes_global_args(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary(global_args=["--session-name", "app"])
+ assert verifier._global_args == ["--session-name", "app"]
+
+
+class TestVerify:
+ async def test_verify_without_inject_raises(self, verifier: XssVerifier) -> None:
+ with pytest.raises(RuntimeError, match="No canary injected"):
+ await verifier.verify(
+ xss_context="reflected",
+ payload_used="",
+ )
+
+ @patch("xss_verifier._eval_js")
+ async def test_verify_unparseable_response_raises(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+
+ mock_eval.return_value = "not-json-at-all"
+ with pytest.raises(RuntimeError, match="Could not parse canary state"):
+ await verifier.verify(
+ xss_context="reflected",
+ payload_used="",
+ )
+
+ @patch("xss_verifier._eval_js")
+ async def test_confirmed_on_alert(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ # Arm canary
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+ nonce = verifier._nonce
+
+ # Verify with alert signal
+ mock_eval.return_value = json.dumps(
+ {
+ "armed": True,
+ "nonce": nonce,
+ "alerts": ["1"],
+ "confirms": [],
+ "prompts": [],
+ "scriptExecutions": [],
+ }
+ )
+ result = await verifier.verify(
+ xss_context="reflected",
+ payload_used="",
+ )
+ assert result.startswith("CONFIRMED")
+ assert "alert() called 1x" in result
+
+ @patch("xss_verifier._eval_js")
+ async def test_confirmed_on_confirm_dialog(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+ nonce = verifier._nonce
+
+ mock_eval.return_value = json.dumps(
+ {
+ "armed": True,
+ "nonce": nonce,
+ "alerts": [],
+ "confirms": ["xss"],
+ "prompts": [],
+ "scriptExecutions": [],
+ }
+ )
+ result = await verifier.verify(
+ xss_context="dom",
+ payload_used="
",
+ )
+ assert result.startswith("CONFIRMED")
+
+ @patch("xss_verifier._eval_js")
+ async def test_partial_on_script_injection_only(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+ nonce = verifier._nonce
+
+ mock_eval.return_value = json.dumps(
+ {
+ "armed": True,
+ "nonce": nonce,
+ "alerts": [],
+ "confirms": [],
+ "prompts": [],
+ "scriptExecutions": [
+ {"src": None, "inline": "fetch('https://evil.com')"}
+ ],
+ }
+ )
+ result = await verifier.verify(
+ xss_context="stored",
+ payload_used="",
+ )
+ assert result.startswith("PARTIAL")
+ assert "",
+ )
+ assert result.startswith("NOT_DETECTED")
+ assert "HTML-encoded" in result
+
+ @patch("xss_verifier._eval_js")
+ async def test_canary_lost_on_navigation(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+
+ mock_eval.return_value = json.dumps({"armed": False})
+ result = await verifier.verify(
+ xss_context="reflected",
+ payload_used="",
+ )
+ assert "CANARY_LOST" in result
+
+ @patch("xss_verifier._eval_js")
+ async def test_nonce_mismatch(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+
+ mock_eval.return_value = json.dumps(
+ {
+ "armed": True,
+ "nonce": "wrong_nonce",
+ "alerts": [],
+ "confirms": [],
+ "prompts": [],
+ "scriptExecutions": [],
+ }
+ )
+ result = await verifier.verify(
+ xss_context="reflected",
+ payload_used="",
+ )
+ assert "NONCE_MISMATCH" in result
+
+
+class TestReset:
+ @patch("xss_verifier._eval_js")
+ async def test_reset_clears_state(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary(global_args=["--session-name", "test"])
+ assert verifier._nonce is not None
+ assert verifier._global_args is not None
+
+ result = await verifier.reset()
+ assert "reset" in result.lower()
+ assert verifier._nonce is None
+ assert verifier._global_args is None
+
+
+class TestMultipleCycles:
+ @patch("xss_verifier._eval_js")
+ async def test_inject_verify_reset_cycle(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ # Cycle 1: inject, verify (confirmed), reset
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+ nonce1 = verifier._nonce
+
+ mock_eval.return_value = json.dumps(
+ {
+ "armed": True,
+ "nonce": nonce1,
+ "alerts": ["1"],
+ "confirms": [],
+ "prompts": [],
+ "scriptExecutions": [],
+ }
+ )
+ result = await verifier.verify(
+ xss_context="reflected", payload_used=""
+ )
+ assert result.startswith("CONFIRMED")
+
+ await verifier.reset()
+
+ # Cycle 2: inject, verify (not detected)
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+ nonce2 = verifier._nonce
+ assert nonce2 != nonce1
+
+ mock_eval.return_value = json.dumps(
+ {
+ "armed": True,
+ "nonce": nonce2,
+ "alerts": [],
+ "confirms": [],
+ "prompts": [],
+ "scriptExecutions": [],
+ }
+ )
+ result = await verifier.verify(
+ xss_context="dom", payload_used="
"
+ )
+ assert result.startswith("NOT_DETECTED")
+
+
+class TestErrorPropagation:
+ @patch("xss_verifier._eval_js")
+ async def test_inject_propagates_eval_error(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.side_effect = RuntimeError("agent-browser is not available")
+ with pytest.raises(RuntimeError, match="agent-browser is not available"):
+ await verifier.inject_canary()
+
+ @patch("xss_verifier._eval_js")
+ async def test_verify_propagates_eval_timeout(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ mock_eval.return_value = "armed"
+ await verifier.inject_canary()
+
+ mock_eval.side_effect = RuntimeError("timed out after 60s")
+ with pytest.raises(RuntimeError, match="timed out"):
+ await verifier.verify(
+ xss_context="reflected",
+ payload_used="",
+ )
+
+
+class TestHandleToolCall:
+ @patch("xss_verifier._eval_js")
+ async def test_inject_via_handle_tool_call(
+ self, mock_eval: AsyncMock, verifier: XssVerifier
+ ) -> None:
+ from dreadnode.agents.tools import FunctionCall, ToolCall
+
+ mock_eval.return_value = "armed"
+ tools = {t.name: t for t in verifier.get_tools()}
+ tc = ToolCall(
+ id="call_inject",
+ function=FunctionCall(name="xss_inject_canary", arguments="{}"),
+ )
+ message, stop = await tools["xss_inject_canary"].handle_tool_call(tc)
+ assert stop is False
+ assert "armed" in message.content.lower()
diff --git a/capabilities/web-security/tools/xss_verifier.py b/capabilities/web-security/tools/xss_verifier.py
new file mode 100644
index 0000000..defd7b4
--- /dev/null
+++ b/capabilities/web-security/tools/xss_verifier.py
@@ -0,0 +1,310 @@
+"""Programmatic XSS verification via agent-browser.
+
+Observes whether arbitrary JavaScript executes in the browser — payload
+agnostic, no hardcoded strings or patterns. Works by injecting a canary
+script that replaces window.alert/confirm/prompt with logging wrappers
+and attaches a MutationObserver to detect ' "
+ "or '
'.",
+ ],
+ global_args: Annotated[
+ list[str] | None,
+ "agent-browser global args. Must match the session used in xss_inject_canary.",
+ ] = None,
+ ) -> str:
+ """Check whether your XSS payload triggered JavaScript execution.
+
+ Reads the canary state from the browser and checks for two
+ signals: (1) dialog function calls logged by the overridden
+ alert/confirm/prompt, (2)