-
Notifications
You must be signed in to change notification settings - Fork 67
chore: add utils CRAP score check #577
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: chore/add-utils-mutation-tests
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| #!/usr/bin/env python3 | ||
| """Fail when pytest-crap reports a CRAP score at or above a threshold.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import argparse | ||
| import sys | ||
| from pathlib import Path | ||
|
|
||
| from coverage import CoverageData | ||
| from pytest_crap.calculator import calculate_crap | ||
|
|
||
|
|
||
| def covered_lines_for_file(coverage_file: Path, source_file: Path) -> set[int]: | ||
| data = CoverageData(basename=str(coverage_file)) | ||
| data.read() | ||
|
|
||
| source_file = source_file.resolve() | ||
| for measured_file in data.measured_files(): | ||
| if Path(measured_file).resolve() == source_file: | ||
| return set(data.lines(measured_file) or []) | ||
|
|
||
| raise SystemExit(f"No coverage data found for {source_file}") | ||
|
|
||
|
|
||
| def main() -> int: | ||
| parser = argparse.ArgumentParser() | ||
| parser.add_argument("source_file", type=Path) | ||
| parser.add_argument("--coverage-file", type=Path, default=Path(".coverage")) | ||
| parser.add_argument("--max-crap", type=float, required=True) | ||
| args = parser.parse_args() | ||
|
|
||
| covered_lines = covered_lines_for_file(args.coverage_file, args.source_file) | ||
| scores = calculate_crap(str(args.source_file), covered_lines) | ||
| offenders = [score for score in scores if score.crap >= args.max_crap] | ||
|
|
||
| if not offenders: | ||
| print(f"All CRAP scores are below {args.max_crap:g} for {args.source_file}") | ||
| return 0 | ||
|
|
||
| print(f"CRAP scores must be below {args.max_crap:g} for {args.source_file}") | ||
| for score in sorted(offenders, key=lambda item: item.crap, reverse=True): | ||
| print( | ||
| f"{score.name}: CRAP={score.crap:.2f}, " | ||
| f"CC={score.cc}, coverage={score.coverage_percent:.1f}%" | ||
| ) | ||
| return 1 | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| sys.exit(main()) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -296,6 +296,10 @@ def test_get_os_info_branches(self): | |
| ): | ||
| assert utils.get_os_info() == {"$os": "Windows", "$os_version": ""} | ||
|
|
||
| with mock.patch.object(utils.platform, "win32_ver", create=True): | ||
| delattr(utils.platform, "win32_ver") | ||
| assert utils._get_windows_os_info() == ("Windows", "", "") | ||
|
|
||
| with ( | ||
| mock.patch.object(utils.sys, "platform", "darwin"), | ||
| mock.patch.object( | ||
|
|
@@ -304,6 +308,15 @@ def test_get_os_info_branches(self): | |
| ): | ||
| assert utils.get_os_info() == {"$os": "Mac OS X", "$os_version": "14.4"} | ||
|
|
||
| with mock.patch.object( | ||
| utils.platform, "mac_ver", return_value=("", ("", "", ""), "") | ||
| ): | ||
| assert utils._get_macos_info() == ("Mac OS X", "", "") | ||
|
|
||
| with mock.patch.object(utils.platform, "mac_ver", create=True): | ||
| delattr(utils.platform, "mac_ver") | ||
| assert utils._get_macos_info() == ("Mac OS X", "", "") | ||
|
|
||
| with ( | ||
| mock.patch.object(utils.sys, "platform", "linux"), | ||
| mock.patch.object(utils.distro, "info", return_value={"version": "24.04"}), | ||
|
|
@@ -315,6 +328,12 @@ def test_get_os_info_branches(self): | |
| "$os_distro": "Ubuntu", | ||
| } | ||
|
|
||
| with ( | ||
| mock.patch.object(utils.distro, "info", return_value={"version": ""}), | ||
| mock.patch.object(utils.distro, "name", return_value=""), | ||
| ): | ||
| assert utils._get_linux_os_info() == ("Linux", "", "") | ||
|
|
||
| with ( | ||
| mock.patch.object(utils.sys, "platform", "freebsd13"), | ||
| mock.patch.object(utils.platform, "release", return_value="13.2"), | ||
|
|
@@ -327,6 +346,10 @@ def test_get_os_info_branches(self): | |
| ): | ||
| assert utils.get_os_info() == {"$os": "sunos", "$os_version": "5.11"} | ||
|
|
||
| with mock.patch.object(utils.platform, "release", create=True): | ||
| delattr(utils.platform, "release") | ||
| assert utils._platform_release() == "" | ||
|
|
||
| def test_system_context(self): | ||
| with ( | ||
| mock.patch.object( | ||
|
|
@@ -495,6 +518,15 @@ def test_cache_misses_when_user_exists_without_flag(self): | |
|
|
||
| assert self.cache.get_cached_flag("user123", "missing-flag", 1) is None | ||
|
|
||
| def test_remove_missing_user_does_not_raise(self): | ||
| self.cache.cache["existing-user"] = {} | ||
| self.cache.access_times["existing-user"] = 123 | ||
|
|
||
| self.cache._remove_user("missing-user") | ||
|
|
||
| assert self.cache.cache == {"existing-user": {}} | ||
| assert self.cache.access_times == {"existing-user": 123} | ||
|
|
||
| def test_stale_cache_misses(self): | ||
| assert self.cache.get_stale_cached_flag("missing-user", "test-flag") is None | ||
|
|
||
|
|
@@ -681,6 +713,16 @@ def test_redis_errors_fall_back_to_miss(self): | |
| failing_cache.invalidate_version(1) | ||
| failing_cache.clear() | ||
|
|
||
| def test_redis_cache_key_helpers(self): | ||
| key = self.cache._get_cache_key("user123", "missing") | ||
| invalid_key = self.cache._get_cache_key("user123", "invalid") | ||
| self.redis.store[invalid_key] = "not json" | ||
|
|
||
| assert self.cache._redis_key_to_string(key) == key | ||
| assert self.cache._key_has_version(key, 1) is False | ||
|
Comment on lines
713
to
+722
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The test exercises Context Used: Do not attempt to comment on incorrect alphabetica... (source) Prompt To Fix With AIThis is a comment left during a code review.
Path: posthog/test/test_utils.py
Line: 713-722
Comment:
**`test_redis_cache_key_helpers` bundles unrelated scenarios into one test**
The test exercises `_redis_key_to_string`, `_key_has_version` (no data), `_key_has_version` (corrupt JSON + implicit deletion), and the resulting store mutation — four distinct behaviours in a single method. Consistent with the team's preference for parameterised/focused tests, splitting these into individual tests (or a `@pytest.mark.parametrize` table) would make failure messages immediately actionable without having to mentally trace which assertion fired.
**Context Used:** Do not attempt to comment on incorrect alphabetica... ([source](https://app.greptile.com/review/custom-context?memory=instruction-0))
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time! |
||
| assert self.cache._key_has_version(invalid_key, 1) is False | ||
| assert invalid_key not in self.redis.store | ||
|
|
||
| def test_invalidate_version(self): | ||
| old_key = self.cache._get_cache_key("user123", "old") | ||
| new_key = self.cache._get_cache_key("user123", "new") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,24 +59,28 @@ def clean(item): | |
| return item | ||
| if isinstance(item, (set, list, tuple)): | ||
| return _clean_list(item) | ||
| # Pydantic model | ||
| try: | ||
| # v2+ | ||
| if hasattr(item, "model_dump") and callable(item.model_dump): | ||
| item = item.model_dump() | ||
| # v1 | ||
| elif hasattr(item, "dict") and callable(item.dict): | ||
| item = item.dict() | ||
| except TypeError as e: | ||
| log.debug(f"Could not serialize Pydantic-like model: {e}") | ||
| pass | ||
|
|
||
| item = _clean_pydantic_model(item) | ||
| if isinstance(item, dict): | ||
| return _clean_dict(item) | ||
| if is_dataclass(item) and not isinstance(item, type): | ||
| return _clean_dataclass(item) | ||
| return _coerce_unicode(item) | ||
|
|
||
|
|
||
| def _clean_pydantic_model(item): | ||
| try: | ||
| model_dump = getattr(item, "model_dump", None) | ||
| if callable(model_dump): | ||
| return model_dump() | ||
| dict_method = getattr(item, "dict", None) | ||
| if callable(dict_method): | ||
| return dict_method() | ||
| except TypeError as e: | ||
| log.debug(f"Could not serialize Pydantic-like model: {e}") | ||
| return item | ||
|
|
||
|
|
||
| def _clean_list(list_): | ||
| return [clean(item) for item in list_] | ||
|
|
||
|
|
@@ -237,27 +241,28 @@ def set_cached_flag( | |
| self.access_times[distinct_id] = current_time | ||
|
|
||
| def invalidate_version(self, old_version): | ||
| users_to_remove = [] | ||
| users_to_remove = [ | ||
| distinct_id | ||
| for distinct_id, user_flags in self.cache.items() | ||
| if self._remove_flags_with_version(user_flags, old_version) | ||
| ] | ||
|
|
||
| for distinct_id, user_flags in self.cache.items(): | ||
| flags_to_remove = [] | ||
| for flag_key, entry in user_flags.items(): | ||
| if entry.flag_definition_version == old_version: | ||
| flags_to_remove.append(flag_key) | ||
|
|
||
| # Remove invalidated flags | ||
| for flag_key in flags_to_remove: | ||
| del user_flags[flag_key] | ||
|
|
||
| # Remove user entirely if no flags remain | ||
| if not user_flags: | ||
| users_to_remove.append(distinct_id) | ||
|
|
||
| # Clean up empty users | ||
| for distinct_id in users_to_remove: | ||
| del self.cache[distinct_id] | ||
| if distinct_id in self.access_times: | ||
| del self.access_times[distinct_id] | ||
| self._remove_user(distinct_id) | ||
|
|
||
| def _remove_flags_with_version(self, user_flags, old_version): | ||
| flags_to_remove = [ | ||
| flag_key | ||
| for flag_key, entry in user_flags.items() | ||
| if entry.flag_definition_version == old_version | ||
| ] | ||
| for flag_key in flags_to_remove: | ||
| del user_flags[flag_key] | ||
| return not user_flags | ||
|
|
||
| def _remove_user(self, distinct_id): | ||
| self.cache.pop(distinct_id, None) | ||
| self.access_times.pop(distinct_id, None) | ||
|
|
||
| def _evict_lru(self): | ||
| if not self.access_times: | ||
|
|
@@ -378,29 +383,12 @@ def set_cached_flag( | |
|
|
||
| def invalidate_version(self, old_version): | ||
| try: | ||
| # For Redis, we use a simple approach: scan for keys with old version | ||
| # and delete them. This could be expensive with many keys, but it's | ||
| # necessary for correctness. | ||
|
|
||
| cursor = 0 | ||
| pattern = f"{self.key_prefix}*" | ||
|
|
||
| while True: | ||
| cursor, keys = self.redis.scan(cursor, match=pattern, count=100) | ||
|
|
||
| for key in keys: | ||
| if key.decode() == self.version_key: | ||
| continue | ||
|
|
||
| try: | ||
| data = self.redis.get(key) | ||
| if data: | ||
| entry_dict = json.loads(data) | ||
| if entry_dict.get("flag_version") == old_version: | ||
| self.redis.delete(key) | ||
| except (json.JSONDecodeError, KeyError): | ||
| # If we can't parse the entry, delete it to be safe | ||
| self.redis.delete(key) | ||
| self._delete_keys_with_version(keys, old_version) | ||
|
|
||
| if cursor == 0: | ||
| break # pragma: no mutate | ||
|
|
@@ -409,6 +397,32 @@ def invalidate_version(self, old_version): | |
| # Redis error - silently fail | ||
| pass | ||
|
|
||
| def _delete_keys_with_version(self, keys, old_version): | ||
| for key in keys: | ||
| if self._is_version_key(key): | ||
| continue | ||
| if self._key_has_version(key, old_version): | ||
| self.redis.delete(key) | ||
|
|
||
| def _is_version_key(self, key): | ||
| return self._redis_key_to_string(key) == self.version_key | ||
|
|
||
| def _redis_key_to_string(self, key): | ||
| if isinstance(key, bytes): | ||
| return key.decode() | ||
| return key | ||
|
|
||
| def _key_has_version(self, key, old_version): | ||
| try: | ||
| data = self.redis.get(key) | ||
| if not data: | ||
| return False | ||
| return json.loads(data).get("flag_version") == old_version | ||
| except (json.JSONDecodeError, KeyError): | ||
| # If we can't parse the entry, delete it to be safe | ||
| self.redis.delete(key) | ||
| return False | ||
|
Comment on lines
+415
to
+424
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Prompt To Fix With AIThis is a comment left during a code review.
Path: posthog/utils.py
Line: 415-424
Comment:
**Hidden deletion side-effect in a predicate method**
`_key_has_version` reads as a pure boolean predicate, but it silently deletes the Redis key when JSON parsing fails. The caller `_delete_keys_with_version` already handles deletion for the `True` return path, so a reader expects `False` to mean "no action taken." The covert delete on the error path will fire even if a caller simply wanted to check the version without intending to clean up. Consider separating the concern — either rename to something like `_version_matches_or_delete_corrupt` to make the side-effect visible, or move the `self.redis.delete(key)` call back into `_delete_keys_with_version` after catching the exception there.
How can I resolve this? If you propose a fix, please make it concise. |
||
|
|
||
| def clear(self): | ||
| try: | ||
| # Delete all keys matching our pattern | ||
|
|
@@ -472,49 +486,50 @@ def str_iequals(value, comparand): | |
| return str(value).casefold() == str(comparand).casefold() | ||
|
|
||
|
|
||
| def _platform_release(): | ||
| release = getattr(platform, "release", None) | ||
| if callable(release): | ||
| return release() | ||
| return "" | ||
|
|
||
|
|
||
| def _get_windows_os_info(): | ||
| win32_ver = getattr(platform, "win32_ver", None) | ||
| if callable(win32_ver): | ||
| return "Windows", win32_ver()[0] or "", "" | ||
| return "Windows", "", "" | ||
|
|
||
|
|
||
| def _get_macos_info(): | ||
| mac_ver = getattr(platform, "mac_ver", None) | ||
| if callable(mac_ver): | ||
| return "Mac OS X", mac_ver()[0] or "", "" | ||
| return "Mac OS X", "", "" | ||
|
|
||
|
|
||
| def _get_linux_os_info(): | ||
| linux_info = distro.info() | ||
| return "Linux", linux_info["version"] or "", distro.name() or "" | ||
|
|
||
|
|
||
| def _get_platform_os_info(platform_name): | ||
| if platform_name.startswith("win"): | ||
| return _get_windows_os_info() | ||
| if platform_name == "darwin": | ||
| return _get_macos_info() | ||
| if platform_name.startswith("linux"): | ||
| return _get_linux_os_info() | ||
| if platform_name.startswith("freebsd"): | ||
| return "FreeBSD", _platform_release(), "" | ||
| return platform_name, _platform_release(), "" | ||
|
|
||
|
|
||
| def get_os_info(): | ||
| """ | ||
| Returns standardized OS name, version and distro (in case of Linux) information. | ||
| Similar to how user agent parsing works in JS. | ||
| """ | ||
| os_name = "" # pragma: no mutate | ||
| os_version = "" | ||
| os_distro = "" # pragma: no mutate | ||
|
|
||
| platform_name = sys.platform | ||
|
|
||
| if platform_name.startswith("win"): | ||
| os_name = "Windows" | ||
| if hasattr(platform, "win32_ver"): | ||
| win_version = platform.win32_ver()[0] | ||
| if win_version: | ||
| os_version = win_version | ||
|
|
||
| elif platform_name == "darwin": | ||
| os_name = "Mac OS X" | ||
| if hasattr(platform, "mac_ver"): | ||
| mac_version = platform.mac_ver()[0] | ||
| if mac_version: | ||
| os_version = mac_version | ||
|
|
||
| elif platform_name.startswith("linux"): | ||
| os_name = "Linux" | ||
| linux_info = distro.info() | ||
| if linux_info["version"]: | ||
| os_version = linux_info["version"] | ||
| linux_distro = distro.name() | ||
| if linux_distro: | ||
| os_distro = linux_distro | ||
|
|
||
| elif platform_name.startswith("freebsd"): | ||
| os_name = "FreeBSD" | ||
| if hasattr(platform, "release"): | ||
| os_version = platform.release() | ||
|
|
||
| else: | ||
| os_name = platform_name | ||
| if hasattr(platform, "release"): | ||
| os_version = platform.release() | ||
| os_name, os_version, os_distro = _get_platform_os_info(sys.platform) | ||
|
|
||
| info = { | ||
| "$os": os_name, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The team prefers parameterised tests. The six new
withblocks (missingwin32_ver, emptymac_ver, absentmac_ver, empty Linux distro, missingrelease) are appended inline insidetest_get_os_inforather than expressed as@pytest.mark.parametrizecases. Inlining them increases the method's size and makes it harder to identify which scenario failed on a CI failure. Each private helper (_get_windows_os_info,_get_macos_info,_get_linux_os_info,_platform_release) could have its own parameterised test covering both the happy path and the fallback path.Context Used: Do not attempt to comment on incorrect alphabetica... (source)
Prompt To Fix With AI
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!