diff --git a/src/boost/docs/database.md b/src/boost/docs/database.md index 6bfc39a84..14e73a03b 100644 --- a/src/boost/docs/database.md +++ b/src/boost/docs/database.md @@ -113,6 +113,7 @@ To see how read / write connections should be configured, let's look at this exa 'connect_timeout' => 10.0, 'wait_timeout' => 3.0, 'heartbeat' => -1, + 'heartbeat_timeout' => 1.0, 'max_idle_time' => (float) env('DB_MAX_IDLE_TIME', 60), ], ], @@ -144,12 +145,13 @@ Each connection may define its own `pool` configuration: 'connect_timeout' => 10.0, 'wait_timeout' => 3.0, 'heartbeat' => -1, + 'heartbeat_timeout' => 1.0, 'max_idle_time' => (float) env('DB_MAX_IDLE_TIME', 60), ], ], ``` -The `min_connections` option determines the minimum number of connections kept in the pool, while the `max_connections` option determines the maximum number of connections that may be opened for the worker. The `connect_timeout` option controls how long Hypervel will wait while opening a new database connection. The `wait_timeout` option controls how long a coroutine may wait for an available connection when the pool is exhausted. The `heartbeat` option controls how often idle pooled connections are pinged to keep them alive and detect broken sockets; set this value to `-1` to disable heartbeats. The `max_idle_time` option controls how long an idle connection may remain in the pool before it is closed. +The `min_connections` option determines the minimum number of connections kept warm in the pool, while the `max_connections` option determines the maximum number of connections that may be opened for the worker. The `connect_timeout` option controls how long Hypervel will wait while opening a new database connection. The `wait_timeout` option controls how long a coroutine may wait for an available connection when the pool is exhausted. The `heartbeat` option controls how often Hypervel validates idle pooled connections; set this value to `-1` to disable heartbeats. When heartbeats are enabled, Hypervel keeps the minimum idle connection set alive with a raw `SELECT 1` ping that does not fire query events, query logs, or query duration handlers. The `heartbeat_timeout` option controls how long a heartbeat ping may run before the connection is discarded. The `max_idle_time` option controls how long idle connections above the minimum pool size may remain in the pool before they are closed. Hypervel's default database configuration also includes a `pgsql-pooled` connection. This connection is intended for PostgreSQL transaction poolers such as PgBouncer and uses separate `DB_POOLED_*` environment variables. It also sets `migrations_connection` to `pgsql`, allowing your application to use the pooled connection at runtime while migration commands use the direct PostgreSQL connection. diff --git a/src/boost/todo.md b/src/boost/todo.md index ecc71fbb7..1f3230bcf 100644 --- a/src/boost/todo.md +++ b/src/boost/todo.md @@ -25,7 +25,7 @@ ## Database -- Wire opt-in heartbeat support for database pools. `src/foundation/config/database.php` and the database docs advertise a `heartbeat` option in every database pool block, but `Hypervel\Database\Pool\PooledConnection` implements `Hypervel\Contracts\Pool\ConnectionInterface` directly and never consumes `PoolOption::getHeartbeat()`. Do not switch `PooledConnection` wholesale to `Hypervel\Pool\KeepaliveConnection`: that class uses a different `call()`-based lifecycle, makes `getConnection()` throw, stores the wrapped connection in a one-slot channel, treats `heartbeat <= 0` as a 10-second interval, and would bypass existing DB-specific release behavior such as state reset, transaction rollback, error-count handling, release events, and shared in-memory SQLite handling. Correct fix: keep `heartbeat => -1` as disabled with zero timer / ping overhead; when `heartbeat > 0`, have each worker-local `DbPool` start one timer for that pool, inspect only idle pooled connections, skip borrowed connections, close connections older than `max_idle_time`, and run a lightweight raw PDO ping such as `SELECT 1` on remaining idle connections without firing query events or mutating query logs / query-duration state. If the ping fails, close / discard the pooled connection so the next borrow creates a fresh connection. This is useful for long-lived workers behind load balancers, firewalls, NAT, or managed database proxies that drop idle TCP connections. +- Add max-lifetime recycling support for pooled database connections. Heartbeat keeps minimum idle connections validated for the worker lifetime, but rotating database poolers and managed proxies may still benefit from periodically replacing otherwise healthy long-lived sockets. Correct fix: add an opt-in pool option that recycles idle pooled DB connections older than the configured lifetime without affecting borrowed connections or normal query execution. ## Foundation diff --git a/src/contracts/src/Engine/CoroutineInterface.php b/src/contracts/src/Engine/CoroutineInterface.php index 7d2b4f0e1..3cb7c2cb3 100644 --- a/src/contracts/src/Engine/CoroutineInterface.php +++ b/src/contracts/src/Engine/CoroutineInterface.php @@ -61,10 +61,15 @@ public static function defer(callable $callable): void; public static function yield(): bool; /** - * Resume the coroutine by coroutine Id. + * Resume the coroutine by coroutine ID. */ public static function resumeById(int $id): bool; + /** + * Cancel the coroutine by coroutine ID. + */ + public static function cancelById(int $id, bool $throwException = false): bool; + /** * Get the coroutine stats. */ diff --git a/src/contracts/src/Pool/PoolOptionInterface.php b/src/contracts/src/Pool/PoolOptionInterface.php index b5ae4de7c..62a0c4c8e 100644 --- a/src/contracts/src/Pool/PoolOptionInterface.php +++ b/src/contracts/src/Pool/PoolOptionInterface.php @@ -31,6 +31,11 @@ public function getWaitTimeout(): float; */ public function getHeartbeat(): float; + /** + * Get the heartbeat timeout in seconds. + */ + public function getHeartbeatTimeout(): float; + /** * Get the maximum idle time in seconds before a connection is closed. */ diff --git a/src/database/src/Connection.php b/src/database/src/Connection.php index 3a09616fc..e3f6e7e13 100755 --- a/src/database/src/Connection.php +++ b/src/database/src/Connection.php @@ -977,6 +977,9 @@ public function resetForPool(): void // Reset record modification state $this->recordsModified = false; + + // Reset execution errors for the next borrow window. + $this->errorCount = 0; } /** diff --git a/src/database/src/Pool/DbPool.php b/src/database/src/Pool/DbPool.php index 3e9862a58..d1e583ed6 100644 --- a/src/database/src/Pool/DbPool.php +++ b/src/database/src/Pool/DbPool.php @@ -5,12 +5,16 @@ namespace Hypervel\Database\Pool; use Hypervel\Contracts\Container\Container; +use Hypervel\Contracts\Log\StdoutLoggerInterface; use Hypervel\Contracts\Pool\ConnectionInterface; +use Hypervel\Coordinator\Timer; use Hypervel\Pool\Frequency; use Hypervel\Pool\Pool; use Hypervel\Support\Arr; use InvalidArgumentException; use PDO; +use Psr\Log\LoggerInterface; +use Throwable; /** * Database connection pool. @@ -26,6 +30,12 @@ class DbPool extends Pool { protected array $config; + protected ?Timer $heartbeatTimer = null; + + protected ?int $heartbeatTimerId = null; + + protected int $heartbeatGeneration = 0; + /** * Shared PDO for in-memory SQLite. All pool slots must share the same PDO * instance, otherwise each would get its own empty database. @@ -52,11 +62,23 @@ public function __construct(Container $container, string $name) parent::__construct($container, $name, $poolOptions); + $this->heartbeatTimer = new Timer($this->getLogger()); + // For in-memory SQLite, pre-create a shared PDO so all pool slots // see the same database. This must happen after parent::__construct. if ($this->isInMemorySqlite()) { $this->sharedInMemorySqlitePdo = $this->createSharedInMemorySqlitePdo(); } + + $this->startHeartbeat(); + } + + /** + * Destroy the database pool. + */ + public function __destruct() + { + $this->clearHeartbeat(); } /** @@ -111,7 +133,137 @@ protected function isInMemorySqlite(): bool */ public function flushAll(): void { + $this->clearHeartbeat(); + parent::flushAll(); $this->sharedInMemorySqlitePdo = null; } + + /** + * Start the heartbeat timer if configured. + */ + protected function startHeartbeat(): void + { + if ($this->heartbeatTimer === null || $this->option->getHeartbeat() <= 0 || $this->sharedInMemorySqlitePdo !== null) { + return; + } + + $this->heartbeatTimerId = $this->heartbeatTimer->tick( + $this->option->getHeartbeat(), + function (bool $isClosing): ?string { + if ($isClosing) { + return Timer::STOP; + } + + $this->heartbeat(); + + return null; + } + ); + } + + /** + * Clear the heartbeat timer. + */ + protected function clearHeartbeat(): void + { + ++$this->heartbeatGeneration; + + if ($this->heartbeatTimer === null || $this->heartbeatTimerId === null) { + return; + } + + $this->heartbeatTimer->clear($this->heartbeatTimerId); + $this->heartbeatTimerId = null; + } + + /** + * Run one heartbeat sweep over currently idle connections. + */ + protected function heartbeat(): void + { + $connectionsToInspect = $this->getConnectionsInChannel(); + + for ($i = 0; $i < $connectionsToInspect; ++$i) { + $connection = $this->channel->pop(0.001); + + if (! $connection instanceof PooledConnection) { + break; + } + + $this->heartbeatConnection($connection); + } + } + + /** + * Heartbeat one idle connection. + */ + protected function heartbeatConnection(PooledConnection $connection): void + { + try { + if ($connection->isIdleExpired() && $this->currentConnections > $this->option->getMinConnections()) { + $this->discardHeartbeatConnection($connection); + + return; + } + + $heartbeatGeneration = $this->heartbeatGeneration; + + if ($connection->ping($this->option->getHeartbeatTimeout())) { + if ($heartbeatGeneration === $this->heartbeatGeneration) { + $this->release($connection); + } else { + $this->discardHeartbeatConnection($connection); + } + + return; + } + + $this->discardHeartbeatConnection($connection); + } catch (Throwable $exception) { + $this->logHeartbeatError('Database heartbeat failed: ' . $exception); + $this->discardHeartbeatConnection($connection); + } + } + + /** + * Discard an idle connection from the pool. + */ + protected function discardHeartbeatConnection(PooledConnection $connection): void + { + --$this->currentConnections; + + try { + if ($connection->hasOpenTransaction()) { + $this->logHeartbeatError('Database heartbeat found an idle connection with an open transaction.'); + } + + $connection->close(); + } catch (Throwable $exception) { + $this->logHeartbeatError('Database heartbeat close failed: ' . $exception); + } + } + + /** + * Log a heartbeat error without breaking pool cleanup. + */ + protected function logHeartbeatError(string $message): void + { + try { + $this->getLogger()?->error($message); + } catch (Throwable) { + } + } + + /** + * Get the logger instance if available. + */ + protected function getLogger(): ?LoggerInterface + { + if (! $this->container->has(StdoutLoggerInterface::class)) { + return null; + } + + return $this->container->make(StdoutLoggerInterface::class); + } } diff --git a/src/database/src/Pool/PooledConnection.php b/src/database/src/Pool/PooledConnection.php index 172dfb24d..384d6f79b 100644 --- a/src/database/src/Pool/PooledConnection.php +++ b/src/database/src/Pool/PooledConnection.php @@ -11,11 +11,17 @@ use Hypervel\Database\Connection; use Hypervel\Database\Connectors\ConnectionFactory; use Hypervel\Database\Events\ConnectionEstablished; +use Hypervel\Engine\Channel; +use Hypervel\Engine\Coroutine; use Hypervel\Pool\Events\ReleaseConnection; +use PDO; use Psr\Log\LoggerInterface; use RuntimeException; +use Swoole\Coroutine\CanceledException; use Throwable; +use function Hypervel\Coroutine\go; + /** * Wraps a database Connection for use with Hypervel's connection pool. * @@ -39,6 +45,8 @@ class PooledConnection implements PoolConnectionInterface protected float $lastReleaseTime = 0.0; + protected bool $invalid = false; + protected ?Dispatcher $dispatcher = null; public function __construct( @@ -126,6 +134,7 @@ public function reconnect(): bool } $this->lastUseTime = microtime(true); + $this->markValid(); return true; } @@ -135,6 +144,10 @@ public function reconnect(): bool */ public function check(): bool { + if ($this->invalid) { + return false; + } + if ($this->connection === null) { return false; } @@ -142,7 +155,7 @@ public function check(): bool $maxIdleTime = $this->pool->getOption()->getMaxIdleTime(); $now = microtime(true); - if ($now > $maxIdleTime + $this->lastUseTime) { + if ($now > $maxIdleTime + max($this->lastReleaseTime, $this->lastUseTime)) { return false; } @@ -151,6 +164,57 @@ public function check(): bool return true; } + /** + * Determine if this connection has been idle long enough to be evicted. + */ + public function isIdleExpired(?float $now = null): bool + { + if ($this->lastReleaseTime === 0.0) { + return false; + } + + return ($now ?? microtime(true)) > $this->pool->getOption()->getMaxIdleTime() + $this->lastReleaseTime; + } + + /** + * Ping already-open PDO connections. + */ + public function ping(float $timeout): bool + { + if ($this->invalid || ! $this->connection instanceof Connection) { + return false; + } + + $pdos = $this->getOpenPdos(); + + if ($pdos === []) { + return true; + } + + $result = new Channel(1); + + $started = go(static function () use ($pdos, $result) { + try { + $result->push(self::pingPdos($pdos), 0.0); + } catch (CanceledException) { + } + }); + + if ($started === false) { + return false; + } + + if ($result->pop($timeout) !== true) { + Coroutine::cancelById($started, throwException: true); + + return false; + } + + $this->lastUseTime = microtime(true); + + return true; + } + /** * Close the database connection. */ @@ -176,13 +240,15 @@ public function release(): void { try { if ($this->connection instanceof Connection) { + $errorCount = $this->connection->getErrorCount(); + // Reset all per-request state to prevent leaks between coroutines $this->connection->resetForPool(); // Check error count and mark as stale if too high - if ($this->connection->getErrorCount() > self::MAX_ERROR_COUNT) { + if ($errorCount > self::MAX_ERROR_COUNT) { $this->logger->warning('Connection has too many errors, marking as stale.'); - $this->lastUseTime = 0.0; + $this->markInvalid(); } // Roll back any uncommitted transactions (including nested savepoints) @@ -202,7 +268,7 @@ public function release(): void } catch (Throwable $exception) { $this->logger->error('Release connection failed: ' . $exception); // Mark as stale so it will be recreated - $this->lastUseTime = 0.0; + $this->markInvalid(); } finally { $this->pool->release($this); } @@ -224,6 +290,85 @@ public function getLastReleaseTime(): float return $this->lastReleaseTime; } + /** + * Determine if the underlying connection has an open transaction. + */ + public function hasOpenTransaction(): bool + { + return $this->connection instanceof Connection + && $this->connection->transactionLevel() > 0; + } + + /** + * Mark the connection as invalid. + */ + protected function markInvalid(): void + { + $this->invalid = true; + } + + /** + * Mark the connection as valid. + */ + protected function markValid(): void + { + $this->invalid = false; + } + + /** + * Get already-open PDO instances. + * + * @return PDO[] + */ + protected function getOpenPdos(): array + { + if (! $this->connection instanceof Connection) { + return []; + } + + $writePdo = $this->connection->getRawPdo(); + $readPdo = $this->connection->getRawReadPdo(); + $pdos = []; + + if ($writePdo instanceof PDO) { + $pdos[] = $writePdo; + } + + if ($readPdo instanceof PDO && $readPdo !== $writePdo) { + $pdos[] = $readPdo; + } + + return $pdos; + } + + /** + * Ping PDO instances. + * + * @param PDO[] $pdos + */ + protected static function pingPdos(array $pdos): bool + { + try { + foreach ($pdos as $pdo) { + $statement = $pdo->query('SELECT 1'); + + if ($statement === false) { + return false; + } + + $statement->closeCursor(); + } + + return true; + } catch (Throwable $exception) { + if ($exception instanceof CanceledException) { + throw $exception; + } + + return false; + } + } + /** * Refresh the PDO connections. */ diff --git a/src/engine/src/Coroutine.php b/src/engine/src/Coroutine.php index 916d6b767..2f045933b 100644 --- a/src/engine/src/Coroutine.php +++ b/src/engine/src/Coroutine.php @@ -132,6 +132,15 @@ public static function resumeById(int $id): bool return SwooleCo::resume($id); } + /** + * Cancel a coroutine by ID. + */ + public static function cancelById(int $id, bool $throwException = false): bool + { + /* @phpstan-ignore arguments.count (@TODO: Remove once PHPStan's bundled JetBrains Swoole stub includes Swoole 6.2's throw_exception parameter.) */ + return SwooleCo::cancel($id, $throwException); + } + /** * Get the coroutine statistics. */ @@ -143,7 +152,7 @@ public static function stats(): array /** * Determine if a coroutine exists. */ - public static function exists(?int $id = null): bool + public static function exists(int $id): bool { return SwooleCo::exists($id); } diff --git a/src/foundation/config/database.php b/src/foundation/config/database.php index 18264443d..60a304456 100644 --- a/src/foundation/config/database.php +++ b/src/foundation/config/database.php @@ -19,6 +19,17 @@ 'default' => env('DB_CONNECTION', 'sqlite'), + /* + |-------------------------------------------------------------------------- + | Database Connection Pools + |-------------------------------------------------------------------------- + | + | Database connections may define a "pool" array for long-lived workers. + | Heartbeats are disabled with -1; positive values keep the minimum + | idle connections validated without firing query events or logs. + | + */ + /* |-------------------------------------------------------------------------- | Database Connections @@ -69,6 +80,7 @@ 'connect_timeout' => 10.0, 'wait_timeout' => 3.0, 'heartbeat' => -1, + 'heartbeat_timeout' => 1.0, 'max_idle_time' => (float) env('DB_MAX_IDLE_TIME', 60), ], ], @@ -97,6 +109,7 @@ 'connect_timeout' => 10.0, 'wait_timeout' => 3.0, 'heartbeat' => -1, + 'heartbeat_timeout' => 1.0, 'max_idle_time' => (float) env('DB_MAX_IDLE_TIME', 60), ], ], @@ -123,6 +136,7 @@ 'connect_timeout' => 10.0, 'wait_timeout' => 3.0, 'heartbeat' => -1, + 'heartbeat_timeout' => 1.0, 'max_idle_time' => (float) env('DB_MAX_IDLE_TIME', 60), ], ], @@ -150,6 +164,7 @@ 'connect_timeout' => 10.0, 'wait_timeout' => 3.0, 'heartbeat' => -1, + 'heartbeat_timeout' => 1.0, 'max_idle_time' => (float) env('DB_POOLED_MAX_IDLE_TIME', 60), ], ], diff --git a/src/pool/src/Connection.php b/src/pool/src/Connection.php index 663ecd9ee..677216013 100644 --- a/src/pool/src/Connection.php +++ b/src/pool/src/Connection.php @@ -24,6 +24,8 @@ abstract class Connection implements ConnectionInterface protected float $lastReleaseTime = 0.0; + protected bool $invalid = false; + private ?Dispatcher $dispatcher = null; private ?StdoutLoggerInterface $logger = null; @@ -79,10 +81,14 @@ public function getConnection(): mixed */ public function check(): bool { + if ($this->invalid) { + return false; + } + $maxIdleTime = $this->pool->getOption()->getMaxIdleTime(); $now = microtime(true); - if ($now > $maxIdleTime + $this->lastUseTime) { + if ($now > $maxIdleTime + max($this->lastReleaseTime, $this->lastUseTime)) { return false; } @@ -107,6 +113,22 @@ public function getLastReleaseTime(): float return $this->lastReleaseTime; } + /** + * Mark the connection as invalid. + */ + protected function markInvalid(): void + { + $this->invalid = true; + } + + /** + * Mark the connection as valid. + */ + protected function markValid(): void + { + $this->invalid = false; + } + /** * Get the active connection, reconnecting if necessary. */ diff --git a/src/pool/src/Pool.php b/src/pool/src/Pool.php index b9689cd93..65c622dfc 100644 --- a/src/pool/src/Pool.php +++ b/src/pool/src/Pool.php @@ -171,6 +171,7 @@ protected function initOption(array $options = []): void connectTimeout: $options['connect_timeout'] ?? 10.0, waitTimeout: $options['wait_timeout'] ?? 3.0, heartbeat: $options['heartbeat'] ?? -1, + heartbeatTimeout: $options['heartbeat_timeout'] ?? 1.0, maxIdleTime: $options['max_idle_time'] ?? 60.0, events: $options['events'] ?? [], ); diff --git a/src/pool/src/PoolOption.php b/src/pool/src/PoolOption.php index 214cbaa10..7606271b3 100644 --- a/src/pool/src/PoolOption.php +++ b/src/pool/src/PoolOption.php @@ -17,6 +17,7 @@ class PoolOption implements PoolOptionInterface * @param float $connectTimeout Timeout in seconds for establishing a connection * @param float $waitTimeout Timeout in seconds for waiting to get a connection from pool * @param float $heartbeat Heartbeat interval in seconds (-1 to disable) + * @param float $heartbeatTimeout Heartbeat timeout in seconds * @param float $maxIdleTime Maximum idle time in seconds before connection is closed * @param array $events Events to trigger on connection lifecycle */ @@ -26,6 +27,7 @@ public function __construct( private float $connectTimeout = 10.0, private float $waitTimeout = 3.0, private float $heartbeat = -1, + private float $heartbeatTimeout = 1.0, private float $maxIdleTime = 60.0, private array $events = [], ) { @@ -93,6 +95,11 @@ public function getHeartbeat(): float return $this->heartbeat; } + public function getHeartbeatTimeout(): float + { + return $this->heartbeatTimeout; + } + /** * Set the heartbeat interval in seconds. * @@ -107,6 +114,20 @@ public function setHeartbeat(float $heartbeat): static return $this; } + /** + * Set the heartbeat timeout in seconds. + * + * Boot-only. The value persists on the worker-lifetime pool option and is + * read by every subsequent pool operation. Per-request use races across + * coroutines. + */ + public function setHeartbeatTimeout(float $heartbeatTimeout): static + { + $this->heartbeatTimeout = $heartbeatTimeout; + + return $this; + } + public function getWaitTimeout(): float { return $this->waitTimeout; diff --git a/src/redis/src/PhpRedisClusterConnection.php b/src/redis/src/PhpRedisClusterConnection.php index fb0096033..d28f61f1a 100644 --- a/src/redis/src/PhpRedisClusterConnection.php +++ b/src/redis/src/PhpRedisClusterConnection.php @@ -37,6 +37,7 @@ public function reconnect(): bool // RedisCluster doesn't support select(), no database selection. $this->connection = $redis; + $this->markValid(); $this->lastUseTime = microtime(true); if (($this->config['event']['enable'] ?? false) && $this->container->bound('events')) { diff --git a/src/redis/src/PhpRedisConnection.php b/src/redis/src/PhpRedisConnection.php index 1a021f1f9..b64afc57a 100644 --- a/src/redis/src/PhpRedisConnection.php +++ b/src/redis/src/PhpRedisConnection.php @@ -59,6 +59,7 @@ public function reconnect(): bool } $this->connection = $redis; + $this->markValid(); $this->lastUseTime = microtime(true); if (($this->config['event']['enable'] ?? false) && $this->container->bound('events')) { diff --git a/src/redis/src/RedisConnection.php b/src/redis/src/RedisConnection.php index b2f775c1e..ccb238ad2 100644 --- a/src/redis/src/RedisConnection.php +++ b/src/redis/src/RedisConnection.php @@ -623,7 +623,7 @@ protected function retry(string $name, array $arguments, RedisException $excepti return $this->executeCommand($name, $arguments); } catch (Throwable $exception) { - $this->lastUseTime = 0.0; + $this->markInvalid(); throw $exception; } } diff --git a/tests/Engine/CoroutineTest.php b/tests/Engine/CoroutineTest.php index 47709453d..25fbfda94 100644 --- a/tests/Engine/CoroutineTest.php +++ b/tests/Engine/CoroutineTest.php @@ -10,11 +10,12 @@ use Hypervel\Engine\Coroutine; use Hypervel\Engine\Exceptions\CoroutineDestroyedException; use Hypervel\Tests\TestCase; +use Swoole\Coroutine\CanceledException; use Throwable; class CoroutineTest extends TestCase { - public function testCoroutineCreate() + public function testCoroutineCreate(): void { $coroutine = new Coroutine(function () { $this->assertTrue(true); @@ -26,7 +27,7 @@ public function testCoroutineCreate() $this->assertIsInt($coroutine->getId()); } - public function testCoroutineCreateStatic() + public function testCoroutineCreateStatic(): void { $coroutine = Coroutine::create(function () { $this->assertTrue(true); @@ -36,7 +37,7 @@ public function testCoroutineCreateStatic() $this->assertIsInt($coroutine->getId()); } - public function testCoroutineContext() + public function testCoroutineContext(): void { $id = uniqid(); $coroutine = Coroutine::create(function () use ($id) { @@ -54,13 +55,13 @@ public function testCoroutineContext() $this->assertNull(Coroutine::getContextFor($coroutine->getId())); } - public function testCoroutineId() + public function testCoroutineId(): void { $this->assertIsInt($id = Coroutine::id()); $this->assertGreaterThan(0, $id); } - public function testCoroutinePid() + public function testCoroutinePid(): void { $pid = Coroutine::id(); Coroutine::create(function () use ($pid) { @@ -77,7 +78,7 @@ public function testCoroutinePid() }); } - public function testCoroutinePidHasBeenDestroyed() + public function testCoroutinePidHasBeenDestroyed(): void { $co = Coroutine::create(function () { }); @@ -90,12 +91,12 @@ public function testCoroutinePidHasBeenDestroyed() } } - public function testCoroutineInTopCoroutine() + public function testCoroutineInTopCoroutine(): void { $this->assertSame(0, Coroutine::pid()); } - public function testCoroutineDefer() + public function testCoroutineDefer(): void { $channel = new Channel(2); Coroutine::create(function () use ($channel) { @@ -110,7 +111,7 @@ public function testCoroutineDefer() $this->assertSame(2, $channel->pop()); } - public function testTheOrderForCoroutineDefer() + public function testTheOrderForCoroutineDefer(): void { $channel = new Channel(3); Coroutine::create(function () use ($channel) { @@ -129,7 +130,7 @@ public function testTheOrderForCoroutineDefer() $this->assertSame(2, $channel->pop()); } - public function testCoroutineResumeById() + public function testCoroutineResumeById(): void { $channel = new Channel(10); Coroutine::create(function () use ($channel) { @@ -151,14 +152,35 @@ public function testCoroutineResumeById() $this->assertSame(5, $channel->pop()); } - public function testCoroutineList() + public function testCoroutineCancelById(): void + { + $channel = new Channel(2); + $coroutine = Coroutine::create(function () use ($channel) { + try { + $channel->push(1); + usleep(100000); + $channel->push(2); + } catch (CanceledException) { + $channel->push('cancelled'); + } + }); + + $this->assertSame(1, $channel->pop()); + $this->assertTrue(Coroutine::exists($coroutine->getId())); + $this->assertTrue(Coroutine::cancelById($coroutine->getId(), throwException: true)); + $this->assertFalse(Coroutine::exists($coroutine->getId())); + $this->assertSame('cancelled', $channel->pop(0.01)); + $this->assertFalse($channel->pop(0.01)); + } + + public function testCoroutineList(): void { $list = Coroutine::list(); $this->assertIsIterable($list); $this->assertContains(Coroutine::id(), $list); } - public function testCoroutineListCount() + public function testCoroutineListCount(): void { Coroutine::create(function () { usleep(100000); diff --git a/tests/Integration/Database/PooledConnectionTest.php b/tests/Integration/Database/PooledConnectionTest.php index 68747f250..5c25dbb33 100644 --- a/tests/Integration/Database/PooledConnectionTest.php +++ b/tests/Integration/Database/PooledConnectionTest.php @@ -275,6 +275,62 @@ public function testLastUseTimeUpdatedOnCheck(): void $this->assertGreaterThan($initialTime, $pooledConnection->getLastUseTime()); } + public function testInvalidConnectionReconnectsEvenWithFreshReleaseTime(): void + { + $pool = new DbPool($this->app, 'pool_test'); + $pooledConnection = $this->createPooledConnection($pool); + $originalConnection = $pooledConnection->getConnection(); + + (new ReflectionProperty(PooledConnection::class, 'invalid'))->setValue($pooledConnection, true); + (new ReflectionProperty(PooledConnection::class, 'lastReleaseTime'))->setValue($pooledConnection, microtime(true)); + + $this->assertNotSame($originalConnection, $pooledConnection->getActiveConnection()); + } + + public function testReleaseSnapshotsErrorCountBeforeResettingConnection(): void + { + $pool = new DbPool($this->app, 'pool_test'); + + /** @var PooledConnection $pooledConnection */ + $pooledConnection = $pool->get(); + $connection = $pooledConnection->getConnection(); + + (new ReflectionProperty(Connection::class, 'errorCount'))->setValue($connection, 101); + + $pooledConnection->release(); + + $this->assertSame(0, $connection->getErrorCount()); + + /** @var PooledConnection $nextPooledConnection */ + $nextPooledConnection = $pool->get(); + + $this->assertNotSame($connection, $nextPooledConnection->getConnection()); + + $nextPooledConnection->release(); + } + + public function testReleaseResetsErrorCountForNextBorrowWindow(): void + { + $pool = new DbPool($this->app, 'pool_test'); + + /** @var PooledConnection $pooledConnection */ + $pooledConnection = $pool->get(); + $connection = $pooledConnection->getConnection(); + + (new ReflectionProperty(Connection::class, 'errorCount'))->setValue($connection, 1); + + $pooledConnection->release(); + + $this->assertSame(0, $connection->getErrorCount()); + + /** @var PooledConnection $nextPooledConnection */ + $nextPooledConnection = $pool->get(); + + $this->assertSame($connection, $nextPooledConnection->getConnection()); + + $nextPooledConnection->release(); + } + public function testSharedPdoForInMemorySqlite(): void { $pool = new DbPool($this->app, 'pool_test'); diff --git a/tests/Integration/Database/Sqlite/DbPoolHeartbeatTest.php b/tests/Integration/Database/Sqlite/DbPoolHeartbeatTest.php new file mode 100644 index 000000000..321a8573e --- /dev/null +++ b/tests/Integration/Database/Sqlite/DbPoolHeartbeatTest.php @@ -0,0 +1,481 @@ +make('config')->set('app.stdout_log.level', []); + } + + protected function setUp(): void + { + parent::setUp(); + + $this->databasePath = sys_get_temp_dir() . '/hypervel_db_pool_heartbeat_' . getmypid() . '_' . spl_object_id($this) . '.sqlite'; + touch($this->databasePath); + + $this->app->instance('db.connector.sqlite', new SQLiteConnector); + } + + protected function tearDown(): void + { + foreach ($this->pools as $pool) { + run(fn () => $pool->flushAll()); + } + + if (file_exists($this->databasePath)) { + @unlink($this->databasePath); + } + + parent::tearDown(); + } + + public function testDisabledHeartbeatDoesNotStartTimer(): void + { + $pool = $this->createPool([ + 'heartbeat' => -1, + ]); + + $this->assertSame(0, $pool->heartbeatTimerClosureCount()); + } + + public function testEnabledHeartbeatStartsTimerAndFlushAllClearsIt(): void + { + $pool = $this->createPool([ + 'heartbeat' => 0.001, + ]); + + $this->assertSame(1, $pool->heartbeatTimerClosureCount()); + + run(fn () => $pool->flushAll()); + + $this->assertSame(0, $pool->heartbeatTimerClosureCount()); + } + + public function testHeartbeatKeepsMinimumConnectionsWarmAndEvictsExpiredExtras(): void + { + run(function () { + $pool = $this->createPool([ + 'min_connections' => 1, + 'max_connections' => 3, + 'heartbeat' => -1, + 'max_idle_time' => 1.0, + ]); + + $connections = [ + $pool->get(), + $pool->get(), + $pool->get(), + ]; + + foreach ($connections as $connection) { + $connection->getConnection()->getPdo(); + $connection->release(); + $this->ageReleasedConnection($connection); + } + + $pool->runHeartbeatForTest(); + + $this->assertSame(1, $pool->getCurrentConnections()); + $this->assertSame(1, $pool->getConnectionsInChannel()); + }); + } + + public function testHeartbeatValidationKeepsMinimumConnectionCheckoutValid(): void + { + run(function () { + $pool = $this->createPool([ + 'min_connections' => 1, + 'max_connections' => 1, + 'heartbeat' => -1, + 'max_idle_time' => 1.0, + ]); + + $pooledConnection = $pool->get(); + $connection = $pooledConnection->getConnection(); + $pdo = $connection->getPdo(); + + $pooledConnection->release(); + $this->ageReleasedConnection($pooledConnection); + + $pool->runHeartbeatForTest(); + + /** @var PooledConnection $nextPooledConnection */ + $nextPooledConnection = $pool->get(); + + $this->assertSame($connection, $nextPooledConnection->getConnection()); + $this->assertSame($pdo, $nextPooledConnection->getConnection()->getPdo()); + + $nextPooledConnection->release(); + }); + } + + public function testHeartbeatDoesNotRealizeLazyPdoClosures(): void + { + run(function () { + $pool = $this->createPool([ + 'heartbeat' => -1, + ]); + + $pooledConnection = $pool->get(); + $connection = $pooledConnection->getConnection(); + + $this->assertInstanceOf(Closure::class, $connection->getRawPdo()); + + $pooledConnection->release(); + $pool->runHeartbeatForTest(); + + $this->assertInstanceOf(Closure::class, $connection->getRawPdo()); + }); + } + + public function testHeartbeatPingDoesNotFireQueryInstrumentation(): void + { + run(function () { + $pool = $this->createPool([ + 'heartbeat' => -1, + ]); + + $events = 0; + $this->app->make(Dispatcher::class)->listen(QueryExecuted::class, function () use (&$events) { + ++$events; + }); + + $pooledConnection = $pool->get(); + $connection = $pooledConnection->getConnection(); + $connection->getPdo(); + $pooledConnection->release(); + + $connection->enableQueryLog(); + $connection->whenQueryingForLongerThan(-1, function () use (&$events) { + ++$events; + }); + + $pool->runHeartbeatForTest(); + + $this->assertSame(0, $events); + $this->assertSame([], $connection->getQueryLog()); + $this->assertSame(0.0, $connection->totalQueryDuration()); + }); + } + + public function testHeartbeatOnlyTouchesIdleConnections(): void + { + run(function () { + $pool = $this->createPool([ + 'min_connections' => 1, + 'max_connections' => 2, + 'heartbeat' => -1, + 'max_idle_time' => 1.0, + ]); + + $borrowed = $pool->get(); + $idle = $pool->get(); + $idle->getConnection()->getPdo(); + $idle->release(); + $this->ageReleasedConnection($idle); + + $pool->runHeartbeatForTest(); + + $this->assertSame(1, $borrowed->getConnection()->selectOne('SELECT 1 as result')->result); + $this->assertSame(1, $pool->getCurrentConnections()); + $this->assertSame(0, $pool->getConnectionsInChannel()); + + $borrowed->release(); + }); + } + + public function testFailedHeartbeatPingDiscardsConnectionBelowMinimum(): void + { + run(function () { + $pool = $this->createPool([ + 'min_connections' => 1, + 'max_connections' => 1, + 'heartbeat' => -1, + ], FailingHeartbeatDbPool::class); + + $pooledConnection = $pool->get(); + $pooledConnection->release(); + + $pool->runHeartbeatForTest(); + + $this->assertSame(0, $pool->getCurrentConnections()); + $this->assertSame(0, $pool->getConnectionsInChannel()); + }); + } + + public function testHeartbeatDiscardsInvalidIdleConnectionBelowMinimum(): void + { + run(function () { + $pool = $this->createPool([ + 'min_connections' => 1, + 'max_connections' => 1, + 'heartbeat' => -1, + ]); + + $pooledConnection = $pool->get(); + $pooledConnection->release(); + + (new ReflectionProperty(PooledConnection::class, 'invalid'))->setValue($pooledConnection, true); + + $pool->runHeartbeatForTest(); + + $this->assertSame(0, $pool->getCurrentConnections()); + $this->assertSame(0, $pool->getConnectionsInChannel()); + }); + } + + public function testHeartbeatPingTimeoutDiscardsWithoutRequeueingLateCompletion(): void + { + run(function () { + SlowHeartbeatPdo::$coroutineId = null; + + $pool = $this->createPool([ + 'min_connections' => 1, + 'max_connections' => 1, + 'heartbeat' => -1, + 'heartbeat_timeout' => 0.001, + ], SlowHeartbeatDbPool::class); + + $pooledConnection = $pool->get(); + $pooledConnection->release(); + + $startedAt = microtime(true); + $pool->runHeartbeatForTest(); + $elapsed = microtime(true) - $startedAt; + + $this->assertLessThan(0.2, $elapsed); + $this->assertSame(0, $pool->getCurrentConnections()); + $this->assertSame(0, $pool->getConnectionsInChannel()); + $this->assertIsInt(SlowHeartbeatPdo::$coroutineId); + $deadline = microtime(true) + 0.1; + while (Coroutine::exists(SlowHeartbeatPdo::$coroutineId) && microtime(true) < $deadline) { + usleep(1000); + } + $this->assertFalse(Coroutine::exists(SlowHeartbeatPdo::$coroutineId)); + + usleep(100000); + + $this->assertSame(0, $pool->getConnectionsInChannel()); + }); + } + + public function testSuccessfulHeartbeatPingAfterFlushDiscardsConnection(): void + { + run(function () { + $pool = $this->createPool([ + 'min_connections' => 1, + 'max_connections' => 1, + 'heartbeat' => -1, + ], FlushingHeartbeatDbPool::class); + + $pooledConnection = $pool->get(); + $pooledConnection->release(); + + $pool->runHeartbeatForTest(); + + $this->assertSame(0, $pool->getCurrentConnections()); + $this->assertSame(0, $pool->getConnectionsInChannel()); + }); + } + + public function testHeartbeatDiscardOnlyDecrementsOnceWhenLoggerThrows(): void + { + run(function () { + $this->app->instance(StdoutLoggerInterface::class, new ThrowingHeartbeatLogger); + + $pool = $this->createPool([ + 'min_connections' => 1, + 'max_connections' => 1, + 'heartbeat' => -1, + ], OpenTransactionFailingHeartbeatDbPool::class); + + $pooledConnection = $pool->get(); + $pooledConnection->release(); + + $pool->runHeartbeatForTest(); + + $this->assertSame(0, $pool->getCurrentConnections()); + $this->assertSame(0, $pool->getConnectionsInChannel()); + }); + } + + /** + * @param array $poolOptions + */ + protected function createPool(array $poolOptions = [], string $poolClass = InspectableHeartbeatDbPool::class): InspectableHeartbeatDbPool + { + $this->app->make('config')->set('database.connections.heartbeat_test', [ + 'driver' => 'sqlite', + 'database' => $this->databasePath, + 'prefix' => '', + 'pool' => [ + 'min_connections' => 1, + 'max_connections' => 2, + 'connect_timeout' => 10.0, + 'wait_timeout' => 3.0, + 'heartbeat' => -1, + 'heartbeat_timeout' => 1.0, + 'max_idle_time' => 60.0, + ...$poolOptions, + ], + ]); + + $pool = new $poolClass($this->app, 'heartbeat_test'); + $this->pools[] = $pool; + + return $pool; + } + + protected function ageReleasedConnection(PooledConnection $connection): void + { + $lastReleaseTime = new ReflectionProperty(PooledConnection::class, 'lastReleaseTime'); + $lastUseTime = new ReflectionProperty(PooledConnection::class, 'lastUseTime'); + + $lastReleaseTime->setValue($connection, microtime(true) - 5.0); + $lastUseTime->setValue($connection, microtime(true) - 5.0); + } +} + +class InspectableHeartbeatDbPool extends DbPool +{ + public function runHeartbeatForTest(): void + { + $this->heartbeat(); + } + + public function heartbeatTimerClosureCount(): int + { + $timer = (new ReflectionProperty(DbPool::class, 'heartbeatTimer'))->getValue($this); + + return $timer === null ? 0 : count((new ClassInvoker($timer))->closures); + } +} + +class FailingHeartbeatDbPool extends InspectableHeartbeatDbPool +{ + protected function createConnection(): ConnectionInterface + { + return new FailingHeartbeatPooledConnection($this->container, $this, $this->config); + } +} + +class FailingHeartbeatPooledConnection extends PooledConnection +{ + public function ping(float $timeout): bool + { + return false; + } +} + +class FlushingHeartbeatDbPool extends InspectableHeartbeatDbPool +{ + protected function createConnection(): ConnectionInterface + { + return new FlushingHeartbeatPooledConnection($this->container, $this, $this->config); + } +} + +class FlushingHeartbeatPooledConnection extends PooledConnection +{ + public function ping(float $timeout): bool + { + $this->pool->flushAll(); + + return true; + } +} + +class OpenTransactionFailingHeartbeatDbPool extends InspectableHeartbeatDbPool +{ + protected function createConnection(): ConnectionInterface + { + return new OpenTransactionFailingHeartbeatPooledConnection($this->container, $this, $this->config); + } +} + +class OpenTransactionFailingHeartbeatPooledConnection extends FailingHeartbeatPooledConnection +{ + public function hasOpenTransaction(): bool + { + return true; + } +} + +class ThrowingHeartbeatLogger extends AbstractLogger implements StdoutLoggerInterface +{ + public function log($level, string|Stringable $message, array $context = []): void + { + throw new RuntimeException('Logger failed.'); + } +} + +class SlowHeartbeatDbPool extends InspectableHeartbeatDbPool +{ + protected function createConnection(): ConnectionInterface + { + return new SlowHeartbeatPooledConnection($this->container, $this, $this->config); + } +} + +class SlowHeartbeatPooledConnection extends PooledConnection +{ + protected function getOpenPdos(): array + { + return [new SlowHeartbeatPdo]; + } +} + +class SlowHeartbeatPdo extends PDO +{ + public static ?int $coroutineId = null; + + public function __construct() + { + } + + public function query(string $query, ?int $fetchMode = null, mixed ...$fetchModeArgs): PDOStatement|false + { + self::$coroutineId = Coroutine::id(); + + usleep(500000); + + return false; + } +} diff --git a/tests/Redis/RedisConnectionTest.php b/tests/Redis/RedisConnectionTest.php index 508a33b8f..91c0e04ae 100644 --- a/tests/Redis/RedisConnectionTest.php +++ b/tests/Redis/RedisConnectionTest.php @@ -259,7 +259,7 @@ public function testPipelineModeBypassesTransformedSet(): void $this->assertSame($redis, $result); } - public function testQueueingModeReshapesSetArgumentsButPreservesRawQueuedReturn() + public function testQueueingModeReshapesSetArgumentsButPreservesRawQueuedReturn(): void { $connection = $this->mockRedisConnection(transform: true); $redis = m::mock(Redis::class); @@ -277,7 +277,7 @@ public function testQueueingModeReshapesSetArgumentsButPreservesRawQueuedReturn( $this->assertSame($redis, $result); } - public function testQueueingModeReshapesHmsetArgumentsButPreservesRawQueuedReturn() + public function testQueueingModeReshapesHmsetArgumentsButPreservesRawQueuedReturn(): void { $connection = $this->mockRedisConnection(transform: true); $redis = m::mock(Redis::class); @@ -295,7 +295,7 @@ public function testQueueingModeReshapesHmsetArgumentsButPreservesRawQueuedRetur $this->assertSame($redis, $result); } - public function testQueueingModeReshapesLremArgumentsButPreservesRawQueuedReturn() + public function testQueueingModeReshapesLremArgumentsButPreservesRawQueuedReturn(): void { $connection = $this->mockRedisConnection(transform: true); $redis = m::mock(Redis::class); @@ -313,7 +313,7 @@ public function testQueueingModeReshapesLremArgumentsButPreservesRawQueuedReturn $this->assertSame($redis, $result); } - public function testQueueingModeReshapesZaddArgumentsButPreservesRawQueuedReturn() + public function testQueueingModeReshapesZaddArgumentsButPreservesRawQueuedReturn(): void { $connection = $this->mockRedisConnection(transform: true); $redis = m::mock(Redis::class); @@ -331,7 +331,7 @@ public function testQueueingModeReshapesZaddArgumentsButPreservesRawQueuedReturn $this->assertSame($redis, $result); } - public function testQueueingModeReshapesZrangebyscoreArgumentsButPreservesRawQueuedReturn() + public function testQueueingModeReshapesZrangebyscoreArgumentsButPreservesRawQueuedReturn(): void { $connection = $this->mockRedisConnection(transform: true); $redis = m::mock(Redis::class); @@ -349,7 +349,7 @@ public function testQueueingModeReshapesZrangebyscoreArgumentsButPreservesRawQue $this->assertSame($redis, $result); } - public function testQueueingModeReshapesZinterstoreArgumentsButPreservesRawQueuedReturn() + public function testQueueingModeReshapesZinterstoreArgumentsButPreservesRawQueuedReturn(): void { $connection = $this->mockRedisConnection(transform: true); $redis = m::mock(Redis::class); @@ -367,7 +367,7 @@ public function testQueueingModeReshapesZinterstoreArgumentsButPreservesRawQueue $this->assertSame($redis, $result); } - public function testPrepareEvalShapesArgumentsForQueueingMode() + public function testPrepareEvalShapesArgumentsForQueueingMode(): void { $connection = new class extends PhpRedisConnectionStub { public function prepareEvalForTest(mixed ...$arguments): array @@ -382,7 +382,7 @@ public function prepareEvalForTest(mixed ...$arguments): array ); } - public function testPrepareEvalshaFallsBackToEvalForQueueingMode() + public function testPrepareEvalshaFallsBackToEvalForQueueingMode(): void { $connection = new class extends PhpRedisConnectionStub { public function prepareEvalshaForTest(mixed ...$arguments): array @@ -397,7 +397,7 @@ public function prepareEvalshaForTest(mixed ...$arguments): array ); } - public function testQueueingModeReshapesExecuteRawArgumentsButPreservesRawQueuedReturn() + public function testQueueingModeReshapesExecuteRawArgumentsButPreservesRawQueuedReturn(): void { $connection = $this->mockRedisConnection(transform: true); $redis = m::mock(Redis::class); @@ -1638,7 +1638,7 @@ public function testSpopWithoutCountReturnsFalseForEmptySet(): void $this->assertFalse($result); } - public function testEvalReordersArguments() + public function testEvalReordersArguments(): void { // Can't mock eval() on phpredis — Mockery's proxy falls through to the // C extension which tries a real connection. Instead, override callEval @@ -1682,7 +1682,7 @@ protected function callEval(string $script, int $numberOfKeys, mixed ...$argumen $this->assertSame(['mykey'], $captured['arguments']); } - public function testEvalReordersMultipleArguments() + public function testEvalReordersMultipleArguments(): void { $captured = []; $connection = new class($this->getContainer(), $this->getMockedPool(), [], $captured) extends PhpRedisConnection { @@ -1723,7 +1723,7 @@ protected function callEval(string $script, int $numberOfKeys, mixed ...$argumen $this->assertSame(['mykey', 'myarg'], $captured['arguments']); } - public function testEvalWithNoKeysOrArguments() + public function testEvalWithNoKeysOrArguments(): void { $captured = []; $connection = new class($this->getContainer(), $this->getMockedPool(), [], $captured) extends PhpRedisConnection { @@ -1764,7 +1764,7 @@ protected function callEval(string $script, int $numberOfKeys, mixed ...$argumen $this->assertSame([], $captured['arguments']); } - public function testSubscribeThrowsOnPooledConnection() + public function testSubscribeThrowsOnPooledConnection(): void { $connection = $this->mockRedisConnection(); @@ -1774,7 +1774,7 @@ public function testSubscribeThrowsOnPooledConnection() $connection->__call('subscribe', [['channel1'], function () {}]); } - public function testPsubscribeThrowsOnPooledConnection() + public function testPsubscribeThrowsOnPooledConnection(): void { $connection = $this->mockRedisConnection(); @@ -1784,7 +1784,7 @@ public function testPsubscribeThrowsOnPooledConnection() $connection->__call('psubscribe', [['channel:*'], function () {}]); } - public function testReconnectSetsSerializerOption() + public function testReconnectSetsSerializerOption(): void { $pool = $this->getMockedPool(); $redis = m::mock(Redis::class); @@ -1809,7 +1809,7 @@ protected function createRedis(array $config): Redis }; } - public function testReconnectSetsPrefixOption() + public function testReconnectSetsPrefixOption(): void { $pool = $this->getMockedPool(); $redis = m::mock(Redis::class); @@ -1834,7 +1834,7 @@ protected function createRedis(array $config): Redis }; } - public function testReconnectSetsConnectionLevelPhpRedisOptions() + public function testReconnectSetsConnectionLevelPhpRedisOptions(): void { $pool = $this->getMockedPool(); $redis = m::mock(Redis::class); @@ -1871,7 +1871,7 @@ protected function createRedis(array $config): Redis }; } - public function testReconnectDoesNotSetReadTimeoutOptionWhenEmpty() + public function testReconnectDoesNotSetReadTimeoutOptionWhenEmpty(): void { $pool = $this->getMockedPool(); $redis = m::mock(Redis::class); @@ -1894,7 +1894,7 @@ protected function createRedis(array $config): Redis }; } - public function testReconnectSetsNumericBackoffAlgorithmAsIs() + public function testReconnectSetsNumericBackoffAlgorithmAsIs(): void { $pool = $this->getMockedPool(); $redis = m::mock(Redis::class); @@ -1919,7 +1919,7 @@ protected function createRedis(array $config): Redis }; } - public function testReconnectThrowsOnUnknownBackoffAlgorithm() + public function testReconnectThrowsOnUnknownBackoffAlgorithm(): void { $pool = $this->getMockedPool(); $redis = m::mock(Redis::class); @@ -1944,7 +1944,7 @@ protected function createRedis(array $config): Redis }; } - public function testReconnectThrowsOnUnknownOption() + public function testReconnectThrowsOnUnknownOption(): void { $pool = $this->getMockedPool(); $redis = m::mock(Redis::class); @@ -1969,7 +1969,7 @@ protected function createRedis(array $config): Redis }; } - public function testReconnectSetsNumericOptions() + public function testReconnectSetsNumericOptions(): void { $pool = $this->getMockedPool(); $redis = m::mock(Redis::class); @@ -1996,7 +1996,7 @@ protected function createRedis(array $config): Redis }; } - public function testReconnectAuthenticatesWhenAuthConfigured() + public function testReconnectAuthenticatesWhenAuthConfigured(): void { $pool = $this->getMockedPool(); $redis = m::mock(Redis::class); @@ -2021,7 +2021,7 @@ protected function createRedis(array $config): Redis }; } - public function testReconnectDoesNotAuthenticateWhenAuthEmpty() + public function testReconnectDoesNotAuthenticateWhenAuthEmpty(): void { $pool = $this->getMockedPool(); $redis = m::mock(Redis::class); @@ -2044,7 +2044,7 @@ protected function createRedis(array $config): Redis }; } - public function testCloseNullsConnection() + public function testCloseNullsConnection(): void { $connection = $this->mockRedisConnection(); @@ -2058,7 +2058,7 @@ public function testCloseNullsConnection() $this->assertNull($connection->client()); } - public function testCloseInvokesNativeRedisClose() + public function testCloseInvokesNativeRedisClose(): void { $connection = $this->mockRedisConnection(); $redis = m::mock(Redis::class); @@ -2073,7 +2073,7 @@ public function testCloseInvokesNativeRedisClose() $this->assertNull($connection->client()); } - public function testCloseSwallowsExceptionFromNativeRedisClose() + public function testCloseSwallowsExceptionFromNativeRedisClose(): void { $connection = $this->mockRedisConnection(); $redis = m::mock(Redis::class); @@ -2089,7 +2089,7 @@ public function testCloseSwallowsExceptionFromNativeRedisClose() $this->assertNull($connection->client()); } - public function testClusterTransformFiresInAtomicMode() + public function testClusterTransformFiresInAtomicMode(): void { $connection = new PhpRedisClusterConnectionStub; $connection->shouldTransform(true); @@ -2109,7 +2109,7 @@ public function testClusterTransformFiresInAtomicMode() $this->assertSame(1, $result); } - public function testRetryFailureZeroesLastUseTime() + public function testRetryFailureMarksConnectionInvalid(): void { $pool = $this->getMockedPool(); $redis = m::mock(Redis::class); @@ -2146,6 +2146,11 @@ public function getLastUseTime(): float { return $this->lastUseTime; } + + public function isInvalidForTest(): bool + { + return $this->invalid; + } }; // lastUseTime should be non-zero after initial construction @@ -2158,10 +2163,91 @@ public function getLastUseTime(): float $this->assertSame('reconnect failed', $exception->getMessage()); } - $this->assertSame(0.0, $connection->getLastUseTime()); + $this->assertGreaterThan(0.0, $connection->getLastUseTime()); + $this->assertTrue($connection->isInvalidForTest()); + } + + public function testReconnectClearsInvalidState(): void + { + $pool = $this->getMockedPool(); + $redis = m::mock(Redis::class); + $redis->shouldReceive('select')->andReturn(true); + + $connection = new class($this->getContainer(), $pool, ['host' => '127.0.0.1', 'port' => 6379, 'database' => 1], $redis) extends PhpRedisConnection { + public function __construct( + ContainerContract $container, + PoolInterface $pool, + array $config, + private Redis $fakeRedis, + ) { + parent::__construct($container, $pool, $config); + } + + protected function createRedis(array $config): Redis + { + return $this->fakeRedis; + } + + public function invalidateForTest(): void + { + $this->markInvalid(); + } + + public function isInvalidForTest(): bool + { + return $this->invalid; + } + }; + + $connection->invalidateForTest(); + + $this->assertTrue($connection->isInvalidForTest()); + + $connection->reconnect(); + + $this->assertFalse($connection->isInvalidForTest()); + } + + public function testInvalidStateIsNotMaskedByFreshReleaseTime(): void + { + $pool = m::mock(PoolInterface::class); + $pool->shouldReceive('getOption')->andReturn(new PoolOption(maxIdleTime: 60.0)); + + $redis = m::mock(Redis::class); + + $connection = new class($this->getContainer(), $pool, ['host' => '127.0.0.1', 'port' => 6379], $redis) extends PhpRedisConnection { + public function __construct( + ContainerContract $container, + PoolInterface $pool, + array $config, + private Redis $fakeRedis, + ) { + parent::__construct($container, $pool, $config); + } + + protected function createRedis(array $config): Redis + { + return $this->fakeRedis; + } + + public function invalidateForTest(): void + { + $this->markInvalid(); + } + + public function setLastReleaseTimeForTest(float $lastReleaseTime): void + { + $this->lastReleaseTime = $lastReleaseTime; + } + }; + + $connection->invalidateForTest(); + $connection->setLastReleaseTimeForTest(microtime(true)); + + $this->assertFalse($connection->check()); } - public function testScanWithArrayOptions() + public function testScanWithArrayOptions(): void { $connection = $this->mockRedisConnection(transform: true); $cursor = 0;