Skip to content

[GLUTEN-12298][FLINK] Support nexmark source multi-parallelism via ParallelSplit#12304

Open
ggjh-159 wants to merge 5 commits into
apache:mainfrom
ggjh-159:fix/nexmark-source-multi-parallelism
Open

[GLUTEN-12298][FLINK] Support nexmark source multi-parallelism via ParallelSplit#12304
ggjh-159 wants to merge 5 commits into
apache:mainfrom
ggjh-159:fix/nexmark-source-multi-parallelism

Conversation

@ggjh-159

@ggjh-159 ggjh-159 commented Jun 16, 2026

Copy link
Copy Markdown

What changes are proposed in this pull request?

fix: #12298
depends: bigo-sg/velox4j#36bigo-sg/velox#45

This PR consumes the ParallelSplit abstraction introduced by the companion velox4j PR:

  • NexmarkSourceFactory.buildVeloxSource iterates over all per-subtask splits from getSplits(parallelism) and packs them into a single NexmarkConnectorSplit whose subtaskSplits carries one entry per subtask.
  • GlutenSourceFunction.initSession detects ParallelSplit with parallelism > 1 and selects the per-subtask split via getSubtaskSplit(subtaskIndex, totalParallelism); otherwise it behaves as before.

How was this patch tested?

Manual run on a standalone Flink cluster with parallelism.default = 2, nexmark events.num = 10000, tps = 2000, query q0.

Signal Before After
Bid input rows ~18400 (duplicated) 9200 (no duplicates)
dateTime span single timestamp ~5 seconds
Subtask splits both share full range subtask 0 firstEventId=1, maxEvents=5000; subtask 1 firstEventId=5001, maxEvents=5000

@lgbo-ustc

Copy link
Copy Markdown
Contributor

Is there any test to cover this case?

@lgbo-ustc

lgbo-ustc commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

you need to update github workflow config to refer the right velox4j branch/commit

Comment on lines +92 to +94
subtaskSplits.add(
new NexmarkConnectorSplit(
"connector-nexmark", toVeloxGeneratorConfig(generatorConfig), null));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for safty, subtask splits shoud use a non-ParallelSplit class

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. Split into two types:

  • NexmarkConnectorSplit — leaf split, extends ConnectorSplit, only carries
    NexmarkGeneratorConfig config. Serialized across the Java/C++ boundary as before.
  • NexmarkParallelSplit — container, extends ParallelSplit, holds
    List<NexmarkConnectorSplit> subtaskSplits. Only this type implements
    getSubtaskSplit, so a leaf split can no longer be mistaken for a parallel
    container.

NexmarkSourceFactory now builds a NexmarkParallelSplit wrapping the
per-subtask leaf splits, and the instanceof ParallelSplit check in
GlutenSourceFunction cleanly distinguishes container vs leaf.

@github-actions github-actions Bot added the INFRA label Jun 18, 2026
@ggjh-159

Copy link
Copy Markdown
Author

cc @lgbo-ustc UTs were added and the velox4j branch was set.

@ggjh-159 ggjh-159 requested a review from lgbo-ustc June 18, 2026 09:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FLINK] Nexmark Source Behaves Incorrectly Under Multi-Parallelism

2 participants