From c0be417d0d85c3a2ee2b337681060121bdf254bf Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Sat, 30 May 2026 13:39:44 -0400 Subject: [PATCH] feat(waterdata): add get_cql for generalized CQL2 queries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds get_cql(service, cql, ...), a single public entry point for querying any Water Data OGC API collection with an arbitrary CQL2 filter — for predicates the typed getters (get_daily, get_continuous, …) can't express: a top-level or, like with % wildcards, comparison operators, nested boolean trees, and geometry predicates beyond a bounding box. The CQL2 body (str or dict) is POSTed verbatim; the result is shaped like the typed getters (wire id renamed, columns ordered/sorted, dtypes coerced) and returned as (DataFrame, BaseMetadata). Like get_stats_data, it's a single request (the CQL body is opaque, so nothing to chunk); server-side CQL errors surface as the module's standard typed errors. Reuses existing machinery rather than duplicating it: _construct_cql_request shares the skipGeometry/limit/bbox/properties URL block with _construct_api_requests via _ogc_query_params; the non-chunked anyio-portal fetch path (_run_sync) is shared with get_stats_data; result shaping goes through the existing _finalize_ogc hook. WATERDATA_SERVICES enumerates the 11 OGC collections, kept in sync with _OUTPUT_ID_BY_SERVICE (guarded by a test). Also adopts _OUTPUT_ID_BY_SERVICE across the typed getters: get_ogc_data derives output_id from service via the map (single source of truth) instead of each getter hardcoding it; output_id becomes an optional override that get_reference_table still passes for its metadata collections. Tests: unit (request construction, skip_geometry omission, service validation, the WATERDATA_SERVICES/_OUTPUT_ID_BY_SERVICE sync invariant) + live (compound AND/IN, str/dict equivalence, id translation, LIKE wildcard); all typed getters re-verified against the live API after the refactor. Addresses #198 Co-Authored-By: Claude Opus 4.8 (1M context) --- dataretrieval/waterdata/__init__.py | 4 + dataretrieval/waterdata/api.py | 183 ++++++++++++++++++++++++---- dataretrieval/waterdata/types.py | 18 +++ dataretrieval/waterdata/utils.py | 182 ++++++++++++++++++++++++--- tests/waterdata_test.py | 121 ++++++++++++++++++ 5 files changed, 466 insertions(+), 42 deletions(-) diff --git a/dataretrieval/waterdata/__init__.py b/dataretrieval/waterdata/__init__.py index f81966c4..9b5ca610 100644 --- a/dataretrieval/waterdata/__init__.py +++ b/dataretrieval/waterdata/__init__.py @@ -15,6 +15,7 @@ get_codes, get_combined_metadata, get_continuous, + get_cql, get_daily, get_field_measurements, get_field_measurements_metadata, @@ -37,6 +38,7 @@ PROFILE_LOOKUP, PROFILES, SERVICES, + WATERDATA_SERVICES, ) __all__ = [ @@ -45,10 +47,12 @@ "PROFILES", "PROFILE_LOOKUP", "SERVICES", + "WATERDATA_SERVICES", "get_channel", "get_codes", "get_combined_metadata", "get_continuous", + "get_cql", "get_daily", "get_field_measurements", "get_field_measurements_metadata", diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index ddbc31b1..b969cca4 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -28,13 +28,22 @@ METADATA_COLLECTIONS, PROFILES, SERVICES, + WATERDATA_SERVICES, ) from dataretrieval.waterdata.utils import ( + _OUTPUT_ID_BY_SERVICE, + GEOPANDAS, SAMPLES_URL, + _as_str_list, _check_profiles, + _construct_cql_request, _default_headers, + _finalize_ogc, _get_args, _raise_for_non_200, + _run_sync, + _switch_properties_id, + _walk_pages, get_ogc_data, get_stats_data, ) @@ -252,12 +261,11 @@ def get_daily( ... ) """ service = "daily" - output_id = "daily_id" # Build argument dictionary, omitting None values args = _get_args(locals()) - return get_ogc_data(args, output_id, service) + return get_ogc_data(args, service) def get_continuous( @@ -440,12 +448,11 @@ def get_continuous( ... ) """ service = "continuous" - output_id = "continuous_id" # Build argument dictionary, omitting None values args = _get_args(locals()) - return get_ogc_data(args, output_id, service) + return get_ogc_data(args, service) def get_monitoring_locations( @@ -738,12 +745,11 @@ def get_monitoring_locations( ... ) """ service = "monitoring-locations" - output_id = "monitoring_location_id" # Build argument dictionary, omitting None values args = _get_args(locals()) - return get_ogc_data(args, output_id, service) + return get_ogc_data(args, service) def get_time_series_metadata( @@ -961,12 +967,11 @@ def get_time_series_metadata( ... ) """ service = "time-series-metadata" - output_id = "time_series_id" # Build argument dictionary, omitting None values args = _get_args(locals()) - return get_ogc_data(args, output_id, service) + return get_ogc_data(args, service) def get_combined_metadata( @@ -1194,11 +1199,10 @@ def get_combined_metadata( """ service = "combined-metadata" - output_id = "combined_meta_id" args = _get_args(locals()) - return get_ogc_data(args, output_id, service) + return get_ogc_data(args, service) def get_latest_continuous( @@ -1388,12 +1392,11 @@ def get_latest_continuous( ... ) """ service = "latest-continuous" - output_id = "latest_continuous_id" # Build argument dictionary, omitting None values args = _get_args(locals()) - return get_ogc_data(args, output_id, service) + return get_ogc_data(args, service) def get_latest_daily( @@ -1584,12 +1587,11 @@ def get_latest_daily( ... ) """ service = "latest-daily" - output_id = "latest_daily_id" # Build argument dictionary, omitting None values args = _get_args(locals()) - return get_ogc_data(args, output_id, service) + return get_ogc_data(args, service) def get_field_measurements( @@ -1774,12 +1776,11 @@ def get_field_measurements( ... ) """ service = "field-measurements" - output_id = "field_measurement_id" # Build argument dictionary, omitting None values args = _get_args(locals()) - return get_ogc_data(args, output_id, service) + return get_ogc_data(args, service) def get_field_measurements_metadata( @@ -1892,11 +1893,10 @@ def get_field_measurements_metadata( """ service = "field-measurements-metadata" - output_id = "field_series_id" args = _get_args(locals()) - return get_ogc_data(args, output_id, service) + return get_ogc_data(args, service) def get_peaks( @@ -2012,11 +2012,10 @@ def get_peaks( """ service = "peaks" - output_id = "peak_id" args = _get_args(locals()) - return get_ogc_data(args, output_id, service) + return get_ogc_data(args, service) def get_reference_table( @@ -2846,8 +2845,148 @@ def get_channel( ... ) """ service = "channel-measurements" - output_id = "channel_measurements_id" args = _get_args(locals()) - return get_ogc_data(args, output_id, service) + return get_ogc_data(args, service) + + +def get_cql( + service: WATERDATA_SERVICES, + cql: str | dict, + *, + properties: str | Iterable[str] | None = None, + bbox: list[float] | None = None, + limit: int | None = None, + skip_geometry: bool | None = None, + convert_type: bool = True, +) -> tuple[pd.DataFrame, BaseMetadata]: + """Query a Water Data OGC API collection with an arbitrary CQL2 filter. + + Sends ``cql`` as a CQL2 filter against ``service`` and returns the matching + features, shaped like the typed getters (``get_daily``, ``get_continuous``, + …): the wire ``id`` renamed to the service's id column, columns ordered and + sorted, and dtypes coerced. Use it when you need a predicate the typed + getters can't express — a top-level ``or``, ``like`` with ``%`` wildcards, + comparison operators, nested boolean trees, or a geometry predicate beyond a + bounding box; prefer a typed getter when one covers the query. + + The request is a single POST with the ``cql`` body sent verbatim, so there + are no multi-value arguments to chunk: narrow a query whose URL or body + would exceed the server's size cap rather than relying on automatic + chunking. + + The CQL2 grammar is documented at + https://api.waterdata.usgs.gov/docs/ogcapi/complex-queries/. + + Parameters + ---------- + service : str + OGC collection name. Must be one of + :data:`dataretrieval.waterdata.types.WATERDATA_SERVICES` + (e.g. ``"daily"``, ``"monitoring-locations"``). + cql : str or dict + CQL2 query. A ``dict`` is JSON-serialized for transport; a ``str`` is + sent through unchanged. The query goes into the HTTP POST body with + ``Content-Type: application/query-cql-json``. + properties : str or iterable of str, optional + Server-side property whitelist (passed as ``properties=`` on the URL). + Reduces payload size. ``"id"`` resolves to the service's ``output_id`` + (e.g. ``daily_id``) the same way it does in the typed wrappers. + bbox : list of float, optional + Bounding box ``[xmin, ymin, xmax, ymax]`` in CRS 4326. Combines with the + CQL filter as an additional spatial predicate. + limit : int, optional + Page size, clamped server-side to 50,000. + skip_geometry : bool, optional + If True, the server omits geometry from each feature + (``skipGeometry=true``). + convert_type : bool, default True + Coerce date/datetime/numeric columns to typed dtypes after the + DataFrame is built. + + Returns + ------- + df : pandas.DataFrame or geopandas.GeoDataFrame + Result of the query. GeoDataFrame when ``geopandas`` is installed and + geometry is present. + md : :class:`dataretrieval.utils.BaseMetadata` + Request metadata (URL, query time, response headers). + + Examples + -------- + .. code:: + + >>> # Daily values for two parameter codes at two sites + >>> # (compound AND-of-INs). + >>> from dataretrieval import waterdata + >>> cql = { + ... "op": "and", + ... "args": [ + ... { + ... "op": "in", + ... "args": [ + ... {"property": "parameter_code"}, + ... ["00060", "00065"], + ... ], + ... }, + ... { + ... "op": "in", + ... "args": [ + ... {"property": "monitoring_location_id"}, + ... ["USGS-07367300", "USGS-03277200"], + ... ], + ... }, + ... ], + ... } + >>> df, md = waterdata.get_cql(service="daily", cql=cql) + + >>> # Monitoring locations whose HUC starts with "02070010" + >>> # (LIKE with the CQL2 ``%`` wildcard). + >>> df, md = waterdata.get_cql( + ... service="monitoring-locations", + ... cql='{"op": "like", "args": [' + ... '{"property": "hydrologic_unit_code"},' + ... ' "02070010%"]}', + ... ) + """ + if service not in _OUTPUT_ID_BY_SERVICE: + raise ValueError( + f"Unknown service {service!r}. Valid services: " + f"{sorted(_OUTPUT_ID_BY_SERVICE)}." + ) + output_id = _OUTPUT_ID_BY_SERVICE[service] + + # ``dict`` is the pythonic input — serialize on the way out. ``str`` is sent + # verbatim so callers who already have a CQL2 doc (e.g. imported from a + # config file) don't need to re-parse it. + body = json.dumps(cql, separators=(",", ":")) if isinstance(cql, dict) else cql + + properties_list = _as_str_list(properties, "properties") + + # Translate user-facing names (``daily_id``/``id``) to the wire ``id`` the + # OGC API expects, matching the typed getters. + wire_properties = _switch_properties_id(properties_list, output_id, service) + + req = _construct_cql_request( + service, + body, + properties=wire_properties, + bbox=bbox, + limit=limit, + skip_geometry=skip_geometry, + ) + + async def _run() -> tuple[pd.DataFrame, httpx.Response]: + return await _walk_pages(geopd=GEOPANDAS, req=req) + + df, response = _run_sync(_run, service=service) + + return _finalize_ogc( + df, + response, + properties=properties_list, + output_id=output_id, + convert_type=convert_type, + service=service, + ) diff --git a/dataretrieval/waterdata/types.py b/dataretrieval/waterdata/types.py index f5e1496b..20753d3f 100644 --- a/dataretrieval/waterdata/types.py +++ b/dataretrieval/waterdata/types.py @@ -40,6 +40,24 @@ "results", ] +# OGC API time-series/monitoring collections queryable via ``get_cql``. +# Keep in sync with ``utils._OUTPUT_ID_BY_SERVICE`` (same keys): that dict maps +# each service to its user-facing ``id`` column and is the runtime source of +# truth ``get_cql`` validates against. +WATERDATA_SERVICES = Literal[ + "channel-measurements", + "combined-metadata", + "continuous", + "daily", + "field-measurements", + "field-measurements-metadata", + "latest-continuous", + "latest-daily", + "monitoring-locations", + "peaks", + "time-series-metadata", +] + PROFILES = Literal[ "actgroup", "actmetric", diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 0f86730f..0a9f1c71 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -67,6 +67,25 @@ STATISTICS_API_VERSION = "v0" STATISTICS_API_URL = f"{BASE_URL}/statistics/{STATISTICS_API_VERSION}" +# Maps each OGC waterdata service to its user-facing ``id`` column (the name the +# typed getters rename the wire ``id`` to, e.g. ``daily`` -> ``daily_id``). +# ``get_cql`` validates its ``service`` argument against these keys and +# uses the value as the ``output_id`` for result shaping. Keep in sync with the +# ``types.WATERDATA_SERVICES`` Literal (same keys). +_OUTPUT_ID_BY_SERVICE: dict[str, str] = { + "channel-measurements": "channel_measurements_id", + "combined-metadata": "combined_meta_id", + "continuous": "continuous_id", + "daily": "daily_id", + "field-measurements": "field_measurement_id", + "field-measurements-metadata": "field_series_id", + "latest-continuous": "latest_continuous_id", + "latest-daily": "latest_daily_id", + "monitoring-locations": "monitoring_location_id", + "peaks": "peak_id", + "time-series-metadata": "time_series_id", +} + def _switch_arg_id(ls: dict[str, Any], id_name: str, service: str): """ @@ -561,6 +580,36 @@ def _paginated_failure_message(pages_collected: int, cause: BaseException) -> st ) +def _ogc_query_params( + params: dict[str, Any], + *, + properties: list[str] | None, + bbox: list[float] | None, + limit: int | None, + skip_geometry: bool | None, +) -> dict[str, Any]: + """Add the shared OGC query knobs to ``params`` (mutated in place). + + Factors out the ``skipGeometry``/``limit``/``bbox``/``properties`` block + common to every OGC request so the typed getters + (:func:`_construct_api_requests`) and the generalized CQL2 path + (:func:`_construct_cql_request`) build identical URL parameters. + + ``skip_geometry=None`` leaves ``skipGeometry`` unset (the server defaults to + including geometry); the typed getters always pass a bool, so their behavior + is unchanged. + """ + if skip_geometry is not None: + params["skipGeometry"] = skip_geometry + params["limit"] = 50000 if limit is None or limit > 50000 else limit + # `len()` instead of truthiness: a numpy ndarray would raise on `if bbox:`. + if bbox is not None and len(bbox) > 0: + params["bbox"] = ",".join(map(str, bbox)) + if properties: + params["properties"] = ",".join(properties) + return params + + def _construct_api_requests( service: str, properties: list[str] | None = None, @@ -631,14 +680,13 @@ def _construct_api_requests( for k, v in kwargs.items() } - params["skipGeometry"] = skip_geometry - params["limit"] = 50000 if limit is None or limit > 50000 else limit - - # `len()` instead of truthiness: a numpy ndarray would raise on `if bbox:`. - if bbox is not None and len(bbox) > 0: - params["bbox"] = ",".join(map(str, bbox)) - if properties: - params["properties"] = ",".join(properties) + _ogc_query_params( + params, + properties=properties, + bbox=bbox, + limit=limit, + skip_geometry=skip_geometry, + ) # Translate CQL filter Python names to the hyphenated URL parameter that # the OGC API expects. The Python kwarg is `filter_lang` because hyphens @@ -665,6 +713,59 @@ def _construct_api_requests( ) +def _construct_cql_request( + service: str, + cql_body: str, + *, + properties: list[str] | None = None, + bbox: list[float] | None = None, + limit: int | None = None, + skip_geometry: bool | None = None, +) -> httpx.Request: + """Build a POST/CQL2 request from a verbatim CQL2 body. + + The OGC-API counterpart to :func:`_construct_api_requests` for the + generalized :func:`~dataretrieval.waterdata.api.get_cql` path: the + caller supplies an already-serialized CQL2 JSON document (any predicate the + grammar allows), sent unchanged as the request body, while + ``properties``/``bbox``/``limit``/``skip_geometry`` go on the URL via the + shared :func:`_ogc_query_params` — so a generalized query and an equivalent + typed getter produce the same URL parameters. + + Parameters + ---------- + service : str + OGC collection name (e.g. ``"daily"``). + cql_body : str + Serialized CQL2 JSON document, sent as the POST body verbatim. + properties, bbox, limit, skip_geometry + See :func:`_ogc_query_params`. ``properties`` are wire-format + (``id``-translated) names. + + Returns + ------- + httpx.Request + A POST request with ``Content-Type: application/query-cql-json``. + """ + service_url = f"{OGC_API_URL}/collections/{service}/items" + params = _ogc_query_params( + {}, + properties=properties, + bbox=bbox, + limit=limit, + skip_geometry=skip_geometry, + ) + headers = _default_headers() + headers["Content-Type"] = "application/query-cql-json" + return httpx.Request( + method="POST", + url=service_url, + headers=headers, + content=cql_body, + params=params, + ) + + def _next_req_url( resp: httpx.Response, *, body: dict[str, Any] | None = None ) -> str | None: @@ -1322,8 +1423,8 @@ def _finalize_ogc( def get_ogc_data( args: dict[str, Any], - output_id: str, service: str, + output_id: str | None = None, max_rows: int | None = None, ) -> tuple[pd.DataFrame, BaseMetadata]: """ @@ -1338,11 +1439,13 @@ def get_ogc_data( ---------- args : Dict[str, Any] Dictionary of request arguments for the OGC service. - output_id : str - The name of the output identifier to use in the request. service : str The OGC API collection name (e.g., ``"daily"``, ``"monitoring-locations"``, ``"continuous"``). + output_id : str, optional + The user-facing id column the wire ``id`` is renamed to. Defaults + to ``_OUTPUT_ID_BY_SERVICE[service]``; pass it explicitly only for + collections outside that map (e.g. reference-table collections). max_rows : int, optional Stop paginating once this many rows have been collected and truncate the result to exactly ``max_rows``. ``None`` (default) @@ -1375,6 +1478,13 @@ def get_ogc_data( ): raise ValueError(f"max_rows must be a positive integer (got {max_rows!r}).") + # Each service renames its wire ``id`` to a service-specific column; that + # name is derived from ``service`` via the canonical map so the getters + # don't each repeat it. Callers for collections outside the map (e.g. + # get_reference_table's metadata collections) pass output_id explicitly. + if output_id is None: + output_id = _OUTPUT_ID_BY_SERVICE[service] + args = args.copy() args["service"] = service args = _switch_arg_id(args, id_name=output_id, service=service) @@ -1584,6 +1694,28 @@ def _expand_percentiles(df: pd.DataFrame) -> pd.DataFrame: return df +def _run_sync( + make_coro: Callable[[], Awaitable[tuple[pd.DataFrame, httpx.Response]]], + *, + service: str, +) -> tuple[pd.DataFrame, httpx.Response]: + """Drive an async OGC fetch to completion from synchronous code. + + Opens the service progress context and runs ``make_coro()`` through a + short-lived ``anyio`` blocking portal (a worker thread), so the + non-chunked getters work whether or not the caller is already inside an + event loop (Jupyter/async apps). The portal copies the calling context, + so the active progress reporter still reaches the sub-requests. + + Shared by the non-chunked fetch paths (:func:`get_stats_data`, + :func:`get_cql`); the chunked OGC getters drive their own portal + inside :meth:`chunking.ChunkedCall.resume`. + """ + with _progress.progress_context(service=service): + with start_blocking_portal() as portal: + return portal.call(make_coro) + + def get_stats_data( args: dict[str, Any], service: str, @@ -1660,13 +1792,7 @@ async def _run() -> tuple[pd.DataFrame, httpx.Response]: client=client, ) - # The stats path opens its own progress context (it doesn't go through - # ``multi_value_chunked``); ``_paginate`` reports pages/rate-limit - # into it. The portal copies the calling context, so the reporter still - # reaches the worker thread. - with _progress.progress_context(service=service): - with start_blocking_portal() as portal: - df, response = portal.call(_run) + df, response = _run_sync(_run, service=service) if expand_percentiles: df = _expand_percentiles(df) @@ -1772,6 +1898,23 @@ def _normalize_str_iterable( return values +def _as_str_list( + value: str | Iterable[str] | None, + param_name: str = "value", +) -> list[str] | None: + """Normalize ``value`` to ``list[str]`` (``None`` passes through). + + Wraps a bare ``str`` in a single-element list — so a later + ``",".join(...)`` doesn't iterate it character-by-character — and + materializes any other iterable via :func:`_normalize_str_iterable`. + """ + return ( + [value] + if isinstance(value, str) + else _normalize_str_iterable(value, param_name) + ) + + def _check_monitoring_location_id( monitoring_location_id: str | Iterable[str] | None, ) -> str | list[str] | None: @@ -1871,8 +2014,7 @@ def _get_args( if k == "monitoring_location_id": args[k] = _check_monitoring_location_id(v) elif k == "properties": - # `",".join(properties)` would iterate a bare string as characters. - args[k] = [v] if isinstance(v, str) else _normalize_str_iterable(v, k) + args[k] = _as_str_list(v, k) elif ( k in _NO_NORMALIZE_PARAMS or isinstance(v, str) diff --git a/tests/waterdata_test.py b/tests/waterdata_test.py index 92c978de..d541fed3 100644 --- a/tests/waterdata_test.py +++ b/tests/waterdata_test.py @@ -15,6 +15,7 @@ get_channel, get_combined_metadata, get_continuous, + get_cql, get_daily, get_field_measurements, get_field_measurements_metadata, @@ -33,6 +34,7 @@ _check_monitoring_location_id, _check_profiles, _construct_api_requests, + _construct_cql_request, _normalize_str_iterable, ) @@ -201,6 +203,61 @@ def test_construct_api_requests_monitoring_locations_post(): assert predicate["args"][1] == ["010802050102", "010802050103"] +def test_construct_cql_request_post_verbatim_body(): + """get_cql's request builder POSTs the CQL2 body verbatim with the + right content-type, and puts the OGC knobs on the URL.""" + body = json.dumps( + {"op": "like", "args": [{"property": "hydrologic_unit_code"}, "02070010%"]}, + separators=(",", ":"), + ) + req = _construct_cql_request( + "daily", + body, + properties=["id", "value"], + bbox=[-90.0, 40.0, -89.0, 41.0], + limit=10, + skip_geometry=True, + ) + assert req.method == "POST" + assert req.headers["Content-Type"] == "application/query-cql-json" + assert str(req.url).startswith( + "https://api.waterdata.usgs.gov/ogcapi/v0/collections/daily/items" + ) + # The body is sent through unchanged, not re-serialized. + assert req.content.decode() == body + url = str(req.url) + assert "skipGeometry=true" in url + assert "limit=10" in url + assert "bbox=-90.0%2C40.0%2C-89.0%2C41.0" in url + assert "properties=id%2Cvalue" in url + + +def test_construct_cql_request_skip_geometry_none_omits_param(): + """skip_geometry=None leaves skipGeometry unset (server default), so it never + reaches the URL — matching get_cql's default.""" + req = _construct_cql_request("daily", "{}") + assert "skipGeometry" not in str(req.url) + + +def test_get_cql_unknown_service_raises(): + """An unknown service is rejected before any network call.""" + with pytest.raises(ValueError, match="Unknown service"): + get_cql("not-a-service", {"op": "isNull", "args": [{"property": "x"}]}) + + +def test_waterdata_services_literal_matches_output_id_map(): + """The WATERDATA_SERVICES Literal and _OUTPUT_ID_BY_SERVICE must enumerate + the same services: get_cql validates against the dict while the Literal + types the public signature, so drift would let one accept a service the other + rejects.""" + from typing import get_args + + from dataretrieval.waterdata.types import WATERDATA_SERVICES + from dataretrieval.waterdata.utils import _OUTPUT_ID_BY_SERVICE + + assert set(get_args(WATERDATA_SERVICES)) == set(_OUTPUT_ID_BY_SERVICE) + + def test_construct_api_requests_single_value_stays_get(): """A length-1 list (or scalar) reaches the URL as a plain value, not a comma-separated form, so existing single-site callers see no change.""" @@ -406,6 +463,70 @@ def test_get_monitoring_locations_hucs(): } +def test_get_cql_compound_and_in(): + """Generalized CQL2: a compound AND-of-INs routed through get_cql. + Confirms the (df, md) shape matches the typed getters — wire ``id`` renamed + and ordered last.""" + cql = { + "op": "and", + "args": [ + {"op": "in", "args": [{"property": "parameter_code"}, ["00060", "00065"]]}, + { + "op": "in", + "args": [{"property": "monitoring_location_id"}, ["USGS-05427718"]], + }, + ], + } + df, md = get_cql("latest-daily", cql) + assert len(df) >= 1 + assert "latest_daily_id" in df.columns and "id" not in df.columns + assert df.columns[-1] == "latest_daily_id" + assert set(df["parameter_code"]).issubset({"00060", "00065"}) + assert set(df["monitoring_location_id"]) == {"USGS-05427718"} + assert hasattr(md, "url") + assert hasattr(md, "query_time") + + +def test_get_cql_str_body_matches_dict(): + """A CQL2 ``str`` body is sent verbatim and yields the same result as the + equivalent ``dict``.""" + cql = { + "op": "in", + "args": [{"property": "monitoring_location_id"}, ["USGS-05427718"]], + } + df_dict, _ = get_cql("latest-daily", cql) + df_str, _ = get_cql("latest-daily", json.dumps(cql)) + assert list(df_str.columns) == list(df_dict.columns) + assert len(df_str) == len(df_dict) + + +def test_get_cql_properties_id_translation(): + """``properties=['id', ...]`` resolves ``id`` to the service's output_id + column, just like the typed getters, preserving the requested order.""" + cql = { + "op": "in", + "args": [{"property": "monitoring_location_id"}, ["USGS-05427718"]], + } + df, _ = get_cql( + "latest-daily", + cql, + properties=["monitoring_location_id", "id", "parameter_code", "value"], + ) + assert df.columns[1] == "latest_daily_id" + + +def test_get_cql_like_wildcard(): + """Generalized CQL2 unlocks predicates the typed getters can't express, e.g. + a LIKE with a ``%`` wildcard.""" + cql = { + "op": "like", + "args": [{"property": "hydrologic_unit_code"}, "020700100101%"], + } + df, _ = get_cql("monitoring-locations", cql) + assert len(df) >= 1 + assert df["hydrologic_unit_code"].astype(str).str.startswith("020700100101").all() + + def test_get_latest_continuous(): df, md = get_latest_continuous( monitoring_location_id=["USGS-05427718", "USGS-05427719"],