From f7d85267c2018e0d72d6cae421bb6c9658c34bd3 Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Fri, 15 May 2026 19:36:20 -0400 Subject: [PATCH] chore: add utils CRAP score check --- .github/scripts/check_crap_threshold.py | 51 +++++++ .github/workflows/ci.yml | 8 + posthog/test/test_utils.py | 42 ++++++ posthog/utils.py | 187 +++++++++++++----------- 4 files changed, 202 insertions(+), 86 deletions(-) create mode 100644 .github/scripts/check_crap_threshold.py diff --git a/.github/scripts/check_crap_threshold.py b/.github/scripts/check_crap_threshold.py new file mode 100644 index 00000000..0fac2e79 --- /dev/null +++ b/.github/scripts/check_crap_threshold.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +"""Fail when pytest-crap reports a CRAP score at or above a threshold.""" + +from __future__ import annotations + +import argparse +import sys +from pathlib import Path + +from coverage import CoverageData +from pytest_crap.calculator import calculate_crap + + +def covered_lines_for_file(coverage_file: Path, source_file: Path) -> set[int]: + data = CoverageData(basename=str(coverage_file)) + data.read() + + source_file = source_file.resolve() + for measured_file in data.measured_files(): + if Path(measured_file).resolve() == source_file: + return set(data.lines(measured_file) or []) + + raise SystemExit(f"No coverage data found for {source_file}") + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("source_file", type=Path) + parser.add_argument("--coverage-file", type=Path, default=Path(".coverage")) + parser.add_argument("--max-crap", type=float, required=True) + args = parser.parse_args() + + covered_lines = covered_lines_for_file(args.coverage_file, args.source_file) + scores = calculate_crap(str(args.source_file), covered_lines) + offenders = [score for score in scores if score.crap >= args.max_crap] + + if not offenders: + print(f"All CRAP scores are below {args.max_crap:g} for {args.source_file}") + return 0 + + print(f"CRAP scores must be below {args.max_crap:g} for {args.source_file}") + for score in sorted(offenders, key=lambda item: item.crap, reverse=True): + print( + f"{score.name}: CRAP={score.crap:.2f}, " + f"CC={score.cc}, coverage={score.coverage_percent:.1f}%" + ) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d8906fa1..72330d6b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -126,6 +126,14 @@ jobs: with: enable-cache: true + - name: Check utils CRAP score + if: steps.changes.outputs.changed == 'true' + shell: bash + run: | + set -euo pipefail + UV_PROJECT_ENVIRONMENT=$pythonLocation uv run --extra test --with pytest-crap --with pytest-cov pytest posthog/test/test_utils.py posthog/test/test_size_limited_dict.py --timeout=30 --cov=posthog.utils --crap --crap-threshold=10 --crap-top-n=40 -q + UV_PROJECT_ENVIRONMENT=$pythonLocation uv run --extra test --with pytest-crap --with pytest-cov python .github/scripts/check_crap_threshold.py posthog/utils.py --max-crap 10 + - name: Restore mutmut cache id: mutmut-cache if: steps.changes.outputs.changed == 'true' diff --git a/posthog/test/test_utils.py b/posthog/test/test_utils.py index 5bb4958c..1def3754 100644 --- a/posthog/test/test_utils.py +++ b/posthog/test/test_utils.py @@ -296,6 +296,10 @@ def test_get_os_info_branches(self): ): assert utils.get_os_info() == {"$os": "Windows", "$os_version": ""} + with mock.patch.object(utils.platform, "win32_ver", create=True): + delattr(utils.platform, "win32_ver") + assert utils._get_windows_os_info() == ("Windows", "", "") + with ( mock.patch.object(utils.sys, "platform", "darwin"), mock.patch.object( @@ -304,6 +308,15 @@ def test_get_os_info_branches(self): ): assert utils.get_os_info() == {"$os": "Mac OS X", "$os_version": "14.4"} + with mock.patch.object( + utils.platform, "mac_ver", return_value=("", ("", "", ""), "") + ): + assert utils._get_macos_info() == ("Mac OS X", "", "") + + with mock.patch.object(utils.platform, "mac_ver", create=True): + delattr(utils.platform, "mac_ver") + assert utils._get_macos_info() == ("Mac OS X", "", "") + with ( mock.patch.object(utils.sys, "platform", "linux"), mock.patch.object(utils.distro, "info", return_value={"version": "24.04"}), @@ -315,6 +328,12 @@ def test_get_os_info_branches(self): "$os_distro": "Ubuntu", } + with ( + mock.patch.object(utils.distro, "info", return_value={"version": ""}), + mock.patch.object(utils.distro, "name", return_value=""), + ): + assert utils._get_linux_os_info() == ("Linux", "", "") + with ( mock.patch.object(utils.sys, "platform", "freebsd13"), mock.patch.object(utils.platform, "release", return_value="13.2"), @@ -327,6 +346,10 @@ def test_get_os_info_branches(self): ): assert utils.get_os_info() == {"$os": "sunos", "$os_version": "5.11"} + with mock.patch.object(utils.platform, "release", create=True): + delattr(utils.platform, "release") + assert utils._platform_release() == "" + def test_system_context(self): with ( mock.patch.object( @@ -495,6 +518,15 @@ def test_cache_misses_when_user_exists_without_flag(self): assert self.cache.get_cached_flag("user123", "missing-flag", 1) is None + def test_remove_missing_user_does_not_raise(self): + self.cache.cache["existing-user"] = {} + self.cache.access_times["existing-user"] = 123 + + self.cache._remove_user("missing-user") + + assert self.cache.cache == {"existing-user": {}} + assert self.cache.access_times == {"existing-user": 123} + def test_stale_cache_misses(self): assert self.cache.get_stale_cached_flag("missing-user", "test-flag") is None @@ -681,6 +713,16 @@ def test_redis_errors_fall_back_to_miss(self): failing_cache.invalidate_version(1) failing_cache.clear() + def test_redis_cache_key_helpers(self): + key = self.cache._get_cache_key("user123", "missing") + invalid_key = self.cache._get_cache_key("user123", "invalid") + self.redis.store[invalid_key] = "not json" + + assert self.cache._redis_key_to_string(key) == key + assert self.cache._key_has_version(key, 1) is False + assert self.cache._key_has_version(invalid_key, 1) is False + assert invalid_key not in self.redis.store + def test_invalidate_version(self): old_key = self.cache._get_cache_key("user123", "old") new_key = self.cache._get_cache_key("user123", "new") diff --git a/posthog/utils.py b/posthog/utils.py index 40626926..a2539277 100644 --- a/posthog/utils.py +++ b/posthog/utils.py @@ -59,17 +59,8 @@ def clean(item): return item if isinstance(item, (set, list, tuple)): return _clean_list(item) - # Pydantic model - try: - # v2+ - if hasattr(item, "model_dump") and callable(item.model_dump): - item = item.model_dump() - # v1 - elif hasattr(item, "dict") and callable(item.dict): - item = item.dict() - except TypeError as e: - log.debug(f"Could not serialize Pydantic-like model: {e}") - pass + + item = _clean_pydantic_model(item) if isinstance(item, dict): return _clean_dict(item) if is_dataclass(item) and not isinstance(item, type): @@ -77,6 +68,19 @@ def clean(item): return _coerce_unicode(item) +def _clean_pydantic_model(item): + try: + model_dump = getattr(item, "model_dump", None) + if callable(model_dump): + return model_dump() + dict_method = getattr(item, "dict", None) + if callable(dict_method): + return dict_method() + except TypeError as e: + log.debug(f"Could not serialize Pydantic-like model: {e}") + return item + + def _clean_list(list_): return [clean(item) for item in list_] @@ -237,27 +241,28 @@ def set_cached_flag( self.access_times[distinct_id] = current_time def invalidate_version(self, old_version): - users_to_remove = [] + users_to_remove = [ + distinct_id + for distinct_id, user_flags in self.cache.items() + if self._remove_flags_with_version(user_flags, old_version) + ] - for distinct_id, user_flags in self.cache.items(): - flags_to_remove = [] - for flag_key, entry in user_flags.items(): - if entry.flag_definition_version == old_version: - flags_to_remove.append(flag_key) - - # Remove invalidated flags - for flag_key in flags_to_remove: - del user_flags[flag_key] - - # Remove user entirely if no flags remain - if not user_flags: - users_to_remove.append(distinct_id) - - # Clean up empty users for distinct_id in users_to_remove: - del self.cache[distinct_id] - if distinct_id in self.access_times: - del self.access_times[distinct_id] + self._remove_user(distinct_id) + + def _remove_flags_with_version(self, user_flags, old_version): + flags_to_remove = [ + flag_key + for flag_key, entry in user_flags.items() + if entry.flag_definition_version == old_version + ] + for flag_key in flags_to_remove: + del user_flags[flag_key] + return not user_flags + + def _remove_user(self, distinct_id): + self.cache.pop(distinct_id, None) + self.access_times.pop(distinct_id, None) def _evict_lru(self): if not self.access_times: @@ -378,29 +383,12 @@ def set_cached_flag( def invalidate_version(self, old_version): try: - # For Redis, we use a simple approach: scan for keys with old version - # and delete them. This could be expensive with many keys, but it's - # necessary for correctness. - cursor = 0 pattern = f"{self.key_prefix}*" while True: cursor, keys = self.redis.scan(cursor, match=pattern, count=100) - - for key in keys: - if key.decode() == self.version_key: - continue - - try: - data = self.redis.get(key) - if data: - entry_dict = json.loads(data) - if entry_dict.get("flag_version") == old_version: - self.redis.delete(key) - except (json.JSONDecodeError, KeyError): - # If we can't parse the entry, delete it to be safe - self.redis.delete(key) + self._delete_keys_with_version(keys, old_version) if cursor == 0: break # pragma: no mutate @@ -409,6 +397,32 @@ def invalidate_version(self, old_version): # Redis error - silently fail pass + def _delete_keys_with_version(self, keys, old_version): + for key in keys: + if self._is_version_key(key): + continue + if self._key_has_version(key, old_version): + self.redis.delete(key) + + def _is_version_key(self, key): + return self._redis_key_to_string(key) == self.version_key + + def _redis_key_to_string(self, key): + if isinstance(key, bytes): + return key.decode() + return key + + def _key_has_version(self, key, old_version): + try: + data = self.redis.get(key) + if not data: + return False + return json.loads(data).get("flag_version") == old_version + except (json.JSONDecodeError, KeyError): + # If we can't parse the entry, delete it to be safe + self.redis.delete(key) + return False + def clear(self): try: # Delete all keys matching our pattern @@ -472,49 +486,50 @@ def str_iequals(value, comparand): return str(value).casefold() == str(comparand).casefold() +def _platform_release(): + release = getattr(platform, "release", None) + if callable(release): + return release() + return "" + + +def _get_windows_os_info(): + win32_ver = getattr(platform, "win32_ver", None) + if callable(win32_ver): + return "Windows", win32_ver()[0] or "", "" + return "Windows", "", "" + + +def _get_macos_info(): + mac_ver = getattr(platform, "mac_ver", None) + if callable(mac_ver): + return "Mac OS X", mac_ver()[0] or "", "" + return "Mac OS X", "", "" + + +def _get_linux_os_info(): + linux_info = distro.info() + return "Linux", linux_info["version"] or "", distro.name() or "" + + +def _get_platform_os_info(platform_name): + if platform_name.startswith("win"): + return _get_windows_os_info() + if platform_name == "darwin": + return _get_macos_info() + if platform_name.startswith("linux"): + return _get_linux_os_info() + if platform_name.startswith("freebsd"): + return "FreeBSD", _platform_release(), "" + return platform_name, _platform_release(), "" + + def get_os_info(): """ Returns standardized OS name, version and distro (in case of Linux) information. Similar to how user agent parsing works in JS. """ - os_name = "" # pragma: no mutate - os_version = "" - os_distro = "" # pragma: no mutate - - platform_name = sys.platform - - if platform_name.startswith("win"): - os_name = "Windows" - if hasattr(platform, "win32_ver"): - win_version = platform.win32_ver()[0] - if win_version: - os_version = win_version - - elif platform_name == "darwin": - os_name = "Mac OS X" - if hasattr(platform, "mac_ver"): - mac_version = platform.mac_ver()[0] - if mac_version: - os_version = mac_version - - elif platform_name.startswith("linux"): - os_name = "Linux" - linux_info = distro.info() - if linux_info["version"]: - os_version = linux_info["version"] - linux_distro = distro.name() - if linux_distro: - os_distro = linux_distro - - elif platform_name.startswith("freebsd"): - os_name = "FreeBSD" - if hasattr(platform, "release"): - os_version = platform.release() - - else: - os_name = platform_name - if hasattr(platform, "release"): - os_version = platform.release() + os_name, os_version, os_distro = _get_platform_os_info(sys.platform) info = { "$os": os_name,