diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 5b97bdc..8f642b2 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -114,7 +114,7 @@ jobs: # Pytest fixtures (tests/conftest.py) build a temp workspaceStorage and # exercise Flask routes via app.test_client(). Only listed files — not # `pytest tests/` — to avoid re-collecting unittest.TestCase classes above. - run: python -m pytest tests/test_api_endpoints.py tests/test_pdf_export.py -v --tb=short + run: python -m pytest tests/test_api_endpoints.py tests/test_pdf_export.py tests/test_search_helpers.py -v --tb=short # ── PyInstaller desktop build (Windows only, once per workflow) ──────── # Closes #44. Builds the onedir bundle and smoke-tests --help so the diff --git a/api/search.py b/api/search.py index ae35b09..3ad9c89 100644 --- a/api/search.py +++ b/api/search.py @@ -3,70 +3,23 @@ GET /api/search?q=...&type=all|chat|composer """ -import json import logging -import os -import re -import sqlite3 -from contextlib import closing -from datetime import datetime -from urllib.parse import unquote as _url_unquote from flask import Blueprint, current_app, jsonify, request -from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules -from utils.workspace_path import resolve_workspace_path, get_cli_chats_path -from utils.path_helpers import to_epoch_ms, warn_workspace_json_read -from utils.text_extract import extract_text_from_bubble -from utils.cli_chat_reader import list_cli_projects, traverse_blobs, messages_to_bubbles -from models import Bubble, Composer, ParseWarningCollector, SchemaError +from models import ParseWarningCollector +from services.search import ( + rank_results, + search_cli_sessions, + search_global_storage, + search_legacy_workspaces, +) +from utils.workspace_path import get_cli_chats_path, resolve_workspace_path bp = Blueprint("search", __name__) _logger = logging.getLogger(__name__) -def _json_dump_safe(value) -> str: - """Best-effort JSON string conversion for exclusion matching.""" - try: - return json.dumps(value, ensure_ascii=False, sort_keys=True) - except Exception: - return str(value) if value is not None else "" - - -def _workspace_display_name_from_folder(folder: str | None, fallback: str | None = None) -> str: - """Extract a human-readable workspace name from workspace folder path.""" - if folder: - raw = str(folder).strip() - cleaned = re.sub(r"^file://", "", raw).replace("\\", "/") - parts = cleaned.split("/") - leaf = parts[-1] if parts else "" - if leaf: - return _url_unquote(leaf) - return fallback or "Other chats" - - -def _build_exclusion_searchable( - *, - project_name: str | None, - chat_title: str | None, - model_names: list[str] | None = None, - content_parts: list[str] | None = None, - metadata_parts: list[str] | None = None, -) -> str: - """Build broad searchable text so exclusion rules cover visible output.""" - combined: list[str] = [] - if content_parts: - combined.extend(p for p in content_parts if p) - if metadata_parts: - combined.extend(p for p in metadata_parts if p) - return build_searchable_text( - project_name=project_name, - chat_title=chat_title, - model_names=model_names, - chat_content_snippet="\n\n".join(combined) if combined else None, - ) - - @bp.route("/api/search") def search(): try: @@ -78,442 +31,22 @@ def search(): return jsonify({"error": "No search query provided"}), 400 workspace_path = resolve_workspace_path() - results = [] parse_warnings = ParseWarningCollector() query_lower = query.lower() - global_db_path = os.path.normpath(os.path.join(workspace_path, "..", "globalStorage", "state.vscdb")) - - # --------------------------------------------------------------- - # Search global cursorDiskKV (new Cursor format — primary source) - # --------------------------------------------------------------- - if os.path.isfile(global_db_path): - # try/finally guarantees .close() on every exit path including - # exception (issue #17). Equivalent to wrapping the body in - # `with closing(sqlite3.connect(...))`, without the 160-line - # indent shift over the search logic that follows. - conn = None - try: - conn = sqlite3.connect(f"file:{global_db_path}?mode=ro", uri=True) - conn.row_factory = sqlite3.Row - - # Build workspace name map for display - workspace_entries = [] - ws_id_to_name = {} - try: - for name in os.listdir(workspace_path): - full = os.path.join(workspace_path, name) - wj = os.path.join(full, "workspace.json") - if os.path.isdir(full) and os.path.isfile(wj): - workspace_entries.append({"name": name, "workspaceJsonPath": wj}) - try: - with open(wj, "r", encoding="utf-8") as f: - wd = json.load(f) - first_folder = wd.get("folder") or (wd.get("folders", [{}])[0] or {}).get("path") - if first_folder: - parts = first_folder.replace("\\", "/").split("/") - fn = parts[-1] if parts else None - if fn: - ws_id_to_name[name] = _url_unquote(fn) - except Exception as e: - warn_workspace_json_read(_logger, name, e) - except Exception as e: - _logger.warning( - "Failed to list workspace entries under %s: %s", - workspace_path, - e, - ) - - # Build composer → workspace mapping - composer_id_to_ws = {} - for entry in workspace_entries: - db_path = os.path.join(workspace_path, entry["name"], "state.vscdb") - if not os.path.isfile(db_path): - continue - try: - # closing() guarantees .close() on scope exit (issue #17). - with closing(sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)) as wconn: - row = wconn.execute( - "SELECT value FROM ItemTable WHERE [key] = 'composer.composerData'" - ).fetchone() - if row and row[0]: - data = json.loads(row[0]) - all_composers = data.get("allComposers") - if isinstance(all_composers, list): - for c in all_composers: - cid = c.get("composerId") if isinstance(c, dict) else None - if cid: - composer_id_to_ws[cid] = entry["name"] - except Exception as e: - _logger.warning( - "Failed to load composer mapping from workspace %s: %s", - entry["name"], - e, - ) - - # Load bubble text for searching - bubble_map = {} - for row in conn.execute("SELECT key, value FROM cursorDiskKV WHERE key LIKE 'bubbleId:%'"): - parts = row["key"].split(":") - if len(parts) >= 3: - bid = parts[2] - try: - bubble = Bubble.from_dict(json.loads(row["value"]), bubble_id=bid) - text = extract_text_from_bubble(bubble) - bubble_map[bid] = {"text": text, "raw": bubble.raw} - except SchemaError as e: - # Drift logged so the operator can see why a chat dropped - # out of search results; bad row still skipped so search - # keeps returning results from the well-formed ones. - _logger.warning( - "Schema drift in bubble %s: %s (%s)", - bid, - e, - type(e).__name__, - ) - parse_warnings.record_bubble_skipped() - except (json.JSONDecodeError, TypeError, ValueError) as e: - _logger.warning( - "Failed to decode Bubble from bubbleId:%s: %s", - bid, - e, - ) - parse_warnings.record_bubble_skipped() - - # Search through composerData - composer_rows = conn.execute( - "SELECT key, value FROM cursorDiskKV WHERE key LIKE 'composerData:%' AND LENGTH(value) > 10" - ).fetchall() - - for row in composer_rows: - composer_id = row["key"].split(":")[1] - try: - composer = Composer.from_dict(json.loads(row["value"]), composer_id=composer_id) - except SchemaError as e: - _logger.warning( - "Schema drift in composer %s: %s (%s)", - composer_id, - e, - type(e).__name__, - ) - parse_warnings.record_composer_skipped() - continue - except (json.JSONDecodeError, TypeError, ValueError) as e: - _logger.warning( - "Failed to decode Composer from composerData:%s: %s", - composer_id, - e, - ) - parse_warnings.record_composer_skipped() - continue - try: - cd = composer.raw - headers = composer.full_conversation_headers_only - if not headers: - continue - - title = composer.name or "" - ws_id = composer_id_to_ws.get(composer_id, "global") - ws_name = ws_id_to_name.get(ws_id) - project_name = ws_name or ("Other chats" if ws_id == "global" else ws_id) - - model_config = composer.model_config - model_name = model_config.get("modelName") - model_names = [model_name] if model_name and model_name != "default" else None - - bubble_texts = [] - bubble_meta = [] - for header in headers: - bid = header.get("bubbleId") - bubble_entry = bubble_map.get(bid) - if not bubble_entry: - continue - text = bubble_entry.get("text") or "" - if text: - bubble_texts.append(text) - raw_bubble = bubble_entry.get("raw") - if raw_bubble: - bubble_meta.append(_json_dump_safe(raw_bubble)) - - exclusion_text = _build_exclusion_searchable( - project_name=project_name, - chat_title=title, - model_names=model_names, - content_parts=bubble_texts, - metadata_parts=[ - _json_dump_safe(model_config), - _json_dump_safe(cd.get("conversationSummary")), - _json_dump_safe(cd.get("usage")), - _json_dump_safe(cd.get("requestMetadata")), - _json_dump_safe(cd), - "\n".join(bubble_meta), - ], - ) - if is_excluded_by_rules(rules, exclusion_text): - continue - - # Check if any bubble text matches - has_match = False - matching_text = "" - # Check title - if title and query_lower in title.lower(): - has_match = True - matching_text = title - - # Check bubble texts - if not has_match: - for text in bubble_texts: - if text and query_lower in text.lower(): - has_match = True - # Extract a snippet around the match - idx = text.lower().find(query_lower) - start = max(0, idx - 80) - end = min(len(text), idx + len(query) + 120) - matching_text = ("..." if start > 0 else "") + text[start:end] + ("..." if end < len(text) else "") - break - - if has_match: - if not title: - # Derive title from first bubble - for text in bubble_texts: - if text: - first_lines = [ln for ln in text.split("\n") if ln.strip()] - if first_lines: - title = first_lines[0][:100] - break - if not title: - title = f"Conversation {composer_id[:8]}" - - results.append({ - "workspaceId": ws_id, - "workspaceFolder": ws_name, - "chatId": composer_id, - "chatTitle": title, - "timestamp": to_epoch_ms(composer.last_updated_at) or to_epoch_ms(composer.created_at) or int(datetime.now().timestamp() * 1000), - "matchingText": matching_text, - "type": "composer", - }) - except Exception as e: - _logger.warning( - "Failed to process Composer from composerData:%s during search: %s", - composer_id, - e, - ) - parse_warnings.record_composer_processing_failure() - - except Exception: - _logger.exception("Error searching global storage") - finally: - if conn is not None: - conn.close() - - # --------------------------------------------------------------- - # Search per-workspace ItemTable (legacy format — fallback) - # --------------------------------------------------------------- - try: - for name in os.listdir(workspace_path): - full = os.path.join(workspace_path, name) - if not os.path.isdir(full): - continue - db_path = os.path.join(full, "state.vscdb") - wj_path = os.path.join(full, "workspace.json") - if not os.path.isfile(db_path): - continue - - workspace_folder = None - try: - with open(wj_path, "r", encoding="utf-8") as f: - wd = json.load(f) - workspace_folder = wd.get("folder") - except Exception as e: - warn_workspace_json_read(_logger, name, e) - workspace_name = _workspace_display_name_from_folder(workspace_folder, fallback=name) - - # try/finally guarantees .close() on every exit path (issue #17). - conn = None - try: - conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) - - # Search chat logs - if search_type in ("all", "chat"): - chat_row = conn.execute( - "SELECT value FROM ItemTable WHERE [key] = 'workbench.panel.aichat.view.aichat.chatdata'" - ).fetchone() - if chat_row and chat_row[0]: - data = json.loads(chat_row[0]) - for tab in (data.get("tabs") or []): - ct = tab.get("chatTitle") or "" - tab_model_names = None - tab_meta = tab.get("metadata") - if isinstance(tab_meta, dict): - models_used = tab_meta.get("modelsUsed") - if isinstance(models_used, list): - tab_model_names = [str(m) for m in models_used if m] - elif tab_meta.get("model"): - tab_model_names = [str(tab_meta.get("model"))] - - tab_bubble_texts = [] - for bubble in (tab.get("bubbles") or []): - text = bubble.get("text") or "" - if text: - tab_bubble_texts.append(text) - - exclusion_text = _build_exclusion_searchable( - project_name=workspace_name, - chat_title=ct, - model_names=tab_model_names, - content_parts=tab_bubble_texts, - metadata_parts=[ - _json_dump_safe(tab), - _json_dump_safe(workspace_folder), - ], - ) - if is_excluded_by_rules(rules, exclusion_text): - continue - - has_match = False - matching_text = "" - - if ct.lower().find(query_lower) != -1: - has_match = True - matching_text = ct - - for bubble in (tab.get("bubbles") or []): - text = bubble.get("text") or "" - if text.lower().find(query_lower) != -1: - has_match = True - idx = text.lower().find(query_lower) - start = max(0, idx - 80) - end = min(len(text), idx + len(query) + 120) - matching_text = ("..." if start > 0 else "") + text[start:end] + ("..." if end < len(text) else "") - break - - if has_match: - results.append({ - "workspaceId": name, - "workspaceFolder": workspace_folder, - "chatId": tab.get("tabId"), - "chatTitle": ct or f"Chat {(tab.get('tabId') or '')[:8]}", - "timestamp": tab.get("lastSendTime") or datetime.now().isoformat(), - "matchingText": matching_text, - "type": "chat", - }) - - except Exception as e: - _logger.warning( - "Failed to search legacy workspace %s: %s", - name, - e, - ) - finally: - if conn is not None: - conn.close() - except Exception as e: - _logger.warning( - "Failed to iterate legacy workspaces under %s: %s", - workspace_path, - e, - ) - - # --------------------------------------------------------------- - # Search Cursor CLI sessions (only for type=all) - # --------------------------------------------------------------- + results = [] + results.extend( + search_global_storage(workspace_path, query, query_lower, rules, parse_warnings) + ) + results.extend( + search_legacy_workspaces(workspace_path, query, query_lower, search_type, rules) + ) if search_type == "all": - try: - cli_projects = list_cli_projects(get_cli_chats_path()) - for cp in cli_projects: - ws_name = cp["workspace_name"] or cp["project_id"][:12] - for session in cp["sessions"]: - meta = session.get("meta", {}) - session_id = session["session_id"] - created_ms: int = meta.get("createdAt") or int(datetime.now().timestamp() * 1000) - session_name = meta.get("name") or f"Session {session_id[:8]}" - - try: - messages = traverse_blobs(session["db_path"]) - except Exception as e: - _logger.warning( - "Failed to traverse CLI session blobs for %s: %s", - session_id, - e, - ) - continue - - bubbles = messages_to_bubbles(messages, created_ms) - if not bubbles: - continue - - # Derive title - title = session_name - if not title or title.startswith("New Agent"): - for b in bubbles: - if b["type"] == "user" and b.get("text"): - first_lines = [ln for ln in b["text"].split("\n") if ln.strip()] - if first_lines: - title = first_lines[0][:100] - break - - bubble_texts = [b["text"] for b in bubbles if b.get("text")] - tool_payloads = [ - tc.get("input") or tc.get("summary") or "" - for b in bubbles - for tc in (b.get("metadata") or {}).get("toolCalls") or [] - ] - exclusion_text = _build_exclusion_searchable( - project_name=ws_name, - chat_title=title, - content_parts=bubble_texts + tool_payloads, - ) - if is_excluded_by_rules(rules, exclusion_text): - continue - - has_match = False - matching_text = "" - - if title and query_lower in title.lower(): - has_match = True - matching_text = title - - if not has_match: - for text in bubble_texts: - if text and query_lower in text.lower(): - has_match = True - idx = text.lower().find(query_lower) - start = max(0, idx - 80) - end = min(len(text), idx + len(query) + 120) - matching_text = ( - ("..." if start > 0 else "") - + text[start:end] - + ("..." if end < len(text) else "") - ) - break - - if has_match: - results.append({ - "workspaceId": f"cli:{cp['project_id']}", - "workspaceFolder": cp.get("workspace_path"), - "chatId": session_id, - "chatTitle": title, - "timestamp": created_ms, - "matchingText": matching_text, - "type": "cli_agent", - "source": "cli", - }) - except Exception: - _logger.exception("Error searching CLI sessions") - - # Sort by timestamp descending - def _ts(r): - t = r.get("timestamp", 0) - if isinstance(t, str): - try: - return datetime.fromisoformat(t.replace("Z", "+00:00")).timestamp() - except Exception: - return 0 - return t - results.sort(key=_ts, reverse=True) + results.extend( + search_cli_sessions(get_cli_chats_path(), query, query_lower, rules) + ) - payload: dict = {"results": results} + payload: dict = {"results": rank_results(results)} return jsonify(parse_warnings.attach_to(payload)) except Exception: diff --git a/models/parse_warnings.py b/models/parse_warnings.py index 15386bd..bcfe802 100644 --- a/models/parse_warnings.py +++ b/models/parse_warnings.py @@ -1,6 +1,6 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field @dataclass @@ -10,6 +10,7 @@ class ParseWarningCollector: composers_skipped: int = 0 bubbles_skipped: int = 0 composers_processing_failed: int = 0 + source_failures: list[dict] = field(default_factory=list) def record_composer_skipped(self, count: int = 1) -> None: if count > 0: @@ -24,12 +25,27 @@ def record_composer_processing_failure(self, count: int = 1) -> None: if count > 0: self.composers_processing_failed += count + def record_source_failure(self, exc: BaseException, source: str) -> None: + """Record a whole-source failure (e.g. the global storage DB is unreadable). + + Distinct from per-item parse skips: signals that an entire data source + could not be searched so the API can warn callers that results may be + incomplete. + + The raw exception is intentionally not stored — it is logged server-side + by the caller (``_logger.exception``) before this method is invoked. + Only the source identifier is retained so ``to_api_list`` can produce a + safe client message without leaking file paths or Python internals. + """ + self.source_failures.append({"source": source}) + @property def has_warnings(self) -> bool: return ( self.composers_skipped > 0 or self.bubbles_skipped > 0 or self.composers_processing_failed > 0 + or bool(self.source_failures) ) def to_api_list(self) -> list[dict]: @@ -65,6 +81,12 @@ def to_api_list(self) -> list[dict]: f"{n} {noun} could not be fully assembled after parsing" ), }) + for sf in self.source_failures: + warnings.append({ + "type": "source_failure", + "source": sf["source"], + "detail": f"Search source '{sf['source']}' could not be queried; results may be incomplete", + }) return warnings def attach_to(self, payload: dict) -> dict: diff --git a/services/search.py b/services/search.py new file mode 100644 index 0000000..b3209a7 --- /dev/null +++ b/services/search.py @@ -0,0 +1,578 @@ +"""Search helpers: three independent data-source readers for /api/search. + +Each public function targets exactly one data source, accepts explicit inputs +with no Flask request-context dependency, and returns a plain list of result +dicts. The route handler in ``api/search.py`` calls all three and merges. + +Data sources +------------ +* :func:`search_global_storage` — composerData rows in global ``cursorDiskKV`` +* :func:`search_legacy_workspaces` — per-workspace ItemTable (legacy chat format) +* :func:`search_cli_sessions` — JSONL files from Cursor CLI agent sessions + +Aggregation +----------- +* :func:`rank_results` — sort merged results by timestamp descending +""" + +from __future__ import annotations + +import json +import logging +import os +import sqlite3 +from contextlib import closing +from datetime import datetime +from pathlib import Path + +__all__ = [ + "rank_results", + "search_cli_sessions", + "search_global_storage", + "search_legacy_workspaces", +] +from models import Bubble, Composer, ParseWarningCollector, SchemaError +from services.workspace_db import ( + build_composer_id_to_workspace_id, + collect_workspace_entries, + open_global_db, +) +from utils.cli_chat_reader import list_cli_projects, messages_to_bubbles, traverse_blobs +from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules +from utils.path_helpers import ( + get_workspace_display_name, + to_epoch_ms, + warn_workspace_json_read, +) +from utils.text_extract import extract_text_from_bubble + +_logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Private helpers — pure functions / small utilities +# --------------------------------------------------------------------------- + + +def _json_dump_safe(value) -> str: + """Best-effort JSON serialisation for exclusion-rule matching.""" + try: + return json.dumps(value, ensure_ascii=False, sort_keys=True) + except Exception: + return str(value) if value is not None else "" + + +def _build_exclusion_searchable( + *, + project_name: str | None, + chat_title: str | None, + model_names: list[str] | None = None, + content_parts: list[str] | None = None, + metadata_parts: list[str] | None = None, +) -> str: + """Compose broad searchable text so exclusion rules cover all visible fields.""" + combined: list[str] = [] + if content_parts: + combined.extend(p for p in content_parts if p) + if metadata_parts: + combined.extend(p for p in metadata_parts if p) + return build_searchable_text( + project_name=project_name, + chat_title=chat_title, + model_names=model_names, + chat_content_snippet="\n\n".join(combined) if combined else None, + ) + + +def _extract_snippet(text: str, query: str, query_lower: str) -> str: + """Return a context window around the first match of *query* in *text*. + + Returns an empty string if there is no match. + """ + idx = text.lower().find(query_lower) + if idx == -1: + return "" + start = max(0, idx - 80) + end = min(len(text), idx + len(query) + 120) + return ( + ("..." if start > 0 else "") + + text[start:end] + + ("..." if end < len(text) else "") + ) + + +def _find_match( + title: str, + bubble_texts: list[str], + query_lower: str, + query: str, +) -> tuple[bool, str]: + """Check whether a conversation matches the search query. + + Returns ``(has_match, matching_text)`` where *matching_text* is either the + full title (on a title hit) or a snippet around the first bubble match. + """ + if title and query_lower in title.lower(): + return True, title + for text in bubble_texts: + if text and query_lower in text.lower(): + return True, _extract_snippet(text, query, query_lower) + return False, "" + + +# --------------------------------------------------------------------------- +# Private data builders +# --------------------------------------------------------------------------- + + +def _build_ws_id_to_name( + workspace_entries: list[dict], +) -> dict[str, str]: + """Map workspace folder IDs to human-readable display names. + + Reads each workspace's ``workspace.json`` via + :func:`~utils.path_helpers.get_workspace_display_name`. Entries whose + JSON cannot be read are silently skipped (warning logged). + """ + mapping: dict[str, str] = {} + for entry in workspace_entries: + try: + with open(entry["workspaceJsonPath"], "r", encoding="utf-8") as fh: + wd = json.load(fh) + name = get_workspace_display_name(wd) + if name: + mapping[entry["name"]] = name + except Exception as exc: + warn_workspace_json_read(_logger, entry["name"], exc) + return mapping + + +def _build_search_bubble_map( + global_db, + parse_warnings: ParseWarningCollector, +) -> dict[str, dict]: + """Load ``bubbleId:*`` rows from an open global DB connection. + + Returns ``{bubble_id: {"text": str, "raw": dict}}``. Rows that fail + schema validation or JSON decoding are skipped; the skip is recorded in + *parse_warnings*. + """ + bubble_map: dict[str, dict] = {} + for row in global_db.execute( + "SELECT key, value FROM cursorDiskKV WHERE key LIKE 'bubbleId:%'" + ): + parts = row["key"].split(":") + if len(parts) < 3: + continue + bid = parts[2] + try: + bubble = Bubble.from_dict(json.loads(row["value"]), bubble_id=bid) + bubble_map[bid] = {"text": extract_text_from_bubble(bubble), "raw": bubble.raw} + except SchemaError as exc: + _logger.warning( + "Schema drift in bubble %s: %s (%s)", bid, exc, type(exc).__name__ + ) + parse_warnings.record_bubble_skipped() + except (json.JSONDecodeError, TypeError, ValueError) as exc: + _logger.warning("Failed to decode Bubble from bubbleId:%s: %s", bid, exc) + parse_warnings.record_bubble_skipped() + return bubble_map + + +# --------------------------------------------------------------------------- +# Public: per-source search functions +# --------------------------------------------------------------------------- + + +def search_global_storage( + workspace_path: str, + query: str, + query_lower: str, + rules: list, + parse_warnings: ParseWarningCollector, +) -> list[dict]: + """Search composer conversations stored in the global ``cursorDiskKV`` table. + + This is the primary data source for current Cursor versions. + + Args: + workspace_path: Cursor workspaceStorage root directory. + query: Raw search string (used for snippet extraction). + query_lower: ``query.lower()`` (pre-computed by caller). + rules: Parsed exclusion rules from app config. + parse_warnings: Collector that accumulates parse/schema failures. + + Returns: + List of search result dicts with keys ``workspaceId``, ``workspaceFolder``, + ``chatId``, ``chatTitle``, ``timestamp``, ``matchingText``, ``type``. + """ + results: list[dict] = [] + try: + workspace_entries = collect_workspace_entries(workspace_path) + ws_id_to_name = _build_ws_id_to_name(workspace_entries) + composer_id_to_ws = build_composer_id_to_workspace_id( + workspace_path, workspace_entries + ) + + with open_global_db(workspace_path) as (conn, _db_path): + if conn is None: + return results + bubble_map = _build_search_bubble_map(conn, parse_warnings) + composer_rows = conn.execute( + "SELECT key, value FROM cursorDiskKV" + " WHERE key LIKE 'composerData:%' AND LENGTH(value) > 10" + ).fetchall() + + for row in composer_rows: + composer_id = row["key"].split(":")[1] + try: + composer = Composer.from_dict( + json.loads(row["value"]), composer_id=composer_id + ) + except SchemaError as exc: + _logger.warning( + "Schema drift in composer %s: %s (%s)", + composer_id, + exc, + type(exc).__name__, + ) + parse_warnings.record_composer_skipped() + continue + except (json.JSONDecodeError, TypeError, ValueError) as exc: + _logger.warning( + "Failed to decode Composer from composerData:%s: %s", + composer_id, + exc, + ) + parse_warnings.record_composer_skipped() + continue + + try: + headers = composer.full_conversation_headers_only + if not headers: + continue + + title = composer.name or "" + ws_id = composer_id_to_ws.get(composer_id, "global") + ws_name = ws_id_to_name.get(ws_id) + project_name = ws_name or ("Other chats" if ws_id == "global" else ws_id) + + cd = composer.raw + model_config = composer.model_config + model_name = model_config.get("modelName") + model_names = ( + [model_name] if model_name and model_name != "default" else None + ) + + bubble_texts: list[str] = [] + bubble_meta: list[str] = [] + for header in headers: + bid = header.get("bubbleId") + if not bid: + continue + entry = bubble_map.get(bid) + if not entry: + continue + text = entry.get("text") or "" + if text: + bubble_texts.append(text) + raw_bubble = entry.get("raw") + if raw_bubble: + bubble_meta.append(_json_dump_safe(raw_bubble)) + + exclusion_text = _build_exclusion_searchable( + project_name=project_name, + chat_title=title, + model_names=model_names, + content_parts=bubble_texts, + metadata_parts=[ + _json_dump_safe(model_config), + _json_dump_safe(cd.get("conversationSummary")), + _json_dump_safe(cd.get("usage")), + _json_dump_safe(cd.get("requestMetadata")), + _json_dump_safe(cd), + "\n".join(bubble_meta), + ], + ) + if is_excluded_by_rules(rules, exclusion_text): + continue + + has_match, matching_text = _find_match( + title, bubble_texts, query_lower, query + ) + if not has_match: + continue + + if not title: + for text in bubble_texts: + if text: + first_lines = [ln for ln in text.split("\n") if ln.strip()] + if first_lines: + title = first_lines[0][:100] + break + if not title: + title = f"Conversation {composer_id[:8]}" + + results.append({ + "workspaceId": ws_id, + "workspaceFolder": ws_name, + "chatId": composer_id, + "chatTitle": title, + "timestamp": ( + to_epoch_ms(composer.last_updated_at) + or to_epoch_ms(composer.created_at) + or int(datetime.now().timestamp() * 1000) + ), + "matchingText": matching_text, + "type": "composer", + }) + except Exception as exc: + _logger.warning( + "Failed to process Composer from composerData:%s during search: %s", + composer_id, + exc, + ) + parse_warnings.record_composer_processing_failure() + + except Exception as exc: + _logger.exception("Error searching global storage") + parse_warnings.record_source_failure(exc, source="global_storage") + + return results + + +def search_legacy_workspaces( + workspace_path: str, + query: str, + query_lower: str, + search_type: str, + rules: list, +) -> list[dict]: + """Search legacy per-workspace ItemTable chat data. + + Iterates per-workspace ``state.vscdb`` files looking for the + ``workbench.panel.aichat.view.aichat.chatdata`` key (present in older + Cursor versions before global storage migration). + + Args: + workspace_path: Cursor workspaceStorage root directory. + query: Raw search string (used for snippet extraction). + query_lower: ``query.lower()`` (pre-computed by caller). + search_type: ``"all"`` or ``"chat"`` — other values return immediately. + rules: Parsed exclusion rules from app config. + + Returns: + List of search result dicts with ``type`` set to ``"chat"``. + """ + results: list[dict] = [] + if search_type not in ("all", "chat"): + return results + + try: + for name in os.listdir(workspace_path): + full = os.path.join(workspace_path, name) + if not os.path.isdir(full): + continue + db_path = os.path.join(full, "state.vscdb") + wj_path = os.path.join(full, "workspace.json") + if not os.path.isfile(db_path): + continue + + workspace_folder: str | None = None + workspace_name = name + try: + with open(wj_path, "r", encoding="utf-8") as fh: + wd = json.load(fh) + workspace_folder = wd.get("folder") + workspace_name = get_workspace_display_name(wd, fallback=name) + except Exception as exc: + warn_workspace_json_read(_logger, name, exc) + + db_uri = Path(db_path).resolve().as_uri() + "?mode=ro" + try: + with closing(sqlite3.connect(db_uri, uri=True)) as conn: + chat_row = conn.execute( + "SELECT value FROM ItemTable" + " WHERE [key] = 'workbench.panel.aichat.view.aichat.chatdata'" + ).fetchone() + + if not (chat_row and chat_row[0]): + continue + + data = json.loads(chat_row[0]) + for tab in (data.get("tabs") or []): + ct = tab.get("chatTitle") or "" + + tab_model_names: list[str] | None = None + tab_meta = tab.get("metadata") + if isinstance(tab_meta, dict): + models_used = tab_meta.get("modelsUsed") + if isinstance(models_used, list): + tab_model_names = [str(m) for m in models_used if m] + elif tab_meta.get("model"): + tab_model_names = [str(tab_meta.get("model"))] + + tab_bubble_texts = [ + bubble.get("text") or "" + for bubble in (tab.get("bubbles") or []) + if bubble.get("text") + ] + exclusion_text = _build_exclusion_searchable( + project_name=workspace_name, + chat_title=ct, + model_names=tab_model_names, + content_parts=tab_bubble_texts, + metadata_parts=[ + _json_dump_safe(tab), + _json_dump_safe(workspace_folder), + ], + ) + if is_excluded_by_rules(rules, exclusion_text): + continue + + has_match, matching_text = _find_match( + ct, tab_bubble_texts, query_lower, query + ) + if not has_match: + continue + + results.append({ + "workspaceId": name, + "workspaceFolder": workspace_folder, + "chatId": tab.get("tabId"), + "chatTitle": ct or f"Chat {(tab.get('tabId') or '')[:8]}", + "timestamp": tab.get("lastSendTime") or 0, + "matchingText": matching_text, + "type": "chat", + }) + + except Exception as exc: + _logger.warning("Failed to search legacy workspace %s: %s", name, exc) + + except Exception as exc: + _logger.warning( + "Failed to iterate legacy workspaces under %s: %s", workspace_path, exc + ) + + return results + + +def search_cli_sessions( + cli_chats_path: str, + query: str, + query_lower: str, + rules: list, +) -> list[dict]: + """Search Cursor CLI agent sessions stored as JSONL + blob files. + + Reads from ``~/.cursor/chats/`` (or the path returned by + :func:`~utils.workspace_path.get_cli_chats_path`). + + Args: + cli_chats_path: Path to the Cursor CLI chats directory. + query: Raw search string (used for snippet extraction). + query_lower: ``query.lower()`` (pre-computed by caller). + rules: Parsed exclusion rules from app config. + + Returns: + List of search result dicts with ``type`` set to ``"cli_agent"`` and + ``source`` set to ``"cli"``. + """ + results: list[dict] = [] + try: + cli_projects = list_cli_projects(cli_chats_path) + for cp in cli_projects: + ws_name = cp["workspace_name"] or cp["project_id"][:12] + for session in cp["sessions"]: + meta = session.get("meta", {}) + session_id = session["session_id"] + created_ms: int = ( + meta.get("createdAt") or int(datetime.now().timestamp() * 1000) + ) + session_name: str = meta.get("name") or f"Session {session_id[:8]}" + + try: + messages = traverse_blobs(session["db_path"]) + except Exception as exc: + _logger.warning( + "Failed to traverse CLI session blobs for %s: %s", + session_id, + exc, + ) + continue + + bubbles = messages_to_bubbles(messages, created_ms) + if not bubbles: + continue + + title = session_name + if not title or title.startswith("New Agent"): + for b in bubbles: + if b["type"] == "user" and b.get("text"): + first_lines = [ + ln for ln in b["text"].split("\n") if ln.strip() + ] + if first_lines: + title = first_lines[0][:100] + break + + bubble_texts = [b["text"] for b in bubbles if b.get("text")] + tool_payloads = [ + tc.get("input") or tc.get("summary") or "" + for b in bubbles + for tc in (b.get("metadata") or {}).get("toolCalls") or [] + ] + exclusion_text = _build_exclusion_searchable( + project_name=ws_name, + chat_title=title, + content_parts=bubble_texts + tool_payloads, + ) + if is_excluded_by_rules(rules, exclusion_text): + continue + + has_match, matching_text = _find_match( + title, bubble_texts, query_lower, query + ) + if not has_match: + continue + + results.append({ + "workspaceId": f"cli:{cp['project_id']}", + "workspaceFolder": cp.get("workspace_path"), + "chatId": session_id, + "chatTitle": title, + "timestamp": created_ms, + "matchingText": matching_text, + "type": "cli_agent", + "source": "cli", + }) + except Exception: + _logger.exception("Error searching CLI sessions") + + return results + + +# --------------------------------------------------------------------------- +# Aggregation +# --------------------------------------------------------------------------- + + +def rank_results(results: list[dict]) -> list[dict]: + """Sort *results* by timestamp descending. + + All three source types use epoch-millisecond integers, except + ``search_legacy_workspaces`` which may emit ISO 8601 strings for the + ``lastSendTime`` field. ISO strings are converted to epoch-ms so + cross-source comparisons are made in the same unit. + """ + def _ts(r: dict) -> float: + t = r.get("timestamp", 0) + if isinstance(t, str): + try: + # .timestamp() -> epoch-seconds; x1000 -> epoch-ms to match ints + return datetime.fromisoformat(t.replace("Z", "+00:00")).timestamp() * 1000 + except Exception: + return 0.0 + return float(t) if t else 0.0 + + return sorted(results, key=_ts, reverse=True) diff --git a/tests/test_models_wired_at_read_sites.py b/tests/test_models_wired_at_read_sites.py index bdda6e0..d1428e6 100644 --- a/tests/test_models_wired_at_read_sites.py +++ b/tests/test_models_wired_at_read_sites.py @@ -98,7 +98,7 @@ def tearDown(self): def test_search_endpoint_calls_bubble_from_dict(self): from app import create_app - import api.search as search_mod + import services.search as search_mod app = create_app() app.config["TESTING"] = True app.config["EXCLUSION_RULES"] = [] @@ -149,7 +149,7 @@ def test_bubble_schema_drift_is_logged_not_swallowed_silently(self): app = create_app() app.config["TESTING"] = True app.config["EXCLUSION_RULES"] = [] - with self.assertLogs("api.search", level="WARNING") as logs: + with self.assertLogs("services.search", level="WARNING") as logs: client = app.test_client() response = client.get("/api/search?q=sentinel-wired") self.assertEqual(response.status_code, 200) diff --git a/tests/test_search_helpers.py b/tests/test_search_helpers.py new file mode 100644 index 0000000..6826640 --- /dev/null +++ b/tests/test_search_helpers.py @@ -0,0 +1,591 @@ +""" +Unit tests for services/search.py — the three decomposed search functions +and shared helpers extracted from the monolithic api/search.py handler. + +Each test class targets a single extracted function so failures pinpoint +the exact data-source reader that broke, independently of the Flask layer. + +Run: + pytest tests/test_search_helpers.py -v +""" + +from __future__ import annotations + +import contextlib +import json +import os +import sqlite3 +import tempfile +from pathlib import Path + +import pytest + +from models import ParseWarningCollector +from services.search import ( + _extract_snippet, + _find_match, + rank_results, + search_cli_sessions, + search_global_storage, + search_legacy_workspaces, +) + + +# --------------------------------------------------------------------------- +# _extract_snippet +# --------------------------------------------------------------------------- + + +class TestExtractSnippet: + def test_match_at_start_no_leading_ellipsis(self): + text = "hello world foo" + snippet = _extract_snippet(text, "hello", "hello") + assert snippet.startswith("hello") + assert not snippet.startswith("...") + + def test_match_in_middle_adds_ellipsis(self): + padding = "x" * 200 + text = padding + "needle" + padding + snippet = _extract_snippet(text, "needle", "needle") + assert "needle" in snippet + assert snippet.startswith("...") + assert snippet.endswith("...") + + def test_no_match_returns_empty_string(self): + assert _extract_snippet("no match here", "xyz", "xyz") == "" + + def test_case_insensitive_query_lower(self): + text = "The Query appears here" + snippet = _extract_snippet(text, "Query", "query") + assert "Query" in snippet + + def test_snippet_length_is_bounded(self): + text = "a" * 1000 + "target" + "b" * 1000 + snippet = _extract_snippet(text, "target", "target") + # Context window: 80 before + len("target") + 120 after = ~206 chars + ellipses + assert len(snippet) < 300 + + +# --------------------------------------------------------------------------- +# _find_match +# --------------------------------------------------------------------------- + + +class TestFindMatch: + def test_title_match_returns_full_title(self): + has_match, text = _find_match("hello query world", [], "query", "query") + assert has_match + assert text == "hello query world" + + def test_bubble_match_returns_snippet(self): + has_match, text = _find_match( + "", + ["padding " * 20 + "needle" + " padding" * 20], + "needle", + "needle", + ) + assert has_match + assert "needle" in text + + def test_no_match_returns_false_and_empty(self): + has_match, text = _find_match("nothing here", ["also nothing"], "xyz", "xyz") + assert not has_match + assert text == "" + + def test_title_checked_before_bubbles(self): + # Both title and bubble contain the term; title should win. + has_match, text = _find_match( + "The query is in the title", + ["The query is also in bubbles"], + "query", + "query", + ) + assert has_match + assert text == "The query is in the title" + + def test_case_insensitive_title_match(self): + has_match, _ = _find_match("HELLO WORLD", [], "hello", "hello") + assert has_match + + def test_empty_title_and_empty_bubbles_no_match(self): + has_match, text = _find_match("", [], "q", "q") + assert not has_match + assert text == "" + + +# --------------------------------------------------------------------------- +# rank_results +# --------------------------------------------------------------------------- + + +class TestRankResults: + def test_sorted_by_timestamp_descending(self): + results = [ + {"timestamp": 1000}, + {"timestamp": 3000}, + {"timestamp": 2000}, + ] + ranked = rank_results(results) + assert [r["timestamp"] for r in ranked] == [3000, 2000, 1000] + + def test_iso_string_timestamps_sort_correctly(self): + results = [ + {"timestamp": "2024-01-01T00:00:00Z"}, + {"timestamp": "2025-01-01T00:00:00Z"}, + {"timestamp": "2023-01-01T00:00:00Z"}, + ] + ranked = rank_results(results) + assert ranked[0]["timestamp"] == "2025-01-01T00:00:00Z" + assert ranked[-1]["timestamp"] == "2023-01-01T00:00:00Z" + + def test_empty_list_returns_empty(self): + assert rank_results([]) == [] + + def test_missing_timestamp_treated_as_zero(self): + results = [{"timestamp": 500}, {}, {"timestamp": 100}] + ranked = rank_results(results) + assert ranked[0]["timestamp"] == 500 + # Missing timestamp entry sorts last + assert "timestamp" not in ranked[-1] + + def test_mixed_epoch_ms_and_iso_string_sort_by_recency(self): + # composer/CLI results use integer epoch-ms (~1.715e12); + # legacy chat results may carry an ISO string from lastSendTime. + # A chat from 2025-01 must rank above a composer from 2024-05 when + # both are in the same result set. + results = [ + {"timestamp": 1_715_000_000_000, "type": "composer"}, # 2024-05 + {"timestamp": "2025-01-01T00:00:00Z", "type": "chat"}, # 2025-01 + ] + ranked = rank_results(results) + assert ranked[0]["type"] == "chat", ( + "2025-01 chat must outrank 2024-05 composer; " + f"got order: {[r['type'] for r in ranked]}" + ) + + +# --------------------------------------------------------------------------- +# Fixtures — minimal SQLite databases for integration-style unit tests +# --------------------------------------------------------------------------- + + +@pytest.fixture +def tmp_workspace_root(): + """Temporary workspaceStorage + globalStorage directory pair.""" + with tempfile.TemporaryDirectory() as tmp: + ws_root = os.path.join(tmp, "workspaceStorage") + global_root = os.path.join(tmp, "globalStorage") + cli_root = os.path.join(tmp, "cli_chats") + os.makedirs(ws_root, exist_ok=True) + os.makedirs(global_root, exist_ok=True) + os.makedirs(cli_root, exist_ok=True) + yield { + "ws_root": ws_root, + "global_root": global_root, + "cli_root": cli_root, + "tmp": tmp, + } + + +def _make_global_db(global_root: str, composer_id: str, bubble_text: str) -> None: + """Seed globalStorage/state.vscdb with one composer + one bubble.""" + db_path = os.path.join(global_root, "state.vscdb") + with contextlib.closing(sqlite3.connect(db_path)) as conn: + conn.execute("CREATE TABLE cursorDiskKV ([key] TEXT PRIMARY KEY, value TEXT)") + conn.execute( + "INSERT INTO cursorDiskKV ([key], value) VALUES (?, ?)", + ( + f"bubbleId:{composer_id}:bub-1", + json.dumps({"type": "user", "text": bubble_text}), + ), + ) + conn.execute( + "INSERT INTO cursorDiskKV ([key], value) VALUES (?, ?)", + ( + f"composerData:{composer_id}", + json.dumps({ + "name": "Test conversation", + "createdAt": 1_715_000_000_000, + "lastUpdatedAt": 1_715_001_000_000, + "fullConversationHeadersOnly": [{"bubbleId": "bub-1"}], + "modelConfig": {"modelName": "gpt-4o"}, + }), + ), + ) + conn.commit() + + +def _make_workspace_db( + ws_root: str, + workspace_id: str, + composer_id: str, + folder: str, + legacy_chat_text: str | None = None, +) -> None: + """Seed a per-workspace state.vscdb + workspace.json.""" + ws_dir = os.path.join(ws_root, workspace_id) + os.makedirs(ws_dir, exist_ok=True) + with open(os.path.join(ws_dir, "workspace.json"), "w", encoding="utf-8") as fh: + json.dump({"folder": folder}, fh) + db_path = os.path.join(ws_dir, "state.vscdb") + with contextlib.closing(sqlite3.connect(db_path)) as conn: + conn.execute("CREATE TABLE ItemTable ([key] TEXT PRIMARY KEY, value TEXT)") + conn.execute( + "INSERT INTO ItemTable ([key], value) VALUES (?, ?)", + ( + "composer.composerData", + json.dumps({"allComposers": [{"composerId": composer_id}]}), + ), + ) + if legacy_chat_text is not None: + legacy_data = { + "tabs": [{ + "tabId": "tab-legacy-1", + "chatTitle": "Legacy chat", + "lastSendTime": "2026-01-01T00:00:00Z", + "bubbles": [{"type": "user", "text": legacy_chat_text}], + }] + } + conn.execute( + "INSERT INTO ItemTable ([key], value) VALUES (?, ?)", + ( + "workbench.panel.aichat.view.aichat.chatdata", + json.dumps(legacy_data), + ), + ) + conn.commit() + + +# --------------------------------------------------------------------------- +# search_global_storage +# --------------------------------------------------------------------------- + + +class TestSearchGlobalStorage: + def test_returns_matching_composer(self, tmp_workspace_root): + dirs = tmp_workspace_root + _make_global_db(dirs["global_root"], "cmp-gs-1", "unique-search-term-gs") + _make_workspace_db(dirs["ws_root"], "ws-gs-1", "cmp-gs-1", "/projects/myapp") + + results = search_global_storage( + workspace_path=dirs["ws_root"], + query="unique-search-term-gs", + query_lower="unique-search-term-gs", + rules=[], + parse_warnings=ParseWarningCollector(), + ) + + assert len(results) >= 1 + assert any(r["chatId"] == "cmp-gs-1" for r in results) + + def test_no_match_returns_empty_list(self, tmp_workspace_root): + dirs = tmp_workspace_root + _make_global_db(dirs["global_root"], "cmp-gs-2", "some other content") + _make_workspace_db(dirs["ws_root"], "ws-gs-2", "cmp-gs-2", "/projects/other") + + results = search_global_storage( + workspace_path=dirs["ws_root"], + query="xyzzy-no-match-ever", + query_lower="xyzzy-no-match-ever", + rules=[], + parse_warnings=ParseWarningCollector(), + ) + + assert results == [] + + def test_result_has_required_keys(self, tmp_workspace_root): + dirs = tmp_workspace_root + _make_global_db(dirs["global_root"], "cmp-gs-3", "search-key-check") + _make_workspace_db(dirs["ws_root"], "ws-gs-3", "cmp-gs-3", "/projects/keys") + + results = search_global_storage( + workspace_path=dirs["ws_root"], + query="search-key-check", + query_lower="search-key-check", + rules=[], + parse_warnings=ParseWarningCollector(), + ) + + assert results + r = results[0] + for key in ("workspaceId", "chatId", "chatTitle", "timestamp", "matchingText", "type"): + assert key in r, f"missing key: {key}" + assert r["type"] == "composer" + assert isinstance(r["timestamp"], int) + + def test_missing_global_db_returns_empty(self, tmp_workspace_root): + dirs = tmp_workspace_root + # No global DB created — directory exists but state.vscdb absent. + results = search_global_storage( + workspace_path=dirs["ws_root"], + query="anything", + query_lower="anything", + rules=[], + parse_warnings=ParseWarningCollector(), + ) + assert results == [] + + def test_workspace_display_name_resolved(self, tmp_workspace_root): + dirs = tmp_workspace_root + _make_global_db(dirs["global_root"], "cmp-gs-4", "name-check-term") + _make_workspace_db( + dirs["ws_root"], "ws-gs-4", "cmp-gs-4", "file:///home/user/projects/myrepo" + ) + + results = search_global_storage( + workspace_path=dirs["ws_root"], + query="name-check-term", + query_lower="name-check-term", + rules=[], + parse_warnings=ParseWarningCollector(), + ) + + assert results + # Workspace folder name is resolved to the basename of the folder path. + assert results[0]["workspaceFolder"] == "myrepo" + + +# --------------------------------------------------------------------------- +# search_legacy_workspaces +# --------------------------------------------------------------------------- + + +class TestSearchLegacyWorkspaces: + def test_returns_matching_legacy_tab(self, tmp_workspace_root): + dirs = tmp_workspace_root + _make_workspace_db( + dirs["ws_root"], + "ws-leg-1", + "cmp-leg-1", + "/projects/legacyapp", + legacy_chat_text="legacy-unique-search-text", + ) + + results = search_legacy_workspaces( + workspace_path=dirs["ws_root"], + query="legacy-unique-search-text", + query_lower="legacy-unique-search-text", + search_type="all", + rules=[], + ) + + assert len(results) >= 1 + assert any(r.get("type") == "chat" for r in results) + + def test_no_match_returns_empty(self, tmp_workspace_root): + dirs = tmp_workspace_root + _make_workspace_db( + dirs["ws_root"], + "ws-leg-2", + "cmp-leg-2", + "/projects/other", + legacy_chat_text="something else entirely", + ) + + results = search_legacy_workspaces( + workspace_path=dirs["ws_root"], + query="xyzzy-absolutely-no-match", + query_lower="xyzzy-absolutely-no-match", + search_type="all", + rules=[], + ) + + assert results == [] + + def test_search_type_composer_returns_empty(self, tmp_workspace_root): + dirs = tmp_workspace_root + _make_workspace_db( + dirs["ws_root"], + "ws-leg-3", + "cmp-leg-3", + "/projects/skip", + legacy_chat_text="type-guard-term", + ) + + results = search_legacy_workspaces( + workspace_path=dirs["ws_root"], + query="type-guard-term", + query_lower="type-guard-term", + search_type="composer", + rules=[], + ) + + # Legacy workspaces only hold chat (type="chat"); composer search skips them. + assert results == [] + + def test_result_has_required_keys(self, tmp_workspace_root): + dirs = tmp_workspace_root + _make_workspace_db( + dirs["ws_root"], + "ws-leg-4", + "cmp-leg-4", + "/projects/keycheck", + legacy_chat_text="key-check-legacy", + ) + + results = search_legacy_workspaces( + workspace_path=dirs["ws_root"], + query="key-check-legacy", + query_lower="key-check-legacy", + search_type="chat", + rules=[], + ) + + assert results + r = results[0] + for key in ("workspaceId", "chatId", "chatTitle", "timestamp", "matchingText", "type"): + assert key in r, f"missing key: {key}" + assert r["type"] == "chat" + + def test_workspace_without_legacy_data_skipped(self, tmp_workspace_root): + dirs = tmp_workspace_root + # Workspace DB exists but has no chatdata key (modern workspaces). + _make_workspace_db( + dirs["ws_root"], + "ws-leg-5", + "cmp-leg-5", + "/projects/modern", + legacy_chat_text=None, # no legacy chatdata row + ) + + results = search_legacy_workspaces( + workspace_path=dirs["ws_root"], + query="anything", + query_lower="anything", + search_type="all", + rules=[], + ) + + assert results == [] + + +# --------------------------------------------------------------------------- +# CLI session fixture helper +# --------------------------------------------------------------------------- + + +def _make_store_db(path: str, meta: dict, json_blobs: dict[str, dict]) -> None: + """Create a minimal ``store.db`` with *meta* and one or more JSON blobs. + + The meta value is hex-encoded JSON, matching the real Cursor CLI format + (see ``utils/cli_chat_reader._read_meta`` and ``traverse_blobs``). + Blob IDs are arbitrary strings; no chain/binary blobs are needed for a + single-message session since ``traverse_blobs`` collects the root blob + directly when it is a JSON blob. + """ + with contextlib.closing(sqlite3.connect(path)) as conn: + conn.execute("CREATE TABLE meta (key TEXT PRIMARY KEY, value TEXT)") + conn.execute("CREATE TABLE blobs (id TEXT PRIMARY KEY, data BLOB)") + conn.execute( + "INSERT INTO meta VALUES ('0', ?)", + (json.dumps(meta).encode("utf-8").hex(),), + ) + for blob_id, msg in json_blobs.items(): + conn.execute( + "INSERT INTO blobs VALUES (?, ?)", + (blob_id, json.dumps(msg).encode("utf-8")), + ) + conn.commit() + + +# --------------------------------------------------------------------------- +# search_cli_sessions +# --------------------------------------------------------------------------- + + +class TestSearchCliSessions: + def test_empty_cli_dir_returns_empty(self, tmp_workspace_root): + dirs = tmp_workspace_root + # cli_root is empty — no projects, no sessions. + results = search_cli_sessions( + cli_chats_path=dirs["cli_root"], + query="anything", + query_lower="anything", + rules=[], + ) + assert results == [] + + def test_nonexistent_cli_dir_returns_empty(self): + results = search_cli_sessions( + cli_chats_path="/nonexistent/path/that/does/not/exist", + query="anything", + query_lower="anything", + rules=[], + ) + assert results == [] + + def test_seeded_session_found_by_content_match(self, tmp_workspace_root): + """Seed a real store.db session and verify search_cli_sessions finds it. + + Directory layout mirrors the real Cursor CLI storage: + cli_root/{project_id}/{session_id}/store.db + + The store.db contains: + - ``meta`` row: hex-encoded JSON with ``latestRootBlobId`` pointing + to the single user-message blob. + - ``blobs`` row: JSON bytes ``{"role": "user", "content": ""}`` + where ```` is the unique query we search for. + """ + dirs = tmp_workspace_root + cli_root = dirs["cli_root"] + project_id = "proj-cli-test" + session_id = "sess-cli-test" + blob_id = "blob-msg-0001" + search_term = "cli-session-unique-sentinel-xyz" + + session_dir = os.path.join(cli_root, project_id, session_id) + os.makedirs(session_dir, exist_ok=True) + + _make_store_db( + path=os.path.join(session_dir, "store.db"), + meta={ + "latestRootBlobId": blob_id, + "name": "CLI search test session", + "createdAt": 1_715_100_000_000, + }, + json_blobs={ + blob_id: {"role": "user", "content": f"Please help me with {search_term}"}, + }, + ) + + results = search_cli_sessions( + cli_chats_path=cli_root, + query=search_term, + query_lower=search_term, + rules=[], + ) + + assert len(results) >= 1 + hit = next((r for r in results if r["chatId"] == session_id), None) + assert hit is not None, f"session {session_id!r} not in results: {results}" + assert hit["type"] == "cli_agent" + assert hit["source"] == "cli" + assert search_term in hit["matchingText"] + + def test_seeded_session_not_returned_when_query_misses(self, tmp_workspace_root): + """Same store.db fixture; a non-matching query must return empty.""" + dirs = tmp_workspace_root + cli_root = dirs["cli_root"] + project_id = "proj-cli-miss" + session_id = "sess-cli-miss" + blob_id = "blob-msg-miss" + + session_dir = os.path.join(cli_root, project_id, session_id) + os.makedirs(session_dir, exist_ok=True) + + _make_store_db( + path=os.path.join(session_dir, "store.db"), + meta={"latestRootBlobId": blob_id, "name": "Miss session", "createdAt": 0}, + json_blobs={ + blob_id: {"role": "user", "content": "completely unrelated content"}, + }, + ) + + results = search_cli_sessions( + cli_chats_path=cli_root, + query="xyzzy-no-match-cli", + query_lower="xyzzy-no-match-cli", + rules=[], + ) + + assert results == []