diff --git a/Dockerfile b/Dockerfile index 2d6a28f..c328707 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,7 +8,7 @@ COPY composer.json /src/ RUN composer install --ignore-platform-reqs --optimize-autoloader \ --no-plugins --no-scripts --prefer-dist -FROM php:8.3.3-cli-alpine3.19 as final +FROM php:8.4.22-cli-alpine3.24 as final LABEL maintainer="team@appwrite.io" diff --git a/composer.json b/composer.json index 1959250..eaf16a9 100644 --- a/composer.json +++ b/composer.json @@ -12,23 +12,26 @@ "scripts": { "lint": "./vendor/bin/pint --test", "format": "./vendor/bin/pint", - "check": "./vendor/bin/phpstan analyse --level max src tests", + "check": "./vendor/bin/phpstan analyse -c phpstan.neon --memory-limit=1G", "test": "./vendor/bin/phpunit --configuration phpunit.xml --testsuite \"Test Suite\"", "bench": "./vendor/bin/phpunit --configuration phpunit.xml --testsuite Benchmark" }, "minimum-stability": "stable", "require": { - "php": ">=8.0", - "utopia-php/fetch": "^1.1", + "php": ">=8.4", + "utopia-php/client": "^0.1|^0.2", "utopia-php/database": "5.*|6.0.*", "utopia-php/query": "0.1.*" }, "require-dev": { "phpunit/phpunit": "^9.5", "utopia-php/cache": "^3.0", - "phpstan/phpstan": "1.*", + "phpstan/phpstan": "^2.0", "laravel/pint": "1.*" }, + "suggest": { + "ext-curl": "Required by the default cURL transport for the ClickHouse adapter. Not needed if you inject a non-cURL utopia-php/client into the adapter." + }, "autoload": { "psr-4": { "Utopia\\Usage\\": "src/Usage" diff --git a/composer.lock b/composer.lock index 75a2640..608f1dc 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "54b275990951e47c22f32d319f95db39", + "content-hash": "5b3770da8c02f718d34e73b85e3ad64a", "packages": [ { "name": "brick/math", @@ -2147,6 +2147,62 @@ }, "time": "2026-05-29T12:12:23+00:00" }, + { + "name": "utopia-php/client", + "version": "0.2.0", + "source": { + "type": "git", + "url": "https://github.com/utopia-php/client.git", + "reference": "26df7973340b363e3fb597498e6971553bc3f085" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/utopia-php/client/zipball/26df7973340b363e3fb597498e6971553bc3f085", + "reference": "26df7973340b363e3fb597498e6971553bc3f085", + "shasum": "" + }, + "require": { + "php": ">=8.4", + "psr/http-client": "^1.0", + "psr/http-factory": "^1.0", + "psr/http-message": "^1.1 || ^2.0", + "utopia-php/pools": "^1.0", + "utopia-php/span": "^1.1 || ^3.0 || ^4.0" + }, + "require-dev": { + "swoole/ide-helper": "^6.0" + }, + "suggest": { + "ext-curl": "Required to use the cURL HTTP client adapter.", + "ext-simplexml": "Required to decode XML responses with Response::xml().", + "ext-swoole": "Required to use the Swoole coroutine HTTP client adapter." + }, + "type": "library", + "autoload": { + "psr-4": { + "Utopia\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "A lightweight PSR-18 HTTP client with cURL and Swoole coroutine backends", + "keywords": [ + "client", + "curl", + "http", + "php", + "psr-18", + "swoole", + "utopia" + ], + "support": { + "issues": "https://github.com/utopia-php/client/issues", + "source": "https://github.com/utopia-php/client/tree/0.2.0" + }, + "time": "2026-06-23T12:43:32+00:00" + }, { "name": "utopia-php/console", "version": "0.1.1", @@ -2255,46 +2311,6 @@ }, "time": "2026-06-18T10:37:40+00:00" }, - { - "name": "utopia-php/fetch", - "version": "1.1.2", - "source": { - "type": "git", - "url": "https://github.com/utopia-php/fetch.git", - "reference": "64f2b3a789480f1deb102ce684dac4217d8e98d5" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/utopia-php/fetch/zipball/64f2b3a789480f1deb102ce684dac4217d8e98d5", - "reference": "64f2b3a789480f1deb102ce684dac4217d8e98d5", - "shasum": "" - }, - "require": { - "php": ">=8.1" - }, - "require-dev": { - "laravel/pint": "^1.5.0", - "phpstan/phpstan": "^1.10", - "phpunit/phpunit": "^9.5", - "swoole/ide-helper": "^6.0" - }, - "type": "library", - "autoload": { - "psr-4": { - "Utopia\\Fetch\\": "src/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "description": "A simple library that provides an interface for making HTTP Requests.", - "support": { - "issues": "https://github.com/utopia-php/fetch/issues", - "source": "https://github.com/utopia-php/fetch/tree/1.1.2" - }, - "time": "2026-04-29T11:19:19+00:00" - }, { "name": "utopia-php/mongo", "version": "1.2.0", @@ -2455,6 +2471,46 @@ }, "time": "2026-03-03T09:05:14+00:00" }, + { + "name": "utopia-php/span", + "version": "4.0.1", + "source": { + "type": "git", + "url": "https://github.com/utopia-php/span.git", + "reference": "d11f2714324cb22b286b2afbf3ea9de32c68de83" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/utopia-php/span/zipball/d11f2714324cb22b286b2afbf3ea9de32c68de83", + "reference": "d11f2714324cb22b286b2afbf3ea9de32c68de83", + "shasum": "" + }, + "require": { + "php": ">=8.2" + }, + "require-dev": { + "swoole/ide-helper": "^5.0" + }, + "suggest": { + "ext-swoole": "Required for coroutine-based storage" + }, + "type": "library", + "autoload": { + "psr-4": { + "Utopia\\Span\\": "src/Span/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "Simple span tracing library for PHP with coroutine support", + "support": { + "issues": "https://github.com/utopia-php/span/issues", + "source": "https://github.com/utopia-php/span/tree/4.0.1" + }, + "time": "2026-06-20T09:45:06+00:00" + }, { "name": "utopia-php/telemetry", "version": "0.4.1", @@ -2924,15 +2980,15 @@ }, { "name": "phpstan/phpstan", - "version": "1.12.33", + "version": "2.2.2", "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/37982d6fc7cbb746dda7773530cda557cdf119e1", - "reference": "37982d6fc7cbb746dda7773530cda557cdf119e1", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/e5cc34d491a90e79c216d824f60fe21fd4d93bd6", + "reference": "e5cc34d491a90e79c216d824f60fe21fd4d93bd6", "shasum": "" }, "require": { - "php": "^7.2|^8.0" + "php": "^7.4|^8.0" }, "conflict": { "phpstan/phpstan-shim": "*" @@ -2951,6 +3007,17 @@ "license": [ "MIT" ], + "authors": [ + { + "name": "Ondřej Mirtes" + }, + { + "name": "Markus Staab" + }, + { + "name": "Vincent Langlet" + } + ], "description": "PHPStan - PHP Static Analysis Tool", "keywords": [ "dev", @@ -2973,7 +3040,7 @@ "type": "github" } ], - "time": "2026-02-28T20:30:03+00:00" + "time": "2026-06-05T09:00:01+00:00" }, { "name": "phpunit/php-code-coverage", @@ -4473,7 +4540,7 @@ "prefer-stable": false, "prefer-lowest": false, "platform": { - "php": ">=8.0" + "php": ">=8.4" }, "platform-dev": {}, "plugin-api-version": "2.9.0" diff --git a/phpstan.neon b/phpstan.neon new file mode 100644 index 0000000..1494f51 --- /dev/null +++ b/phpstan.neon @@ -0,0 +1,5 @@ +parameters: + level: max + paths: + - src + - tests diff --git a/src/Usage/Adapter/ClickHouse.php b/src/Usage/Adapter/ClickHouse.php index 407c77b..2a8826e 100644 --- a/src/Usage/Adapter/ClickHouse.php +++ b/src/Usage/Adapter/ClickHouse.php @@ -6,7 +6,10 @@ use DateTimeZone; use Exception; use Throwable; -use Utopia\Fetch\Client; +use Utopia\Client; +use Utopia\Client\Adapter\Curl\Client as CurlAdapter; +use Utopia\Psr7\Method; +use Utopia\Psr7\Request\Factory as RequestFactory; use Utopia\Query\Query; use Utopia\Usage\Metric; use Utopia\Usage\Usage; @@ -92,6 +95,8 @@ class ClickHouse extends SQL private Client $client; + private RequestFactory $requestFactory; + protected ?string $tenant = null; protected bool $sharedTables = false; @@ -104,21 +109,9 @@ class ClickHouse extends SQL /** @var array, duration: float, timestamp: float, success: bool, error?: string}> Query execution log */ private array $queryLog = []; - /** @var bool Whether to enable gzip compression for HTTP requests/responses */ - private bool $enableCompression = false; - - /** @var bool Whether to enable HTTP keep-alive for connection pooling */ - private bool $enableKeepAlive = true; - /** @var int Number of requests made using this adapter instance */ private int $requestCount = 0; - /** @var int Maximum number of retry attempts for failed requests (0 = no retries) */ - private int $maxRetries = 3; - - /** @var int Initial retry delay in milliseconds (doubles with each retry) */ - private int $retryDelay = 100; - /** @var string|null Current operation context for better error messages */ private ?string $operationContext = null; @@ -155,13 +148,17 @@ class ClickHouse extends SQL * @param string $password ClickHouse password (default: '') * @param int $port ClickHouse HTTP port (default: 8123) * @param bool $secure Whether to use HTTPS (default: false) + * @param Client|null $client HTTP transport. Defaults to a cURL client + * with persistent connection reuse. Inject your own to control timeouts, + * TLS, or retries — e.g. wrap an adapter in `Utopia\Client\Decorator\Retry`. */ public function __construct( string $host, string $username = 'default', string $password = '', int $port = self::DEFAULT_PORT, - bool $secure = false + bool $secure = false, + ?Client $client = null ) { $this->validateHost($host); $this->validatePort($port); @@ -172,30 +169,12 @@ public function __construct( $this->password = $password; $this->secure = $secure; - // Initialize the HTTP client for connection reuse - $this->client = new Client(); - $this->client->addHeader('X-ClickHouse-User', $this->username); - $this->client->addHeader('X-ClickHouse-Key', $this->password); - $this->client->setTimeout(30_000); // 30 seconds - } - - /** - * Set the HTTP request timeout in milliseconds. - * - * @param int $milliseconds Timeout in milliseconds (min: 1000ms, max: 600000ms) - * @return self - * @throws Exception If timeout is out of valid range - */ - public function setTimeout(int $milliseconds): self - { - if ($milliseconds < 1000) { - throw new Exception('Timeout must be at least 1000 milliseconds (1 second)'); - } - if ($milliseconds > 600000) { - throw new Exception('Timeout cannot exceed 600000 milliseconds (10 minutes)'); - } - $this->client->setTimeout($milliseconds); - return $this; + // `withConnectionReuse()` keeps the underlying cURL handle alive across + // requests so the TCP/TLS handshake is paid once. Auth and database are + // layered on each request via the factory, so an injected client stays + // a pure transport. + $this->client = $client ?? new Client((new CurlAdapter())->withConnectionReuse()); + $this->requestFactory = new RequestFactory(); } /** @@ -210,63 +189,6 @@ public function enableQueryLogging(bool $enable = true): self return $this; } - /** - * Enable or disable gzip compression for HTTP requests/responses. - * - * @param bool $enable Whether to enable compression - * @return self - */ - public function setCompression(bool $enable): self - { - $this->enableCompression = $enable; - return $this; - } - - /** - * Enable or disable HTTP keep-alive for connection pooling. - * - * @param bool $enable Whether to enable keep-alive (default: true) - * @return self - */ - public function setKeepAlive(bool $enable): self - { - $this->enableKeepAlive = $enable; - return $this; - } - - /** - * Set maximum number of retry attempts for failed requests. - * - * @param int $maxRetries Maximum retry attempts (0-10, 0 = no retries) - * @return self - * @throws Exception If maxRetries is out of valid range - */ - public function setMaxRetries(int $maxRetries): self - { - if ($maxRetries < 0 || $maxRetries > 10) { - throw new Exception('Max retries must be between 0 and 10'); - } - $this->maxRetries = $maxRetries; - return $this; - } - - /** - * Set initial retry delay in milliseconds. - * Delay doubles with each retry attempt (exponential backoff). - * - * @param int $milliseconds Initial delay in milliseconds (10-5000ms) - * @return self - * @throws Exception If delay is out of valid range - */ - public function setRetryDelay(int $milliseconds): self - { - if ($milliseconds < 10 || $milliseconds > 5000) { - throw new Exception('Retry delay must be between 10 and 5000 milliseconds'); - } - $this->retryDelay = $milliseconds; - return $this; - } - /** * Enable or disable ClickHouse async inserts (server-side batching). * @@ -333,17 +255,13 @@ public function setDualReadSampleRate(float $rate): self /** * Get connection statistics for monitoring. * - * @return array{request_count: int, keep_alive_enabled: bool, compression_enabled: bool, query_logging_enabled: bool, max_retries: int, retry_delay: int, async_inserts: bool, async_insert_wait: bool} + * @return array{request_count: int, query_logging_enabled: bool, async_inserts: bool, async_insert_wait: bool} */ public function getConnectionStats(): array { return [ 'request_count' => $this->requestCount, - 'keep_alive_enabled' => $this->enableKeepAlive, - 'compression_enabled' => $this->enableCompression, 'query_logging_enabled' => $this->enableQueryLogging, - 'max_retries' => $this->maxRetries, - 'retry_delay' => $this->retryDelay, 'async_inserts' => $this->asyncInserts, 'async_insert_wait' => $this->asyncInsertWait, ]; @@ -399,9 +317,9 @@ public function healthCheck(): array try { // Simple connectivity test $response = $this->query('SELECT 1 as ping FORMAT JSON'); - $json = json_decode($response, true); + $rows = $this->decodeRows($response); - if (!is_array($json) || !isset($json['data'][0]['ping'])) { + if (!isset($rows[0]['ping'])) { $result['error'] = 'Invalid response format'; return $result; } @@ -409,11 +327,11 @@ public function healthCheck(): array // Get server version and uptime try { $versionResponse = $this->query('SELECT version() as version, uptime() as uptime FORMAT JSON'); - $versionJson = json_decode($versionResponse, true); + $versionRows = $this->decodeRows($versionResponse); - if (is_array($versionJson) && isset($versionJson['data'][0])) { - $result['version'] = (string) $versionJson['data'][0]['version']; - $result['uptime'] = (int) $versionJson['data'][0]['uptime']; + if (isset($versionRows[0])) { + $result['version'] = self::toStr($versionRows[0]['version'] ?? null); + $result['uptime'] = self::toInt($versionRows[0]['uptime'] ?? null); } } catch (Exception $e) { // Version info is optional, don't fail health check @@ -658,9 +576,8 @@ private function buildTableReference(string $tableName): string * @param float $duration Execution duration in seconds * @param bool $success Whether the query succeeded * @param string|null $error Error message if query failed - * @param int $retryAttempt Current retry attempt number */ - private function logQuery(string $sql, array $params, float $duration, bool $success, ?string $error = null, int $retryAttempt = 0): void + private function logQuery(string $sql, array $params, float $duration, bool $success, ?string $error = null): void { if (!$this->enableQueryLogging) { return; @@ -674,10 +591,6 @@ private function logQuery(string $sql, array $params, float $duration, bool $suc 'success' => $success, ]; - if ($retryAttempt > 0) { - $logEntry['retry_attempt'] = $retryAttempt; - } - if ($error !== null) { $logEntry['error'] = $error; } @@ -685,39 +598,6 @@ private function logQuery(string $sql, array $params, float $duration, bool $suc $this->queryLog[] = $logEntry; } - /** - * Determine if an error is retryable. - * - * @param int|null $httpCode HTTP status code if available - * @param string $errorMessage Error message - * @return bool True if the error is retryable - */ - private function isRetryableError(?int $httpCode, string $errorMessage): bool - { - if ($httpCode !== null) { - if (in_array($httpCode, [408, 429, 500, 502, 503, 504], true)) { - return true; - } - if ($httpCode >= 400 && $httpCode < 500) { - return false; - } - } - - $retryablePatterns = [ - 'connection', 'timeout', 'timed out', 'refused', 'reset', - 'broken pipe', 'network', 'temporary', 'unavailable', - ]; - - $lowerMessage = strtolower($errorMessage); - foreach ($retryablePatterns as $pattern) { - if (strpos($lowerMessage, $pattern) !== false) { - return true; - } - } - - return false; - } - /** * Set the current operation context for better error messages. * @@ -729,44 +609,6 @@ private function setOperationContext(?string $context): void $this->operationContext = $context; } - /** - * Execute an operation with automatic retry logic and exponential backoff. - * - * @template T - * @param callable(int): T $operation - * @param callable(Exception, int|null): bool $shouldRetry - * @param callable(Exception, int): Exception $buildException - * @return T - * @throws Exception - */ - private function executeWithRetry(callable $operation, callable $shouldRetry, callable $buildException): mixed - { - $attempt = 0; - $lastException = null; - - while ($attempt <= $this->maxRetries) { - try { - return $operation($attempt); - } catch (Exception $e) { - $lastException = $e; - - if ($attempt < $this->maxRetries && $shouldRetry($e, $attempt)) { - $attempt++; - $delay = $this->retryDelay * (2 ** ($attempt - 1)); - usleep($delay * 1000); - continue; - } - - throw $buildException($e, $attempt); - } - } - - throw $buildException( - $lastException ?? new Exception('Unknown error occurred'), - $this->maxRetries - ); - } - /** * Build a contextual error message. * @@ -810,80 +652,109 @@ private function query(string $sql, array $params = []): string $queryId = $this->nextQueryId; $this->nextQueryId = null; - return $this->executeWithRetry( - function (int $attempt) use ($sql, $params, $queryId): string { - $startTime = microtime(true); - $scheme = $this->secure ? 'https' : 'http'; - $url = "{$scheme}://{$this->host}:{$this->port}/"; - if ($queryId !== null) { - $url .= '?' . http_build_query(['query_id' => $queryId]); - } - - $this->client->addHeader('X-ClickHouse-Database', $this->database); + $startTime = microtime(true); + $scheme = $this->secure ? 'https' : 'http'; - if ($this->enableKeepAlive) { - $this->client->addHeader('Connection', 'keep-alive'); - } else { - $this->client->addHeader('Connection', 'close'); - } + // ClickHouse reads `query` and `param_*` from a multipart/form-data + // body (the pattern the pre-migration cURL transport used). Keeping the + // SQL and bound parameters in the body — rather than the URL query + // string — avoids request-line length limits (HTTP 414) on large + // `equal`/tag filters. ClickHouse does NOT parse + // application/x-www-form-urlencoded bodies, so multipart is required. + // Only the tiny query_id, which has no size concern, stays in the URL. + $parts = ['query' => $sql]; + foreach ($params as $key => $value) { + $parts['param_' . $key] = $this->formatParamValue($value); + } + $url = "{$scheme}://{$this->host}:{$this->port}/"; + if ($queryId !== null) { + $url .= '?' . http_build_query(['query_id' => $queryId]); + } - if ($this->enableCompression) { - $this->client->addHeader('Accept-Encoding', 'gzip'); - } + $this->requestCount++; - if ($attempt === 0) { - $this->requestCount++; - } + $request = $this->requestFactory->multipart(Method::POST, $url, $parts, $this->buildHeaders()); - $body = ['query' => $sql]; - foreach ($params as $key => $value) { - $body['param_' . $key] = $this->formatParamValue($value); - } + try { + $response = $this->client->sendRequest($request); + } catch (Throwable $e) { + $duration = microtime(true) - $startTime; + $errorMsg = $this->buildErrorMessage("ClickHouse query failed: {$e->getMessage()}", null, $sql); + $this->logQuery($sql, $params, $duration, false, $errorMsg); + throw new Exception($errorMsg, 0, $e); + } - $response = $this->client->fetch( - url: $url, - method: Client::METHOD_POST, - body: $body - ); - $httpCode = $response->getStatusCode(); + $httpCode = $response->getStatusCode(); + $bodyStr = (string) $response->getBody(); + $duration = microtime(true) - $startTime; - if ($httpCode !== 200) { - $bodyStr = $response->getBody(); - $bodyStr = is_string($bodyStr) ? $bodyStr : ''; - $duration = microtime(true) - $startTime; - $baseError = "ClickHouse query failed with HTTP {$httpCode}: {$bodyStr}"; - $errorMsg = $this->buildErrorMessage($baseError, null, $sql); - $this->logQuery($sql, $params, $duration, false, $errorMsg, $attempt); + if ($httpCode !== 200) { + $errorMsg = $this->buildErrorMessage("ClickHouse query failed with HTTP {$httpCode}: {$bodyStr}", null, $sql); + $this->logQuery($sql, $params, $duration, false, $errorMsg); + throw new Exception($errorMsg); + } - throw new Exception($errorMsg . '|HTTP_CODE:' . $httpCode); - } + $this->logQuery($sql, $params, $duration, true); + return $bodyStr; + } - $body = $response->getBody(); - $result = is_string($body) ? $body : ''; - $duration = microtime(true) - $startTime; - $this->logQuery($sql, $params, $duration, true, null, $attempt); - return $result; - }, - function (Exception $e, ?int $httpCode): bool { - $exceptionHttpCode = null; - if (preg_match('/\|HTTP_CODE:(\d+)$/', $e->getMessage(), $matches)) { - $exceptionHttpCode = (int) $matches[1]; - } - return $this->isRetryableError($exceptionHttpCode, $e->getMessage()); - }, - function (Exception $e, int $attempt) use ($sql): Exception { - $cleanMessage = preg_replace('/\|HTTP_CODE:\d+$/', '', $e->getMessage()); - $cleanMessage = is_string($cleanMessage) ? $cleanMessage : $e->getMessage(); - - if (strpos($cleanMessage, '[Operation:') !== false) { - return new Exception($cleanMessage, 0, $e); - } + /** + * Decode a ClickHouse `FORMAT JSON` response body into its data rows. + * Returns an empty list when the body is not the expected envelope. + * + * @return array> + */ + private function decodeRows(string $response): array + { + $json = json_decode($response, true); + if (!is_array($json) || !isset($json['data']) || !is_array($json['data'])) { + return []; + } - $baseError = "ClickHouse query execution failed after " . ($attempt + 1) . " attempt(s): {$cleanMessage}"; - $errorMsg = $this->buildErrorMessage($baseError, null, $sql); - return new Exception($errorMsg, 0, $e); + $rows = []; + foreach ($json['data'] as $row) { + if (!is_array($row)) { + continue; } - ); + $typed = []; + foreach ($row as $key => $value) { + $typed[(string) $key] = $value; + } + $rows[] = $typed; + } + + return $rows; + } + + private static function toInt(mixed $value): int + { + return is_numeric($value) ? (int) $value : 0; + } + + private static function toFloat(mixed $value): float + { + return is_numeric($value) ? (float) $value : 0.0; + } + + private static function toStr(mixed $value): string + { + return is_scalar($value) ? (string) $value : ''; + } + + /** + * Build the per-request headers ClickHouse expects: credentials and target + * database. Applied to every request so the injected transport client stays + * auth-agnostic. + * + * @return array + */ + private function buildHeaders(): array + { + return [ + 'X-ClickHouse-User' => $this->username, + 'X-ClickHouse-Key' => $this->password, + 'X-ClickHouse-Database' => $this->database, + ]; } /** @@ -899,91 +770,50 @@ private function insert(string $table, array $data): void return; } - $this->executeWithRetry( - function (int $attempt) use ($table, $data): void { - $startTime = microtime(true); - $scheme = $this->secure ? 'https' : 'http'; - $escapedTable = $this->escapeIdentifier($table); - - $queryParams = ['query' => "INSERT INTO {$escapedTable} FORMAT JSONEachRow"]; - if ($this->asyncInserts) { - $queryParams['async_insert'] = '1'; - $queryParams['wait_for_async_insert'] = $this->asyncInsertWait ? '1' : '0'; - } - $url = "{$scheme}://{$this->host}:{$this->port}/?" . http_build_query($queryParams); - - $this->client->addHeader('X-ClickHouse-Database', $this->database); - $this->client->addHeader('Content-Type', 'application/x-ndjson'); - - if ($this->enableKeepAlive) { - $this->client->addHeader('Connection', 'keep-alive'); - } else { - $this->client->addHeader('Connection', 'close'); - } - - if ($this->enableCompression) { - $this->client->addHeader('Accept-Encoding', 'gzip'); - } - - if ($attempt === 0) { - $this->requestCount++; - } - - $body = implode("\n", $data); + // Inserts are not idempotent: the MergeTree engine has no row-level + // deduplication, so a retried insert that reaches the server twice + // leaves duplicate rows behind. The default transport does not retry + // POST; any injected retry strategy must keep it that way. + $startTime = microtime(true); + $scheme = $this->secure ? 'https' : 'http'; + $escapedTable = $this->escapeIdentifier($table); + $rowCount = count($data); - $sql = "INSERT INTO {$escapedTable} FORMAT JSONEachRow"; - $params = ['rows' => count($data), 'bytes' => strlen($body)]; + $queryParams = ['query' => "INSERT INTO {$escapedTable} FORMAT JSONEachRow"]; + if ($this->asyncInserts) { + $queryParams['async_insert'] = '1'; + $queryParams['wait_for_async_insert'] = $this->asyncInsertWait ? '1' : '0'; + } + $url = "{$scheme}://{$this->host}:{$this->port}/?" . http_build_query($queryParams); - try { - $response = $this->client->fetch( - url: $url, - method: Client::METHOD_POST, - body: $body - ); + $this->requestCount++; - $httpCode = $response->getStatusCode(); + $body = implode("\n", $data); + $sql = "INSERT INTO {$escapedTable} FORMAT JSONEachRow"; + $params = ['rows' => $rowCount, 'bytes' => strlen($body)]; - if ($httpCode !== 200) { - $bodyStr = $response->getBody(); - $bodyStr = is_string($bodyStr) ? $bodyStr : ''; - $duration = microtime(true) - $startTime; - $rowCount = count($data); - $baseError = "ClickHouse insert failed with HTTP {$httpCode}: {$bodyStr}"; - $errorMsg = $this->buildErrorMessage($baseError, $table, "INSERT INTO {$table} ({$rowCount} rows)"); - $this->logQuery($sql, $params, $duration, false, $errorMsg, $attempt); + $request = $this->requestFactory->body(Method::POST, $url, $body, 'application/x-ndjson', $this->buildHeaders()); - throw new Exception($errorMsg . '|HTTP_CODE:' . $httpCode); - } + try { + $response = $this->client->sendRequest($request); + } catch (Throwable $e) { + $duration = microtime(true) - $startTime; + $errorMsg = $this->buildErrorMessage("ClickHouse insert failed: {$e->getMessage()}", $table, "INSERT INTO {$table} ({$rowCount} rows)"); + $this->logQuery($sql, $params, $duration, false, $errorMsg); + throw new Exception($errorMsg, 0, $e); + } - $duration = microtime(true) - $startTime; - $this->logQuery($sql, $params, $duration, true, null, $attempt); - } finally { - $this->client->removeHeader('Content-Type'); - } - }, - function (Exception $e, ?int $httpCode): bool { - // Never retry inserts. The underlying MergeTree engine has - // no row-level deduplication, so a retried insert that hits - // the server twice (network blip + first request actually - // succeeded) leaves duplicate rows behind. Surface the - // failure to the caller instead — they can replay the - // batch from durable storage if they choose. - return false; - }, - function (Exception $e, int $attempt) use ($table, $data): Exception { - $cleanMessage = preg_replace('/\|HTTP_CODE:\d+$/', '', $e->getMessage()); - $cleanMessage = is_string($cleanMessage) ? $cleanMessage : $e->getMessage(); + $httpCode = $response->getStatusCode(); + $duration = microtime(true) - $startTime; - if (strpos($cleanMessage, '[Operation:') !== false) { - return new Exception($cleanMessage, 0, $e); - } + if ($httpCode !== 200) { + $bodyStr = (string) $response->getBody(); + $errorMsg = $this->buildErrorMessage("ClickHouse insert failed with HTTP {$httpCode}: {$bodyStr}", $table, "INSERT INTO {$table} ({$rowCount} rows)"); + $this->logQuery($sql, $params, $duration, false, $errorMsg); + throw new Exception($errorMsg); + } - $rowCount = count($data); - $baseError = "ClickHouse insert execution failed after " . ($attempt + 1) . " attempt(s): {$cleanMessage}"; - $errorMsg = $this->buildErrorMessage($baseError, $table, "INSERT INTO {$table} ({$rowCount} rows)"); - return new Exception($errorMsg, 0, $e); - } - ); + $this->logQuery($sql, $params, $duration, true); } /** @@ -1560,10 +1390,6 @@ private function validateMetricData(string $metric, int $value, string $type, ar if ($type !== Usage::TYPE_EVENT && $type !== Usage::TYPE_GAUGE) { throw new \InvalidArgumentException($prefix . "Invalid type '{$type}'. Allowed: " . Usage::TYPE_EVENT . ', ' . Usage::TYPE_GAUGE); } - - if (!is_array($tags)) { - throw new Exception($prefix . 'Tags must be an array'); - } } /** @@ -1977,26 +1803,16 @@ private function parseAggregatedResults(string $result, string $type = 'event'): return []; } - $json = json_decode($result, true); - - if (!is_array($json) || !isset($json['data']) || !is_array($json['data'])) { - return []; - } - - $rows = $json['data']; + $rows = $this->decodeRows($result); $metrics = []; foreach ($rows as $row) { - if (!is_array($row)) { - continue; - } - $document = []; foreach ($row as $key => $value) { if ($key === 'bucket') { // Map 'bucket' back to 'time' for consistent Metric objects - $parsedTime = (string) $value; + $parsedTime = self::toStr($value); if (strpos($parsedTime, 'T') === false) { $parsedTime = str_replace(' ', 'T', $parsedTime) . '+00:00'; } @@ -2111,13 +1927,13 @@ private function countFromTable(array $queries, string $type, ?int $max = null): } $result = $this->query($sql, $params); - $json = json_decode($result, true); + $rows = $this->decodeRows($result); - if (!is_array($json) || !isset($json['data'][0]['total'])) { + if (!isset($rows[0]['total'])) { return 0; } - return (int) $json['data'][0]['total']; + return self::toInt($rows[0]['total']); } /** @@ -2392,7 +2208,7 @@ private function translateInclusiveMidnightForDaily(array $queries): array { $result = []; foreach ($queries as $q) { - if (!($q instanceof Query) || $q->getAttribute() !== 'time') { + if ($q->getAttribute() !== 'time') { $result[] = $q; continue; } @@ -2479,7 +2295,7 @@ private function tightenUpperBound(?string $current, ?string $candidate): ?strin * lower bound while the raw branch keeps the caller's original literal. * * @param array $queries - * @return array{nonTime: array, time: array} + * @return array{nonTime: array, time: array} */ private function splitTimeQueries(array $queries): array { @@ -2791,13 +2607,13 @@ private function sumHybridDailyAndRaw(array $queries, array $plan): int "; $result = $this->query($sql, $rawWhere['params']); - $json = json_decode($result, true); + $rows = $this->decodeRows($result); - if (!is_array($json) || !isset($json['data'][0]['total'])) { + if (!isset($rows[0]['total'])) { return 0; } - return (int) $json['data'][0]['total']; + return self::toInt($rows[0]['total']); } /** @@ -2830,13 +2646,13 @@ private function sumFromTable(array $queries, string $attribute, string $type): $result = $this->query($sql, $params); - $json = json_decode($result, true); + $rows = $this->decodeRows($result); - if (!is_array($json) || !isset($json['data'][0]['total'])) { + if (!isset($rows[0]['total'])) { return 0; } - return (int) $json['data'][0]['total']; + return self::toInt($rows[0]['total']); } /** @@ -2914,9 +2730,9 @@ public function sumDaily(array $queries = [], string $attribute = 'value'): int $sql = "SELECT sum({$escapedAttribute}) as total FROM {$fromTable}{$whereData['clause']} FORMAT JSON"; $result = $this->query($sql, $whereData['params']); - $json = json_decode($result, true); + $rows = $this->decodeRows($result); - return (is_array($json) && isset($json['data'][0]['total'])) ? (int) $json['data'][0]['total'] : 0; + return isset($rows[0]['total']) ? self::toInt($rows[0]['total']) : 0; } /** @@ -2976,14 +2792,12 @@ public function sumDailyBatch(array $metrics, array $queries = []): array "; $result = $this->query($sql, $params); - $json = json_decode($result, true); + $rows = $this->decodeRows($result); - if (is_array($json) && isset($json['data']) && is_array($json['data'])) { - foreach ($json['data'] as $row) { - $metricName = $row['metric'] ?? ''; - if (isset($totals[$metricName])) { - $totals[$metricName] = (int) ($row['total'] ?? 0); - } + foreach ($rows as $row) { + $metricName = self::toStr($row['metric'] ?? null); + if (isset($totals[$metricName])) { + $totals[$metricName] = self::toInt($row['total'] ?? null); } } @@ -3140,36 +2954,34 @@ private function getTimeSeriesFromTable(array $metrics, string $interval, string "; $result = $this->query($sql, $params); - $json = json_decode($result, true); + $rows = $this->decodeRows($result); // Initialize result structure $output = []; foreach ($metrics as $metric) { - $output[$metric] = ['total' => 0, 'data' => []]; + $output[$metric] = ['total' => 0.0, 'data' => []]; } - if (is_array($json) && isset($json['data']) && is_array($json['data'])) { - foreach ($json['data'] as $row) { - $metricName = $row['metric'] ?? ''; - $bucketTime = (string) ($row['bucket'] ?? ''); - $value = (float) ($row['agg_value'] ?? 0); - - if (!isset($output[$metricName])) { - continue; - } + foreach ($rows as $row) { + $metricName = self::toStr($row['metric'] ?? null); + $bucketTime = self::toStr($row['bucket'] ?? null); + $value = self::toFloat($row['agg_value'] ?? null); - // Format bucket time - $formattedDate = $bucketTime; - if (strpos($bucketTime, 'T') === false) { - $formattedDate = str_replace(' ', 'T', $bucketTime) . '+00:00'; - } + if (!isset($output[$metricName])) { + continue; + } - $output[$metricName]['total'] += $value; - $output[$metricName]['data'][] = [ - 'value' => $value, - 'date' => $formattedDate, - ]; + // Format bucket time + $formattedDate = $bucketTime; + if (strpos($bucketTime, 'T') === false) { + $formattedDate = str_replace(' ', 'T', $bucketTime) . '+00:00'; } + + $output[$metricName]['total'] += $value; + $output[$metricName]['data'][] = [ + 'value' => $value, + 'date' => $formattedDate, + ]; } return $output; @@ -3299,13 +3111,13 @@ private function getTotalFromGauges(string $metric, array $queries): int "; $result = $this->query($sql, $whereData['params']); - $json = json_decode($result, true); + $rows = $this->decodeRows($result); - if (!is_array($json) || !isset($json['data'][0]['total'])) { + if (!isset($rows[0]['total'])) { return 0; } - return (int) $json['data'][0]['total']; + return self::toInt($rows[0]['total']); } /** @@ -3385,33 +3197,31 @@ public function getTotalBatch(array $metrics, array $queries = [], ?string $type "; $result = $this->query($sql, $params); - $json = json_decode($result, true); - - if (is_array($json) && isset($json['data']) && is_array($json['data'])) { - foreach ($json['data'] as $row) { - $metricName = $row['metric'] ?? ''; + $rows = $this->decodeRows($result); - if (!isset($totals[$metricName])) { - continue; - } + foreach ($rows as $row) { + $metricName = self::toStr($row['metric'] ?? null); - $rowValue = (int) ($row['agg_val'] ?? 0); - if ($rowValue === 0) { - continue; - } + if (!isset($totals[$metricName])) { + continue; + } - if ($type === null - && isset($contributingType[$metricName]) - && $contributingType[$metricName] !== $queryType) { - throw new Exception( - "Metric '{$metricName}' exists in both event and gauge tables. " - . "Specify \$type explicitly to avoid ambiguous aggregation." - ); - } + $rowValue = self::toInt($row['agg_val'] ?? null); + if ($rowValue === 0) { + continue; + } - $contributingType[$metricName] = $queryType; - $totals[$metricName] = $rowValue; + if ($type === null + && isset($contributingType[$metricName]) + && $contributingType[$metricName] !== $queryType) { + throw new Exception( + "Metric '{$metricName}' exists in both event and gauge tables. " + . "Specify \$type explicitly to avoid ambiguous aggregation." + ); } + + $contributingType[$metricName] = $queryType; + $totals[$metricName] = $rowValue; } } @@ -3661,7 +3471,7 @@ private function buildOrderBySql(array $orderAttributes, bool $flip = false): ar * * @param array $queries * @param string $type 'event' or 'gauge' — used for attribute validation - * @return array{filters: array, params: array, orderBy?: array, orderAttributes?: array, limit?: int, offset?: int, groupByInterval?: string, groupBy?: array, cursor?: array, cursorDirection?: string} + * @return array{filters: array, params: array, orderBy?: array, orderAttributes?: array, limit?: int, offset?: int, groupByInterval?: string, groupBy?: array, cursor?: array, cursorDirection?: string} * @throws Exception */ private function parseQueries(array $queries, string $type = 'event'): array @@ -3875,7 +3685,7 @@ private function parseQueries(array $queries, string $type = 'event'): array break; case Query::TYPE_LIMIT: - $limitVal = is_array($values) && !empty($values) ? $values[0] : $values; + $limitVal = !empty($values) ? $values[0] : $values; if (!\is_int($limitVal)) { throw new Exception('Invalid limit value. Expected int'); } @@ -3884,7 +3694,7 @@ private function parseQueries(array $queries, string $type = 'event'): array break; case Query::TYPE_OFFSET: - $offsetVal = is_array($values) && !empty($values) ? $values[0] : $values; + $offsetVal = !empty($values) ? $values[0] : $values; if (!\is_int($offsetVal)) { throw new Exception('Invalid offset value. Expected int'); } @@ -3966,28 +3776,19 @@ private function parseResults(string $result, string $type = 'event'): array return []; } - $json = json_decode($result, true); - - if (!is_array($json) || !isset($json['data']) || !is_array($json['data'])) { - return []; - } - - $rows = $json['data']; + $rows = $this->decodeRows($result); $metrics = []; foreach ($rows as $row) { - if (!is_array($row)) { - continue; - } $document = []; foreach ($row as $key => $value) { if ($key === 'tenant') { - $document[$key] = $value !== null ? (string) $value : null; + $document[$key] = $value !== null ? self::toStr($value) : null; } elseif ($key === 'value') { - $document[$key] = $value !== null ? (int) $value : null; + $document[$key] = $value !== null ? self::toInt($value) : null; } elseif ($key === 'time') { - $parsedTime = (string)$value; + $parsedTime = self::toStr($value); if (strpos($parsedTime, 'T') === false) { $parsedTime = str_replace(' ', 'T', $parsedTime) . '+00:00'; } diff --git a/tests/Benchmark/BenchmarkBase.php b/tests/Benchmark/BenchmarkBase.php index a31c72a..5749ae8 100644 --- a/tests/Benchmark/BenchmarkBase.php +++ b/tests/Benchmark/BenchmarkBase.php @@ -253,11 +253,14 @@ protected function captureCHStats(string $queryId): array $raw = $query->invoke($this->adapter, $sql, []); $rawString = is_string($raw) ? $raw : ''; $json = json_decode($rawString, true); - if (is_array($json) && isset($json['data'][0]) && is_array($json['data'][0])) { + if (is_array($json) && isset($json['data']) && is_array($json['data']) && isset($json['data'][0]) && is_array($json['data'][0])) { $row = $json['data'][0]; - $stats['rows_read'] = (int) ($row['rows_read'] ?? 0); - $stats['read_bytes'] = (int) ($row['read_bytes'] ?? 0); - $stats['query_duration_ms'] = (float) ($row['query_duration_ms'] ?? 0); + $rowsRead = $row['rows_read'] ?? 0; + $readBytes = $row['read_bytes'] ?? 0; + $queryDuration = $row['query_duration_ms'] ?? 0; + $stats['rows_read'] = is_numeric($rowsRead) ? (int) $rowsRead : 0; + $stats['read_bytes'] = is_numeric($readBytes) ? (int) $readBytes : 0; + $stats['query_duration_ms'] = is_numeric($queryDuration) ? (float) $queryDuration : 0.0; } } catch (Throwable $e) { } @@ -292,7 +295,7 @@ protected function captureProjectionsUsed(string $queryId): array $raw = $query->invoke($this->adapter, $sql, []); $rawString = is_string($raw) ? $raw : ''; $json = json_decode($rawString, true); - if (!is_array($json) || empty($json['data'])) { + if (!is_array($json) || empty($json['data']) || !is_array($json['data'])) { return []; } $row = $json['data'][0]; diff --git a/tests/Usage/Adapter/ClickHouseDimRoutingTest.php b/tests/Usage/Adapter/ClickHouseDimRoutingTest.php index ac8a1a2..2f26f02 100644 --- a/tests/Usage/Adapter/ClickHouseDimRoutingTest.php +++ b/tests/Usage/Adapter/ClickHouseDimRoutingTest.php @@ -276,11 +276,11 @@ private function projectionsForQueryId(string $queryId): array $raw = $this->queryRaw($this->adapter, $sql); $json = json_decode($raw, true); - if (!is_array($json) || empty($json['data'])) { + if (!is_array($json) || empty($json['data']) || !is_array($json['data'])) { return []; } $row = $json['data'][0]; - $projections = $row['projections'] ?? []; + $projections = is_array($row) ? ($row['projections'] ?? []) : []; $out = []; foreach (is_array($projections) ? $projections : [] as $p) { if (is_string($p)) { diff --git a/tests/Usage/Adapter/ClickHouseGaugeDimRoutingTest.php b/tests/Usage/Adapter/ClickHouseGaugeDimRoutingTest.php index 46e5a31..d3c7c01 100644 --- a/tests/Usage/Adapter/ClickHouseGaugeDimRoutingTest.php +++ b/tests/Usage/Adapter/ClickHouseGaugeDimRoutingTest.php @@ -219,11 +219,11 @@ private function projectionsForQueryId(string $queryId): array $raw = $this->queryRaw($this->adapter, $sql); $json = json_decode($raw, true); - if (!is_array($json) || empty($json['data'])) { + if (!is_array($json) || empty($json['data']) || !is_array($json['data'])) { return []; } $row = $json['data'][0]; - $projections = $row['projections'] ?? []; + $projections = is_array($row) ? ($row['projections'] ?? []) : []; $out = []; foreach (is_array($projections) ? $projections : [] as $p) { if (is_string($p)) { diff --git a/tests/Usage/Adapter/ClickHouseSchemaTest.php b/tests/Usage/Adapter/ClickHouseSchemaTest.php index 952dd77..50049b6 100644 --- a/tests/Usage/Adapter/ClickHouseSchemaTest.php +++ b/tests/Usage/Adapter/ClickHouseSchemaTest.php @@ -132,8 +132,11 @@ public function testSetupBackfillsServiceResourceOnLegacyGaugesTable(): void $rawString = $this->queryRaw($legacyAdapter, "SHOW CREATE TABLE {$fullName} FORMAT JSON"); $json = json_decode($rawString, true); $ddl = ''; - if (is_array($json) && isset($json['data'][0]['statement']) && is_string($json['data'][0]['statement'])) { - $ddl = $json['data'][0]['statement']; + if (is_array($json) && isset($json['data']) && is_array($json['data']) && isset($json['data'][0]) && is_array($json['data'][0])) { + $statement = $json['data'][0]['statement'] ?? ''; + if (is_string($statement)) { + $ddl = $statement; + } } $this->assertStringContainsString('`service` LowCardinality(Nullable(String))', $ddl); @@ -145,7 +148,7 @@ private function showCreate(string $table): string $database = $this->databaseName($this->adapter); $raw = $this->queryRaw($this->adapter, "SHOW CREATE TABLE `{$database}`.`{$table}` FORMAT JSON"); $json = json_decode($raw, true); - if (is_array($json) && isset($json['data'][0]) && is_array($json['data'][0])) { + if (is_array($json) && isset($json['data']) && is_array($json['data']) && isset($json['data'][0]) && is_array($json['data'][0])) { $statement = $json['data'][0]['statement'] ?? ''; if (is_string($statement)) { return $statement; diff --git a/tests/Usage/Adapter/ClickHouseTest.php b/tests/Usage/Adapter/ClickHouseTest.php index c145765..92f0a83 100644 --- a/tests/Usage/Adapter/ClickHouseTest.php +++ b/tests/Usage/Adapter/ClickHouseTest.php @@ -681,7 +681,6 @@ public function testHealthCheck(): void $health = $adapter->healthCheck(); // Assert basic structure - $this->assertIsArray($health); $this->assertArrayHasKey('healthy', $health); $this->assertArrayHasKey('host', $health); $this->assertArrayHasKey('port', $health); @@ -712,7 +711,6 @@ public function testHealthCheckFailure(): void $health = $adapter->healthCheck(); // Assert basic structure - $this->assertIsArray($health); $this->assertArrayHasKey('healthy', $health); $this->assertArrayHasKey('host', $health); @@ -733,152 +731,6 @@ public function testHealthCheckFailure(): void } } - /** - * Test setTimeout() method with valid timeout - */ - public function testSetTimeoutValid(): void - { - $host = getenv('CLICKHOUSE_HOST') ?: 'clickhouse'; - $username = getenv('CLICKHOUSE_USER') ?: 'default'; - $password = getenv('CLICKHOUSE_PASSWORD') ?: 'clickhouse'; - $port = (int) (getenv('CLICKHOUSE_PORT') ?: 8123); - $secure = (bool) (getenv('CLICKHOUSE_SECURE') ?: false); - - $adapter = new ClickHouseAdapter($host, $username, $password, $port, $secure); - - // Test setting valid timeout - $result = $adapter->setTimeout(5000); // 5 seconds - - // Should return self for chaining - $this->assertInstanceOf(ClickHouseAdapter::class, $result); - - // Test that it still works after setting timeout - $health = $adapter->healthCheck(); - $this->assertTrue($health['healthy']); - } - - /** - * Test setTimeout() with minimum timeout (1 second) - */ - public function testSetTimeoutMinimum(): void - { - $host = getenv('CLICKHOUSE_HOST') ?: 'clickhouse'; - $username = getenv('CLICKHOUSE_USER') ?: 'default'; - $password = getenv('CLICKHOUSE_PASSWORD') ?: 'clickhouse'; - $port = (int) (getenv('CLICKHOUSE_PORT') ?: 8123); - - $adapter = new ClickHouseAdapter($host, $username, $password, $port); - $adapter->setTimeout(1000); // 1 second minimum - - $this->assertTrue(true); // If we reach here, no exception was thrown - } - - /** - * Test setTimeout() with maximum timeout (10 minutes) - */ - public function testSetTimeoutMaximum(): void - { - $host = getenv('CLICKHOUSE_HOST') ?: 'clickhouse'; - $username = getenv('CLICKHOUSE_USER') ?: 'default'; - $password = getenv('CLICKHOUSE_PASSWORD') ?: 'clickhouse'; - $port = (int) (getenv('CLICKHOUSE_PORT') ?: 8123); - - $adapter = new ClickHouseAdapter($host, $username, $password, $port); - $adapter->setTimeout(600000); // 10 minutes maximum - - $this->assertTrue(true); // If we reach here, no exception was thrown - } - - /** - * Test setTimeout() with timeout below minimum - */ - public function testSetTimeoutBelowMinimum(): void - { - $this->expectException(\Exception::class); - $this->expectExceptionMessage('Timeout must be at least 1000 milliseconds'); - - $host = getenv('CLICKHOUSE_HOST') ?: 'clickhouse'; - $username = getenv('CLICKHOUSE_USER') ?: 'default'; - $password = getenv('CLICKHOUSE_PASSWORD') ?: 'clickhouse'; - $port = (int) (getenv('CLICKHOUSE_PORT') ?: 8123); - - $adapter = new ClickHouseAdapter($host, $username, $password, $port); - $adapter->setTimeout(999); // Below minimum - } - - /** - * Test setTimeout() with timeout above maximum - */ - public function testSetTimeoutAboveMaximum(): void - { - $this->expectException(\Exception::class); - $this->expectExceptionMessage('Timeout cannot exceed 600000 milliseconds'); - - $host = getenv('CLICKHOUSE_HOST') ?: 'clickhouse'; - $username = getenv('CLICKHOUSE_USER') ?: 'default'; - $password = getenv('CLICKHOUSE_PASSWORD') ?: 'clickhouse'; - $port = (int) (getenv('CLICKHOUSE_PORT') ?: 8123); - - $adapter = new ClickHouseAdapter($host, $username, $password, $port); - $adapter->setTimeout(600001); // Above maximum - } - - /** - * Test compression functionality - */ - public function testCompression(): void - { - $host = getenv('CLICKHOUSE_HOST') ?: 'clickhouse'; - $username = getenv('CLICKHOUSE_USER') ?: 'default'; - $password = getenv('CLICKHOUSE_PASSWORD') ?: 'clickhouse'; - $port = (int) (getenv('CLICKHOUSE_PORT') ?: 8123); - $secure = (bool) (getenv('CLICKHOUSE_SECURE') ?: false); - - $adapter = new ClickHouseAdapter($host, $username, $password, $port, $secure); - $adapter->setNamespace('utopia_usage_compression_test'); - $adapter->setTenant('1'); - - if ($database = getenv('CLICKHOUSE_DATABASE')) { - $adapter->setDatabase($database); - } - - $usage = new Usage($adapter); - $usage->setup(); - - // Test enabling compression - $result = $adapter->setCompression(true); - $this->assertInstanceOf(ClickHouseAdapter::class, $result); - - // Test disabling compression - $result = $adapter->setCompression(false); - $this->assertInstanceOf(ClickHouseAdapter::class, $result); - - // Enable compression for all subsequent operations - $adapter->setCompression(true); - - // Insert data using addBatch with compression enabled - $batchResult = $usage->addBatch([ - ['metric' => 'compression.test.batch', 'value' => 50, 'tags' => ['service' => 'batch']], - ['metric' => 'compression.test.batch', 'value' => 75, 'tags' => ['service' => 'batch']], - ['metric' => 'compression.test.single', 'value' => 100, 'tags' => ['service' => 'single']], - ], Usage::TYPE_EVENT); - $this->assertTrue($batchResult); - - // Verify find query works with compression - $metrics = $usage->find([], Usage::TYPE_EVENT); - $this->assertIsArray($metrics); - - // Verify count query works with compression - $count = $usage->count([], Usage::TYPE_EVENT); - $this->assertIsInt($count); - - // Verify sum operation works with compression - $sum = $usage->sum([ - \Utopia\Query\Query::equal('metric', ['compression.test.batch']), - ], 'value', Usage::TYPE_EVENT); - $this->assertIsInt($sum); - } - /** * Test connection pooling functionality */ @@ -901,25 +753,12 @@ public function testConnectionPooling(): void $usage = new Usage($adapter); $usage->setup(); - // Test enabling keep-alive - $result = $adapter->setKeepAlive(true); - $this->assertInstanceOf(ClickHouseAdapter::class, $result); - - // Test disabling keep-alive - $result = $adapter->setKeepAlive(false); - $this->assertInstanceOf(ClickHouseAdapter::class, $result); - - // Re-enable for testing - $adapter->setKeepAlive(true); - - // Get initial stats + // Connection reuse is always on (the transport client holds the cURL + // handle for its lifetime). Stats expose the request counter so callers + // can confirm requests are flowing over the pooled connection. $stats = $adapter->getConnectionStats(); - $this->assertIsArray($stats); $this->assertArrayHasKey('request_count', $stats); - $this->assertArrayHasKey('keep_alive_enabled', $stats); - $this->assertArrayHasKey('compression_enabled', $stats); $this->assertArrayHasKey('query_logging_enabled', $stats); - $this->assertTrue($stats['keep_alive_enabled']); $initialCount = $stats['request_count']; @@ -936,114 +775,6 @@ public function testConnectionPooling(): void $this->assertGreaterThanOrEqual(3, $newStats['request_count'] - $initialCount); } - /** - * Test retry logic configuration - */ - public function testRetryConfiguration(): void - { - $host = getenv('CLICKHOUSE_HOST') ?: 'clickhouse'; - $username = getenv('CLICKHOUSE_USER') ?: 'default'; - $password = getenv('CLICKHOUSE_PASSWORD') ?: 'clickhouse'; - $port = (int) (getenv('CLICKHOUSE_PORT') ?: 8123); - $secure = (bool) (getenv('CLICKHOUSE_SECURE') ?: false); - - $adapter = new ClickHouseAdapter($host, $username, $password, $port, $secure); - - // Test setting max retries - $result = $adapter->setMaxRetries(5); - $this->assertInstanceOf(ClickHouseAdapter::class, $result); - - // Test setting retry delay - $result = $adapter->setRetryDelay(200); - $this->assertInstanceOf(ClickHouseAdapter::class, $result); - - // Verify stats reflect configuration - $stats = $adapter->getConnectionStats(); - $this->assertSame(5, $stats['max_retries']); - $this->assertSame(200, $stats['retry_delay']); - - // Test valid retry range (0-10) - $adapter->setMaxRetries(0); - $stats = $adapter->getConnectionStats(); - $this->assertSame(0, $stats['max_retries']); - - $adapter->setMaxRetries(10); - $stats = $adapter->getConnectionStats(); - $this->assertSame(10, $stats['max_retries']); - } - - /** - * Test retry validation errors - */ - public function testRetryValidation(): void - { - $host = getenv('CLICKHOUSE_HOST') ?: 'clickhouse'; - $username = getenv('CLICKHOUSE_USER') ?: 'default'; - $password = getenv('CLICKHOUSE_PASSWORD') ?: 'clickhouse'; - $port = (int) (getenv('CLICKHOUSE_PORT') ?: 8123); - $secure = (bool) (getenv('CLICKHOUSE_SECURE') ?: false); - - $adapter = new ClickHouseAdapter($host, $username, $password, $port, $secure); - - // Test max retries below minimum - $this->expectException(\Exception::class); - $this->expectExceptionMessage('Max retries must be between 0 and 10'); - $adapter->setMaxRetries(-1); - } - - /** - * Test retry delay validation - */ - public function testRetryDelayValidation(): void - { - $host = getenv('CLICKHOUSE_HOST') ?: 'clickhouse'; - $username = getenv('CLICKHOUSE_USER') ?: 'default'; - $password = getenv('CLICKHOUSE_PASSWORD') ?: 'clickhouse'; - $port = (int) (getenv('CLICKHOUSE_PORT') ?: 8123); - $secure = (bool) (getenv('CLICKHOUSE_SECURE') ?: false); - - $adapter = new ClickHouseAdapter($host, $username, $password, $port, $secure); - - // Test retry delay below minimum - $this->expectException(\Exception::class); - $this->expectExceptionMessage('Retry delay must be between 10 and 5000 milliseconds'); - $adapter->setRetryDelay(5); - } - - /** - * Test retry logic with successful operations - */ - public function testRetryWithSuccessfulOperations(): void - { - $host = getenv('CLICKHOUSE_HOST') ?: 'clickhouse'; - $username = getenv('CLICKHOUSE_USER') ?: 'default'; - $password = getenv('CLICKHOUSE_PASSWORD') ?: 'clickhouse'; - $port = (int) (getenv('CLICKHOUSE_PORT') ?: 8123); - $secure = (bool) (getenv('CLICKHOUSE_SECURE') ?: false); - - $adapter = new ClickHouseAdapter($host, $username, $password, $port, $secure); - $adapter->setNamespace('utopia_usage_retry_test'); - $adapter->setTenant('1'); - $adapter->setMaxRetries(2); - $adapter->setRetryDelay(50); - - if ($database = getenv('CLICKHOUSE_DATABASE')) { - $adapter->setDatabase($database); - } - - $usage = new Usage($adapter); - $usage->setup(); - - // These operations should succeed on first attempt (no retries needed) - $result = $usage->addBatch([ - ['metric' => 'retry.test', 'value' => 100, 'tags' => ['service' => 'success']], - ], Usage::TYPE_EVENT); - $this->assertTrue($result); - - $count = $usage->count([], Usage::TYPE_EVENT); - $this->assertIsInt($count); - } - /** * Test error messages include operation context */ @@ -1059,7 +790,6 @@ public function testErrorMessagesIncludeContext(): void $adapter->setNamespace('utopia_usage_error_test'); $adapter->setDatabase('nonexistent_db_for_testing_errors_12345'); $adapter->setTenant('1'); - $adapter->setMaxRetries(0); // Disable retries for faster test $usage = new Usage($adapter); diff --git a/tests/Usage/Adapter/DatabaseTest.php b/tests/Usage/Adapter/DatabaseTest.php index 779450c..2d142f8 100644 --- a/tests/Usage/Adapter/DatabaseTest.php +++ b/tests/Usage/Adapter/DatabaseTest.php @@ -207,7 +207,6 @@ public function testHealthCheck(): void $health = $adapter->healthCheck(); // Assert basic structure - $this->assertIsArray($health); $this->assertArrayHasKey('healthy', $health); // Assert connection is healthy @@ -246,7 +245,6 @@ public function testHealthCheckWithNonExistentDatabase(): void $health = $adapter->healthCheck(); // Assert basic structure - $this->assertIsArray($health); $this->assertArrayHasKey('healthy', $health); // Assert connection failed diff --git a/tests/Usage/MetricTest.php b/tests/Usage/MetricTest.php index f542b97..eba1029 100644 --- a/tests/Usage/MetricTest.php +++ b/tests/Usage/MetricTest.php @@ -14,7 +14,6 @@ public function testGetEventSchemaReturnsAttributeDefinitions(): void { $schema = Metric::getEventSchema(); - $this->assertIsArray($schema); // 3 core (metric, value, time) + 24 dimension columns from EVENT_COLUMNS. $this->assertCount(3 + count(Metric::EVENT_COLUMNS), $schema); @@ -47,7 +46,6 @@ public function testGetGaugeSchemaReturnsAttributeDefinitions(): void { $schema = Metric::getGaugeSchema(); - $this->assertIsArray($schema); // 3 core (metric, value, time) + 4 GAUGE_COLUMNS. $this->assertCount(3 + count(Metric::GAUGE_COLUMNS), $schema); @@ -78,8 +76,6 @@ public function testGetEventIndexesReturnsIndexDefinitions(): void { $indexes = Metric::getEventIndexes(); - $this->assertIsArray($indexes); - $ids = array_column($indexes, '$id'); $this->assertNotContains('index-userAgent', $ids, 'userAgent index must be dropped'); } @@ -91,7 +87,6 @@ public function testGetGaugeIndexesReturnsIndexDefinitions(): void { $indexes = Metric::getGaugeIndexes(); - $this->assertIsArray($indexes); $this->assertCount(count(Metric::GAUGE_COLUMNS), $indexes); } @@ -155,7 +150,7 @@ public function testValidateAcceptsValidEventData(): void // Should not throw exception Metric::validate($validData, 'event'); - $this->assertTrue(true); + $this->expectNotToPerformAssertions(); } /** @@ -172,7 +167,7 @@ public function testValidateAcceptsValidGaugeData(): void ]; Metric::validate($validData, 'gauge'); - $this->assertTrue(true); + $this->expectNotToPerformAssertions(); } /** @@ -186,7 +181,7 @@ public function testValidateAcceptsMinimalData(): void ]; Metric::validate($minimalData, 'event'); - $this->assertTrue(true); + $this->expectNotToPerformAssertions(); } /** @@ -269,7 +264,7 @@ public function testValidateAcceptsDateTimeForTime(): void ]; Metric::validate($data, 'event'); - $this->assertTrue(true); + $this->expectNotToPerformAssertions(); } /** @@ -284,7 +279,7 @@ public function testValidateAcceptsDatetimeStringForTime(): void ]; Metric::validate($data, 'event'); - $this->assertTrue(true); + $this->expectNotToPerformAssertions(); } /** @@ -489,7 +484,6 @@ public function testGetAttributesReturnsAllAttributes(): void $metric = new Metric($data); $attributes = $metric->getAttributes(); - $this->assertIsArray($attributes); $this->assertEquals('metric-1', $attributes['$id']); $this->assertEquals('requests', $attributes['metric']); $this->assertEquals(100, $attributes['value']); @@ -600,7 +594,6 @@ public function testToArrayReturnsArray(): void $metric = new Metric($data); $array = $metric->toArray(); - $this->assertIsArray($array); $this->assertEquals('metric-1', $array['$id']); $this->assertEquals('requests', $array['metric']); $this->assertEquals(100, $array['value']); diff --git a/tests/Usage/UsageBase.php b/tests/Usage/UsageBase.php index 3e8d568..1dcbd3f 100644 --- a/tests/Usage/UsageBase.php +++ b/tests/Usage/UsageBase.php @@ -137,7 +137,6 @@ public function testGetTotalBatch(): void // Event metrics batch $totals = $this->usage->getTotalBatch(['requests', 'bandwidth'], [], Usage::TYPE_EVENT); - $this->assertIsArray($totals); $this->assertArrayHasKey('requests', $totals); $this->assertArrayHasKey('bandwidth', $totals); @@ -160,7 +159,6 @@ public function testGetTotalBatchWithMissingMetric(): void public function testGetTotalBatchEmpty(): void { $totals = $this->usage->getTotalBatch([]); - $this->assertIsArray($totals); $this->assertEmpty($totals); } @@ -179,7 +177,6 @@ public function testGetTimeSeries(): void Usage::TYPE_EVENT, ); - $this->assertIsArray($results); $this->assertArrayHasKey('requests', $results); $this->assertArrayHasKey('total', $results['requests']); $this->assertArrayHasKey('data', $results['requests']); @@ -662,7 +659,6 @@ public function testGroupByWithoutGroupByIntervalReturnsDimOnlyAggregate(): void Query::equal('metric', ['gbi-requests']), ], Usage::TYPE_EVENT); - $this->assertIsArray($rows); foreach ($rows as $row) { $this->assertArrayNotHasKey('time', $row->getArrayCopy()); $this->assertArrayHasKey('service', $row->getArrayCopy()); diff --git a/tests/Usage/UsageQueryTest.php b/tests/Usage/UsageQueryTest.php index 64bf7fe..924c6e3 100644 --- a/tests/Usage/UsageQueryTest.php +++ b/tests/Usage/UsageQueryTest.php @@ -110,7 +110,6 @@ public function testRemoveGroupByInterval(): void public function testValidIntervalsConstant(): void { - $this->assertIsArray(UsageQuery::VALID_INTERVALS); $this->assertArrayHasKey('1m', UsageQuery::VALID_INTERVALS); $this->assertArrayHasKey('5m', UsageQuery::VALID_INTERVALS); $this->assertArrayHasKey('15m', UsageQuery::VALID_INTERVALS);