feat: add experimental support for Spark regexp expressions via JVM UDF framework#4239
feat: add experimental support for Spark regexp expressions via JVM UDF framework#4239andygrove wants to merge 49 commits into
Conversation
Also fix CometArrayExpressionSuite compilation by qualifying the Spark udf() call, which was shadowed by the new org.apache.comet.udf package.
Implements a DataFusion PhysicalExpr that evaluates child expressions, exports the results as Arrow FFI arrays, calls CometUdfBridge.evaluate() via JNI, and imports the output array. Adds datafusion-comet-jni-bridge as a dependency of the spark-expr crate.
…UDF class via context classloader Wrap the JNI body in try/finally so input ValueVectors and the result vector are always closed, even when the UDF or arrow export throws. Resolve the CometUDF class through the thread context classloader so user-supplied UDF jars (added via spark.jars) are visible from the bridge.
…ns fall back to Spark When routing RLike through the JVM UDF, reject Literal(null) and patterns that fail Pattern.compile during planning. Both cases now produce withInfo + None, letting Spark evaluate the expression instead of crashing the executor task with PatternSyntaxException or NullPointerException.
Make comet_udf_bridge an Option in JVMClasses so a missing org.apache.comet.udf.CometUdfBridge class (e.g. shading dropped org.apache.comet.udf.*) no longer crashes executor JVM init. The JVM-UDF dispatch path returns a clear ExecutionError when the bridge is unavailable. Also clarify the FFI lifetime contract on the result import.
Replace string literals "rust"/"java" used for the regexp engine selector with named constants on CometConf. Tighten CometRLike.getSupportLevel so it only reports Compatible(None) when the pattern is a Literal, matching the actual constraint enforced by the convert path.
Literal-folded children no longer get expanded to batch-row count before crossing JNI; ColumnarValue::Scalar is materialized at length 1, avoiding an O(rows) copy of values that never vary across the batch. Document the contract on CometUDF: scalar inputs arrive as length-1 vectors, vector inputs at the batch row count, and the result must match the longest input.
# Conflicts: # common/src/main/java/org/apache/comet/udf/CometUdfBridge.java # common/src/main/scala/org/apache/comet/udf/CometUDF.scala # native/jni-bridge/src/lib.rs # native/spark-expr/src/jvm_udf/mod.rs
The bridge now caches a single shared CometUDF instance per class across native worker threads, so UDF instance state must be thread-safe. Replace the per-instance LinkedHashMap LRU (which mutates on get when accessOrder is true) with ConcurrentHashMap.
Atomic and shorter. Avoids two threads compiling the same pattern under contention.
|
Marking this as draft while we discuss and compare with #4267. |
|
@mbutrovich I'm going to start reviewing your PR, but it is 7k LOC and there is lot in there so it may take some time. I have a couple of questions:
|
The native Rust engine remains the default. The JVM-backed engine is available as an experimental opt-in for workloads that need 100% Java-regex-compatible semantics.
|
|
That does raise the issue of how aggressively I should review these if they'll be dead code soon. There are number of optimizations we're leaving on the table, but maybe that doesn't matter right now. |
My argument for allowing this PR in would be that it adds all the regression tests and gives us an immediate performance win today. The refactor to update the UDF implementations to use the new framework should be a net reduction in line count I am assuming and easy to review? |
Likely not due to the codegen work I've done, but we'll cross that bridge when we come to it. If you mark this ready for review today I'll review it. |
These SQL test files exercise Java-only regex features (backreferences, lookahead, embedded flags) and previously relied on the JVM engine being the default. After the default reverted to the Rust engine, they need to explicitly opt in via spark.comet.exec.regexp.engine=java.
# Conflicts: # spark/src/main/scala/org/apache/comet/serde/strings.scala
|
@mbutrovich following on from our discussion about configs yesterday, I filed an issue where we can have that discussion. #4310 |
Which issue does this PR close?
Closes #.
Rationale for this change
This PR extends the JVM UDF framework (introduced in #4232) to support all Spark regular expression functions with full Java regex compatibility. The native Rust engine remains the default; the JVM-backed engine is offered as an experimental opt-in for workloads that need 100% Java-regex-compatible semantics (backreferences, lookaround, embedded flags, etc.) at the cost of a JNI round-trip per batch.
What changes are included in this PR?
regexp_extract,regexp_extract_all,regexp_instr,regexp_replace, andsplitspark.comet.exec.regexp.engineconfig (defaultrust) that selects between the native Rust engine and the experimental Java engine*_rust.sql,*_rust_enabled.sql,*_java.sql)docs/source/user-guide/latest/compatibility/regex.md, including a note that the Java engine is experimental and may changeHow are these changes tested?
CometRegExpJvmSuite: 45 tests covering all regexp expressions with the Java engine explicitly enabledCometExpressionSuiteandCometStringExpressionSuitetests continue to pass