fix: align native_datafusion Parquet schema checks with Spark's vectorized reader#4229
fix: align native_datafusion Parquet schema checks with Spark's vectorized reader#4229andygrove wants to merge 13 commits into
Conversation
When COMET_SCHEMA_EVOLUTION_ENABLED is false, the native_datafusion scan path now rejects reading Parquet INT32 as INT64, FLOAT as DOUBLE, and INT32 as DOUBLE — matching the existing validation in native_iceberg_compat. The allow_type_promotion flag is passed from JVM via protobuf and checked in replace_with_spark_cast() before allowing widening casts. Closes apache#3720 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Format the SchemaColumnConvertNotSupportedException message produced by the type-promotion check so it matches Spark's vectorized reader output: column rendered as [name], expected as Spark catalog string (bigint), found as Parquet primitive name (INT32). This lets the SPARK-35640 and "row group skipping doesn't overflow" tests pass, and updates 3.4.3.diff to remove their IgnoreCometNativeDataFusion tags. The TimestampLTZ to TimestampNTZ case (SPARK-36182) and decimal precision/scale case (SPARK-34212) remain ignored, tracked under apache#4219 and apache#3720 respectively. Also reverts the cfg(test) gate on parquet/util/test_common so the parquet_read benchmark builds.
Run the 3720-tagged tests in dev/diffs/3.5.8.diff, 4.0.2.diff, and 4.1.1.diff against patched Spark trees with the type-promotion fix applied, then drop the IgnoreCometNativeDataFusion tag for tests that now pass and keep it on tests that still fail. 3.5.8: Drop tags for SPARK-35640 (int as long) and "row group skipping doesn't overflow", repoint SPARK-36182 at issue apache#4219. Same scope as 3.4.3, since the test source matches. 4.0.2 and 4.1.1: Drop tags for SPARK-47447 (TimestampLTZ as TimestampNTZ) and "row group skipping". 4.1.1 also drops the tag for SPARK-45604 (timestamp_ntz to array<timestamp_ntz>). Tests for SPARK-35640 (binary as timestamp), SPARK-34212 (decimal precision/scale), the schema-mismatch vectorized-reader test, and the parameterized ParquetTypeWideningSuite cases (unsupported parquet conversion, unsupported parquet timestamp conversion, parquet decimal precision change, parquet decimal precision and scale change) still fail and remain ignored under apache#3720.
The test reads a partitioned dataset where one partition is an empty parquet file written with INT32 schema and the other has 10 rows of INT64. Spark's vectorized reader silently skips the type check for the empty file because no row groups are scanned. The native_datafusion adapter rejects the INT32 to INT64 promotion at plan time regardless of file row count, so the test now fails when allow_type_promotion is false (Spark 3.x default). Tag the test with IgnoreCometNativeDataFusion under the existing 3720 umbrella in 3.4.3.diff and 3.5.8.diff. Spark 4.x defaults allow_type_promotion to true so its diffs are unaffected.
|
Nice reduction in ignored tests. One concern on scope. The three-case match in But I ran a probe against this PR on Spark 3.5 with
The top three rows are what the PR fixes and look right under both settings. The bottom seven are wrong-answer paths under both settings: silent overflow on narrowing, silent precision loss on widening Spark doesn't allow, silent raw-int-as-epoch-seconds reinterpretation for Not asking you to fix all of them in this PR. But I think the framing in the commit message and code comment (
Either is fine by me. I'd lean toward (2) to keep this PR scoped. Probe used (slimmed, put under package org.apache.comet.parquet
import scala.util.Try
import org.apache.spark.sql.{CometTestBase, DataFrame}
import org.apache.spark.sql.internal.SQLConf
import org.apache.comet.CometConf
class TypePromotionProbeSuite extends CometTestBase {
import testImplicits._
private def probe(label: String)(body: => Any): Unit = {
val result = Try(body)
// scalastyle:off println
println(s"[PROBE] $label -> ${result match {
case scala.util.Success(v) => s"OK value=$v"
case scala.util.Failure(e) => s"THROW ${e.getClass.getSimpleName}"
}}")
// scalastyle:on println
}
private def runAll(ev: Boolean): Unit = withSQLConf(
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> ev.toString,
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
def run(label: String, df: DataFrame, writeType: String, readAs: String): Unit =
probe(s"$label (ev=$ev)") {
withTempPath { dir =>
df.selectExpr(s"cast(c as $writeType) as c").write.parquet(dir.getCanonicalPath)
spark.read.schema(s"c $readAs").parquet(dir.getCanonicalPath)
.collect().map(_.get(0)).toSeq
}
}
run("int->long", Seq(1, 2, 3).toDF("c"), "int", "bigint")
run("float->double", Seq(1.0f, 2.0f, 3.0f).toDF("c"), "float", "double")
run("int->double", Seq(1, 2, 3).toDF("c"), "int", "double")
run("long->int narrowing", Seq(1L, 2L, 3L, Int.MaxValue.toLong + 5L).toDF("c"), "bigint", "int")
run("double->float narrowing",Seq(1.5, 2.5, 1e40).toDF("c"), "double", "float")
run("float->long", Seq(1.5f, 2.5f).toDF("c"), "float", "bigint")
run("long->double", Seq(1L, 2L, (1L << 54) + 1L).toDF("c"), "bigint", "double")
run("int->float", Seq(1, 2, (1 << 25) + 1).toDF("c"), "int", "float")
run("int->timestamp", Seq(1, 2, 3).toDF("c"), "int", "timestamp")
run("double->long", Seq(1.0, 2.0, 3.0).toDF("c"), "double", "bigint")
}
test("probe ev=false") { runAll(ev = false) }
test("probe ev=true") { runAll(ev = true) }
} |
|
I guess the bigger question to me becomes: why do we have |
|
Thanks for the probe — that table makes the remaining surface concrete. Going with option (2): filed #4297 to track the seven unconditionally-rejected conversions, with your probe table and code copied over so the behavior is captured. Tightened the code comment and PR description here to be explicit that this PR only closes the three On the bigger question about deprecating |
…ion-validation # Conflicts: # dev/diffs/3.4.3.diff # dev/diffs/3.5.8.diff # native/core/src/parquet/parquet_support.rs # native/proto/src/proto/operator.proto # spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
…-Spark-version default
Removes the public `spark.comet.schemaEvolution.enabled` ConfigEntry and reads
type-promotion permissiveness directly from the per-Spark-version constant in
`ShimCometConf` (false on 3.x, true on 4.x). Mirrors what Spark's vectorized
reader does without requiring a user-tunable knob that historically existed only
for the now-dead Java Iceberg-Comet integration.
- Promote `COMET_SCHEMA_EVOLUTION_ENABLED` to a public `val` in `ShimCometConf`
- Drop the `internal()` ConfigEntry from `CometConf`
- Swap Java callers in `AbstractColumnReader` and `TypeUtil` to read the constant
- Swap `CometNativeScan` to set the proto field from the constant
- Rewrite `ParquetReadSuite` tests that flipped the conf:
- `schema evolution`: drop the parametrization, branch on the constant
- `type widening` and `read byte, int, short, long together`: gate with
`assume(...COMET_SCHEMA_EVOLUTION_ENABLED)` since they only apply on 4.x
Closes apache#4298.
Also regenerates `dev/diffs/3.4.3.diff` and `dev/diffs/3.5.8.diff` to include the
SPARK-26709 IgnoreCometNativeDataFusion tag alongside main's SPARK-33084 tag
(both branches added separate hunks to SQLQuerySuite; merge needed a re-diff).
|
@mbutrovich I removed |
…ion-validation # Conflicts: # dev/diffs/4.0.2.diff # dev/diffs/4.1.1.diff
…-26709 The plan-time check in `replace_with_spark_cast` rejects the three widenings (INT32->INT64, FLOAT->DOUBLE, INT32->DOUBLE) regardless of file row count. Spark's vectorized reader only invokes `ParquetVectorUpdaterFactory.getUpdater` while decoding a row group, so a Parquet file with no row groups (e.g. written from an empty DataFrame) passes silently. SPARK-26709's mixed-partition case hit this: one partition is an empty INT32 file, another has 10 rows of INT64. Replace the eager `return Err(...)` with a `RejectOnNonEmpty` PhysicalExpr that returns an empty array of the target type when the input batch has 0 rows and raises `ParquetSchemaConvert` otherwise. The JVM shim converts the error to `SchemaColumnConvertNotSupportedException` with the same Spark-compatible column-name and type formatting. Drops the `IgnoreCometNativeDataFusion` tag for SPARK-26709 in 3.4.3.diff and 3.5.8.diff (both diffs regenerated from clean Spark trees).
Match Spark's `_LEGACY_ERROR_TEMP_2063` exactly for the two BINARY-related rejection paths in `replace_with_spark_cast`: - Existing BINARY -> non-string/binary/decimal rejection: format column as `[name]`, emit Parquet primitive names (`BINARY`) and Spark catalog names (`int`, `timestamp`, ...) instead of Arrow datatype debug names. - New non-BINARY -> string/binary rejection: Spark's vectorized reader has no `int -> string` or `long -> string` updater in `ParquetVectorUpdaterFactory`, so reject these to match (previously we silently produced strings via `spark_expr::Cast`). Extends `parquet_primitive_name` / `spark_catalog_name` with Utf8, Binary, Date32, and Timestamp entries needed by the new error format. Un-ignores tests now passing: - `schema mismatch failure error message for parquet vectorized reader` (all four diffs, tests both directions) - `SPARK-35640: read binary as timestamp should throw schema incompatible error` (4.0.2, 4.1.1) - `SPARK-35640: int as long should throw schema incompatible error` (3.4.3, 3.5.8) was already enabled by the existing type-promotion rejection but the upmerge regen had re-added the ignore tag; drop it here. Replaces the existing `parquet_roundtrip_int_as_string` test (which was asserting silent wrong-answer behavior) with `parquet_int_read_as_string_errors` plus a companion `parquet_string_read_as_int_errors`.
|
I iterated more on this PR and it now unignores more tests and no longer adds any new ignores - let's see if CI passes |
…er to runtime CI regressed on legitimate Parquet reads where the physical Arrow type is something other than a basic numeric primitive: FIXED_LEN_BYTE_ARRAY (FixedSizeBinary) and partitioned iceberg reads (Dictionary / constant / other) both hit the new "non-BINARY -> string/binary" rejection. Spark allows those reads. Narrow the check to fire only when the physical type is BOOLEAN, INT32-class (Int8/Int16/Int32), INT64, FLOAT, or DOUBLE -- the set with no "read as string" updater in `ParquetVectorUpdaterFactory`. Defer to runtime via `RejectOnNonEmpty` so the error flows through the JVM shim and surfaces as `SchemaColumnConvertNotSupportedException` with the single-bracket Spark-compatible column format, instead of bubbling out as `CometNativeException` at plan-build time with the Rust display.
Which issue does this PR close?
Closes #3720. Also closes #4298 (deprecation of
spark.comet.schemaEvolution.enabled).Rationale for this change
Under
spark.comet.scan.impl=native_datafusion, several Spark SQL tests that expectSchemaColumnConvertNotSupportedExceptionon incompatible Parquet reads pass silently: DataFusion's reader coerces mismatched numeric/string types instead of erroring. This PR adds the rejections Spark's vectorized reader performs inParquetVectorUpdaterFactory.getUpdater, with the error message formatted to match_LEGACY_ERROR_TEMP_2063/FAILED_READ_FILE.PARQUET_COLUMN_DATA_TYPE_MISMATCH.Scope: BINARY-related rejections plus the three Spark-3.x-gated widenings (
INT32 -> INT64,FLOAT -> DOUBLE,INT32 -> DOUBLE). Conversions Spark rejects unconditionally on every supported version (e.g.long -> int,double -> float,int -> timestamp) are tracked in #4297.What changes are included in this PR?
In
replace_with_spark_cast:RejectOnNonEmptyPhysicalExpr, returning an empty array for batches with zero rows and raising otherwise. This mirrors Spark's per-row-group check, so files with no row groups (e.g. an empty DataFrame write) pass silently and SPARK-26709's mixed-partition case keeps working.StringType/BinaryType. Spark's vectorized reader has no such updater; previously Comet silently produced strings viaspark_expr::Cast.[name], physical type asBINARY/INT32/..., logical type asint/string/timestamp/....Other:
spark.comet.schemaEvolution.enabledconf in favor of a per-Spark-version constant inShimCometConf(false on 3.x, true on 4.x).dev/diffs/{3.4.3,3.5.8,4.0.2,4.1.1}.diffthat now pass:schema mismatch failure error message for parquet vectorized reader,SPARK-35640: int as long(3.x),SPARK-35640: read binary as timestamp(4.x),row group skipping doesn't overflow(all), and (on 4.x)SPARK-47447andSPARK-45604.How are these changes tested?
schema_adapter.rscovering both directions for both BINARY and the type-promotion rejection, plus the empty-file pass-through:parquet_empty_file_disallowed_wideningparquet_non_empty_file_disallowed_widening_errorsparquet_int_read_as_string_errorsparquet_string_read_as_int_errors