Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,25 @@ format changes.

## [Unreleased]

### Added

- **`pxf.TableReader` and `pxf.bind_row`** (draft §3.4.4). Streaming
consumption for the `@table` directive, alternative to materializing
every row into `Result.tables` up front. Construct via
`pxf.TableReader.from_bytes(data)`; iterate with the standard for
loop or call `next_or_none()` until it returns `None`. The reader
exposes the header `type` / `columns` / `directives` properties and
a `tail()` method that returns the unconsumed buffer for chaining a
second reader on multi-`@table` documents. `bind_row(msg, columns,
row)` is the per-row binder used by `scan()` and exposed as a
free function for callers iterating `Result.tables[i].rows` from
the materializing path. Strategy is format-and-reparse, matching
the C++ port: cells are rendered as a synthetic PXF body and run
through `unmarshal`, reusing every branch of the existing decoder
(WKT timestamps / durations, wrapper-nullability, enum-by-name,
`pxf.required` / `pxf.default`, oneof). PR-2 takes input as bytes;
a file-like / chunked-IO bridge is a possible follow-up.

### Changed

- **CI pin to protowire-cpp v0.75.0.** The cpp sibling now ships the
Expand Down
95 changes: 95 additions & 0 deletions src/_protowire/module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <memory>
#include <optional>
#include <span>
#include <sstream>
#include <string>
#include <string_view>
#include <utility>
Expand Down Expand Up @@ -235,6 +236,89 @@ PxfValidateDescriptor(nb::bytes fds_bytes, const std::string& full_name) {
return out;
}

// --- PyTableReader: streaming @table consumption -------------------------
//
// Wraps protowire::pxf::TableReader. The reader takes a std::istream*; we
// hold the istringstream alongside the reader so its lifetime is bound to
// the Python object. Input is provided as bytes (PR-2 scope); a file-like
// streambuf bridge is a possible follow-up.
class PyTableReader {
public:
static std::unique_ptr<PyTableReader> FromBytes(nb::bytes data) {
auto out = std::unique_ptr<PyTableReader>(new PyTableReader());
out->stream_ = std::make_unique<std::istringstream>(
std::string(data.c_str(), data.size()));
auto tr = protowire::pxf::TableReader::Create(out->stream_.get());
if (!tr.ok()) {
throw nb::value_error(("pxf.TableReader: " + tr.status().ToString()).c_str());
}
out->reader_ = std::move(*tr);
// Marshal the side-channel directives once at construction; they're
// fixed for the reader's lifetime.
for (const auto& d : out->reader_->Directives()) {
out->directives_.emplace_back(
d.name, d.prefixes, d.type,
nb::bytes(d.body.data(), d.body.size()),
d.has_body, d.pos.line, d.pos.column);
}
return out;
}

const std::string& Type() const { return reader_->Type(); }
const std::vector<std::string>& Columns() const { return reader_->Columns(); }
const std::vector<PyDirective>& Directives() const { return directives_; }
bool Done() const { return reader_->Done(); }

// Returns the next row as a Python list of cells, or None at EOF.
// Raises ValueError on parse error.
nb::object NextOrNone() {
if (reader_->Done()) return nb::none();
protowire::pxf::TableRow row;
auto s = reader_->Next(&row);
if (!s.ok()) {
throw nb::value_error(("pxf.TableReader.next: " + s.ToString()).c_str());
}
if (reader_->Done()) return nb::none();
return RowToList(row);
}

// Iterator protocol: __next__ raises StopIteration at EOF.
nb::object Next() {
if (reader_->Done()) throw nb::stop_iteration();
protowire::pxf::TableRow row;
auto s = reader_->Next(&row);
if (!s.ok()) {
throw nb::value_error(("pxf.TableReader.next: " + s.ToString()).c_str());
}
if (reader_->Done()) throw nb::stop_iteration();
return RowToList(row);
}

// Drains the remaining buffered + underlying bytes. Only meaningful
// after Done(); the Python wrapper exposes this as a method that
// returns bytes so callers can chain a second TableReader on
// multi-@table documents.
nb::bytes Tail() {
auto t = reader_->Tail();
std::ostringstream buf;
buf << t->rdbuf();
std::string s = buf.str();
return nb::bytes(s.data(), s.size());
}

private:
static nb::object RowToList(const protowire::pxf::TableRow& row) {
std::vector<nb::object> cells;
cells.reserve(row.cells.size());
for (const auto& cell : row.cells) cells.push_back(CellToPyTuple(cell));
return nb::cast(cells);
}

std::unique_ptr<std::istringstream> stream_;
std::unique_ptr<protowire::pxf::TableReader> reader_;
std::vector<PyDirective> directives_;
};

// Binary proto bytes -> PXF text.
std::string PxfMarshal(nb::bytes msg_bytes, nb::bytes fds_bytes,
const std::string& full_name) {
Expand Down Expand Up @@ -418,6 +502,17 @@ NB_MODULE(_protowire, m) {
m.def("pxf_marshal", &PxfMarshal, "msg_bytes"_a, "fds"_a, "full_name"_a);
m.def("pxf_validate_descriptor", &PxfValidateDescriptor, "fds"_a, "full_name"_a);

nb::class_<PyTableReader>(m, "PxfTableReader")
.def_static("from_bytes", &PyTableReader::FromBytes, "data"_a)
.def_prop_ro("type", &PyTableReader::Type)
.def_prop_ro("columns", &PyTableReader::Columns)
.def_prop_ro("directives", &PyTableReader::Directives)
.def_prop_ro("done", &PyTableReader::Done)
.def("next_or_none", &PyTableReader::NextOrNone)
.def("tail", &PyTableReader::Tail)
.def("__iter__", [](PyTableReader& self) -> PyTableReader& { return self; })
.def("__next__", &PyTableReader::Next);

nb::class_<SbeCodec>(m, "SbeCodec")
.def_static("create", &SbeCodec::Create, "fds"_a, "file_names"_a)
.def("marshal", &SbeCodec::Marshal, "msg_bytes"_a, "full_name"_a)
Expand Down
170 changes: 170 additions & 0 deletions src/protowire/pxf.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from __future__ import annotations

import base64
from dataclasses import dataclass, field
from typing import Literal, Union

Expand Down Expand Up @@ -261,3 +262,172 @@ def _normalize_cell(c) -> Cell:
and stays as Python `bytes` after the round-trip.
"""
return c # already in the right shape


# --- TableReader (streaming @table consumption, draft §3.4.4) ------------


class TableReader:
"""Streaming row reader for a single `@table` directive.

`unmarshal_full` materializes every row of a `@table` directive into
`Result.tables`. That works for small datasets and breaks for the
CSV-replacement workload `@table` was designed for. `TableReader`
pulls one row at a time from an in-memory buffer; working-set memory
is bounded by the size of the largest single row.

Usage::

reader = pxf.TableReader.from_bytes(open("trades.pxf", "rb").read())
for row in reader:
msg = TradeMsg()
pxf.bind_row(msg, reader.columns, row)
handle(msg)

Per draft §3.4.4: per-row arity and v1 cell-grammar checks happen at
consume time, not deferred to EOF. The reader header (everything
from the start of the input through the closing `)` of the column
list) is capped at 64 KiB to fail-fast on misuse.

NOTE: this PR-2 implementation reads from a `bytes` buffer. A
file-like / chunked-IO bridge is a possible follow-up.
"""

def __init__(self, native: object) -> None:
# Private — construct via `from_bytes`. We keep the native handle
# only; type/columns/directives are forwarded on demand.
self._native = native

@classmethod
def from_bytes(cls, data: bytes | str) -> "TableReader":
if isinstance(data, str):
data = data.encode("utf-8")
return cls(_protowire.PxfTableReader.from_bytes(bytes(data)))

@property
def type(self) -> str:
return self._native.type

@property
def columns(self) -> tuple[str, ...]:
return tuple(self._native.columns)

@property
def directives(self) -> tuple[Directive, ...]:
return tuple(
Directive(
name=name,
prefixes=tuple(prefixes),
type=type_,
body=bytes(body),
has_body=has_body,
line=line,
column=column,
)
for (name, prefixes, type_, body, has_body, line, column) in self._native.directives
)

@property
def done(self) -> bool:
return self._native.done

def next_or_none(self) -> tuple[Cell, ...] | None:
"""Returns the next row, or None at EOF."""
cells = self._native.next_or_none()
return None if cells is None else tuple(cells)

def __iter__(self) -> "TableReader":
return self

def __next__(self) -> tuple[Cell, ...]:
return tuple(self._native.__next__())

def tail(self) -> bytes:
"""Returns the bytes the reader has buffered but not consumed,
followed by any remaining bytes from the underlying source.

Use to chain a second `TableReader` for documents containing
multiple `@table` directives::

tr1 = pxf.TableReader.from_bytes(data)
for _ in tr1: ...
tr2 = pxf.TableReader.from_bytes(tr1.tail())

MUST only be called after iteration has exhausted (i.e. `done`
is True). Calling earlier returns bytes the current reader
still intends to consume.
"""
return self._native.tail()

def scan(self, msg: Message, *, skip_validate: bool = True) -> bool:
"""Read the next row and bind its cells to `msg`'s fields by
column name. Returns True on success; returns False at EOF
(callers check `reader.done`)."""
cells = self.next_or_none()
if cells is None:
return False
bind_row(msg, self.columns, cells, skip_validate=skip_validate)
return True


# --- bind_row (per-row proto binding) ------------------------------------


def bind_row(
msg: Message,
columns: tuple[str, ...] | list[str],
row: tuple[Cell, ...] | list[Cell],
*,
skip_validate: bool = True,
) -> None:
"""Bind a `@table` row's cells to `msg`'s fields by column name.

`columns` and `row` MUST have the same length. Cell-state semantics:

- `None` — field absent. (pxf.default) applies if declared;
(pxf.required) errors if neither default nor
value is present.
- `("null", _)` — field cleared (draft §3.9).
- any other — field set to the cell's value.

Strategy: render the row as a synthetic PXF body (`<col> = <val>`
per non-None cell) and run it through `unmarshal`. This mirrors
`protowire-cpp`'s `BindRow` and reuses every branch of the existing
decoder — WKT timestamps / durations, wrapper-type nullability,
enum-by-name resolution, oneof handling — instead of growing a
parallel Cell→FieldDescriptor switch.

`skip_validate` defaults to True: the descriptor was validated once
when the caller constructed the message factory, and re-running the
reserved-name check per row is wasteful in tight loops.
"""
if len(columns) != len(row):
raise ValueError(
f"bind_row: {len(columns)} columns vs {len(row)} cells"
)
parts: list[str] = []
for col, cell in zip(columns, row):
if cell is None:
continue
parts.append(f"{col} = {_cell_to_pxf(cell)}\n")
unmarshal("".join(parts), msg, skip_validate=skip_validate)


def _cell_to_pxf(cell: tuple[CellKind, object]) -> str:
kind, value = cell
if kind == "null":
return "null"
if kind == "string":
# Escape `"` and `\`; other characters round-trip verbatim because
# the lexer accepts UTF-8 in strings.
s = str(value).replace("\\", "\\\\").replace('"', '\\"')
return f'"{s}"'
if kind in ("int", "float", "timestamp", "duration"):
return str(value) # raw text
if kind == "bool":
return "true" if value else "false"
if kind == "bytes":
return 'b"' + base64.b64encode(value).decode("ascii") + '"'
if kind == "ident":
return str(value)
raise ValueError(f"bind_row: unknown cell kind {kind!r}")
Loading
Loading