diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c2e330..7801073 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,63 @@ format changes. ### Added +- **`TableReader` streaming + `scan(Message.Builder)` + `BindRow` helper.** + Companion to `protowire-go` v0.74 (`pxf.TableReader`) and v0.75 + (`TableReader.scan` / `BindRow`). Reads rows from an + {@link java.io.InputStream} one at a time with working-set memory bounded + by the size of the largest single row — the shape consumers need for + CSV-replacement datasets that don't fit in memory. + + ```java + try (var tr = new TableReader(in)) { + while (true) { + var b = AllTypes.newBuilder(); + if (!tr.scan(b)) break; + process(b.build()); + } + } + ``` + + Cell-state semantics match `BindRow`: a `null` cell leaves the field + absent (`pxf.default` applied, `pxf.required` errors), an `Ast.NullVal` + cell clears wrappers / optional / oneof per §3.9, any other value sets + the field. WKT timestamps and durations, enum-by-name, proto3 wrappers + all bind correctly because `BindRow` re-uses the existing unmarshal + pipeline (format-and-reparse). + + Implementation: byte-level row-boundary scanner pulls bytes from the + source `InputStream` on demand and slices one `( ... )` row range at a + time, which is then handed to `Parser.parseTableRow` for cell decoding. + The scanner is string / bytes-literal / line-comment / block-comment + aware so embedded parens or `)` inside literals don't trip it. Header + parsing reuses `Parser.parse()` against the buffered header prefix, so + the standalone constraint and dotted-column rejection get the same + enforcement the materializing path uses. Header byte budget caps at + 64 KiB — fail-fast against a `TableReader` pointed at a giant + body-only document with no `@table` ever. + + Multi-table documents chain via `tr.tail()`, which returns an + `InputStream` yielding the bytes the reader buffered but didn't consume + followed by the remaining source. + + Public API additions: + - `TableReader(InputStream)`, `type()`, `columns()`, `directives()`, + `tail()`, `next()`, `scan(Message.Builder)` + - `BindRow.bindRow(Message.Builder, List, Ast.TableRow)` + + Errors are sticky: once `next()` or `scan` throws, subsequent calls + rethrow the same exception (matches the Go port's contract). + + Tests in `TableReaderTest` (24 cases): basic streaming, three cell + states, side-channel directives before header, sticky errors, + list/block cells rejected mid-stream, strings / block + line comments + with embedded parens, byte-at-a-time `InputStream` (adversarial for + buffer boundaries), multi-table via `tail()`, equivalence with the + materializing path, oversized-header rejection, `scan` happy path, + `scan` empty-cell-leaves-field-at-zero, `null`-on-wrapper clearing, + WKT timestamp binding, `BindRow` against the materializing path, + arity mismatch, non-leaf-cell rejection. + - **`Result.directives()` / `Result.tables()` accessors.** `FastDecoder` used to consume named directives and `@table` directives at the document head without storing them (PR #35 parser-side port did the diff --git a/pxf/src/main/java/org/protowire/pxf/BindRow.java b/pxf/src/main/java/org/protowire/pxf/BindRow.java new file mode 100644 index 0000000..1b2e488 --- /dev/null +++ b/pxf/src/main/java/org/protowire/pxf/BindRow.java @@ -0,0 +1,125 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2026 TrendVidia, LLC. +package org.protowire.pxf; + +import com.google.protobuf.Message; + +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.List; + +/** + * Per-row proto-binding helper for {@code @table} rows. Sits atop the + * streaming {@link TableReader} (via {@link TableReader#scan}) and is also + * exported as a standalone helper for callers that iterate the + * materializing path's {@link Result#tables()} rows. + * + *

Implementation strategy: convert each non-{@code null} cell back to + * its PXF text representation, concatenate as a {@code = \n} + * body, and run through the existing unmarshal pipeline with + * {@link UnmarshalOptions#skipValidate()} on. That reuses every branch of + * the existing decoder — WKT timestamps and durations, wrapper-type + * nullability, enum-by-name resolution, {@code pxf.required} / + * {@code pxf.default}, oneof handling — instead of growing a parallel + * Value-to-FieldDescriptor switch with ~50 arms. The cost is a small + * format-and-reparse per row; that's an acceptable trade for a streaming + * convenience API whose consumers have already opted into the convenience + * tier. Same trade {@code protowire-go} made in {@code table_bind.go}. + */ +public final class BindRow { + private BindRow() {} + + /** + * Bind the cells of {@code row} to the fields of {@code builder} by + * column name. The {@code columns} list MUST have the same length as + * {@code row.cells()}; mismatch raises {@link IllegalArgumentException}. + * + *

Cell-state semantics (mirrors draft §3.4.4): + *

+ * + *

{@code builder}'s descriptor MUST contain fields whose names + * appear in {@code columns}; a column referring to an unknown field + * surfaces as a "field not found" error from the underlying unmarshal + * call (unless {@link UnmarshalOptions#discardUnknown} is set). + */ + public static void bindRow(Message.Builder builder, List columns, Ast.TableRow row) { + if (columns.size() != row.cells().size()) { + throw new IllegalArgumentException( + "BindRow: " + columns.size() + " columns vs " + row.cells().size() + " cells"); + } + byte[] body = rowToPxfBody(columns, row); + // Run the synthetic body through the standard unmarshal pipeline. + // SkipValidate avoids re-running the reserved-name check per row + // (the caller's TableReader / unmarshalFull already validated the + // descriptor once at bind time). + UnmarshalOptions.defaults().withSkipValidate(true).unmarshal(body, builder); + } + + /** + * Render a row as a PXF body: one {@code = } entry per + * non-{@code null} cell, in column order. Empty cells produce no + * entry — the field stays absent from the decoder's perspective. + */ + static byte[] rowToPxfBody(List columns, Ast.TableRow row) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < row.cells().size(); i++) { + Ast.Value cell = row.cells().get(i); + if (cell == null) continue; + sb.setLength(0); + sb.append(columns.get(i)).append(" = "); + writeCellValue(sb, cell); + sb.append('\n'); + out.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8)); + } + return out.toByteArray(); + } + + /** + * Format a single cell value as PXF text. v1 {@code @table} cells are + * scalar-shaped (no list, no block), so only the leaf-value variants + * appear; list and block AST nodes are unreachable here because + * {@code parseTableRow} / {@code consumeRowCell} rejects them before + * the streaming reader hands them to {@code bindRow}. Hand-constructed + * TableRow values bypass that check, so guard defensively. + * + *

The {@code NullVal} / {@code ListVal} / {@code BlockVal} cases + * don't need to read the bound variable, so they're checked via + * {@code instanceof} before the value-using switch. Java 21 standard + * pattern matching requires a variable binding on every {@code case} + * label; routing the no-binding cases out keeps the switch tidy and + * sidesteps CodeQL's "unused local variable" check. + */ + static void writeCellValue(StringBuilder sb, Ast.Value v) { + if (v instanceof Ast.NullVal) { + sb.append("null"); + return; + } + if (v instanceof Ast.ListVal || v instanceof Ast.BlockVal) { + throw new IllegalArgumentException( + "BindRow: unexpected " + (v instanceof Ast.ListVal ? "list" : "block") + + " value in cell (v1 @table cells are scalar-shaped)"); + } + switch (v) { + case Ast.StringVal s -> + sb.append('"').append(Format.escape(s.value())).append('"'); + case Ast.IntVal i -> sb.append(i.raw()); + case Ast.FloatVal f -> sb.append(f.raw()); + case Ast.BoolVal b -> sb.append(b.value() ? "true" : "false"); + case Ast.BytesVal by -> + sb.append("b\"").append(Base64.getEncoder().encodeToString(by.value())).append('"'); + case Ast.IdentVal id -> sb.append(id.name()); + case Ast.TimestampVal t -> sb.append(t.raw()); + case Ast.DurationVal d -> sb.append(d.raw()); + default -> throw new IllegalArgumentException( + "BindRow: unexpected cell value type " + v.getClass().getSimpleName()); + } + } +} diff --git a/pxf/src/main/java/org/protowire/pxf/Format.java b/pxf/src/main/java/org/protowire/pxf/Format.java index 4ec5d31..983a342 100644 --- a/pxf/src/main/java/org/protowire/pxf/Format.java +++ b/pxf/src/main/java/org/protowire/pxf/Format.java @@ -103,13 +103,21 @@ private static void formatEntries(StringBuilder sb, List entries, int } private static void formatValue(StringBuilder sb, Ast.Value v, int level) { + // NullVal has no payload to format — the bound variable would be + // unused, which CodeQL flags. Java 21 standard pattern matching + // requires a binding on every `case` label (unnamed `_` is a + // preview feature this project doesn't enable), so we route the + // no-binding case out before the switch. + if (v instanceof Ast.NullVal) { + sb.append("null"); + return; + } switch (v) { - case Ast.StringVal s -> { sb.append('"').append(escape(s.value())).append('"'); } + case Ast.StringVal s -> sb.append('"').append(escape(s.value())).append('"'); case Ast.IntVal i -> sb.append(i.raw()); case Ast.FloatVal f -> sb.append(f.raw()); case Ast.BoolVal b -> sb.append(b.value() ? "true" : "false"); case Ast.BytesVal by -> sb.append("b\"").append(Base64.getEncoder().encodeToString(by.value())).append('"'); - case Ast.NullVal n -> sb.append("null"); case Ast.IdentVal id -> sb.append(id.name()); case Ast.TimestampVal t -> sb.append(t.raw()); case Ast.DurationVal d -> sb.append(d.raw()); @@ -130,6 +138,8 @@ private static void formatValue(StringBuilder sb, Ast.Value v, int level) { writeIndent(sb, level); sb.append('}'); } + default -> throw new IllegalStateException( + "Format: unexpected value type " + v.getClass().getSimpleName()); } } diff --git a/pxf/src/main/java/org/protowire/pxf/Parser.java b/pxf/src/main/java/org/protowire/pxf/Parser.java index 57d6d1d..9b0ce02 100644 --- a/pxf/src/main/java/org/protowire/pxf/Parser.java +++ b/pxf/src/main/java/org/protowire/pxf/Parser.java @@ -30,6 +30,19 @@ public static Ast.Document parse(String input) { return parse(input.getBytes(StandardCharsets.UTF_8)); } + /** + * Parse a single {@code ( cell, cell, ... )} tuple as a {@code @table} + * row. Used by {@link TableReader} to decode each row's byte slice + * without re-running the full document grammar. {@code input} MUST + * start with {@code (} and contain a balanced row tuple. + * + * @param input row bytes including the surrounding parens + * @param expected expected cell count (column arity) + */ + static Ast.TableRow parseTableRow(byte[] input, int expected) { + return new Parser(input).parseTableRow(expected); + } + private void advance() { while (true) { current = lex.next(); diff --git a/pxf/src/main/java/org/protowire/pxf/TableReader.java b/pxf/src/main/java/org/protowire/pxf/TableReader.java new file mode 100644 index 0000000..ddcab76 --- /dev/null +++ b/pxf/src/main/java/org/protowire/pxf/TableReader.java @@ -0,0 +1,431 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2026 TrendVidia, LLC. +package org.protowire.pxf; + +import com.google.protobuf.Message; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * Streaming consumption for the {@code @table} directive (draft §3.4.4 + * "Streaming consumption"). Pulls bytes from an {@link InputStream} on + * demand and yields one {@link Ast.TableRow} per {@link #next()} call, + * with working-set memory bounded by the size of the largest single row. + * Use it for datasets too large to materialize via {@link Pxf#unmarshal} / + * {@link Pxf#unmarshalFull}. + * + *

Per the spec: a streaming API MUST enforce per-row arity and the v1 + * cell-grammar rule on each row as it is consumed (not deferred to end of + * input), and MUST yield rows in source order. Both invariants fall out + * of the implementation here: the row-boundary scanner produces one + * {@code ( ... )} byte slice at a time, and {@link Parser#parseTableRow} + * decodes it. + * + *

A TableReader is positioned at the first row after + * {@link #TableReader(InputStream)} returns. Call {@link #next()} in a + * loop until it returns {@code null}; the row sequence is exhausted at + * that point. For documents containing multiple {@code @table} + * directives, construct a second TableReader from {@link #tail()}. + * + *

A TableReader is NOT safe for concurrent use. + */ +public final class TableReader { + + /** + * Cap on the byte budget for the {@code @table} header (leading + * directives + {@code @table TYPE (col1, col2, ...)}). Real headers + * are tiny; the cap exists to fail-fast on misuse — a TableReader + * pointed at a multi-gigabyte document with no {@code @table} + * directive shouldn't OOM trying to find one. + */ + private static final int DEFAULT_HEADER_MAX_BYTES = 64 * 1024; + + /** + * Chunk size for {@code InputStream.read} pulls. Larger reduces + * syscall pressure; smaller bounds per-row peak buffer occupancy. + */ + private static final int STREAM_PULL_SIZE = 4096; + + private final InputStream src; + private byte[] pending = new byte[0]; // bytes pulled from src but not yet consumed + private boolean srcEof; // src.read has returned -1 + private boolean finished; // next() has returned null at least once + private RuntimeException stickyError; + + private String type; + private List columns; + private List directives; + + /** + * Consume any leading directives ({@code @type}, {@code @}, etc.) + * and the {@code @table TYPE ( cols )} header, returning a reader + * positioned at the first row. + * + * @throws NoSuchElementException if the input ends before any + * {@code @table} directive is seen + * @throws PxfException on a malformed header + * @throws IOException on an underlying {@link InputStream} + * read failure + */ + public TableReader(InputStream src) throws IOException { + this.src = src; + readHeader(); + } + + /** The row message type declared by the {@code @table} header. */ + public String type() { return type; } + + /** The column field names declared by the {@code @table} header, in source order. */ + public List columns() { return columns; } + + /** + * The side-channel directives ({@code @} / {@code @entry} / + * etc., NOT {@code @type} or {@code @table}) that appeared before the + * {@code @table} header. Stable for the reader's lifetime. + */ + public List directives() { return directives; } + + /** + * Returns an {@link InputStream} that yields the bytes the reader has + * buffered but not consumed, followed by any remaining bytes from the + * underlying source. Use it to chain a second TableReader for + * documents containing multiple {@code @table} directives: + * + *

{@code
+     * var tr1 = new TableReader(src);
+     * // ... iterate tr1.next() until null ...
+     * var tr2 = new TableReader(tr1.tail());
+     * }
+ * + *

MUST only be called after {@link #next()} has returned {@code null}. + * Calling it earlier returns bytes the current reader still intends to + * consume, which will desync the next reader. + */ + public InputStream tail() { + if (pending.length == 0) return src; + return new SequenceInputStream(new ByteArrayInputStream(pending), src); + } + + /** + * Reads the next row. Returns {@code null} when the table's row + * sequence is exhausted. After {@code null} (or any other error), all + * subsequent calls return {@code null} or rethrow the sticky error. + * + *

The returned {@link Ast.TableRow}'s cells list is freshly + * allocated; reading the next row does not invalidate it. + */ + public Ast.TableRow next() throws IOException { + if (stickyError != null) throw stickyError; + if (finished) return null; + for (;;) { + int[] rowRange = findNextRow(pending); + if (rowRange != null) { + int start = rowRange[0]; + int end = rowRange[1]; + byte[] rowBytes = sliceBytes(pending, start, end + 1); + Ast.TableRow row; + try { + row = Parser.parseTableRow(rowBytes, columns.size()); + } catch (PxfException e) { + stickyError = e; + throw e; + } + // Advance pending past the consumed row. + pending = sliceBytes(pending, end + 1, pending.length); + return row; + } + // Not found — either need more bytes or the row sequence is over. + if (srcEof) { + finished = true; + return null; + } + pull(STREAM_PULL_SIZE); + } + } + + /** + * Reads the next row and binds its cells to the fields of + * {@code builder} by column name. Returns {@code false} when the + * table's row sequence is exhausted. Cell-state semantics match + * {@link BindRow#bindRow}: a {@code null} cell leaves the field + * absent, an {@link Ast.NullVal} cell clears the field, any other + * value sets the field. + */ + public boolean scan(Message.Builder builder) throws IOException { + Ast.TableRow row = next(); + if (row == null) return false; + BindRow.bindRow(builder, columns, row); + return true; + } + + // -- internals -------------------------------------------------------- + + private void readHeader() throws IOException { + for (;;) { + int headerEnd = scanHeaderEnd(pending); + if (headerEnd >= 0) { + // Parse the header prefix as a (rowless) PXF document. + // Parser is happy with an @table directive that has no + // rows yet, and validates everything we care about + // (leading-directive shape, @type/@table conflict, + // dotted columns, etc.). + byte[] headerBytes = sliceBytes(pending, 0, headerEnd + 1); + Ast.Document doc = Parser.parse(headerBytes); + if (doc.tables().isEmpty()) { + // Should not happen — scanHeaderEnd found an @table — + // but defensive. + throw new NoSuchElementException("pxf: no @table directive in stream"); + } + Ast.TableDirective tbl = doc.tables().get(0); + this.type = tbl.type(); + this.columns = tbl.columns(); + this.directives = doc.directives(); + this.pending = sliceBytes(pending, headerEnd + 1, pending.length); + return; + } + if (srcEof) { + throw new NoSuchElementException("pxf: no @table directive in stream"); + } + if (pending.length >= DEFAULT_HEADER_MAX_BYTES) { + throw new PxfException(Position.UNKNOWN, + "pxf: @table header exceeds " + DEFAULT_HEADER_MAX_BYTES + " bytes; " + + "check that the input begins with `@table TYPE (cols)`"); + } + pull(STREAM_PULL_SIZE); + } + } + + private void pull(int n) throws IOException { + if (srcEof) return; + byte[] buf = new byte[n]; + int read = src.read(buf); + if (read < 0) { + srcEof = true; + return; + } + if (read > 0) { + byte[] grown = new byte[pending.length + read]; + System.arraycopy(pending, 0, grown, 0, pending.length); + System.arraycopy(buf, 0, grown, pending.length, read); + pending = grown; + } + } + + // -- byte-level row-boundary scanner -------------------------------- + // + // Finds row boundaries with string / bytes-literal / line-comment / + // block-comment awareness. Mirrors protowire-go's table_stream.go. + + /** + * Search {@code input} for the first complete + * {@code @table TYPE ( cols )} directive and return the index of the + * {@code )} that closes its column list. Returns -1 if the input + * ends before the header is complete (caller should pull more bytes). + * Throws {@link PxfException} on malformed string/comment. + */ + static int scanHeaderEnd(byte[] input) { + int atIdx = findAtTable(input); + if (atIdx < 0) return -1; + int lparen = findNextChar(input, atIdx + "@table".length(), '('); + if (lparen < 0) return -1; + return findMatchingParen(input, lparen); + } + + /** + * Return the byte offset of the next {@code @table} keyword outside + * strings/comments. The match must be followed by a non-identifier + * byte so we don't false-match {@code @tableau}. Returns -1 when not + * found or when the input ends mid-construct. + */ + static int findAtTable(byte[] input) { + int i = 0; + while (i < input.length) { + int j = skipStringOrComment(input, i); + if (j == NEED_MORE) return -1; + if (j != i) { i = j; continue; } + if (input[i] == '@' && i + 6 <= input.length + && input[i + 1] == 't' && input[i + 2] == 'a' && input[i + 3] == 'b' + && input[i + 4] == 'l' && input[i + 5] == 'e') { + int after = i + 6; + if (after == input.length) { + // `@table` followed by more bytes we haven't seen yet + // — be conservative. + return -1; + } + if (!isIdentPart(input[after])) { + return i; + } + } + i++; + } + return -1; + } + + /** Return the offset of the next {@code ch} outside strings/comments. */ + static int findNextChar(byte[] input, int startFrom, char ch) { + int i = startFrom; + while (i < input.length) { + int j = skipStringOrComment(input, i); + if (j == NEED_MORE) return -1; + if (j != i) { i = j; continue; } + if (input[i] == ch) return i; + i++; + } + return -1; + } + + /** Find the matching {@code )} for the {@code (} at {@code openIdx}. */ + static int findMatchingParen(byte[] input, int openIdx) { + int depth = 1; + int i = openIdx + 1; + while (i < input.length) { + int j = skipStringOrComment(input, i); + if (j == NEED_MORE) return -1; + if (j != i) { i = j; continue; } + switch (input[i]) { + case '(' -> { depth++; i++; } + case ')' -> { + depth--; + if (depth == 0) return i; + i++; + } + default -> i++; + } + } + return -1; + } + + /** + * Find the next {@code ( ... )} row in {@code input}, skipping + * leading whitespace + comments. Returns an int[2] of [start, end] + * (both inclusive of the parens) on success, or {@code null} when + * either the input ran out mid-scan (caller pulls more) or the next + * significant byte is not {@code (} (row sequence over). + */ + static int[] findNextRow(byte[] input) { + int i = 0; + while (i < input.length) { + byte ch = input[i]; + if (ch == ' ' || ch == '\t' || ch == '\r' || ch == '\n') { i++; continue; } + int j = skipStringOrComment(input, i); + if (j == NEED_MORE) return null; + if (j != i) { i = j; continue; } + break; + } + if (i >= input.length) return null; + if (input[i] != '(') return null; + int end = findMatchingParen(input, i); + if (end < 0) return null; + return new int[] {i, end}; + } + + private static final int NEED_MORE = -1; + + /** + * If {@code input[i]} opens a string / bytes-literal / line-comment / + * block-comment, advance past it and return the new index. Return + * {@code i} unchanged if it doesn't open one. Return {@link #NEED_MORE} + * (= -1) if the construct is incomplete (caller treats as "pull + * more bytes"). Throw on a malformed construct that can't be fixed + * by more bytes (e.g. unterminated single-line string with a + * literal newline). + */ + static int skipStringOrComment(byte[] input, int i) { + if (i >= input.length) return i; + byte ch = input[i]; + if (ch == '"') { + if (i + 2 < input.length && input[i + 1] == '"' && input[i + 2] == '"') { + return skipTripleString(input, i); + } + return skipSimpleString(input, i); + } + if (ch == 'b' && i + 1 < input.length && input[i + 1] == '"') { + return skipBytesLiteral(input, i); + } + if (ch == '#') return skipLineComment(input, i + 1); + if (ch == '/' && i + 1 < input.length && input[i + 1] == '/') return skipLineComment(input, i + 2); + if (ch == '/' && i + 1 < input.length && input[i + 1] == '*') return skipBlockComment(input, i + 2); + return i; + } + + private static int skipSimpleString(byte[] input, int i) { + int j = i + 1; + while (j < input.length) { + byte b = input[j]; + if (b == '\\') { + if (j + 1 >= input.length) return NEED_MORE; + j += 2; + continue; + } + if (b == '"') return j + 1; + if (b == '\n') { + throw new PxfException(Position.UNKNOWN, "pxf: unterminated string literal"); + } + j++; + } + return NEED_MORE; + } + + private static int skipTripleString(byte[] input, int i) { + int j = i + 3; + while (j + 2 < input.length) { + if (input[j] == '"' && input[j + 1] == '"' && input[j + 2] == '"') return j + 3; + j++; + } + return NEED_MORE; + } + + private static int skipBytesLiteral(byte[] input, int i) { + int j = i + 2; // past `b"` + while (j < input.length) { + byte b = input[j]; + if (b == '"') return j + 1; + if (b == '\n') { + throw new PxfException(Position.UNKNOWN, "pxf: unterminated bytes literal"); + } + j++; + } + return NEED_MORE; + } + + private static int skipLineComment(byte[] input, int from) { + int j = from; + while (j < input.length && input[j] != '\n') j++; + return j; + } + + private static int skipBlockComment(byte[] input, int from) { + int j = from; + while (j + 1 < input.length) { + if (input[j] == '*' && input[j + 1] == '/') return j + 2; + j++; + } + return NEED_MORE; + } + + private static boolean isIdentPart(byte b) { + return (b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') + || (b >= '0' && b <= '9') || b == '_' || b == '.'; + } + + /** Slice a copy of {@code [from, to)} from {@code src}. */ + private static byte[] sliceBytes(byte[] src, int from, int to) { + if (from < 0 || to > src.length || from > to) { + throw new IllegalArgumentException("invalid slice [" + from + "," + to + ")"); + } + byte[] out = new byte[to - from]; + System.arraycopy(src, from, out, 0, to - from); + return out; + } + + // Suppress "method may be static" warning — kept as instance for parity + // with future state-aware variants (e.g. per-instance maxHeaderBytes). + @SuppressWarnings("unused") + private void noop() { Collections.emptyList(); } +} diff --git a/pxf/src/test/java/org/protowire/pxf/TableReaderTest.java b/pxf/src/test/java/org/protowire/pxf/TableReaderTest.java new file mode 100644 index 0000000..7479358 --- /dev/null +++ b/pxf/src/test/java/org/protowire/pxf/TableReaderTest.java @@ -0,0 +1,431 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2026 TrendVidia, LLC. +package org.protowire.pxf; + +import com.google.protobuf.DynamicMessage; +import org.junit.jupiter.api.Test; +import org.protowire.pxf.testproto.AllTypes; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * Streaming {@code @table} consumption tests. Mirrors protowire-go's + * table_stream_test.go. + */ +class TableReaderTest { + + private static InputStream s(String in) { + return new ByteArrayInputStream(in.getBytes(StandardCharsets.UTF_8)); + } + + // --- Happy path --- + + @Test + void basicStreaming() throws IOException { + var tr = new TableReader(s(""" + @table trades.v1.Trade (symbol, price, qty) + ("AAPL", 192.34, 100) + ("MSFT", 410.10, 50) + ("GOOG", 142.00, 25)""")); + assertEquals("trades.v1.Trade", tr.type()); + assertEquals(List.of("symbol", "price", "qty"), tr.columns()); + assertEquals(List.of(), tr.directives()); + + List rows = new ArrayList<>(); + for (Ast.TableRow r; (r = tr.next()) != null; ) rows.add(r); + assertEquals(3, rows.size()); + var sv = (Ast.StringVal) rows.get(0).cells().get(0); + assertEquals("AAPL", sv.value()); + } + + @Test + void emptyTableReturnsNullImmediately() throws IOException { + var tr = new TableReader(s("@table trades.v1.Trade (symbol, price)")); + assertNull(tr.next()); + assertNull(tr.next()); // sticky + } + + // --- Three cell states --- + + @Test + void cellStates() throws IOException { + var tr = new TableReader(s(""" + @table t.T (a, b, c) + ("x", 1, true) + (null, , 3) + (, "y", null)""")); + + var r1 = tr.next(); + assertNotNull(r1); + assertInstanceOf(Ast.StringVal.class, r1.cells().get(0)); + assertInstanceOf(Ast.IntVal.class, r1.cells().get(1)); + assertInstanceOf(Ast.BoolVal.class, r1.cells().get(2)); + + var r2 = tr.next(); + assertInstanceOf(Ast.NullVal.class, r2.cells().get(0)); + assertNull(r2.cells().get(1)); + assertInstanceOf(Ast.IntVal.class, r2.cells().get(2)); + + var r3 = tr.next(); + assertNull(r3.cells().get(0)); + assertInstanceOf(Ast.StringVal.class, r3.cells().get(1)); + assertInstanceOf(Ast.NullVal.class, r3.cells().get(2)); + + assertNull(tr.next()); + } + + // --- Leading directives --- + + @Test + void sideChannelDirectivesBeforeHeader() throws IOException { + var tr = new TableReader(s(""" + @header meta.v1.H { generated_at = 2026-05-12T10:00:00Z } + @table trades.v1.Trade (symbol) + ("AAPL") + ("MSFT")""")); + + assertEquals(1, tr.directives().size()); + assertEquals("header", tr.directives().get(0).name()); + assertEquals("meta.v1.H", tr.directives().get(0).type()); + + int count = 0; + while (tr.next() != null) count++; + assertEquals(2, count); + } + + // --- Standalone constraint enforced at header read --- + + @Test + void rejectsAtTypeWithAtTable() { + var ex = assertThrows(PxfException.class, () -> + new TableReader(s(""" + @type some.Other + @table trades.v1.Trade (symbol) + ("AAPL")"""))); + assertTrue(ex.getMessage().contains("@type")); + } + + // --- No @table --- + + @Test + void noTableInStream() { + assertThrows(NoSuchElementException.class, () -> + new TableReader(s("string_field = \"x\""))); + } + + @Test + void emptyInput() { + assertThrows(NoSuchElementException.class, () -> new TableReader(s(""))); + } + + // --- Errors mid-stream are sticky --- + + @Test + void errorsAreSticky() throws IOException { + var tr = new TableReader(s(""" + @table T (a, b, c) + ("x", 1, 2) + ("y", 1)""")); // arity mismatch + assertNotNull(tr.next()); + var ex = assertThrows(PxfException.class, tr::next); + // Subsequent calls rethrow the same sticky error. + var ex2 = assertThrows(PxfException.class, tr::next); + assertEquals(ex.getMessage(), ex2.getMessage()); + } + + @Test + void rejectsListCellMidStream() throws IOException { + var tr = new TableReader(s(""" + @table T (a, b) + ("ok", 1) + ("bad", [1, 2])""")); + assertNotNull(tr.next()); + var ex = assertThrows(PxfException.class, tr::next); + assertTrue(ex.getMessage().contains("list values")); + } + + // --- Strings / comments inside cells don't trip row scanner --- + + @Test + void stringWithParens() throws IOException { + var tr = new TableReader(s(""" + @table T (note, n) + ("contains (paren) inside", 1) + ("normal", 2)""")); + var r1 = tr.next(); + assertEquals("contains (paren) inside", ((Ast.StringVal) r1.cells().get(0)).value()); + var r2 = tr.next(); + assertEquals("normal", ((Ast.StringVal) r2.cells().get(0)).value()); + } + + @Test + void blockCommentBetweenRows() throws IOException { + var tr = new TableReader(s(""" + @table T (a) + ("x") + /* this comment ) has ( parens + spanning multiple lines */ + ("y")""")); + int count = 0; + while (tr.next() != null) count++; + assertEquals(2, count); + } + + @Test + void lineCommentBetweenRows() throws IOException { + var tr = new TableReader(s(""" + @table T (a) + ("x") + # this is a comment, with ( a paren ) inside + ("y") + // another comment ) here + ("z")""")); + int count = 0; + while (tr.next() != null) count++; + assertEquals(3, count); + } + + // --- Chunked InputStream: rows split across reads --- + + private static class ChunkedStream extends InputStream { + private final byte[] data; + private int i = 0; + ChunkedStream(byte[] data) { this.data = data; } + + @Override + public int read() { + if (i >= data.length) return -1; + return data[i++] & 0xff; + } + + @Override + public int read(byte[] b, int off, int len) { + if (i >= data.length) return -1; + b[off] = data[i++]; + return 1; // one byte at a time — adversarial for any buffering bug + } + } + + @Test + void handlesByteAtATimeReader() throws IOException { + var tr = new TableReader(new ChunkedStream(""" + @table T (a, b, c) + ("hello", 42, true) + ("world", 99, false) + ("end", 0, null)""".getBytes(StandardCharsets.UTF_8))); + int count = 0; + while (tr.next() != null) count++; + assertEquals(3, count); + } + + // --- Multi-table via tail() --- + + @Test + void multipleTablesViaTail() throws IOException { + var tr1 = new TableReader(s(""" + @table events.v1.Created (id, ts) + ("e-1", 2026-05-12T10:00:00Z) + ("e-2", 2026-05-12T10:00:01Z) + @table events.v1.Deleted (id, ts) + ("e-9", 2026-05-12T10:00:02Z)""")); + assertEquals("events.v1.Created", tr1.type()); + int c1 = 0; + while (tr1.next() != null) c1++; + assertEquals(2, c1); + + var tr2 = new TableReader(tr1.tail()); + assertEquals("events.v1.Deleted", tr2.type()); + int c2 = 0; + while (tr2.next() != null) c2++; + assertEquals(1, c2); + } + + // --- Streaming and materializing produce equivalent rows --- + + @Test + void equivalentToMaterializingPath() throws IOException { + String in = """ + @table t.T (a, b, c) + ("alpha", 1, true) + ("beta", null, false) + (, , ) + ("gamma", 99, true)"""; + // Materializing. + var doc = Parser.parse(in); + assertEquals(1, doc.tables().size()); + var mat = doc.tables().get(0).rows(); + + // Streaming. + var tr = new TableReader(s(in)); + List stream = new ArrayList<>(); + for (Ast.TableRow r; (r = tr.next()) != null; ) stream.add(r); + + assertEquals(mat.size(), stream.size()); + for (int i = 0; i < mat.size(); i++) { + assertEquals(mat.get(i).cells().size(), stream.get(i).cells().size()); + for (int j = 0; j < mat.get(i).cells().size(); j++) { + var m = mat.get(i).cells().get(j); + var sCell = stream.get(i).cells().get(j); + if (m == null) assertNull(sCell); + else { + assertNotNull(sCell); + assertEquals(m.getClass(), sCell.getClass()); + } + } + } + } + + // --- Header size limit --- + + @Test + void rejectsOversizedHeader() { + // 70 KiB identifier > 64 KiB cap. + String long_ = "a".repeat(70 * 1024); + var ex = assertThrows(PxfException.class, () -> + new TableReader(s("@table " + long_ + ".T (col)\n(1)"))); + assertTrue(ex.getMessage().contains("header exceeds")); + } + + // --- scan(Message.Builder) --- + + @Test + void scanHappyPath() throws IOException { + var tr = new TableReader(s(""" + @table test.v1.AllTypes (string_field, int32_field, bool_field, enum_field) + ("alpha", 1, true, STATUS_ACTIVE) + ("beta", 2, false, STATUS_INACTIVE) + ("gamma", 3, true, STATUS_UNSPECIFIED)""")); + var desc = AllTypes.getDescriptor(); + int count = 0; + while (true) { + var b = DynamicMessage.newBuilder(desc); + if (!tr.scan(b)) break; + count++; + } + assertEquals(3, count); + } + + @Test + void scanReturnsFalseOnEof() throws IOException { + var tr = new TableReader(s(""" + @table test.v1.AllTypes (string_field) + ("x")""")); + var b1 = DynamicMessage.newBuilder(AllTypes.getDescriptor()); + assertTrue(tr.scan(b1)); + var b2 = DynamicMessage.newBuilder(AllTypes.getDescriptor()); + assertFalse(tr.scan(b2)); + } + + @Test + void scanEmptyCellLeavesFieldUnset() throws IOException { + var tr = new TableReader(s(""" + @table test.v1.AllTypes (string_field, int32_field) + ("present", 7) + (, 99) + ("set", )""")); + var stringFd = AllTypes.getDescriptor().findFieldByName("string_field"); + var intFd = AllTypes.getDescriptor().findFieldByName("int32_field"); + + var b1 = DynamicMessage.newBuilder(AllTypes.getDescriptor()); + tr.scan(b1); + assertEquals("present", b1.build().getField(stringFd)); + assertEquals(7, b1.build().getField(intFd)); + + var b2 = DynamicMessage.newBuilder(AllTypes.getDescriptor()); + tr.scan(b2); + assertEquals("", b2.build().getField(stringFd)); + assertEquals(99, b2.build().getField(intFd)); + + var b3 = DynamicMessage.newBuilder(AllTypes.getDescriptor()); + tr.scan(b3); + assertEquals("set", b3.build().getField(stringFd)); + assertEquals(0, b3.build().getField(intFd)); + } + + @Test + void scanNullOnWrapperClears() throws IOException { + var tr = new TableReader(s(""" + @table test.v1.AllTypes (string_field, nullable_int) + ("with-value", 42) + ("nullified", null)""")); + var nullableIntFd = AllTypes.getDescriptor().findFieldByName("nullable_int"); + + var b1 = DynamicMessage.newBuilder(AllTypes.getDescriptor()); + tr.scan(b1); + assertTrue(b1.build().hasField(nullableIntFd)); + + var b2 = DynamicMessage.newBuilder(AllTypes.getDescriptor()); + tr.scan(b2); + assertFalse(b2.build().hasField(nullableIntFd)); + } + + @Test + void scanWellKnownTimestamp() throws IOException { + var tr = new TableReader(s(""" + @table test.v1.AllTypes (string_field, ts_field) + ("first", 2026-05-12T10:30:00Z)""")); + var tsFd = AllTypes.getDescriptor().findFieldByName("ts_field"); + + var b = DynamicMessage.newBuilder(AllTypes.getDescriptor()); + tr.scan(b); + var tsMsg = (com.google.protobuf.Message) b.build().getField(tsFd); + var secondsFd = tsMsg.getDescriptorForType().findFieldByName("seconds"); + long expected = java.time.Instant.parse("2026-05-12T10:30:00Z").getEpochSecond(); + assertEquals(expected, tsMsg.getField(secondsFd)); + } + + // --- BindRow against the materializing path --- + + @Test + void bindRowAgainstMaterializingPath() { + var doc = Parser.parse(""" + @table test.v1.AllTypes (string_field, int32_field) + ("alpha", 1) + ("beta", 2)"""); + var tbl = doc.tables().get(0); + for (int i = 0; i < tbl.rows().size(); i++) { + var b = DynamicMessage.newBuilder(AllTypes.getDescriptor()); + BindRow.bindRow(b, tbl.columns(), tbl.rows().get(i)); + } + } + + @Test + void bindRowArityMismatch() { + var b = DynamicMessage.newBuilder(AllTypes.getDescriptor()); + var row = new Ast.TableRow(Position.UNKNOWN, + java.util.Collections.singletonList(new Ast.StringVal(Position.UNKNOWN, "x"))); + var ex = assertThrows(IllegalArgumentException.class, + () -> BindRow.bindRow(b, List.of("a", "b"), row)); + assertTrue(ex.getMessage().contains("columns vs")); + } + + @Test + void bindRowRejectsNonLeafCell() { + // Hand-construct a row with a ListVal cell — the parser rejects + // these earlier, but a caller that builds a TableRow manually + // bypasses that check. + var b = DynamicMessage.newBuilder(AllTypes.getDescriptor()); + var row = new Ast.TableRow(Position.UNKNOWN, + java.util.Collections.singletonList( + new Ast.ListVal(Position.UNKNOWN, + List.of(new Ast.StringVal(Position.UNKNOWN, "x"))))); + var ex = assertThrows(IllegalArgumentException.class, + () -> BindRow.bindRow(b, List.of("string_field"), row)); + assertTrue(ex.getMessage().contains("scalar-shaped")); + } +}