Skip to content

feat: Granary v2 pipeline — vLLM 0.19.1, throughput metadata export, standardized audio stages#1905

Closed
mohammadaaftabv wants to merge 66 commits intoNVIDIA-NeMo:mmkrtchyan/qwen-omni-inprocessfrom
mohammadaaftabv:aaftabv/standardize-audio-stages
Closed

feat: Granary v2 pipeline — vLLM 0.19.1, throughput metadata export, standardized audio stages#1905
mohammadaaftabv wants to merge 66 commits intoNVIDIA-NeMo:mmkrtchyan/qwen-omni-inprocessfrom
mohammadaaftabv:aaftabv/standardize-audio-stages

Conversation

@mohammadaaftabv
Copy link
Copy Markdown
Contributor

Summary

  • Upgrade vLLM to 0.19.1 with torch==2.10.0 for Qwen3.5 MoE architecture support
  • Add throughput metadata export: per-stage inference timing via _log_metrics() in InferenceQwenOmniStage and PnCRestorationStage, with perf_summary.json aggregate output in ShardedManifestWriterStage
  • Pin dependencies (vllm==0.19.1, torch==2.10.0+cu128, transformers==5.7.0) for stable ABI compatibility
  • Fix config paths (/src/Curator -> /opt/Curator) for Docker image alignment
  • Add pipeline architecture documentation

Changes

  • nemo_curator/stages/audio/inference/qwen_omni.py — add inference timing + utterance count metrics
  • nemo_curator/stages/audio/text_filtering/pnc_restoration.py — add inference timing + restoration count metrics
  • nemo_curator/stages/audio/io/sharded_manifest_writer.py — add perf_summary.json + per-shard _perf.jsonl export
  • tutorials/audio/qwen_omni_inprocess/main.py — log perf summary after pipeline completion
  • pyproject.toml / uv.lock — dependency pins

Test plan

  • Verified pipeline completes successfully on Kratos with 17,197 utterances (25.5 min)
  • Docker image built and pushed with correct dependency versions
  • Verify perf_summary.json output in next Kratos run

Made with Cursor

Nune Tadevosyan and others added 30 commits April 21, 2026 04:48
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
fix
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Meline Mkrtchyan <mmkrtchyan@nvidia.com>
Converts spoken-form text to written form (e.g. "fourteen dollars" → "$14")
using batched vLLM inference with Qwen3.5-35B-A3B-FP8. Runs after PnC
restoration, validates outputs against 3 hallucination checks (word insertion,
novel content, excessive deletion), and falls back to input on failure.

Includes bundled default prompt with 18 ITN conversion rule categories.
Enabled via --enable_itn in run_pipeline.py.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Extract _check_novel_words() to reduce _validate_itn_output complexity (C901)
- Move magic numbers to module constants _MIN_WORDS_FOR_DELETION_CHECK, _MAX_DELETION_RATIO (PLR2004)
- Use or-operator for tensor_parallel_size fallback (FURB110)
- Fix import sorting in run_pipeline.py (I001)
- Apply ruff formatter to both files

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…rompts

Signed-off-by: nithinraok <nithinrao.koluguri@gmail.com>
…ion-test

# Conflicts:
#	examples/audio/qwen_omni_inprocess/run_pipeline.py
…koluguri/integration-test

# Conflicts:
#	examples/audio/qwen_omni_inprocess/run_pipeline.py
#	nemo_curator/models/qwen_omni.py
#	nemo_curator/stages/audio/text_filtering/fasttext_lid.py
Signed-off-by: nithinraok <nithinrao.koluguri@gmail.com>
Signed-off-by: nithinraok <nithinrao.koluguri@gmail.com>
Signed-off-by: nithinraok <nithinrao.koluguri@gmail.com>
Signed-off-by: nithinraok <nithinrao.koluguri@gmail.com>
…c prompt, add prefill caching

Signed-off-by: nithinraok <nithinrao.koluguri@gmail.com>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
- tutorials/audio/README.md: add all 6 tutorials with decision table,
  composability section, and GPU requirements column
- qwen_omni.py, qwen_asr.py, pnc_restoration.py, itn_restoration.py:
  add GPU VRAM, throughput, and model download size to docstrings
- post_processsing_pipeline.py: add --backend flag (xenna/ray_data)
  following the fleurs/ pattern instead of hardcoding XennaExecutor
- granary_v2_postprocessing/README.md: add Parameter Tuning section
  with justification for each default, and Expected Filtering Ratios
- granary_v2_postprocessing/requirements.txt: new file documenting deps
- nemo_curator/stages/audio/README.md: add GPU memory table, throughput
  estimates, and composability guidance for the Qwen Omni pipeline

Signed-off-by: Mohammad Aaftab V <aaftabv@nvidia.com>
- Fix ModuleNotFoundError in segmentation stages by changing import from
  nonexistent backends.experimental.utils to backends.utils
- Fix test_fasttext_lid.py: mock _model.predict() instead of _lid.score_document(),
  add missing source_lang key, use multi-word text for min_word_count threshold
- Fix whisper_hallucination setup() to strip trailing frequency counts from
  phrase files (production format: Thank you 1297 -> Thank you)

Signed-off-by: Mohammad Aaftab V <aaftabv@nvidia.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
… integration)

Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
…dpoint fix

- Add _prefetch_models() for concurrent HF model downloads (ThreadPoolExecutor)
- Fix lambda closure for omni_model in prefetch
- Set s3_endpoint_url to https://pdx.s8k.io for Kratos Swift access
- Add PIPELINE_DEEP_DIVE.md with line-by-line stage breakdown

Signed-off-by: Mohammad Aaftab V <aaftabv@nvidia.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
… docs

NvLLMOps now owns all S3 data staging — Curator only sees local paths.
Nulled s3_endpoint_url in Hydra config and updated PIPELINE_DEEP_DIVE.md
with _prefetch_models docs and NvLLMOps integration notes.

Signed-off-by: Mohammad Aaftab V <aaftabv@nvidia.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
These are runtime dependencies needed by Granary v2 audio stages
(QwenOmni, PnCRestoration, QwenASR) but were missing from the extra.

Signed-off-by: Mohammad Aaftab V <aaftabv@nvidia.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
FastTextLIDStage requires fasttext at runtime but it was only
declared in text_cpu extra, not audio_cuda12.

Signed-off-by: Mohammad Aaftab V <aaftabv@nvidia.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Qwen3.5 MoE architecture (used by PnC model Qwen3.5-35B-A3B-FP8)
requires transformers 5.x. Removed <5.0 cap and relaxed hf-hub
upper bound accordingly.

Signed-off-by: Mohammad Aaftab V <aaftabv@nvidia.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
nemo-toolkit pins transformers<4.58 so we cannot use >=5.0 in
pyproject.toml. NvLLMOps will force-upgrade at runtime instead.

Signed-off-by: Mohammad Aaftab V <aaftabv@nvidia.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
- Add prefix_caching_hash_algo=xxhash to all LLM constructors
- Add languages param to QwenTextLLM for multilingual PnC
- Return skipped_indices from QwenOmni.generate()
- Add empty waveform filtering in QwenASR and QwenOmni
- Set itn_restoration defaults to match reference
- Add source_lang_key to PnCRestorationStage
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
- Remove transformers<5.0 cap from constraint-dependencies
- Remove huggingface-hub<1.0 cap from override-dependencies
- Override nemo-toolkit's transformers<4.58 pin
- Override video_cuda12's torch<=2.9.1 pin
- Point transformers/vllm/huggingface-hub to PyPI (NVIDIA mirror behind)
- Remove torch<=2.9.1 from build dependency-group

Resulting versions:
  vLLM: 0.16.0 -> 0.19.1
  torch: 2.9.1+cu128 -> 2.11.0+cu128
  transformers: 4.57.6 -> 5.7.0
  huggingface-hub: 0.36.2 -> 1.13.0
vLLM 0.19.1 wheels are compiled against torch 2.10.0.
torch 2.11.0 has incompatible C++ ABI causing:
  undefined symbol: _ZN3c1013MessageLoggerC1EPKciib
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
@mohammadaaftabv mohammadaaftabv requested review from a team as code owners May 1, 2026 18:01
@mohammadaaftabv mohammadaaftabv requested review from weijiac0619 and removed request for a team May 1, 2026 18:01
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented May 1, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@mohammadaaftabv
Copy link
Copy Markdown
Contributor Author

Closing in favor of #1881 which targets main.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 1, 2026

Greptile Summary

This PR upgrades vLLM to 0.19.1 (torch 2.10.0) for Qwen3.5 MoE support and introduces a substantial set of new audio pipeline stages: ShardedManifestWriterStage, PnCRestorationStage, InferenceQwenASRStage, QwenTextLLM, plus throughput metadata export via _log_metrics() and perf_summary.json. The dependency constraint changes (removing transformers<5.0 and huggingface-hub<1.0 caps) are intentional and well-commented.

  • P1: PnCRestorationStage.setup_on_node calls self._model.setup(), performing a full vLLM GPU allocation at node level. Unlike InferenceQwenOmniStage.setup_on_node which only prefetches weights, this leaks GPU memory in the node process — in distributed (Xenna) deployment the stage is serialized to workers which each re-allocate the engine independently, risking OOM before the pipeline starts.
  • P2: InferenceQwenOmniStage.skip_me_key defaults to \"_skipme\" while PnCRestorationStage.skip_me_key defaults to \"_skip_me\" — these don't match, so the skip flag set by the omni stage is silently ignored by the PnC stage.

Confidence Score: 3/5

The P1 OOM risk in PnCRestorationStage.setup_on_node should be resolved before merging for distributed/multi-GPU deployments.

One P1 finding (GPU memory double-allocation in distributed setup_on_node) that could silently cause OOM at pipeline start in production Kratos runs; score pulled below the P1 ceiling of 4 due to combined risk with the skip_me_key inconsistency and the large surface area of 70 files.

nemo_curator/stages/audio/text_filtering/pnc_restoration.py (setup_on_node GPU allocation), nemo_curator/stages/audio/inference/qwen_omni.py (skip_me_key default mismatch)

Important Files Changed

Filename Overview
nemo_curator/stages/audio/text_filtering/pnc_restoration.py New PnC restoration stage — setup_on_node performs full GPU model load (P1 OOM risk in distributed), while the pattern from InferenceQwenOmniStage only prefetches weights at node level.
nemo_curator/stages/audio/inference/qwen_omni.py Adds inference timing, language-aware prompt interpolation, and skipped_indices tracking; skip_me_key default differs from PnCRestorationStage causing silent skip-flag misalignment.
nemo_curator/stages/audio/io/sharded_manifest_writer.py New stage — writes per-shard JSONL + perf stats; opens files per-task (no buffering), which is a throughput bottleneck for large pipelines.
nemo_curator/models/qwen_text_llm.py New two-step completeness-check + PnC restoration LLM; clean design with thread-pool preprocessing and proper teardown.
tutorials/audio/qwen_omni_inprocess/main.py Hydra entry point for Granary v2 pipeline; parallel model prefetch is well-implemented; auth token written only to parent-process env may miss spawned vLLM workers.
nemo_curator/stages/audio/inference/qwen_asr.py New Qwen3-ASR inference stage; imports private _LANG_CODE_TO_NAME from sibling module rather than a shared utility location.
pyproject.toml Pins vllm==0.19.1, torch==2.10.0, removes transformers<5.0 cap; transformers>=4.56.0 appears in both constraint-dependencies and override-dependencies (redundant).
nemo_curator/stages/audio/common.py Adds ManifestReader, ManifestReaderStage, ManifestWriterStage, and read_jsonl_manifests helper; well-structured with fsspec for cloud path support.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[NemoTarredAudioReader] --> B[InitializeFieldsStage]
    B --> C[InferenceQwenOmniStage\nvLLM 0.19.1 sets _skipme]
    C --> D{followup_prompt?}
    D -- yes --> E[DisfluencyWerGuardStage]
    D -- no --> F[WhisperHallucinationStage omni]
    E --> F
    F --> G{asr_model_id?}
    G -- yes --> H[InferenceQwenASRStage]
    G -- no --> I[SelectBestPredictionStage]
    H --> H2[WhisperHallucinationStage asr]
    H2 --> I
    I --> J[FastTextLIDStage]
    J --> K[RegexSubstitutionStage]
    K --> L[AbbreviationConcatStage]
    L --> M{skip_pnc?}
    M -- no --> N[PnCRestorationStage\nsetup_on_node allocates GPU P1]
    N --> O[PnCContentGuardStage]
    O --> P{enable_itn?}
    M -- yes --> P
    P -- yes --> Q[ITNRestorationStage]
    P -- no --> R[ShardedManifestWriterStage\nper-shard JSONL + perf_summary.json]
    Q --> R
Loading

Reviews (1): Last reviewed commit: "docs: add Granary v2 pipeline architectu..." | Re-trigger Greptile

Comment on lines +145 to +152
def setup_on_node(
self,
_node_info: NodeInfo | None = None,
_worker_metadata: WorkerMetadata | None = None,
) -> None:
self._model = self._create_model()
self._model.setup()
logger.info("PnCRestoration model ready on node")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 setup_on_node allocates GPU memory — diverges from intended node-level prefetch

setup_on_node calls self._model.setup(), which triggers a full vLLM engine allocation (GPU memory). In contrast, InferenceQwenOmniStage.setup_on_node explicitly only calls snapshot_download and states "no GPU allocation" in its docstring. In the Xenna executor, setup_on_node runs once per node process, and the stage is then serialized to per-worker actors — _model is lost on deserialization, so setup() re-allocates the full engine anyway. The node-level GPU allocation is therefore leaked: GPU memory is consumed and never freed in the node process, while each worker independently allocates again. On a node with multiple pipeline stages, this double allocation can cause OOM before the first batch is processed.

The fix is to align with the pattern used in InferenceQwenOmniStage: call only snapshot_download in setup_on_node and leave full model initialization to setup().

Comment on lines 100 to +102
pred_text_key: str = "qwen3_prediction_s1"
disfluency_text_key: str = "qwen3_prediction_s2"
skip_me_key: str = "_skipme"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 skip_me_key default mismatches PnCRestorationStage

InferenceQwenOmniStage.skip_me_key defaults to "_skipme" (no second underscore), while PnCRestorationStage.skip_me_key defaults to "_skip_me". When InferenceQwenOmniStage marks a task as skipped, PnCRestorationStage reads the wrong key and gets an empty string, treating the task as eligible. In practice the downstream text_key is also empty for such tasks (preventing actual inference), but the skip-flag is silently missed for any stage that relies on it between the two. The defaults should be aligned to a single canonical field name across both stages.

from nemo_curator.stages.base import ProcessingStage
from nemo_curator.stages.resources import Resources
from nemo_curator.tasks import AudioTask

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Importing private symbol from sibling module

_LANG_CODE_TO_NAME is a module-private dict defined in qwen_omni.py and imported here with its private-by-convention name. Sharing private symbols across module boundaries makes the dependency implicit and fragile — renaming or moving the dict in qwen_omni.py silently breaks qwen_asr.py. Consider moving _LANG_CODE_TO_NAME to a shared utility location (e.g., nemo_curator/stages/audio/common.py) and exporting it as a public constant.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Comment on lines +107 to +135
"""Append one task's stage perf chain to the shard's perf JSONL."""
perf_path = os.path.join(self.output_dir, f"{shard_key}_perf.jsonl")
line = {
"task_id": task.task_id,
"stages": _serialize_stage_perf(task._stage_perf or []),
}
with open(perf_path, "a", encoding="utf-8") as f:
f.write(json.dumps(line, ensure_ascii=False) + "\n")

def process(self, task: AudioTask) -> FileGroupTask:
shard_key = task._metadata.get("_shard_key", "unknown/shard_0")

out_path = os.path.join(self.output_dir, f"{shard_key}.jsonl")
os.makedirs(os.path.dirname(out_path), exist_ok=True)

with open(out_path, "a", encoding="utf-8") as f:
f.write(json.dumps(task.data, ensure_ascii=False) + "\n")

if self.write_perf_stats:
self._accumulate_perf(task)
self._write_perf_line(task, shard_key)

self._shard_counts[shard_key] += 1

shard_total = task._metadata.get("_shard_total", 0)
if shard_total > 0 and self._shard_counts[shard_key] >= shard_total:
done_path = os.path.join(self.output_dir, f"{shard_key}.jsonl.done")
with open(done_path, "w") as f:
f.write(f"{self._shard_counts[shard_key]}\n")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Per-task file open/close is a throughput bottleneck

Both process() and _write_perf_line() open and close their respective files for every single AudioTask. On a 17K-utterance pipeline run, this is ~34K syscall pairs per shard file. For high-throughput pipelines this will be a significant bottleneck that can be avoided by buffering writes (e.g., holding an open file handle in stage state after setup_on_node, flushing in teardown). Since num_workers() already returns 1, thread-safety is not a concern.

Comment thread pyproject.toml
Comment on lines 276 to 295
"starlette>=0.49.1", # Address CVE GHSA-7f5h-v6xp-fcq8
"urllib3>=2.6.3", # Address CVE GHSA-38jv-5279-wg99
"wheel>=0.46.2", # Address CVE GHSA-8rrh-rw8j-w5fx
"transformers>=4.56.0,<5.0", # tranfomers>5 breaks with hf_hub<1.0 added in override
"transformers>=4.56.0", # removed <5.0 cap to allow qwen3_5_moe support
]
override-dependencies = [
"apex; sys_platform == 'never'",
"distance; sys_platform == 'never'",
"huggingface-hub>=0.34,<1.0", # Override huggingface-hub, transformers and data-designer require two different versions of hugging-face hub
"huggingface-hub>=0.34", # removed <1.0 cap — vllm 0.17+ / transformers 5.x need hf_hub>=1.0
"transformers>=4.56.0", # Override nemo-toolkit's <4.58 pin — qwen3_5_moe needs newer transformers
"kaldiio; sys_platform == 'never'",
"levenshtein; sys_platform == 'never'",
"numpy>=2.0.0,<=2.2.0", # Override nemo-toolkits constraint of <2.0.0, upperbounds for Numba compatibility
"protobuf>=5.29.5", # Override nemo-toolkits constraint of ~=5.29.5
"setuptools>=80.10.1", # Override setuptools range in other dependencies to address CVE GHSA-58pv-8j8x-9vj2
"nixl-cu12>=0.10.0; (platform_machine == 'x86_64' and platform_system != 'Darwin')", # Override ray[llm]'s unconditional nixl dep for ARM
"xgrammar>=0.1.32", # Override vllm's ==0.1.29 pin to address CVE GHSA-7rgv-gqhr-fxg3 (DoS via multi-layer nesting)
"torch==2.10.0", # Override video_cuda12's <=2.9.1 pin — must match vllm 0.19.1's compiled ABI
]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 transformers>=4.56.0 appears in both constraint-dependencies and override-dependencies

After this PR, transformers>=4.56.0 is listed in both sections. While this probably does not break resolution today (the override supersedes the constraint), it is redundant and makes future version bumps error-prone — a maintainer may update only one entry, causing the two entries to diverge. Consider keeping the transformers pin only in override-dependencies and removing it from constraint-dependencies.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants