Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dataretrieval/waterdata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
get_codes,
get_combined_metadata,
get_continuous,
get_cql,
get_daily,
get_field_measurements,
get_field_measurements_metadata,
Expand All @@ -37,6 +38,7 @@
PROFILE_LOOKUP,
PROFILES,
SERVICES,
WATERDATA_SERVICES,
)

__all__ = [
Expand All @@ -45,10 +47,12 @@
"PROFILES",
"PROFILE_LOOKUP",
"SERVICES",
"WATERDATA_SERVICES",
"get_channel",
"get_codes",
"get_combined_metadata",
"get_continuous",
"get_cql",
"get_daily",
"get_field_measurements",
"get_field_measurements_metadata",
Expand Down
183 changes: 161 additions & 22 deletions dataretrieval/waterdata/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,22 @@
METADATA_COLLECTIONS,
PROFILES,
SERVICES,
WATERDATA_SERVICES,
)
from dataretrieval.waterdata.utils import (
_OUTPUT_ID_BY_SERVICE,
GEOPANDAS,
SAMPLES_URL,
_as_str_list,
_check_profiles,
_construct_cql_request,
_default_headers,
_finalize_ogc,
_get_args,
_raise_for_non_200,
_run_sync,
_switch_properties_id,
_walk_pages,
get_ogc_data,
get_stats_data,
)
Expand Down Expand Up @@ -252,12 +261,11 @@ def get_daily(
... )
"""
service = "daily"
output_id = "daily_id"

# Build argument dictionary, omitting None values
args = _get_args(locals())

return get_ogc_data(args, output_id, service)
return get_ogc_data(args, service)


def get_continuous(
Expand Down Expand Up @@ -440,12 +448,11 @@ def get_continuous(
... )
"""
service = "continuous"
output_id = "continuous_id"

# Build argument dictionary, omitting None values
args = _get_args(locals())

return get_ogc_data(args, output_id, service)
return get_ogc_data(args, service)


def get_monitoring_locations(
Expand Down Expand Up @@ -738,12 +745,11 @@ def get_monitoring_locations(
... )
"""
service = "monitoring-locations"
output_id = "monitoring_location_id"

# Build argument dictionary, omitting None values
args = _get_args(locals())

return get_ogc_data(args, output_id, service)
return get_ogc_data(args, service)


def get_time_series_metadata(
Expand Down Expand Up @@ -961,12 +967,11 @@ def get_time_series_metadata(
... )
"""
service = "time-series-metadata"
output_id = "time_series_id"

# Build argument dictionary, omitting None values
args = _get_args(locals())

return get_ogc_data(args, output_id, service)
return get_ogc_data(args, service)


def get_combined_metadata(
Expand Down Expand Up @@ -1194,11 +1199,10 @@ def get_combined_metadata(

"""
service = "combined-metadata"
output_id = "combined_meta_id"

args = _get_args(locals())

return get_ogc_data(args, output_id, service)
return get_ogc_data(args, service)


def get_latest_continuous(
Expand Down Expand Up @@ -1388,12 +1392,11 @@ def get_latest_continuous(
... )
"""
service = "latest-continuous"
output_id = "latest_continuous_id"

# Build argument dictionary, omitting None values
args = _get_args(locals())

return get_ogc_data(args, output_id, service)
return get_ogc_data(args, service)


def get_latest_daily(
Expand Down Expand Up @@ -1584,12 +1587,11 @@ def get_latest_daily(
... )
"""
service = "latest-daily"
output_id = "latest_daily_id"

# Build argument dictionary, omitting None values
args = _get_args(locals())

return get_ogc_data(args, output_id, service)
return get_ogc_data(args, service)


def get_field_measurements(
Expand Down Expand Up @@ -1774,12 +1776,11 @@ def get_field_measurements(
... )
"""
service = "field-measurements"
output_id = "field_measurement_id"

# Build argument dictionary, omitting None values
args = _get_args(locals())

return get_ogc_data(args, output_id, service)
return get_ogc_data(args, service)


def get_field_measurements_metadata(
Expand Down Expand Up @@ -1892,11 +1893,10 @@ def get_field_measurements_metadata(

"""
service = "field-measurements-metadata"
output_id = "field_series_id"

args = _get_args(locals())

return get_ogc_data(args, output_id, service)
return get_ogc_data(args, service)


def get_peaks(
Expand Down Expand Up @@ -2012,11 +2012,10 @@ def get_peaks(

"""
service = "peaks"
output_id = "peak_id"

args = _get_args(locals())

return get_ogc_data(args, output_id, service)
return get_ogc_data(args, service)


def get_reference_table(
Expand Down Expand Up @@ -2846,8 +2845,148 @@ def get_channel(
... )
"""
service = "channel-measurements"
output_id = "channel_measurements_id"

args = _get_args(locals())

return get_ogc_data(args, output_id, service)
return get_ogc_data(args, service)


def get_cql(
service: WATERDATA_SERVICES,
cql: str | dict,
*,
properties: str | Iterable[str] | None = None,
bbox: list[float] | None = None,
limit: int | None = None,
skip_geometry: bool | None = None,
convert_type: bool = True,
) -> tuple[pd.DataFrame, BaseMetadata]:
"""Query a Water Data OGC API collection with an arbitrary CQL2 filter.

Sends ``cql`` as a CQL2 filter against ``service`` and returns the matching
features, shaped like the typed getters (``get_daily``, ``get_continuous``,
…): the wire ``id`` renamed to the service's id column, columns ordered and
sorted, and dtypes coerced. Use it when you need a predicate the typed
getters can't express — a top-level ``or``, ``like`` with ``%`` wildcards,
comparison operators, nested boolean trees, or a geometry predicate beyond a
bounding box; prefer a typed getter when one covers the query.

The request is a single POST with the ``cql`` body sent verbatim, so there
are no multi-value arguments to chunk: narrow a query whose URL or body
would exceed the server's size cap rather than relying on automatic
chunking.

The CQL2 grammar is documented at
https://api.waterdata.usgs.gov/docs/ogcapi/complex-queries/.

Parameters
----------
service : str
OGC collection name. Must be one of
:data:`dataretrieval.waterdata.types.WATERDATA_SERVICES`
(e.g. ``"daily"``, ``"monitoring-locations"``).
cql : str or dict
CQL2 query. A ``dict`` is JSON-serialized for transport; a ``str`` is
sent through unchanged. The query goes into the HTTP POST body with
``Content-Type: application/query-cql-json``.
properties : str or iterable of str, optional
Server-side property whitelist (passed as ``properties=`` on the URL).
Reduces payload size. ``"id"`` resolves to the service's ``output_id``
(e.g. ``daily_id``) the same way it does in the typed wrappers.
bbox : list of float, optional
Bounding box ``[xmin, ymin, xmax, ymax]`` in CRS 4326. Combines with the
CQL filter as an additional spatial predicate.
limit : int, optional
Page size, clamped server-side to 50,000.
skip_geometry : bool, optional
If True, the server omits geometry from each feature
(``skipGeometry=true``).
convert_type : bool, default True
Coerce date/datetime/numeric columns to typed dtypes after the
DataFrame is built.

Returns
-------
df : pandas.DataFrame or geopandas.GeoDataFrame
Result of the query. GeoDataFrame when ``geopandas`` is installed and
geometry is present.
md : :class:`dataretrieval.utils.BaseMetadata`
Request metadata (URL, query time, response headers).

Examples
--------
.. code::

>>> # Daily values for two parameter codes at two sites
>>> # (compound AND-of-INs).
>>> from dataretrieval import waterdata
>>> cql = {
... "op": "and",
... "args": [
... {
... "op": "in",
... "args": [
... {"property": "parameter_code"},
... ["00060", "00065"],
... ],
... },
... {
... "op": "in",
... "args": [
... {"property": "monitoring_location_id"},
... ["USGS-07367300", "USGS-03277200"],
... ],
... },
... ],
... }
>>> df, md = waterdata.get_cql(service="daily", cql=cql)

>>> # Monitoring locations whose HUC starts with "02070010"
>>> # (LIKE with the CQL2 ``%`` wildcard).
>>> df, md = waterdata.get_cql(
... service="monitoring-locations",
... cql='{"op": "like", "args": ['
... '{"property": "hydrologic_unit_code"},'
... ' "02070010%"]}',
... )
"""
if service not in _OUTPUT_ID_BY_SERVICE:
raise ValueError(
f"Unknown service {service!r}. Valid services: "
f"{sorted(_OUTPUT_ID_BY_SERVICE)}."
)
output_id = _OUTPUT_ID_BY_SERVICE[service]

# ``dict`` is the pythonic input — serialize on the way out. ``str`` is sent
# verbatim so callers who already have a CQL2 doc (e.g. imported from a
# config file) don't need to re-parse it.
body = json.dumps(cql, separators=(",", ":")) if isinstance(cql, dict) else cql

properties_list = _as_str_list(properties, "properties")

# Translate user-facing names (``daily_id``/``id``) to the wire ``id`` the
# OGC API expects, matching the typed getters.
wire_properties = _switch_properties_id(properties_list, output_id, service)

req = _construct_cql_request(
service,
body,
properties=wire_properties,
bbox=bbox,
limit=limit,
skip_geometry=skip_geometry,
)

async def _run() -> tuple[pd.DataFrame, httpx.Response]:
return await _walk_pages(geopd=GEOPANDAS, req=req)

df, response = _run_sync(_run, service=service)

return _finalize_ogc(
df,
response,
properties=properties_list,
output_id=output_id,
convert_type=convert_type,
service=service,
)
18 changes: 18 additions & 0 deletions dataretrieval/waterdata/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,24 @@
"results",
]

# OGC API time-series/monitoring collections queryable via ``get_cql``.
# Keep in sync with ``utils._OUTPUT_ID_BY_SERVICE`` (same keys): that dict maps
# each service to its user-facing ``id`` column and is the runtime source of
# truth ``get_cql`` validates against.
WATERDATA_SERVICES = Literal[
"channel-measurements",
"combined-metadata",
"continuous",
"daily",
"field-measurements",
"field-measurements-metadata",
"latest-continuous",
"latest-daily",
"monitoring-locations",
"peaks",
"time-series-metadata",
]

PROFILES = Literal[
"actgroup",
"actmetric",
Expand Down
Loading
Loading