Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions .github/scripts/check_crap_threshold.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/usr/bin/env python3
"""Fail when pytest-crap reports a CRAP score at or above a threshold."""

from __future__ import annotations

import argparse
import sys
from pathlib import Path

from coverage import CoverageData
from pytest_crap.calculator import calculate_crap


def covered_lines_for_file(coverage_file: Path, source_file: Path) -> set[int]:
data = CoverageData(basename=str(coverage_file))
data.read()

source_file = source_file.resolve()
for measured_file in data.measured_files():
if Path(measured_file).resolve() == source_file:
return set(data.lines(measured_file) or [])

raise SystemExit(f"No coverage data found for {source_file}")


def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("source_file", type=Path)
parser.add_argument("--coverage-file", type=Path, default=Path(".coverage"))
parser.add_argument("--max-crap", type=float, required=True)
args = parser.parse_args()

covered_lines = covered_lines_for_file(args.coverage_file, args.source_file)
scores = calculate_crap(str(args.source_file), covered_lines)
offenders = [score for score in scores if score.crap >= args.max_crap]

if not offenders:
print(f"All CRAP scores are below {args.max_crap:g} for {args.source_file}")
return 0

print(f"CRAP scores must be below {args.max_crap:g} for {args.source_file}")
for score in sorted(offenders, key=lambda item: item.crap, reverse=True):
print(
f"{score.name}: CRAP={score.crap:.2f}, "
f"CC={score.cc}, coverage={score.coverage_percent:.1f}%"
)
return 1


if __name__ == "__main__":
sys.exit(main())
8 changes: 8 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ jobs:
with:
enable-cache: true

- name: Check utils CRAP score
if: steps.changes.outputs.changed == 'true'
shell: bash
run: |
set -euo pipefail
UV_PROJECT_ENVIRONMENT=$pythonLocation uv run --extra test --with pytest-crap --with pytest-cov pytest posthog/test/test_utils.py posthog/test/test_size_limited_dict.py --timeout=30 --cov=posthog.utils --crap --crap-threshold=10 --crap-top-n=40 -q
UV_PROJECT_ENVIRONMENT=$pythonLocation uv run --extra test --with pytest-crap --with pytest-cov python .github/scripts/check_crap_threshold.py posthog/utils.py --max-crap 10

- name: Restore mutmut cache
id: mutmut-cache
if: steps.changes.outputs.changed == 'true'
Expand Down
42 changes: 42 additions & 0 deletions posthog/test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ def test_get_os_info_branches(self):
):
assert utils.get_os_info() == {"$os": "Windows", "$os_version": ""}

with mock.patch.object(utils.platform, "win32_ver", create=True):
delattr(utils.platform, "win32_ver")
assert utils._get_windows_os_info() == ("Windows", "", "")

with (
mock.patch.object(utils.sys, "platform", "darwin"),
mock.patch.object(
Expand All @@ -304,6 +308,15 @@ def test_get_os_info_branches(self):
):
assert utils.get_os_info() == {"$os": "Mac OS X", "$os_version": "14.4"}

with mock.patch.object(
utils.platform, "mac_ver", return_value=("", ("", "", ""), "")
):
assert utils._get_macos_info() == ("Mac OS X", "", "")

with mock.patch.object(utils.platform, "mac_ver", create=True):
delattr(utils.platform, "mac_ver")
assert utils._get_macos_info() == ("Mac OS X", "", "")

with (
mock.patch.object(utils.sys, "platform", "linux"),
mock.patch.object(utils.distro, "info", return_value={"version": "24.04"}),
Expand All @@ -315,6 +328,12 @@ def test_get_os_info_branches(self):
"$os_distro": "Ubuntu",
}

with (
mock.patch.object(utils.distro, "info", return_value={"version": ""}),
mock.patch.object(utils.distro, "name", return_value=""),
):
assert utils._get_linux_os_info() == ("Linux", "", "")

with (
mock.patch.object(utils.sys, "platform", "freebsd13"),
mock.patch.object(utils.platform, "release", return_value="13.2"),
Expand All @@ -327,6 +346,10 @@ def test_get_os_info_branches(self):
):
assert utils.get_os_info() == {"$os": "sunos", "$os_version": "5.11"}

with mock.patch.object(utils.platform, "release", create=True):
delattr(utils.platform, "release")
assert utils._platform_release() == ""
Comment on lines 299 to +351
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 New edge-case assertions embedded in the existing test rather than parameterised

The team prefers parameterised tests. The six new with blocks (missing win32_ver, empty mac_ver, absent mac_ver, empty Linux distro, missing release) are appended inline inside test_get_os_info rather than expressed as @pytest.mark.parametrize cases. Inlining them increases the method's size and makes it harder to identify which scenario failed on a CI failure. Each private helper (_get_windows_os_info, _get_macos_info, _get_linux_os_info, _platform_release) could have its own parameterised test covering both the happy path and the fallback path.

Context Used: Do not attempt to comment on incorrect alphabetica... (source)

Prompt To Fix With AI
This is a comment left during a code review.
Path: posthog/test/test_utils.py
Line: 299-351

Comment:
**New edge-case assertions embedded in the existing test rather than parameterised**

The team prefers parameterised tests. The six new `with` blocks (missing `win32_ver`, empty `mac_ver`, absent `mac_ver`, empty Linux distro, missing `release`) are appended inline inside `test_get_os_info` rather than expressed as `@pytest.mark.parametrize` cases. Inlining them increases the method's size and makes it harder to identify which scenario failed on a CI failure. Each private helper (`_get_windows_os_info`, `_get_macos_info`, `_get_linux_os_info`, `_platform_release`) could have its own parameterised test covering both the happy path and the fallback path.

**Context Used:** Do not attempt to comment on incorrect alphabetica... ([source](https://app.greptile.com/review/custom-context?memory=instruction-0))

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!


def test_system_context(self):
with (
mock.patch.object(
Expand Down Expand Up @@ -495,6 +518,15 @@ def test_cache_misses_when_user_exists_without_flag(self):

assert self.cache.get_cached_flag("user123", "missing-flag", 1) is None

def test_remove_missing_user_does_not_raise(self):
self.cache.cache["existing-user"] = {}
self.cache.access_times["existing-user"] = 123

self.cache._remove_user("missing-user")

assert self.cache.cache == {"existing-user": {}}
assert self.cache.access_times == {"existing-user": 123}

def test_stale_cache_misses(self):
assert self.cache.get_stale_cached_flag("missing-user", "test-flag") is None

Expand Down Expand Up @@ -681,6 +713,16 @@ def test_redis_errors_fall_back_to_miss(self):
failing_cache.invalidate_version(1)
failing_cache.clear()

def test_redis_cache_key_helpers(self):
key = self.cache._get_cache_key("user123", "missing")
invalid_key = self.cache._get_cache_key("user123", "invalid")
self.redis.store[invalid_key] = "not json"

assert self.cache._redis_key_to_string(key) == key
assert self.cache._key_has_version(key, 1) is False
Comment on lines 713 to +722
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 test_redis_cache_key_helpers bundles unrelated scenarios into one test

The test exercises _redis_key_to_string, _key_has_version (no data), _key_has_version (corrupt JSON + implicit deletion), and the resulting store mutation — four distinct behaviours in a single method. Consistent with the team's preference for parameterised/focused tests, splitting these into individual tests (or a @pytest.mark.parametrize table) would make failure messages immediately actionable without having to mentally trace which assertion fired.

Context Used: Do not attempt to comment on incorrect alphabetica... (source)

Prompt To Fix With AI
This is a comment left during a code review.
Path: posthog/test/test_utils.py
Line: 713-722

Comment:
**`test_redis_cache_key_helpers` bundles unrelated scenarios into one test**

The test exercises `_redis_key_to_string`, `_key_has_version` (no data), `_key_has_version` (corrupt JSON + implicit deletion), and the resulting store mutation — four distinct behaviours in a single method. Consistent with the team's preference for parameterised/focused tests, splitting these into individual tests (or a `@pytest.mark.parametrize` table) would make failure messages immediately actionable without having to mentally trace which assertion fired.

**Context Used:** Do not attempt to comment on incorrect alphabetica... ([source](https://app.greptile.com/review/custom-context?memory=instruction-0))

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

assert self.cache._key_has_version(invalid_key, 1) is False
assert invalid_key not in self.redis.store

def test_invalidate_version(self):
old_key = self.cache._get_cache_key("user123", "old")
new_key = self.cache._get_cache_key("user123", "new")
Expand Down
187 changes: 101 additions & 86 deletions posthog/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,28 @@ def clean(item):
return item
if isinstance(item, (set, list, tuple)):
return _clean_list(item)
# Pydantic model
try:
# v2+
if hasattr(item, "model_dump") and callable(item.model_dump):
item = item.model_dump()
# v1
elif hasattr(item, "dict") and callable(item.dict):
item = item.dict()
except TypeError as e:
log.debug(f"Could not serialize Pydantic-like model: {e}")
pass

item = _clean_pydantic_model(item)
if isinstance(item, dict):
return _clean_dict(item)
if is_dataclass(item) and not isinstance(item, type):
return _clean_dataclass(item)
return _coerce_unicode(item)


def _clean_pydantic_model(item):
try:
model_dump = getattr(item, "model_dump", None)
if callable(model_dump):
return model_dump()
dict_method = getattr(item, "dict", None)
if callable(dict_method):
return dict_method()
except TypeError as e:
log.debug(f"Could not serialize Pydantic-like model: {e}")
return item


def _clean_list(list_):
return [clean(item) for item in list_]

Expand Down Expand Up @@ -237,27 +241,28 @@ def set_cached_flag(
self.access_times[distinct_id] = current_time

def invalidate_version(self, old_version):
users_to_remove = []
users_to_remove = [
distinct_id
for distinct_id, user_flags in self.cache.items()
if self._remove_flags_with_version(user_flags, old_version)
]

for distinct_id, user_flags in self.cache.items():
flags_to_remove = []
for flag_key, entry in user_flags.items():
if entry.flag_definition_version == old_version:
flags_to_remove.append(flag_key)

# Remove invalidated flags
for flag_key in flags_to_remove:
del user_flags[flag_key]

# Remove user entirely if no flags remain
if not user_flags:
users_to_remove.append(distinct_id)

# Clean up empty users
for distinct_id in users_to_remove:
del self.cache[distinct_id]
if distinct_id in self.access_times:
del self.access_times[distinct_id]
self._remove_user(distinct_id)

def _remove_flags_with_version(self, user_flags, old_version):
flags_to_remove = [
flag_key
for flag_key, entry in user_flags.items()
if entry.flag_definition_version == old_version
]
for flag_key in flags_to_remove:
del user_flags[flag_key]
return not user_flags

def _remove_user(self, distinct_id):
self.cache.pop(distinct_id, None)
self.access_times.pop(distinct_id, None)

def _evict_lru(self):
if not self.access_times:
Expand Down Expand Up @@ -378,29 +383,12 @@ def set_cached_flag(

def invalidate_version(self, old_version):
try:
# For Redis, we use a simple approach: scan for keys with old version
# and delete them. This could be expensive with many keys, but it's
# necessary for correctness.

cursor = 0
pattern = f"{self.key_prefix}*"

while True:
cursor, keys = self.redis.scan(cursor, match=pattern, count=100)

for key in keys:
if key.decode() == self.version_key:
continue

try:
data = self.redis.get(key)
if data:
entry_dict = json.loads(data)
if entry_dict.get("flag_version") == old_version:
self.redis.delete(key)
except (json.JSONDecodeError, KeyError):
# If we can't parse the entry, delete it to be safe
self.redis.delete(key)
self._delete_keys_with_version(keys, old_version)

if cursor == 0:
break # pragma: no mutate
Expand All @@ -409,6 +397,32 @@ def invalidate_version(self, old_version):
# Redis error - silently fail
pass

def _delete_keys_with_version(self, keys, old_version):
for key in keys:
if self._is_version_key(key):
continue
if self._key_has_version(key, old_version):
self.redis.delete(key)

def _is_version_key(self, key):
return self._redis_key_to_string(key) == self.version_key

def _redis_key_to_string(self, key):
if isinstance(key, bytes):
return key.decode()
return key

def _key_has_version(self, key, old_version):
try:
data = self.redis.get(key)
if not data:
return False
return json.loads(data).get("flag_version") == old_version
except (json.JSONDecodeError, KeyError):
# If we can't parse the entry, delete it to be safe
self.redis.delete(key)
return False
Comment on lines +415 to +424
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Hidden deletion side-effect in a predicate method

_key_has_version reads as a pure boolean predicate, but it silently deletes the Redis key when JSON parsing fails. The caller _delete_keys_with_version already handles deletion for the True return path, so a reader expects False to mean "no action taken." The covert delete on the error path will fire even if a caller simply wanted to check the version without intending to clean up. Consider separating the concern — either rename to something like _version_matches_or_delete_corrupt to make the side-effect visible, or move the self.redis.delete(key) call back into _delete_keys_with_version after catching the exception there.

Prompt To Fix With AI
This is a comment left during a code review.
Path: posthog/utils.py
Line: 415-424

Comment:
**Hidden deletion side-effect in a predicate method**

`_key_has_version` reads as a pure boolean predicate, but it silently deletes the Redis key when JSON parsing fails. The caller `_delete_keys_with_version` already handles deletion for the `True` return path, so a reader expects `False` to mean "no action taken." The covert delete on the error path will fire even if a caller simply wanted to check the version without intending to clean up. Consider separating the concern — either rename to something like `_version_matches_or_delete_corrupt` to make the side-effect visible, or move the `self.redis.delete(key)` call back into `_delete_keys_with_version` after catching the exception there.

How can I resolve this? If you propose a fix, please make it concise.


def clear(self):
try:
# Delete all keys matching our pattern
Expand Down Expand Up @@ -472,49 +486,50 @@ def str_iequals(value, comparand):
return str(value).casefold() == str(comparand).casefold()


def _platform_release():
release = getattr(platform, "release", None)
if callable(release):
return release()
return ""


def _get_windows_os_info():
win32_ver = getattr(platform, "win32_ver", None)
if callable(win32_ver):
return "Windows", win32_ver()[0] or "", ""
return "Windows", "", ""


def _get_macos_info():
mac_ver = getattr(platform, "mac_ver", None)
if callable(mac_ver):
return "Mac OS X", mac_ver()[0] or "", ""
return "Mac OS X", "", ""


def _get_linux_os_info():
linux_info = distro.info()
return "Linux", linux_info["version"] or "", distro.name() or ""


def _get_platform_os_info(platform_name):
if platform_name.startswith("win"):
return _get_windows_os_info()
if platform_name == "darwin":
return _get_macos_info()
if platform_name.startswith("linux"):
return _get_linux_os_info()
if platform_name.startswith("freebsd"):
return "FreeBSD", _platform_release(), ""
return platform_name, _platform_release(), ""


def get_os_info():
"""
Returns standardized OS name, version and distro (in case of Linux) information.
Similar to how user agent parsing works in JS.
"""
os_name = "" # pragma: no mutate
os_version = ""
os_distro = "" # pragma: no mutate

platform_name = sys.platform

if platform_name.startswith("win"):
os_name = "Windows"
if hasattr(platform, "win32_ver"):
win_version = platform.win32_ver()[0]
if win_version:
os_version = win_version

elif platform_name == "darwin":
os_name = "Mac OS X"
if hasattr(platform, "mac_ver"):
mac_version = platform.mac_ver()[0]
if mac_version:
os_version = mac_version

elif platform_name.startswith("linux"):
os_name = "Linux"
linux_info = distro.info()
if linux_info["version"]:
os_version = linux_info["version"]
linux_distro = distro.name()
if linux_distro:
os_distro = linux_distro

elif platform_name.startswith("freebsd"):
os_name = "FreeBSD"
if hasattr(platform, "release"):
os_version = platform.release()

else:
os_name = platform_name
if hasattr(platform, "release"):
os_version = platform.release()
os_name, os_version, os_distro = _get_platform_os_info(sys.platform)

info = {
"$os": os_name,
Expand Down
Loading