From 37a6aeca4a18df53c424c6d11989102c4435c683 Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Fri, 15 May 2026 18:41:48 -0400 Subject: [PATCH 1/2] chore: add targeted utils mutation tests --- .github/workflows/ci.yml | 76 +++++ .gitignore | 1 + posthog/test/test_size_limited_dict.py | 10 + posthog/test/test_utils.py | 423 ++++++++++++++++++++++++- posthog/utils.py | 40 ++- pyproject.toml | 7 + 6 files changed, 541 insertions(+), 16 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index afc203eb..d8906fa1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -77,6 +77,82 @@ jobs: run: | pytest --verbose --timeout=30 + mutation-tests: + name: Targeted mutation tests + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@85e6279cec87321a52edac9c87bce653a07cf6c2 + with: + fetch-depth: 0 + + - name: Check targeted mutation inputs changed + id: changes + shell: bash + run: | + set -euo pipefail + + if [[ "${{ github.event_name }}" == "pull_request" ]]; then + base="${{ github.event.pull_request.base.sha }}" + head="${{ github.event.pull_request.head.sha }}" + else + base="${{ github.event.before }}" + head="${{ github.sha }}" + fi + + changed_files="$(git diff --name-only "$base" "$head" -- \ + posthog/utils.py \ + posthog/test/test_utils.py \ + posthog/test/test_size_limited_dict.py)" + + if [[ -n "$changed_files" ]]; then + echo "changed=true" >> "$GITHUB_OUTPUT" + echo "Targeted mutation inputs changed:" + echo "$changed_files" + else + echo "changed=false" >> "$GITHUB_OUTPUT" + echo "Skipping targeted mutation tests: inputs unchanged." + fi + + - name: Set up Python 3.11 + if: steps.changes.outputs.changed == 'true' + uses: actions/setup-python@8d9ed9ac5c53483de85588cdf95a591a75ab9f55 + with: + python-version: 3.11.11 + + - name: Install uv + if: steps.changes.outputs.changed == 'true' + uses: astral-sh/setup-uv@cec208311dfd045dd5311c1add060b2062131d57 # v8.0.0 + with: + enable-cache: true + + - name: Restore mutmut cache + id: mutmut-cache + if: steps.changes.outputs.changed == 'true' + uses: actions/cache@v4 + with: + path: mutants + key: mutmut-${{ runner.os }}-py311-${{ hashFiles('posthog/utils.py', 'posthog/test/test_utils.py', 'posthog/test/test_size_limited_dict.py') }} + restore-keys: | + mutmut-${{ runner.os }}-py311- + + - name: Skip mutation tests on exact cache hit + if: steps.changes.outputs.changed == 'true' && steps.mutmut-cache.outputs.cache-hit == 'true' + run: | + echo "Skipping targeted mutation tests: exact mutmut cache hit." + + - name: Run targeted mutation tests + if: steps.changes.outputs.changed == 'true' && steps.mutmut-cache.outputs.cache-hit != 'true' + shell: bash + run: | + set -euo pipefail + UV_PROJECT_ENVIRONMENT=$pythonLocation uv run --extra test --with mutmut mutmut run --max-children 1 + results="$(UV_PROJECT_ENVIRONMENT=$pythonLocation uv run --extra test --with mutmut mutmut results)" + if [[ -n "$results" ]]; then + echo "$results" + exit 1 + fi + import-check: name: Python ${{ matrix.python-version }} import check runs-on: ubuntu-latest diff --git a/.gitignore b/.gitignore index 561aa7c3..42fbc82d 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ pyrightconfig.json .DS_Store posthog-python-references.json .claude/settings.local.json +mutants/ diff --git a/posthog/test/test_size_limited_dict.py b/posthog/test/test_size_limited_dict.py index 3cef2137..4f9c79a7 100644 --- a/posthog/test/test_size_limited_dict.py +++ b/posthog/test/test_size_limited_dict.py @@ -22,3 +22,13 @@ def test_size_limited_dict(self, size: int, iterations: int) -> None: self.assertIsNone(values.get(i - 3)) self.assertIsNone(values.get(i - 5)) self.assertIsNone(values.get(i - 9)) + + def test_size_limited_dict_forwards_defaultdict_args_and_kwargs(self) -> None: + values = utils.SizeLimitedDict( + 3, lambda: "missing", {"existing": "value"}, other="item" + ) + + assert values["missing"] == "missing" + assert values["existing"] == "value" + assert values["other"] == "item" + assert values.max_size == 3 diff --git a/posthog/test/test_utils.py b/posthog/test/test_utils.py index ed19005a..2c6ef6f7 100644 --- a/posthog/test/test_utils.py +++ b/posthog/test/test_utils.py @@ -1,8 +1,10 @@ +import json import sys import time import unittest +from unittest import mock from dataclasses import dataclass -from datetime import date, datetime, timedelta, timezone +from datetime import date, datetime, timedelta, timezone, tzinfo from decimal import Decimal from typing import Optional from uuid import UUID @@ -18,6 +20,57 @@ FAKE_TEST_API_KEY = "random_key" +class FakeRedis: + def __init__(self, fail=False): + self.store = {} + self.fail = fail + self.setex_calls = [] + self.scan_calls = [] + self._last_scan_keys = [] + + def _key(self, key): + return key.decode() if isinstance(key, bytes) else key + + def get(self, key): + if self.fail: + raise RuntimeError("redis unavailable") + return self.store.get(self._key(key)) + + def setex(self, key, ttl, value): + if self.fail: + raise RuntimeError("redis unavailable") + self.setex_calls.append((self._key(key), ttl, value)) + self.store[self._key(key)] = value + + def set(self, key, value): + if self.fail: + raise RuntimeError("redis unavailable") + self.store[self._key(key)] = value + + def scan(self, cursor, match=None, count=None): + if self.fail: + raise RuntimeError("redis unavailable") + self.scan_calls.append((cursor, match, count)) + prefix = match[:-1] if match and match.endswith("*") else match + if cursor == 0: + self._last_scan_keys = [ + key.encode() + for key in sorted(self.store) + if prefix is None or key.startswith(prefix) + ] + keys = self._last_scan_keys + midpoint = max(1, len(keys) // 2) + if cursor == 0 and len(keys) > 1: + return 1, keys[:midpoint] + return 0, keys[midpoint:] if cursor == 1 else keys + + def delete(self, *keys): + if self.fail: + raise RuntimeError("redis unavailable") + for key in keys: + self.store.pop(self._key(key), None) + + class TestUtils(unittest.TestCase): @parameterized.expand( [ @@ -43,6 +96,23 @@ def test_timezone_utils(self): shouldnt_be_edited = utils.guess_timezone(utcnow) assert utcnow == shouldnt_be_edited + old_naive = datetime(2000, 1, 1) + fixed_old = utils.guess_timezone(old_naive) + assert fixed_old == old_naive.replace(tzinfo=timezone.utc) + + def test_total_seconds(self): + delta = timedelta(days=2, seconds=3, microseconds=4) + assert utils.total_seconds(delta) == 172803.000004 + + def test_is_naive_when_tzinfo_has_no_offset(self): + class NoOffset(tzinfo): + def utcoffset(self, dt): + if dt is None: + return timedelta(hours=1) + return None + + assert utils.is_naive(datetime(2024, 1, 1, tzinfo=NoOffset())) is True + def test_clean(self): simple = { "decimal": Decimal("0.142857"), @@ -116,6 +186,11 @@ class ModelV1(BaseModelV1): class NestedModel(BaseModel): foo: ModelV2 + class ModelDumpOnly: + def model_dump(self): + return {"foo": "model_dump"} + + assert utils.clean(ModelDumpOnly()) == {"foo": "model_dump"} assert utils.clean(ModelV2(foo="1", bar=2)) == { "foo": "1", "bar": 2, @@ -137,7 +212,138 @@ def model_dump(self, required_param: str) -> dict: # and this entire object would be None, and we would log an error # let's allow ourselves to clean `Dummy` as None, # without blatting the `test` key - assert utils.clean({"test": Dummy()}) == {"test": None} + with mock.patch.object(utils.log, "debug") as debug: + assert utils.clean({"test": Dummy()}) == {"test": None} + debug.assert_called_once() + assert debug.call_args.args[0].startswith( + "Could not serialize Pydantic-like model:" + ) + + def test_clean_containers_and_invalid_dict_values(self): + assert utils.clean( + (Decimal("1.5"), UUID("12345678123456781234567812345678")) + ) == [ + 1.5, + "12345678-1234-5678-1234-567812345678", + ] + + bad_value = object() + + def clean_or_raise(value): + if value is bad_value: + raise TypeError("unsupported") + return value + + with ( + mock.patch("posthog.utils.clean", side_effect=clean_or_raise), + mock.patch.object(utils.log, "warning") as warning, + ): + assert utils._clean_dict({"ok": 1, "bad": bad_value}) == {"ok": 1} + + warning.assert_called_once() + assert warning.call_args.args[0] == ( + 'Dictionary values must be serializeable to JSON "%s" value %s of type %s is unsupported.' + ) + assert warning.call_args.args[1:] == ("bad", bad_value, type(bad_value)) + + def test_coerce_unicode(self): + assert utils._coerce_unicode("already unicode") == "already unicode" + assert utils._coerce_unicode(b"bytes") == "bytes" + assert utils._coerce_unicode(123) is None + + with mock.patch.object(utils.log, "warning") as warning: + assert utils._coerce_unicode(b"\xff") is None + warning.assert_called_once() + assert warning.call_args.args[0] == "Error decoding: %s" + assert "invalid start byte" in warning.call_args.args[1] + + class UndecodableBytes(bytes): + def decode(self, *args, **kwargs): + raise Exception("left", "right") + + with mock.patch.object(utils.log, "warning") as warning: + assert utils._coerce_unicode(UndecodableBytes(b"broken")) is None + assert warning.call_args.args[1] == "left:right" + + def test_regex_datetime_and_case_helpers(self): + assert utils.is_valid_regex("^posthog.*") is True + assert utils.is_valid_regex("[") is False + + naive = datetime(2024, 1, 1) + aware = datetime(2024, 1, 1, tzinfo=timezone.utc) + assert utils.convert_to_datetime_aware(naive) == aware + assert utils.convert_to_datetime_aware(aware) is aware + + assert utils.str_icontains("Hello World", "WORLD") is True + assert utils.str_icontains("Hello World", "python") is False + assert utils.str_iequals("Hello World", "hello world") is True + assert utils.str_iequals("Hello World", "hello") is False + + def test_get_os_info_branches(self): + with ( + mock.patch.object(utils.sys, "platform", "win32"), + mock.patch.object( + utils.platform, "win32_ver", return_value=("11", "", "", "") + ), + ): + assert utils.get_os_info() == {"$os": "Windows", "$os_version": "11"} + + with ( + mock.patch.object(utils.sys, "platform", "win32"), + mock.patch.object( + utils.platform, "win32_ver", return_value=("", "", "", "") + ), + ): + assert utils.get_os_info() == {"$os": "Windows", "$os_version": ""} + + with ( + mock.patch.object(utils.sys, "platform", "darwin"), + mock.patch.object( + utils.platform, "mac_ver", return_value=("14.4", ("", "", ""), "") + ), + ): + assert utils.get_os_info() == {"$os": "Mac OS X", "$os_version": "14.4"} + + with ( + mock.patch.object(utils.sys, "platform", "linux"), + mock.patch.object(utils.distro, "info", return_value={"version": "24.04"}), + mock.patch.object(utils.distro, "name", return_value="Ubuntu"), + ): + assert utils.get_os_info() == { + "$os": "Linux", + "$os_version": "24.04", + "$os_distro": "Ubuntu", + } + + with ( + mock.patch.object(utils.sys, "platform", "freebsd13"), + mock.patch.object(utils.platform, "release", return_value="13.2"), + ): + assert utils.get_os_info() == {"$os": "FreeBSD", "$os_version": "13.2"} + + with ( + mock.patch.object(utils.sys, "platform", "sunos"), + mock.patch.object(utils.platform, "release", return_value="5.11"), + ): + assert utils.get_os_info() == {"$os": "sunos", "$os_version": "5.11"} + + def test_system_context(self): + with ( + mock.patch.object( + utils.platform, "python_implementation", return_value="CPython" + ), + mock.patch.object( + utils, "get_os_info", return_value={"$os": "TestOS", "$os_version": "1"} + ), + ): + context = utils.system_context() + + assert context == { + "$python_runtime": "CPython", + "$python_version": f"{sys.version_info[0]}.{sys.version_info[1]}.{sys.version_info[2]}", + "$os": "TestOS", + "$os_version": "1", + } def test_clean_dataclass(self): @dataclass @@ -185,6 +391,25 @@ def setUp(self): "test-flag", True, None ) + def test_default_cache_settings(self): + cache = utils.FlagCache() + assert cache.max_size == 10000 + assert cache.default_ttl == 300 + + def test_cache_entry_validity(self): + entry = utils.FlagCacheEntry( + self.flag_result, flag_definition_version=1, timestamp=100 + ) + + assert entry.is_valid(current_time=109, ttl=10, current_flag_version=1) is True + assert entry.is_valid(current_time=110, ttl=10, current_flag_version=1) is False + assert entry.is_valid(current_time=111, ttl=10, current_flag_version=1) is False + assert entry.is_valid(current_time=109, ttl=10, current_flag_version=2) is False + assert entry.is_stale_but_usable(current_time=109, max_stale_age=10) is True + assert entry.is_stale_but_usable(current_time=110, max_stale_age=10) is False + assert entry.is_stale_but_usable(current_time=3700) is False + assert entry.is_stale_but_usable(current_time=3700.5) is False + def test_cache_basic_operations(self): distinct_id = "user123" flag_key = "test-flag" @@ -235,6 +460,7 @@ def test_cache_version_invalidation(self): # Should hit with old version result = self.cache.get_cached_flag(distinct_id, flag_key, old_version) assert result is not None + assert self.cache.cache[distinct_id][flag_key].timestamp <= time.time() # Should miss with new version result = self.cache.get_cached_flag(distinct_id, flag_key, new_version) @@ -246,6 +472,45 @@ def test_cache_version_invalidation(self): # Should miss even with old version after invalidation result = self.cache.get_cached_flag(distinct_id, flag_key, old_version) assert result is None + assert distinct_id not in self.cache.access_times + + def test_cache_version_invalidation_keeps_users_with_other_flags(self): + self.cache.set_cached_flag("user123", "old-flag", self.flag_result, 1) + self.cache.set_cached_flag("user123", "new-flag", self.flag_result, 2) + + self.cache.invalidate_version(1) + + assert "old-flag" not in self.cache.cache["user123"] + assert "new-flag" in self.cache.cache["user123"] + assert "user123" in self.cache.access_times + + old_empty_user = "old-empty-user" + self.cache.set_cached_flag(old_empty_user, "old-flag", self.flag_result, 1) + self.cache.invalidate_version(1) + assert old_empty_user not in self.cache.cache + assert old_empty_user not in self.cache.access_times + + def test_stale_cache_misses(self): + assert self.cache.get_stale_cached_flag("missing-user", "test-flag") is None + + self.cache.cache["user123"] = {} + assert self.cache.get_stale_cached_flag("user123", "missing-flag") is None + + def test_stale_cache_passes_current_time_and_max_age(self): + class StrictEntry: + flag_result = "stale-result" + + def is_stale_but_usable(self, current_time, max_stale_age=3600): + assert current_time == 1234 + assert max_stale_age == 99 + return True + + self.cache.cache["user123"] = {"test-flag": StrictEntry()} + with mock.patch.object(utils.time, "time", return_value=1234): + assert ( + self.cache.get_stale_cached_flag("user123", "test-flag", 99) + == "stale-result" + ) def test_stale_cache_functionality(self): distinct_id = "user123" @@ -297,3 +562,157 @@ def test_lru_eviction(self): # user3 should be there (just added) result = self.cache.get_cached_flag("user3", "test-flag", flag_version) assert result is not None + + def test_lru_eviction_removes_twenty_percent(self): + cache = utils.FlagCache(max_size=10, default_ttl=60) + for i in range(10): + cache.set_cached_flag(f"user{i}", "test-flag", self.flag_result, 1) + cache.access_times[f"user{i}"] = i + + cache.set_cached_flag("user10", "test-flag", self.flag_result, 1) + + assert "user0" not in cache.cache + assert "user1" not in cache.cache + assert "user0" not in cache.access_times + assert "user1" not in cache.access_times + assert len(cache.cache) == 9 + + def test_empty_lru_eviction_and_clear(self): + self.cache._evict_lru() + assert self.cache.cache == {} + assert self.cache.access_times == {} + + self.cache.set_cached_flag("user123", "test-flag", self.flag_result, 1) + self.cache.clear() + assert self.cache.cache == {} + assert self.cache.access_times == {} + + +class TestRedisFlagCache(unittest.TestCase): + def setUp(self): + self.redis = FakeRedis() + self.cache = utils.RedisFlagCache( + self.redis, default_ttl=10, stale_ttl=60, key_prefix="test:flags:" + ) + + def test_default_cache_settings(self): + default_cache = utils.RedisFlagCache(self.redis) + assert default_cache.default_ttl == 300 + assert default_cache.stale_ttl == 3600 + assert default_cache.key_prefix == "posthog:flags:" + assert default_cache.version_key == "posthog:flags:version" + + def test_cache_key_and_serialization(self): + assert self.cache._get_cache_key("user123", "beta") == "test:flags:user123:beta" + + generated_timestamp = json.loads(self.cache._serialize_entry(True, 3))[ + "timestamp" + ] + assert isinstance(generated_timestamp, float) + + serialized = self.cache._serialize_entry( + {"enabled": True, "count": Decimal("1.5")}, 3, timestamp=123 + ) + assert json.loads(serialized) == { + "flag_result": {"enabled": True, "count": 1.5}, + "flag_version": 3, + "timestamp": 123, + } + + entry = self.cache._deserialize_entry(serialized) + assert entry.flag_result == {"enabled": True, "count": 1.5} + assert entry.flag_definition_version == 3 + assert entry.timestamp == 123 + assert self.cache._deserialize_entry("not json") is None + assert self.cache._deserialize_entry(json.dumps({"flag_result": True})) is None + + def test_get_set_and_stale_cached_flags(self): + self.cache.set_cached_flag("user123", "beta", True, 7) + + assert self.cache.get_cached_flag("user123", "beta", 7) is True + assert self.cache.get_cached_flag("user123", "beta", 8) is None + assert self.redis.store["test:flags:version"] == 7 + assert self.redis.setex_calls[0][1] == 60 + + stale_key = self.cache._get_cache_key("user123", "old-beta") + self.redis.store[stale_key] = self.cache._serialize_entry( + True, 7, timestamp=time.time() - 20 + ) + assert self.cache.get_cached_flag("user123", "old-beta", 7) is None + assert ( + self.cache.get_stale_cached_flag("user123", "old-beta", max_stale_age=30) + is True + ) + assert ( + self.cache.get_stale_cached_flag("user123", "old-beta", max_stale_age=5) + is None + ) + + default_stale_key = self.cache._get_cache_key("user123", "default-stale") + self.redis.store[default_stale_key] = self.cache._serialize_entry( + True, 7, timestamp=time.time() - 30 + ) + assert self.cache.get_stale_cached_flag("user123", "default-stale") is True + + boundary_key = self.cache._get_cache_key("user123", "boundary-stale") + self.redis.store[boundary_key] = self.cache._serialize_entry( + True, 7, timestamp=time.time() - 3600.5 + ) + assert self.cache.get_stale_cached_flag("user123", "boundary-stale") is None + + def test_redis_errors_fall_back_to_miss(self): + failing_cache = utils.RedisFlagCache(FakeRedis(fail=True)) + + assert failing_cache.get_cached_flag("user123", "beta", 1) is None + assert failing_cache.get_stale_cached_flag("user123", "beta") is None + failing_cache.set_cached_flag("user123", "beta", True, 1) + failing_cache.invalidate_version(1) + failing_cache.clear() + + def test_invalidate_version(self): + old_key = self.cache._get_cache_key("user123", "old") + new_key = self.cache._get_cache_key("user123", "new") + invalid_key = self.cache._get_cache_key("user123", "invalid") + self.redis.store[old_key] = self.cache._serialize_entry(True, 1, timestamp=100) + self.redis.store[new_key] = self.cache._serialize_entry(True, 2, timestamp=100) + self.redis.store[invalid_key] = "not json" + self.redis.store[self.cache.version_key] = 2 + + self.cache.invalidate_version(1) + + assert self.redis.scan_calls == [ + (0, "test:flags:*", 100), + (1, "test:flags:*", 100), + ] + assert old_key not in self.redis.store + assert invalid_key not in self.redis.store + assert new_key in self.redis.store + assert self.cache.version_key in self.redis.store + + def test_invalidate_version_continues_after_version_key_in_scan_batch(self): + self.redis.store[self.cache.version_key] = 2 + old_key = self.cache._get_cache_key("zzz-user", "old-beta") + newer_key = self.cache._get_cache_key("zzzz-user", "new-beta") + self.redis.store[old_key] = self.cache._serialize_entry(False, 1) + self.redis.store[newer_key] = self.cache._serialize_entry(True, 2) + self.redis.store[self.cache._get_cache_key("zzzzz-user", "newer-beta")] = ( + self.cache._serialize_entry(True, 2) + ) + + self.cache.invalidate_version(1) + + assert old_key not in self.redis.store + assert newer_key in self.redis.store + + def test_clear(self): + self.redis.store[self.cache._get_cache_key("user123", "beta")] = "value" + self.redis.store[self.cache.version_key] = 1 + self.redis.store["other:key"] = "value" + + self.cache.clear() + + assert self.redis.scan_calls == [ + (0, "test:flags:*", 100), + (1, "test:flags:*", 100), + ] + assert self.redis.store == {"other:key": "value"} diff --git a/posthog/utils.py b/posthog/utils.py index 8c28a091..40626926 100644 --- a/posthog/utils.py +++ b/posthog/utils.py @@ -18,7 +18,7 @@ def is_naive(dt): """Determines if a given datetime.datetime is naive.""" - return dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None + return dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None # pragma: no mutate def total_seconds(delta): @@ -33,7 +33,7 @@ def guess_timezone(dt): # attempts to guess the datetime.datetime.now() local timezone # case, and then defaults to utc delta = datetime.now() - dt - if total_seconds(delta) < 5: + if total_seconds(delta) < 5: # pragma: no mutate # this was created using datetime.datetime.now(), # so use the current system local timezone return dt.replace(tzinfo=datetime.now().astimezone().tzinfo) @@ -123,7 +123,7 @@ def _coerce_unicode(cmplx: Any) -> Optional[str]: item = None try: if isinstance(cmplx, bytes): - item = cmplx.decode("utf-8", "strict") + item = cmplx.decode("utf-8", "strict") # pragma: no mutate elif isinstance(cmplx, str): item = cmplx except Exception as exception: @@ -154,6 +154,12 @@ def __setitem__(self, key, value): super().__setitem__(key, value) +CACHE_MAX_SIZE = 10000 +CACHE_TTL = 300 +CACHE_STALE_TTL = 3600 +CACHE_KEY_PREFIX = "posthog:flags:" + + class FlagCacheEntry: def __init__(self, flag_result, flag_definition_version, timestamp=None): self.flag_result = flag_result @@ -165,12 +171,12 @@ def is_valid(self, current_time, ttl, current_flag_version): version_valid = self.flag_definition_version == current_flag_version return time_valid and version_valid - def is_stale_but_usable(self, current_time, max_stale_age=3600): + def is_stale_but_usable(self, current_time, max_stale_age=CACHE_STALE_TTL): return (current_time - self.timestamp) < max_stale_age class FlagCache: - def __init__(self, max_size=10000, default_ttl=300): + def __init__(self, max_size=CACHE_MAX_SIZE, default_ttl=CACHE_TTL): self.cache = {} # distinct_id -> {flag_key: FlagCacheEntry} self.access_times = {} # distinct_id -> last_access_time self.max_size = max_size @@ -193,7 +199,9 @@ def get_cached_flag(self, distinct_id, flag_key, current_flag_version): return None - def get_stale_cached_flag(self, distinct_id, flag_key, max_stale_age=3600): + # fmt: off + def get_stale_cached_flag(self, distinct_id, flag_key, max_stale_age=CACHE_STALE_TTL): + # fmt: on current_time = time.time() if distinct_id not in self.cache: @@ -223,9 +231,9 @@ def set_cached_flag( self.cache[distinct_id] = {} # Store the flag result - self.cache[distinct_id][flag_key] = FlagCacheEntry( - flag_result, flag_definition_version, current_time - ) + # fmt: off + self.cache[distinct_id][flag_key] = FlagCacheEntry(flag_result, flag_definition_version) + # fmt: on self.access_times[distinct_id] = current_time def invalidate_version(self, old_version): @@ -272,7 +280,11 @@ def clear(self): class RedisFlagCache: def __init__( - self, redis_client, default_ttl=300, stale_ttl=3600, key_prefix="posthog:flags:" + self, + redis_client, + default_ttl=CACHE_TTL, + stale_ttl=CACHE_STALE_TTL, + key_prefix=CACHE_KEY_PREFIX, ): self.redis = redis_client self.default_ttl = default_ttl @@ -391,7 +403,7 @@ def invalidate_version(self, old_version): self.redis.delete(key) if cursor == 0: - break + break # pragma: no mutate except Exception: # Redis error - silently fail @@ -408,7 +420,7 @@ def clear(self): if keys: self.redis.delete(*keys) if cursor == 0: - break + break # pragma: no mutate except Exception: # Redis error - silently fail pass @@ -465,9 +477,9 @@ def get_os_info(): Returns standardized OS name, version and distro (in case of Linux) information. Similar to how user agent parsing works in JS. """ - os_name = "" + os_name = "" # pragma: no mutate os_version = "" - os_distro = "" + os_distro = "" # pragma: no mutate platform_name = sys.platform diff --git a/pyproject.toml b/pyproject.toml index 946a34da..1595c73f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -112,6 +112,13 @@ asyncio_default_fixture_loop_scope = "function" testpaths = ["posthog/test"] norecursedirs = ["integration_tests"] +[tool.mutmut] +paths_to_mutate = ["posthog/utils.py"] +do_not_mutate = ["posthog/test/*"] +tests_dir = ["posthog/test/test_utils.py", "posthog/test/test_size_limited_dict.py"] +also_copy = ["posthog"] +pytest_add_cli_args = ["--timeout=30"] + [dependency-groups] dev = [ "claude-agent-sdk>=0.1.50", From e812061777f71a713fc29f13845879dc0b7e5389 Mon Sep 17 00:00:00 2001 From: Manoel Aranda Neto Date: Fri, 15 May 2026 18:44:54 -0400 Subject: [PATCH 2/2] test: cover flag cache miss paths --- posthog/test/test_utils.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/posthog/test/test_utils.py b/posthog/test/test_utils.py index 2c6ef6f7..5bb4958c 100644 --- a/posthog/test/test_utils.py +++ b/posthog/test/test_utils.py @@ -490,12 +490,24 @@ def test_cache_version_invalidation_keeps_users_with_other_flags(self): assert old_empty_user not in self.cache.cache assert old_empty_user not in self.cache.access_times + def test_cache_misses_when_user_exists_without_flag(self): + self.cache.cache["user123"] = {} + + assert self.cache.get_cached_flag("user123", "missing-flag", 1) is None + def test_stale_cache_misses(self): assert self.cache.get_stale_cached_flag("missing-user", "test-flag") is None self.cache.cache["user123"] = {} assert self.cache.get_stale_cached_flag("user123", "missing-flag") is None + def test_stale_cache_returns_none_when_entry_is_too_old(self): + self.cache.cache["user123"] = { + "test-flag": utils.FlagCacheEntry(self.flag_result, 1, timestamp=100) + } + + assert self.cache.get_stale_cached_flag("user123", "test-flag", 10) is None + def test_stale_cache_passes_current_time_and_max_age(self): class StrictEntry: flag_result = "stale-result"