Skip to content

[VL] Fix broadcast hash table reuse for reused exchanges#12264

Open
wecharyu wants to merge 5 commits into
apache:mainfrom
wecharyu:fix_shared_bhj_table
Open

[VL] Fix broadcast hash table reuse for reused exchanges#12264
wecharyu wants to merge 5 commits into
apache:mainfrom
wecharyu:fix_shared_bhj_table

Conversation

@wecharyu

@wecharyu wecharyu commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

What changes are proposed in this pull request?

Cache hash table data by droppedDuplicates flag in build side relation because the reused relation could generate different hash table data.

How was this patch tested?

Add UT.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Codex GPT-5.5

@github-actions

github-actions Bot commented Jun 8, 2026

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

}
}

override def doCanonicalizeForBroadcastMode(mode: BroadcastMode): BroadcastMode = {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this part still needed when buildHashTableOncePerExecutor is disabled?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doCanonicalizeForBroadcastMode() is not invoked anywhere, broadcast canonicalization goes through ColumnarBroadcastExchangeExec.doCanonicalize().

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is actually useful — when buildHashTableOncePerExecutor is disabled, it provides more opportunities to reuse broadcast exchanges. Moreover, I now think that even when buildHashTableOncePerExecutor is enabled, the comment in doCanonicalizeForBroadcastMode still holds true: we still broadcast byte arrays and build HashRelation at the executor side.
@JkSelf Can you explain why this was removed? This allows us to reuse broadcast exchanges for different build keys with the same data.
We should either restore the code before ColumnarBroadcastExchangeExec.doCanonicalize, or at least follow the original logic when buildHashTableOncePerExecutor is disabled.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JkSelf Thank you for the explanation, I understand now. @wecharyu According to the instructions here, you can restore the original behavior of doCanonicalizeForBroadcastMode when enableBroadcastBuildOncePerExecutor=false.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

@github-actions

github-actions Bot commented Jun 9, 2026

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@wecharyu wecharyu force-pushed the fix_shared_bhj_table branch from 075dc89 to 7552606 Compare June 9, 2026 16:07
@github-actions

github-actions Bot commented Jun 9, 2026

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

1 similar comment
@github-actions

github-actions Bot commented Jun 9, 2026

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@wecharyu wecharyu force-pushed the fix_shared_bhj_table branch from d7d69eb to d88b181 Compare June 10, 2026 02:08
@github-actions

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@JkSelf

JkSelf commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

@wecharyu Thanks for your work.

For ColumnarBroadcastExchangeExec, reuse is already decided by doCanonicalize(). It now directly uses mode.canonicalized, and that canonicalized BroadcastMode already captures the key BHJ semantics such as bound build keys and null-aware behavior. So we no longer need this handling for the unique broadcastId.

@wecharyu

wecharyu commented Jun 10, 2026

Copy link
Copy Markdown
Contributor Author

@JkSelf Thanks for the prompt response. HashedRelationBroadcastMode does not contains joinType, which could also change the build hash table data. For example in this PR's test the LEFT SEMI JOIN and INNER JOIN would use the same hash table, which cause the test failed before this PR. https://github.com/apache/spark/blob/62ae4db28f3be8a0ca2c3016d27ca5a62f02915d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L1152-L1153

And since current Gluten is still broadcast the raw build plan data bytes instead of hash table,I agree with @liujiayi771 that BroadcastExchangeExec can be reused for the same build plan even when the build keys differ. This reuse avoids broadcasting the same data multiple times.

I think now we can make following improvements:

  1. Move doCanonicalizeForBroadcastMode() back to reuse the broadcast exchange as much as possible.
  2. Associate the constructed hash table with the plan_id, build_keys, drop_duplicates, and null_aware flag, as these attributes uniquely identify the hash table.

@JkSelf

JkSelf commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

HashedRelationBroadcastMode does not contains joinType, which could also change the build hash table data. For example in this PR's test the LEFT SEMI JOIN and INNER JOIN would use the same hash table, which cause the test failed before this PR.

Gluten's implementation is the same as vanilla Spark's. Just out of curiosity, can Spark pass this failed test?

@wecharyu

Copy link
Copy Markdown
Contributor Author

Gluten's implementation is the same as vanilla Spark's. Just out of curiosity, can Spark pass this failed test?

Vanilla Spark does share the same hash relation, but it does not drop duplicates, so the two types of joins will not generate data issue. But Gluten native build hash table will drop duplicates for the LEFT SEMI JOIN, and the hash table is reused by the INNER JOIN, which cause the issue.

@wecharyu wecharyu force-pushed the fix_shared_bhj_table branch from d88b181 to 8e7e9b5 Compare June 16, 2026 05:09
@github-actions

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@wecharyu wecharyu force-pushed the fix_shared_bhj_table branch from 8e7e9b5 to da8f1f6 Compare June 16, 2026 07:24
@github-actions

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@wecharyu

Copy link
Copy Markdown
Contributor Author

@JkSelf @liujiayi771 Pls take a look again when you have time. Thanks!

case class BroadcastHashTable(
pointer: Long,
relation: BuildSideRelation,
droppedDuplicates: Boolean)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to consider ExistenceJoin and null-aware anti join value? Could you help to add tests to cover? Thanks.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark will convert ExistenceJoin to LeftSemiJoin, and null-aware anti join will not be reused because the HashedRelationBroadcastMode's isNullAware is different, add a test to cover this case.

@github-actions

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

bloomFilterPushdownSize: Long,
buildHashTableTimeMetric: Option[SQLMetric] = None)
buildHashTableTimeMetric: Option[SQLMetric] = None) {
def droppedDuplicates: Boolean = {

@liujiayi771 liujiayi771 Jun 17, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also take the filter case into account here?

Native dropDuplicates_ (Velox PlanNode.h#L3247 / Gluten HashTableBuilder) is !withFilter && (semi || anti). With a filter the table keeps duplicates and stores payload columns. Since the join condition isn't part of HashedRelationBroadcastMode, a filtered and a non-filtered semi/anti could reuse the same broadcast exchange — and with the current SEMI || ANTI flag both get droppedDuplicates = true, so one might clone the other's key-only/deduped table.

Maybe droppedDuplicates should also include the !hasJoinFilter condition? Just want to make sure I'm not missing something here. And please add a test case for this as well.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: after a closer look, the "filtered and non-filtered semi reuse the same exchange" scenario is actually not reachable — if the filter references non-key build columns, column pruning makes the build plans differ, preventing reuse; if the filter only references key columns, the plans can match but the hash table structure difference (allowDuplicates flag) doesn't affect the result for semi/anti. So we can drop the test request. That said, aligning with !withFilter is still the right call — it ensures each join gets a hash table built for its actual native configuration rather than relying on coincidental result equivalence.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aligned it with native hash builder.

@JkSelf JkSelf left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for you fix.

@github-actions

Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CORE works for Gluten Core VELOX

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants