diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d851c0..836483e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,25 @@ format changes. ## [Unreleased] +### Added + +- **`pxf.TableReader` and `pxf.bind_row`** (draft §3.4.4). Streaming + consumption for the `@table` directive, alternative to materializing + every row into `Result.tables` up front. Construct via + `pxf.TableReader.from_bytes(data)`; iterate with the standard for + loop or call `next_or_none()` until it returns `None`. The reader + exposes the header `type` / `columns` / `directives` properties and + a `tail()` method that returns the unconsumed buffer for chaining a + second reader on multi-`@table` documents. `bind_row(msg, columns, + row)` is the per-row binder used by `scan()` and exposed as a + free function for callers iterating `Result.tables[i].rows` from + the materializing path. Strategy is format-and-reparse, matching + the C++ port: cells are rendered as a synthetic PXF body and run + through `unmarshal`, reusing every branch of the existing decoder + (WKT timestamps / durations, wrapper-nullability, enum-by-name, + `pxf.required` / `pxf.default`, oneof). PR-2 takes input as bytes; + a file-like / chunked-IO bridge is a possible follow-up. + ### Changed - **CI pin to protowire-cpp v0.75.0.** The cpp sibling now ships the diff --git a/src/_protowire/module.cc b/src/_protowire/module.cc index 6cb790f..c6e36fc 100644 --- a/src/_protowire/module.cc +++ b/src/_protowire/module.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -235,6 +236,89 @@ PxfValidateDescriptor(nb::bytes fds_bytes, const std::string& full_name) { return out; } +// --- PyTableReader: streaming @table consumption ------------------------- +// +// Wraps protowire::pxf::TableReader. The reader takes a std::istream*; we +// hold the istringstream alongside the reader so its lifetime is bound to +// the Python object. Input is provided as bytes (PR-2 scope); a file-like +// streambuf bridge is a possible follow-up. +class PyTableReader { + public: + static std::unique_ptr FromBytes(nb::bytes data) { + auto out = std::unique_ptr(new PyTableReader()); + out->stream_ = std::make_unique( + std::string(data.c_str(), data.size())); + auto tr = protowire::pxf::TableReader::Create(out->stream_.get()); + if (!tr.ok()) { + throw nb::value_error(("pxf.TableReader: " + tr.status().ToString()).c_str()); + } + out->reader_ = std::move(*tr); + // Marshal the side-channel directives once at construction; they're + // fixed for the reader's lifetime. + for (const auto& d : out->reader_->Directives()) { + out->directives_.emplace_back( + d.name, d.prefixes, d.type, + nb::bytes(d.body.data(), d.body.size()), + d.has_body, d.pos.line, d.pos.column); + } + return out; + } + + const std::string& Type() const { return reader_->Type(); } + const std::vector& Columns() const { return reader_->Columns(); } + const std::vector& Directives() const { return directives_; } + bool Done() const { return reader_->Done(); } + + // Returns the next row as a Python list of cells, or None at EOF. + // Raises ValueError on parse error. + nb::object NextOrNone() { + if (reader_->Done()) return nb::none(); + protowire::pxf::TableRow row; + auto s = reader_->Next(&row); + if (!s.ok()) { + throw nb::value_error(("pxf.TableReader.next: " + s.ToString()).c_str()); + } + if (reader_->Done()) return nb::none(); + return RowToList(row); + } + + // Iterator protocol: __next__ raises StopIteration at EOF. + nb::object Next() { + if (reader_->Done()) throw nb::stop_iteration(); + protowire::pxf::TableRow row; + auto s = reader_->Next(&row); + if (!s.ok()) { + throw nb::value_error(("pxf.TableReader.next: " + s.ToString()).c_str()); + } + if (reader_->Done()) throw nb::stop_iteration(); + return RowToList(row); + } + + // Drains the remaining buffered + underlying bytes. Only meaningful + // after Done(); the Python wrapper exposes this as a method that + // returns bytes so callers can chain a second TableReader on + // multi-@table documents. + nb::bytes Tail() { + auto t = reader_->Tail(); + std::ostringstream buf; + buf << t->rdbuf(); + std::string s = buf.str(); + return nb::bytes(s.data(), s.size()); + } + + private: + static nb::object RowToList(const protowire::pxf::TableRow& row) { + std::vector cells; + cells.reserve(row.cells.size()); + for (const auto& cell : row.cells) cells.push_back(CellToPyTuple(cell)); + return nb::cast(cells); + } + + std::unique_ptr stream_; + std::unique_ptr reader_; + std::vector directives_; +}; + // Binary proto bytes -> PXF text. std::string PxfMarshal(nb::bytes msg_bytes, nb::bytes fds_bytes, const std::string& full_name) { @@ -418,6 +502,17 @@ NB_MODULE(_protowire, m) { m.def("pxf_marshal", &PxfMarshal, "msg_bytes"_a, "fds"_a, "full_name"_a); m.def("pxf_validate_descriptor", &PxfValidateDescriptor, "fds"_a, "full_name"_a); + nb::class_(m, "PxfTableReader") + .def_static("from_bytes", &PyTableReader::FromBytes, "data"_a) + .def_prop_ro("type", &PyTableReader::Type) + .def_prop_ro("columns", &PyTableReader::Columns) + .def_prop_ro("directives", &PyTableReader::Directives) + .def_prop_ro("done", &PyTableReader::Done) + .def("next_or_none", &PyTableReader::NextOrNone) + .def("tail", &PyTableReader::Tail) + .def("__iter__", [](PyTableReader& self) -> PyTableReader& { return self; }) + .def("__next__", &PyTableReader::Next); + nb::class_(m, "SbeCodec") .def_static("create", &SbeCodec::Create, "fds"_a, "file_names"_a) .def("marshal", &SbeCodec::Marshal, "msg_bytes"_a, "full_name"_a) diff --git a/src/protowire/pxf.py b/src/protowire/pxf.py index 5b9e2ca..81ca023 100644 --- a/src/protowire/pxf.py +++ b/src/protowire/pxf.py @@ -8,6 +8,7 @@ from __future__ import annotations +import base64 from dataclasses import dataclass, field from typing import Literal, Union @@ -261,3 +262,172 @@ def _normalize_cell(c) -> Cell: and stays as Python `bytes` after the round-trip. """ return c # already in the right shape + + +# --- TableReader (streaming @table consumption, draft §3.4.4) ------------ + + +class TableReader: + """Streaming row reader for a single `@table` directive. + + `unmarshal_full` materializes every row of a `@table` directive into + `Result.tables`. That works for small datasets and breaks for the + CSV-replacement workload `@table` was designed for. `TableReader` + pulls one row at a time from an in-memory buffer; working-set memory + is bounded by the size of the largest single row. + + Usage:: + + reader = pxf.TableReader.from_bytes(open("trades.pxf", "rb").read()) + for row in reader: + msg = TradeMsg() + pxf.bind_row(msg, reader.columns, row) + handle(msg) + + Per draft §3.4.4: per-row arity and v1 cell-grammar checks happen at + consume time, not deferred to EOF. The reader header (everything + from the start of the input through the closing `)` of the column + list) is capped at 64 KiB to fail-fast on misuse. + + NOTE: this PR-2 implementation reads from a `bytes` buffer. A + file-like / chunked-IO bridge is a possible follow-up. + """ + + def __init__(self, native: object) -> None: + # Private — construct via `from_bytes`. We keep the native handle + # only; type/columns/directives are forwarded on demand. + self._native = native + + @classmethod + def from_bytes(cls, data: bytes | str) -> "TableReader": + if isinstance(data, str): + data = data.encode("utf-8") + return cls(_protowire.PxfTableReader.from_bytes(bytes(data))) + + @property + def type(self) -> str: + return self._native.type + + @property + def columns(self) -> tuple[str, ...]: + return tuple(self._native.columns) + + @property + def directives(self) -> tuple[Directive, ...]: + return tuple( + Directive( + name=name, + prefixes=tuple(prefixes), + type=type_, + body=bytes(body), + has_body=has_body, + line=line, + column=column, + ) + for (name, prefixes, type_, body, has_body, line, column) in self._native.directives + ) + + @property + def done(self) -> bool: + return self._native.done + + def next_or_none(self) -> tuple[Cell, ...] | None: + """Returns the next row, or None at EOF.""" + cells = self._native.next_or_none() + return None if cells is None else tuple(cells) + + def __iter__(self) -> "TableReader": + return self + + def __next__(self) -> tuple[Cell, ...]: + return tuple(self._native.__next__()) + + def tail(self) -> bytes: + """Returns the bytes the reader has buffered but not consumed, + followed by any remaining bytes from the underlying source. + + Use to chain a second `TableReader` for documents containing + multiple `@table` directives:: + + tr1 = pxf.TableReader.from_bytes(data) + for _ in tr1: ... + tr2 = pxf.TableReader.from_bytes(tr1.tail()) + + MUST only be called after iteration has exhausted (i.e. `done` + is True). Calling earlier returns bytes the current reader + still intends to consume. + """ + return self._native.tail() + + def scan(self, msg: Message, *, skip_validate: bool = True) -> bool: + """Read the next row and bind its cells to `msg`'s fields by + column name. Returns True on success; returns False at EOF + (callers check `reader.done`).""" + cells = self.next_or_none() + if cells is None: + return False + bind_row(msg, self.columns, cells, skip_validate=skip_validate) + return True + + +# --- bind_row (per-row proto binding) ------------------------------------ + + +def bind_row( + msg: Message, + columns: tuple[str, ...] | list[str], + row: tuple[Cell, ...] | list[Cell], + *, + skip_validate: bool = True, +) -> None: + """Bind a `@table` row's cells to `msg`'s fields by column name. + + `columns` and `row` MUST have the same length. Cell-state semantics: + + - `None` — field absent. (pxf.default) applies if declared; + (pxf.required) errors if neither default nor + value is present. + - `("null", _)` — field cleared (draft §3.9). + - any other — field set to the cell's value. + + Strategy: render the row as a synthetic PXF body (` = ` + per non-None cell) and run it through `unmarshal`. This mirrors + `protowire-cpp`'s `BindRow` and reuses every branch of the existing + decoder — WKT timestamps / durations, wrapper-type nullability, + enum-by-name resolution, oneof handling — instead of growing a + parallel Cell→FieldDescriptor switch. + + `skip_validate` defaults to True: the descriptor was validated once + when the caller constructed the message factory, and re-running the + reserved-name check per row is wasteful in tight loops. + """ + if len(columns) != len(row): + raise ValueError( + f"bind_row: {len(columns)} columns vs {len(row)} cells" + ) + parts: list[str] = [] + for col, cell in zip(columns, row): + if cell is None: + continue + parts.append(f"{col} = {_cell_to_pxf(cell)}\n") + unmarshal("".join(parts), msg, skip_validate=skip_validate) + + +def _cell_to_pxf(cell: tuple[CellKind, object]) -> str: + kind, value = cell + if kind == "null": + return "null" + if kind == "string": + # Escape `"` and `\`; other characters round-trip verbatim because + # the lexer accepts UTF-8 in strings. + s = str(value).replace("\\", "\\\\").replace('"', '\\"') + return f'"{s}"' + if kind in ("int", "float", "timestamp", "duration"): + return str(value) # raw text + if kind == "bool": + return "true" if value else "false" + if kind == "bytes": + return 'b"' + base64.b64encode(value).decode("ascii") + '"' + if kind == "ident": + return str(value) + raise ValueError(f"bind_row: unknown cell kind {kind!r}") diff --git a/tests/test_pxf_table_reader.py b/tests/test_pxf_table_reader.py new file mode 100644 index 0000000..bf3ad89 --- /dev/null +++ b/tests/test_pxf_table_reader.py @@ -0,0 +1,235 @@ +# SPDX-License-Identifier: MIT +# Copyright (c) 2026 TrendVidia, LLC. +"""Tests for pxf.TableReader (streaming @table consumption) and pxf.bind_row +(per-row proto binding). PR 2 of the Python v0.72-v0.75 catch-up.""" + +from __future__ import annotations + +import pytest + +from protowire import pxf + + +# ---- TableReader.from_bytes header parsing ------------------------------- + + +def test_reads_header_and_exposes_type_and_columns(): + src = b"@table trades.v1.Trade ( px, qty )\n( 100, 5 )\n( 101, 7 )\n" + tr = pxf.TableReader.from_bytes(src) + assert tr.type == "trades.v1.Trade" + assert tr.columns == ("px", "qty") + assert tr.directives == () + + +def test_accepts_str_input(): + tr = pxf.TableReader.from_bytes("@table x.Row ( a )\n( 1 )\n") + assert tr.type == "x.Row" + + +def test_no_table_raises(): + with pytest.raises(ValueError, match="no @table"): + pxf.TableReader.from_bytes(b"@type foo.Msg\nname = \"x\"\n") + + +def test_empty_input_raises(): + with pytest.raises(ValueError): + pxf.TableReader.from_bytes(b"") + + +def test_leading_directives_preserved(): + src = b'''@header pkg.Hdr { id = "h" } +@frob alpha +@table trades.v1.Trade ( px, qty ) +( 1, 2 ) +''' + tr = pxf.TableReader.from_bytes(src) + assert len(tr.directives) == 2 + assert tr.directives[0].name == "header" + assert tr.directives[1].name == "frob" + assert tr.directives[0].body == b' id = "h" ' + + +def test_header_oversize_rejected(): + # >64 KiB of leading directive bytes before any @table. + big = b"@frob " + (b"x " * 35000) + b"\n@table x.Row ( a )\n" + with pytest.raises(ValueError, match="header exceeds"): + pxf.TableReader.from_bytes(big) + + +# ---- iteration ---------------------------------------------------------- + + +def test_iter_yields_rows_in_order(): + src = b"@table x.Row ( a, b )\n( 1, 2 )\n( 3, 4 )\n( 5, 6 )\n" + tr = pxf.TableReader.from_bytes(src) + rows = list(tr) + assert rows == [ + (("int", "1"), ("int", "2")), + (("int", "3"), ("int", "4")), + (("int", "5"), ("int", "6")), + ] + assert tr.done + + +def test_zero_rows_immediately_stops(): + tr = pxf.TableReader.from_bytes(b"@table x.Row ( a )\n") + rows = list(tr) + assert rows == [] + assert tr.done + + +def test_next_or_none_returns_none_at_eof(): + tr = pxf.TableReader.from_bytes(b"@table x.Row ( a )\n( 1 )\n") + first = tr.next_or_none() + assert first == (("int", "1"),) + assert tr.next_or_none() is None + assert tr.done + + +def test_cell_shapes_match_three_state_grammar(): + src = b"""@table x.Row ( a, b, c, d, e ) +( 42, "hi", true, null, ) +""" + tr = pxf.TableReader.from_bytes(src) + (row,) = list(tr) + assert row[0] == ("int", "42") + assert row[1] == ("string", "hi") + assert row[2] == ("bool", True) + assert row[3] == ("null", None) + assert row[4] is None # absent (empty cell at end) + + +def test_arity_mismatch_raises(): + src = b"@table x.Row ( a, b )\n( 1, 2, 3 )\n" + tr = pxf.TableReader.from_bytes(src) + with pytest.raises(ValueError, match="3 cells, expected 2"): + next(iter(tr)) + + +def test_parens_inside_string_not_row_boundary(): + src = b'@table x.Row ( a )\n( "hi ) there" )\n( "next" )\n' + tr = pxf.TableReader.from_bytes(src) + rows = list(tr) + assert rows == [ + (("string", "hi ) there"),), + (("string", "next"),), + ] + + +def test_comments_between_rows_ignored(): + src = b"""@table x.Row ( a ) +# leading +( 1 ) +// mid +( 2 ) +/* block + comment */ +( 3 ) +""" + tr = pxf.TableReader.from_bytes(src) + assert len(list(tr)) == 3 + + +# ---- tail() chaining ----------------------------------------------------- + + +def test_tail_chains_to_second_table(): + src = b"""@table a.Row ( x ) +( 1 ) +( 2 ) +@table b.Row ( y ) +( "p" ) +( "q" ) +""" + tr1 = pxf.TableReader.from_bytes(src) + assert tr1.type == "a.Row" + list(tr1) # drain + tr2 = pxf.TableReader.from_bytes(tr1.tail()) + assert tr2.type == "b.Row" + rows = list(tr2) + assert rows == [ + (("string", "p"),), + (("string", "q"),), + ] + + +# ---- bind_row + scan ----------------------------------------------------- + + +def test_bind_row_sets_fields_by_column(all_types_cls): + src = b'@table test.v1.AllTypes ( string_field, int32_field )\n( "alpha", 42 )\n' + tr = pxf.TableReader.from_bytes(src) + (row,) = list(tr) + msg = all_types_cls() + pxf.bind_row(msg, tr.columns, row) + assert msg.string_field == "alpha" + assert msg.int32_field == 42 + + +def test_scan_equivalent_to_next_plus_bind(all_types_cls): + src = b'@table test.v1.AllTypes ( string_field )\n( "row1" )\n( "row2" )\n' + tr = pxf.TableReader.from_bytes(src) + seen = [] + while True: + msg = all_types_cls() + ok = tr.scan(msg) + if not ok: + break + seen.append(msg.string_field) + assert seen == ["row1", "row2"] + + +def test_bind_row_absent_cell_leaves_default(all_types_cls): + # proto3 string default is ""; absent cell shouldn't stamp a value. + src = b'@table test.v1.AllTypes ( string_field, int32_field )\n( , 7 )\n' + tr = pxf.TableReader.from_bytes(src) + (row,) = list(tr) + msg = all_types_cls() + pxf.bind_row(msg, tr.columns, row) + assert msg.string_field == "" + assert msg.int32_field == 7 + + +def test_bind_row_null_clears_wrapper(all_types_cls): + # A `null` cell on a wrapper field clears it (draft §3.9). + src = b'@table test.v1.AllTypes ( nullable_string )\n( null )\n' + tr = pxf.TableReader.from_bytes(src) + (row,) = list(tr) + msg = all_types_cls() + msg.nullable_string.value = "stale" # populate to confirm clear + assert msg.HasField("nullable_string") + pxf.bind_row(msg, tr.columns, row) + # nullable_string is a StringValue — after `null`, HasField is False. + assert not msg.HasField("nullable_string") + + +def test_bind_row_bytes_cell(all_types_cls): + src = b'@table test.v1.AllTypes ( bytes_field )\n( b"YWJj" )\n' # "abc" + tr = pxf.TableReader.from_bytes(src) + (row,) = list(tr) + msg = all_types_cls() + pxf.bind_row(msg, tr.columns, row) + assert msg.bytes_field == b"abc" + + +def test_bind_row_mismatched_columns_errors(all_types_cls): + msg = all_types_cls() + with pytest.raises(ValueError, match="1 columns vs 2 cells"): + pxf.bind_row(msg, ("string_field",), (None, None)) + + +def test_bind_row_unknown_column_errors(all_types_cls): + msg = all_types_cls() + with pytest.raises(ValueError): + pxf.bind_row(msg, ("not_a_field",), (("int", "1"),)) + + +def test_bind_row_string_escape(all_types_cls): + # String values containing quotes and backslashes must round-trip + # via the synthetic body formatter. + src = b'@table test.v1.AllTypes ( string_field )\n( "she said \\"hi\\"" )\n' + tr = pxf.TableReader.from_bytes(src) + (row,) = list(tr) + msg = all_types_cls() + pxf.bind_row(msg, tr.columns, row) + assert msg.string_field == 'she said "hi"'