Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CONTRACT.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ The supported import surface is:
- `ResultSummary`
- `RunHistoryItem`
- `WorkspaceSelection`
- `ManagedDatabase`
- `ManagedTable`
- `LoadManagedTableResult`
- `MANAGED_SOURCE_TYPE`
- `DEFAULT_SCHEMA`
- `build_managed_config`
- `create_connection_request`
- `is_parquet_path`

Adapters should import from `hotdata_runtime` and treat this surface as the stable API.

Expand All @@ -49,6 +57,15 @@ Adapters should import from `hotdata_runtime` and treat this surface as the stab
- `list_qualified_table_names(...)` returns sorted fully qualified table names.
- `columns_for_qualified(qualified, connection_id=...)` resolves table columns, and
adapters should pass `connection_id` when known.
- `uploads()` returns the uploads API wrapper for parquet staging.
- `list_managed_databases()` returns managed-catalog connections (`source_type: managed`).
- `resolve_managed_database(name_or_id)` resolves a managed database by name or id.
- `create_managed_database(name, schema=..., tables=...)` creates a managed database and optionally declares tables up front.
- `delete_managed_database(name_or_id)` deletes a managed database connection.
- `list_managed_tables(database, schema=...)` lists tables in a managed database.
- `upload_parquet(path)` uploads a local parquet file and returns an upload id.
- `load_managed_table(database, table, schema=..., upload_id=..., file=...)` publishes parquet data into a declared managed table.
- `delete_managed_table(database, table, schema=...)` deletes a managed table.

### `QueryResult`

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Runtime boundary and guarantees are defined in `CONTRACT.md`.
- **SQL execution helper** — run SQL through `POST /v1/query`, poll async query runs when needed, and return a `QueryResult`.
- **Result utilities** — convert query results to records, pandas DataFrames, or metadata dictionaries for adapter display layers.
- **History helpers** — list recent results and query run history with normalized dataclasses.
- **Managed databases** — create Hotdata-owned catalogs, declare tables, upload parquet, and load managed tables (mirrors `hotdata databases` in the CLI).
- **Health helpers** — build compact API/workspace health summaries for UI integrations.

Install:
Expand Down
18 changes: 18 additions & 0 deletions hotdata_runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@
RunHistoryItem,
from_env,
)
from hotdata_runtime.databases import (
DEFAULT_SCHEMA,
LoadManagedTableResult,
ManagedDatabase,
ManagedTable,
MANAGED_SOURCE_TYPE,
build_managed_config,
create_connection_request,
is_parquet_path,
)
from hotdata_runtime.env import (
default_api_key,
default_host,
Expand All @@ -29,8 +39,16 @@

__all__ = [
"__version__",
"DEFAULT_SCHEMA",
"HotdataClient",
"LoadManagedTableResult",
"MANAGED_SOURCE_TYPE",
"ManagedDatabase",
"ManagedTable",
"QueryResult",
"build_managed_config",
"create_connection_request",
"is_parquet_path",
"workspace_health_lines",
"default_api_key",
"default_host",
Expand Down
151 changes: 151 additions & 0 deletions hotdata_runtime/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
from hotdata.api.query_api import QueryApi
from hotdata.api.query_runs_api import QueryRunsApi
from hotdata.api.results_api import ResultsApi
from hotdata.api.uploads_api import UploadsApi
from hotdata.exceptions import ApiException
from hotdata.models.async_query_response import AsyncQueryResponse
from hotdata.models.query_request import QueryRequest
from hotdata.models.query_response import QueryResponse
from hotdata.models.load_managed_table_request import LoadManagedTableRequest
from hotdata.models.table_info import TableInfo

from hotdata_runtime.env import (
Expand All @@ -26,6 +28,17 @@
normalize_host,
pick_workspace,
)
from hotdata_runtime.databases import (
DEFAULT_SCHEMA,
LoadManagedTableResult,
ManagedDatabase,
ManagedTable,
MANAGED_SOURCE_TYPE,
api_error_message,
create_connection_request,
is_parquet_path,
managed_database_from_connection,
)
from hotdata_runtime.http import default_http_retries
from hotdata_runtime.result import QueryResult

Expand Down Expand Up @@ -135,6 +148,144 @@ def query_runs(self) -> QueryRunsApi:
def results(self) -> ResultsApi:
return self._results_api()

def uploads(self) -> UploadsApi:
return UploadsApi(self._api)

def list_managed_databases(self) -> list[ManagedDatabase]:
listing = self.connections().list_connections()
return [
managed_database_from_connection(c)
for c in listing.connections
if c.source_type == MANAGED_SOURCE_TYPE
]

def resolve_managed_database(self, name_or_id: str) -> ManagedDatabase:
listing = self.connections().list_connections()
match = None
for c in listing.connections:
if c.id == name_or_id or c.name == name_or_id:
match = c
break
if match is None:
raise KeyError(f"No database named or with id {name_or_id!r}")
if match.source_type != MANAGED_SOURCE_TYPE:
raise ValueError(
f"{match.name!r} is not a managed database "
f"(source_type: {match.source_type})"
)
return managed_database_from_connection(match)

def create_managed_database(
self,
name: str,
*,
schema: str = DEFAULT_SCHEMA,
tables: list[str] | None = None,
) -> ManagedDatabase:
request = create_connection_request(name, schema=schema, tables=tables)
try:
created = self.connections().create_connection(request)
except ApiException as e:
raise RuntimeError(api_error_message(e)) from e
return managed_database_from_connection(created)

def delete_managed_database(self, name_or_id: str) -> None:
db = self.resolve_managed_database(name_or_id)
try:
self.connections().delete_connection(db.id)
except ApiException as e:
raise RuntimeError(api_error_message(e)) from e

def list_managed_tables(
self,
database: str,
*,
schema: str | None = None,
) -> list[ManagedTable]:
db = self.resolve_managed_database(database)
rows: list[ManagedTable] = []
for t in self.iter_tables(connection_id=db.id):
if schema is not None and t.var_schema != schema:
continue
rows.append(
ManagedTable(
full_name=f"{db.name}.{t.var_schema}.{t.table}",
schema=t.var_schema,
table=t.table,
synced=t.synced,
last_sync=t.last_sync,
)
)
rows.sort(key=lambda row: (row.schema, row.table))
return rows

def upload_parquet(self, path: str) -> str:
if not is_parquet_path(path):
raise ValueError(
f"Managed table loads require a parquet file (got {path!r})"
)
with open(path, "rb") as f:
data = f.read()
try:
uploaded = self.uploads().upload_file(
data,
_content_type="application/octet-stream",
)
except ApiException as e:
raise RuntimeError(api_error_message(e)) from e
return uploaded.id

def load_managed_table(
self,
database: str,
table: str,
*,
schema: str = DEFAULT_SCHEMA,
upload_id: str | None = None,
file: str | None = None,
) -> LoadManagedTableResult:
if (upload_id is None) == (file is None):
raise ValueError("Exactly one of upload_id or file is required")
db = self.resolve_managed_database(database)
if upload_id is not None:
resolved_upload_id = upload_id
else:
assert file is not None
resolved_upload_id = self.upload_parquet(file)
request = LoadManagedTableRequest(
mode="replace",
upload_id=resolved_upload_id,
)
try:
loaded = self.connections().load_managed_table(
db.id,
schema,
table,
request,
)
except ApiException as e:
raise RuntimeError(api_error_message(e)) from e
return LoadManagedTableResult(
connection_id=loaded.connection_id,
schema_name=loaded.schema_name,
table_name=loaded.table_name,
row_count=loaded.row_count,
full_name=f"{db.name}.{loaded.schema_name}.{loaded.table_name}",
)

def delete_managed_table(
self,
database: str,
table: str,
*,
schema: str = DEFAULT_SCHEMA,
) -> None:
db = self.resolve_managed_database(database)
try:
self.connections().delete_managed_table(db.id, schema, table)
except ApiException as e:
raise RuntimeError(api_error_message(e)) from e

def list_recent_results(
self,
*,
Expand Down
91 changes: 91 additions & 0 deletions hotdata_runtime/databases.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""Managed database helpers (Hotdata-owned catalogs with parquet table loads)."""

from __future__ import annotations

from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any

from hotdata.exceptions import ApiException
from hotdata.models.create_connection_request import CreateConnectionRequest

MANAGED_SOURCE_TYPE = "managed"
DEFAULT_SCHEMA = "public"


@dataclass(frozen=True)
class ManagedDatabase:
id: str
name: str
source_type: str

def to_dict(self) -> dict[str, Any]:
return asdict(self)


@dataclass(frozen=True)
class ManagedTable:
full_name: str
schema: str
table: str
synced: bool
last_sync: str | None

def to_dict(self) -> dict[str, Any]:
return asdict(self)


@dataclass(frozen=True)
class LoadManagedTableResult:
connection_id: str
schema_name: str
table_name: str
row_count: int
full_name: str

def to_dict(self) -> dict[str, Any]:
return asdict(self)


def is_parquet_path(path: str) -> bool:
return Path(path).suffix.lower() == ".parquet"
Comment thread
eddietejeda marked this conversation as resolved.


def build_managed_config(schema: str, tables: list[str]) -> dict[str, Any]:
if not tables:
return {}
return {
"schemas": [
{
"name": schema,
"tables": [{"name": table} for table in tables],
}
]
}


def create_connection_request(
name: str,
*,
schema: str = DEFAULT_SCHEMA,
tables: list[str] | None = None,
) -> CreateConnectionRequest:
table_list = tables or []
return CreateConnectionRequest(
name=name,
source_type=MANAGED_SOURCE_TYPE,
config=build_managed_config(schema, table_list),
skip_discovery=True,
)


def managed_database_from_connection(conn: Any) -> ManagedDatabase:
return ManagedDatabase(
id=str(conn.id),
name=str(conn.name),
source_type=str(conn.source_type),
)


def api_error_message(exc: ApiException) -> str:
return exc.reason or str(exc)
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ build-backend = "hatchling.build"

[project]
name = "hotdata-runtime"
version = "0.1.0"
version = "0.1.1"
description = "Workspace/session runtime primitives for Hotdata integrations"
readme = "README.md"
requires-python = ">=3.10"
license = { text = "MIT" }
dependencies = [
"hotdata>=0.1.0",
"hotdata>=0.2.0",
"pandas>=2.0",
]

Expand All @@ -23,6 +23,10 @@ dev = [
[tool.uv]
default-groups = ["dev"]

# Resolve hotdata from a sibling checkout until v0.2.0 is on PyPI.
[tool.uv.sources]
hotdata = { path = "../sdk-python", editable = true }

[tool.hatch.build.targets.wheel]
packages = ["hotdata_runtime"]

Expand Down
8 changes: 8 additions & 0 deletions tests/test_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,16 @@
def test_public_exports_contract():
assert hr.__all__ == [
"__version__",
"DEFAULT_SCHEMA",
"HotdataClient",
"LoadManagedTableResult",
"MANAGED_SOURCE_TYPE",
"ManagedDatabase",
"ManagedTable",
"QueryResult",
"build_managed_config",
"create_connection_request",
"is_parquet_path",
"workspace_health_lines",
"default_api_key",
"default_host",
Expand Down
Loading
Loading