Skip to content

fix: align native_datafusion Parquet schema checks with Spark's vectorized reader#4229

Draft
andygrove wants to merge 13 commits into
apache:mainfrom
andygrove:native-df-type-promotion-validation
Draft

fix: align native_datafusion Parquet schema checks with Spark's vectorized reader#4229
andygrove wants to merge 13 commits into
apache:mainfrom
andygrove:native-df-type-promotion-validation

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented May 5, 2026

Which issue does this PR close?

Closes #3720. Also closes #4298 (deprecation of spark.comet.schemaEvolution.enabled).

Rationale for this change

Under spark.comet.scan.impl=native_datafusion, several Spark SQL tests that expect SchemaColumnConvertNotSupportedException on incompatible Parquet reads pass silently: DataFusion's reader coerces mismatched numeric/string types instead of erroring. This PR adds the rejections Spark's vectorized reader performs in ParquetVectorUpdaterFactory.getUpdater, with the error message formatted to match _LEGACY_ERROR_TEMP_2063 / FAILED_READ_FILE.PARQUET_COLUMN_DATA_TYPE_MISMATCH.

Scope: BINARY-related rejections plus the three Spark-3.x-gated widenings (INT32 -> INT64, FLOAT -> DOUBLE, INT32 -> DOUBLE). Conversions Spark rejects unconditionally on every supported version (e.g. long -> int, double -> float, int -> timestamp) are tracked in #4297.

What changes are included in this PR?

In replace_with_spark_cast:

  • Reject the three Spark-3.x widenings when type promotion is disabled. The rejection is deferred to evaluation time via a new RejectOnNonEmpty PhysicalExpr, returning an empty array for batches with zero rows and raising otherwise. This mirrors Spark's per-row-group check, so files with no row groups (e.g. an empty DataFrame write) pass silently and SPARK-26709's mixed-partition case keeps working.
  • Reject non-BINARY physical -> StringType/BinaryType. Spark's vectorized reader has no such updater; previously Comet silently produced strings via spark_expr::Cast.
  • Format both BINARY rejections to match Spark's error params byte-for-byte: column as [name], physical type as BINARY/INT32/..., logical type as int/string/timestamp/....

Other:

  • Deprecate the public spark.comet.schemaEvolution.enabled conf in favor of a per-Spark-version constant in ShimCometConf (false on 3.x, true on 4.x).
  • Un-ignore tests across dev/diffs/{3.4.3,3.5.8,4.0.2,4.1.1}.diff that now pass: schema mismatch failure error message for parquet vectorized reader, SPARK-35640: int as long (3.x), SPARK-35640: read binary as timestamp (4.x), row group skipping doesn't overflow (all), and (on 4.x) SPARK-47447 and SPARK-45604.

How are these changes tested?

  • New Rust tests in schema_adapter.rs covering both directions for both BINARY and the type-promotion rejection, plus the empty-file pass-through:
    • parquet_empty_file_disallowed_widening
    • parquet_non_empty_file_disallowed_widening_errors
    • parquet_int_read_as_string_errors
    • parquet_string_read_as_int_errors
  • Affected Spark SQL tests run under CI with the regenerated diffs.

andygrove and others added 5 commits May 5, 2026 12:52
When COMET_SCHEMA_EVOLUTION_ENABLED is false, the native_datafusion scan
path now rejects reading Parquet INT32 as INT64, FLOAT as DOUBLE, and
INT32 as DOUBLE — matching the existing validation in native_iceberg_compat.

The allow_type_promotion flag is passed from JVM via protobuf and checked
in replace_with_spark_cast() before allowing widening casts.

Closes apache#3720

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Format the SchemaColumnConvertNotSupportedException message produced by
the type-promotion check so it matches Spark's vectorized reader output:
column rendered as [name], expected as Spark catalog string (bigint),
found as Parquet primitive name (INT32). This lets the SPARK-35640 and
"row group skipping doesn't overflow" tests pass, and updates 3.4.3.diff
to remove their IgnoreCometNativeDataFusion tags.

The TimestampLTZ to TimestampNTZ case (SPARK-36182) and decimal
precision/scale case (SPARK-34212) remain ignored, tracked under apache#4219
and apache#3720 respectively. Also reverts the cfg(test) gate on
parquet/util/test_common so the parquet_read benchmark builds.
Run the 3720-tagged tests in dev/diffs/3.5.8.diff, 4.0.2.diff, and
4.1.1.diff against patched Spark trees with the type-promotion fix
applied, then drop the IgnoreCometNativeDataFusion tag for tests that
now pass and keep it on tests that still fail.

3.5.8: Drop tags for SPARK-35640 (int as long) and "row group skipping
doesn't overflow", repoint SPARK-36182 at issue apache#4219. Same scope as
3.4.3, since the test source matches.

4.0.2 and 4.1.1: Drop tags for SPARK-47447 (TimestampLTZ as TimestampNTZ)
and "row group skipping". 4.1.1 also drops the tag for SPARK-45604
(timestamp_ntz to array<timestamp_ntz>). Tests for SPARK-35640 (binary as
timestamp), SPARK-34212 (decimal precision/scale), the schema-mismatch
vectorized-reader test, and the parameterized ParquetTypeWideningSuite
cases (unsupported parquet conversion, unsupported parquet timestamp
conversion, parquet decimal precision change, parquet decimal precision
and scale change) still fail and remain ignored under apache#3720.
The test reads a partitioned dataset where one partition is an
empty parquet file written with INT32 schema and the other has 10
rows of INT64. Spark's vectorized reader silently skips the type
check for the empty file because no row groups are scanned. The
native_datafusion adapter rejects the INT32 to INT64 promotion at
plan time regardless of file row count, so the test now fails
when allow_type_promotion is false (Spark 3.x default).

Tag the test with IgnoreCometNativeDataFusion under the existing
3720 umbrella in 3.4.3.diff and 3.5.8.diff. Spark 4.x defaults
allow_type_promotion to true so its diffs are unaffected.
@andygrove andygrove marked this pull request as ready for review May 5, 2026 23:04
@andygrove andygrove added this to the 0.16.0 milestone May 6, 2026
@andygrove andygrove moved this to In progress in Comet Development May 6, 2026
@mbutrovich
Copy link
Copy Markdown
Contributor

Nice reduction in ignored tests. One concern on scope.

The three-case match in replace_with_spark_cast exactly mirrors the three Comet-specific permissive branches in Comet's TypeUtil.checkParquetType (INT32→Long and FLOAT→Double gated on allowTypePromotion, INT32→Double gated on isSpark40Plus), so native_datafusion and native_iceberg_compat now line up for those three. Good.

But native_datafusion also silently accepts a bunch of conversions that Spark's vectorized reader rejects on every supported version (Spark's ParquetVectorUpdaterFactory.getUpdater falls through to constructConvertNotSupportedException). From Spark's ParquetTypeWideningSuite expectError = true list on 4.0.2: Long→Int, Double→Float, Float→Long, Long→Double, Int→Float, Int→TimestampType, Date→TimestampType. None of these is gated by the new flag, and none of them throws today.

I ran a probe against this PR on Spark 3.5 with COMET_NATIVE_SCAN_IMPL=native_datafusion, sweeping spark.comet.schemaEvolution.enabled on and off. Results:

Case Written Spark ref behavior schemaEvolution=false schemaEvolution=true
int→long [1, 2, 3] throws (3.x) / ok (4.0) throws [1, 2, 3]
float→double [1.0, 2.0, 3.0] throws (3.x) / ok (4.0) throws [1.0, 2.0, 3.0]
int→double [1, 2, 3] throws (3.x) / ok (4.0) throws [1.0, 2.0, 3.0]
long→int (narrowing) [1, 2, 3, 2147483652] throws [1, 2, 3, -2147483644] [1, 2, 3, -2147483644]
double→float (narrowing) [1.5, 2.5, 1e40] throws [1.5, 2.5, Infinity] [1.5, 2.5, Infinity]
float→long [1.5, 2.5] throws [1, 2] (truncated) [1, 2] (truncated)
long→double [1, 2, 2^54+1] throws [1.0, 2.0, 1.8014398509481984E16] (lost +1) same
int→float [1, 2, 2^25+1] throws [1.0, 2.0, 3.3554432E7] (lost +1) same
int→timestamp [1, 2, 3] throws [1969-12-31 16:00:01 … 03] (PST, int-as-seconds) same
double→long [1.0, 2.0, 3.0] throws [1, 2, 3] [1, 2, 3]

The top three rows are what the PR fixes and look right under both settings. The bottom seven are wrong-answer paths under both settings: silent overflow on narrowing, silent precision loss on widening Spark doesn't allow, silent raw-int-as-epoch-seconds reinterpretation for int→timestamp. These are the same class of gap #3720 enumerates for STRING→INT and decimal precision narrowing, just for primitive-to-primitive conversions.

Not asking you to fix all of them in this PR. But I think the framing in the commit message and code comment (mirrors TypeUtil.checkParquetType) undersells the remaining surface. Two options worth considering:

  1. Invert the check to an allowlist of Spark-supported (physical, target) pairs (essentially mirror the accept cases in Spark's ParquetVectorUpdaterFactory.getUpdater per Spark version), so anything else raises ParquetSchemaConvert. This closes the whole category.
  2. Land this as-is and file a followup issue tracking the seven cases above, linking this probe so behavior is captured.

Either is fine by me. I'd lean toward (2) to keep this PR scoped.


Probe used (slimmed, put under spark/src/test/scala/org/apache/comet/parquet/, runs with ./mvnw test -Pspark-3.5 -Dtest=none -Dsuites=org.apache.comet.parquet.TypePromotionProbeSuite -Dscalastyle.skip=true):

package org.apache.comet.parquet

import scala.util.Try
import org.apache.spark.sql.{CometTestBase, DataFrame}
import org.apache.spark.sql.internal.SQLConf
import org.apache.comet.CometConf

class TypePromotionProbeSuite extends CometTestBase {
  import testImplicits._

  private def probe(label: String)(body: => Any): Unit = {
    val result = Try(body)
    // scalastyle:off println
    println(s"[PROBE] $label -> ${result match {
        case scala.util.Success(v) => s"OK value=$v"
        case scala.util.Failure(e) => s"THROW ${e.getClass.getSimpleName}"
      }}")
    // scalastyle:on println
  }

  private def runAll(ev: Boolean): Unit = withSQLConf(
    CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
    CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> ev.toString,
    SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
    def run(label: String, df: DataFrame, writeType: String, readAs: String): Unit =
      probe(s"$label (ev=$ev)") {
        withTempPath { dir =>
          df.selectExpr(s"cast(c as $writeType) as c").write.parquet(dir.getCanonicalPath)
          spark.read.schema(s"c $readAs").parquet(dir.getCanonicalPath)
            .collect().map(_.get(0)).toSeq
        }
      }
    run("int->long",              Seq(1, 2, 3).toDF("c"),                              "int",    "bigint")
    run("float->double",          Seq(1.0f, 2.0f, 3.0f).toDF("c"),                     "float",  "double")
    run("int->double",            Seq(1, 2, 3).toDF("c"),                              "int",    "double")
    run("long->int narrowing",    Seq(1L, 2L, 3L, Int.MaxValue.toLong + 5L).toDF("c"), "bigint", "int")
    run("double->float narrowing",Seq(1.5, 2.5, 1e40).toDF("c"),                       "double", "float")
    run("float->long",            Seq(1.5f, 2.5f).toDF("c"),                           "float",  "bigint")
    run("long->double",           Seq(1L, 2L, (1L << 54) + 1L).toDF("c"),              "bigint", "double")
    run("int->float",             Seq(1, 2, (1 << 25) + 1).toDF("c"),                  "int",    "float")
    run("int->timestamp",         Seq(1, 2, 3).toDF("c"),                              "int",    "timestamp")
    run("double->long",           Seq(1.0, 2.0, 3.0).toDF("c"),                        "double", "bigint")
  }

  test("probe ev=false") { runAll(ev = false) }
  test("probe ev=true")  { runAll(ev = true) }
}

@mbutrovich
Copy link
Copy Markdown
Contributor

I guess the bigger question to me becomes: why do we have spark.comet.schemaEvolution.enabled config anymore? Maybe we should deprecate that first and help us simplify the story. I think it's legacy from when Comet's Parquet decoder could be called from Iceberg, which has different schema evolution semantics.

@andygrove
Copy link
Copy Markdown
Member Author

Thanks for the probe — that table makes the remaining surface concrete.

Going with option (2): filed #4297 to track the seven unconditionally-rejected conversions, with your probe table and code copied over so the behavior is captured. Tightened the code comment and PR description here to be explicit that this PR only closes the three schemaEvolution-gated widenings, not the broader category.

On the bigger question about deprecating spark.comet.schemaEvolution.enabled — filed #4298 for that investigation. It looks like the per-version defaults (false on 3.x, true on 4.x via ShimCometConf) already encode what Spark itself does, so if there are no remaining callers flipping it, hardcoding it into the shim seems reasonable.

andygrove added 3 commits May 12, 2026 08:09
…ion-validation

# Conflicts:
#	dev/diffs/3.4.3.diff
#	dev/diffs/3.5.8.diff
#	native/core/src/parquet/parquet_support.rs
#	native/proto/src/proto/operator.proto
#	spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
…-Spark-version default

Removes the public `spark.comet.schemaEvolution.enabled` ConfigEntry and reads
type-promotion permissiveness directly from the per-Spark-version constant in
`ShimCometConf` (false on 3.x, true on 4.x). Mirrors what Spark's vectorized
reader does without requiring a user-tunable knob that historically existed only
for the now-dead Java Iceberg-Comet integration.

- Promote `COMET_SCHEMA_EVOLUTION_ENABLED` to a public `val` in `ShimCometConf`
- Drop the `internal()` ConfigEntry from `CometConf`
- Swap Java callers in `AbstractColumnReader` and `TypeUtil` to read the constant
- Swap `CometNativeScan` to set the proto field from the constant
- Rewrite `ParquetReadSuite` tests that flipped the conf:
  - `schema evolution`: drop the parametrization, branch on the constant
  - `type widening` and `read byte, int, short, long together`: gate with
    `assume(...COMET_SCHEMA_EVOLUTION_ENABLED)` since they only apply on 4.x

Closes apache#4298.

Also regenerates `dev/diffs/3.4.3.diff` and `dev/diffs/3.5.8.diff` to include the
SPARK-26709 IgnoreCometNativeDataFusion tag alongside main's SPARK-33084 tag
(both branches added separate hunks to SQLQuerySuite; merge needed a re-diff).
@andygrove
Copy link
Copy Markdown
Member Author

@mbutrovich I removed COMET_SCHEMA_EVOLUTION_ENABLED. I did not remove the references from the Iceberg diffs yet, but there is no harm in them being there.

andygrove added 3 commits May 13, 2026 07:32
…ion-validation

# Conflicts:
#	dev/diffs/4.0.2.diff
#	dev/diffs/4.1.1.diff
…-26709

The plan-time check in `replace_with_spark_cast` rejects the three widenings
(INT32->INT64, FLOAT->DOUBLE, INT32->DOUBLE) regardless of file row count.
Spark's vectorized reader only invokes `ParquetVectorUpdaterFactory.getUpdater`
while decoding a row group, so a Parquet file with no row groups (e.g. written
from an empty DataFrame) passes silently. SPARK-26709's mixed-partition case
hit this: one partition is an empty INT32 file, another has 10 rows of INT64.

Replace the eager `return Err(...)` with a `RejectOnNonEmpty` PhysicalExpr that
returns an empty array of the target type when the input batch has 0 rows and
raises `ParquetSchemaConvert` otherwise. The JVM shim converts the error to
`SchemaColumnConvertNotSupportedException` with the same Spark-compatible
column-name and type formatting.

Drops the `IgnoreCometNativeDataFusion` tag for SPARK-26709 in 3.4.3.diff and
3.5.8.diff (both diffs regenerated from clean Spark trees).
Match Spark's `_LEGACY_ERROR_TEMP_2063` exactly for the two BINARY-related
rejection paths in `replace_with_spark_cast`:

- Existing BINARY -> non-string/binary/decimal rejection: format column as
  `[name]`, emit Parquet primitive names (`BINARY`) and Spark catalog names
  (`int`, `timestamp`, ...) instead of Arrow datatype debug names.
- New non-BINARY -> string/binary rejection: Spark's vectorized reader has no
  `int -> string` or `long -> string` updater in `ParquetVectorUpdaterFactory`,
  so reject these to match (previously we silently produced strings via
  `spark_expr::Cast`).

Extends `parquet_primitive_name` / `spark_catalog_name` with Utf8, Binary,
Date32, and Timestamp entries needed by the new error format.

Un-ignores tests now passing:
- `schema mismatch failure error message for parquet vectorized reader`
  (all four diffs, tests both directions)
- `SPARK-35640: read binary as timestamp should throw schema incompatible error`
  (4.0.2, 4.1.1)
- `SPARK-35640: int as long should throw schema incompatible error` (3.4.3,
  3.5.8) was already enabled by the existing type-promotion rejection but the
  upmerge regen had re-added the ignore tag; drop it here.

Replaces the existing `parquet_roundtrip_int_as_string` test (which was
asserting silent wrong-answer behavior) with `parquet_int_read_as_string_errors`
plus a companion `parquet_string_read_as_int_errors`.
@andygrove andygrove changed the title fix: reject disallowed type promotions in native_datafusion scan fix: align native_datafusion Parquet schema checks with Spark's vectorized reader May 13, 2026
@andygrove
Copy link
Copy Markdown
Member Author

I iterated more on this PR and it now unignores more tests and no longer adds any new ignores - let's see if CI passes

@andygrove andygrove marked this pull request as draft May 13, 2026 15:05
…er to runtime

CI regressed on legitimate Parquet reads where the physical Arrow type is
something other than a basic numeric primitive: FIXED_LEN_BYTE_ARRAY
(FixedSizeBinary) and partitioned iceberg reads (Dictionary / constant /
other) both hit the new "non-BINARY -> string/binary" rejection. Spark
allows those reads.

Narrow the check to fire only when the physical type is BOOLEAN, INT32-class
(Int8/Int16/Int32), INT64, FLOAT, or DOUBLE -- the set with no
"read as string" updater in `ParquetVectorUpdaterFactory`.

Defer to runtime via `RejectOnNonEmpty` so the error flows through the JVM
shim and surfaces as `SchemaColumnConvertNotSupportedException` with the
single-bracket Spark-compatible column format, instead of bubbling out as
`CometNativeException` at plan-build time with the Rust display.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: In progress

2 participants