feat: Granary v2 pipeline — vLLM 0.19.1, throughput metadata export, standardized audio stages#1905
Conversation
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Meline Mkrtchyan <mmkrtchyan@nvidia.com>
Converts spoken-form text to written form (e.g. "fourteen dollars" → "$14") using batched vLLM inference with Qwen3.5-35B-A3B-FP8. Runs after PnC restoration, validates outputs against 3 hallucination checks (word insertion, novel content, excessive deletion), and falls back to input on failure. Includes bundled default prompt with 18 ITN conversion rule categories. Enabled via --enable_itn in run_pipeline.py. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Extract _check_novel_words() to reduce _validate_itn_output complexity (C901) - Move magic numbers to module constants _MIN_WORDS_FOR_DELETION_CHECK, _MAX_DELETION_RATIO (PLR2004) - Use or-operator for tensor_parallel_size fallback (FURB110) - Fix import sorting in run_pipeline.py (I001) - Apply ruff formatter to both files Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…rompts Signed-off-by: nithinraok <nithinrao.koluguri@gmail.com>
…ion-test # Conflicts: # examples/audio/qwen_omni_inprocess/run_pipeline.py
…koluguri/integration-test # Conflicts: # examples/audio/qwen_omni_inprocess/run_pipeline.py # nemo_curator/models/qwen_omni.py # nemo_curator/stages/audio/text_filtering/fasttext_lid.py
Signed-off-by: nithinraok <nithinrao.koluguri@gmail.com>
Signed-off-by: nithinraok <nithinrao.koluguri@gmail.com>
Signed-off-by: nithinraok <nithinrao.koluguri@gmail.com>
Signed-off-by: nithinraok <nithinrao.koluguri@gmail.com>
…c prompt, add prefill caching Signed-off-by: nithinraok <nithinrao.koluguri@gmail.com>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
Signed-off-by: Nune Tadevosyan <ntadevosyan@cw-dfw-cs-001-login-01.cm.cluster>
- tutorials/audio/README.md: add all 6 tutorials with decision table, composability section, and GPU requirements column - qwen_omni.py, qwen_asr.py, pnc_restoration.py, itn_restoration.py: add GPU VRAM, throughput, and model download size to docstrings - post_processsing_pipeline.py: add --backend flag (xenna/ray_data) following the fleurs/ pattern instead of hardcoding XennaExecutor - granary_v2_postprocessing/README.md: add Parameter Tuning section with justification for each default, and Expected Filtering Ratios - granary_v2_postprocessing/requirements.txt: new file documenting deps - nemo_curator/stages/audio/README.md: add GPU memory table, throughput estimates, and composability guidance for the Qwen Omni pipeline Signed-off-by: Mohammad Aaftab V <aaftabv@nvidia.com>
- Fix ModuleNotFoundError in segmentation stages by changing import from nonexistent backends.experimental.utils to backends.utils - Fix test_fasttext_lid.py: mock _model.predict() instead of _lid.score_document(), add missing source_lang key, use multi-word text for min_word_count threshold - Fix whisper_hallucination setup() to strip trailing frequency counts from phrase files (production format: Thank you 1297 -> Thank you) Signed-off-by: Mohammad Aaftab V <aaftabv@nvidia.com> Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
… integration) Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
…dpoint fix - Add _prefetch_models() for concurrent HF model downloads (ThreadPoolExecutor) - Fix lambda closure for omni_model in prefetch - Set s3_endpoint_url to https://pdx.s8k.io for Kratos Swift access - Add PIPELINE_DEEP_DIVE.md with line-by-line stage breakdown Signed-off-by: Mohammad Aaftab V <aaftabv@nvidia.com> Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
… docs NvLLMOps now owns all S3 data staging — Curator only sees local paths. Nulled s3_endpoint_url in Hydra config and updated PIPELINE_DEEP_DIVE.md with _prefetch_models docs and NvLLMOps integration notes. Signed-off-by: Mohammad Aaftab V <aaftabv@nvidia.com> Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
These are runtime dependencies needed by Granary v2 audio stages (QwenOmni, PnCRestoration, QwenASR) but were missing from the extra. Signed-off-by: Mohammad Aaftab V <aaftabv@nvidia.com> Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
FastTextLIDStage requires fasttext at runtime but it was only declared in text_cpu extra, not audio_cuda12. Signed-off-by: Mohammad Aaftab V <aaftabv@nvidia.com> Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Qwen3.5 MoE architecture (used by PnC model Qwen3.5-35B-A3B-FP8) requires transformers 5.x. Removed <5.0 cap and relaxed hf-hub upper bound accordingly. Signed-off-by: Mohammad Aaftab V <aaftabv@nvidia.com> Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
nemo-toolkit pins transformers<4.58 so we cannot use >=5.0 in pyproject.toml. NvLLMOps will force-upgrade at runtime instead. Signed-off-by: Mohammad Aaftab V <aaftabv@nvidia.com> Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
- Add prefix_caching_hash_algo=xxhash to all LLM constructors - Add languages param to QwenTextLLM for multilingual PnC - Return skipped_indices from QwenOmni.generate() - Add empty waveform filtering in QwenASR and QwenOmni - Set itn_restoration defaults to match reference - Add source_lang_key to PnCRestorationStage
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
- Remove transformers<5.0 cap from constraint-dependencies - Remove huggingface-hub<1.0 cap from override-dependencies - Override nemo-toolkit's transformers<4.58 pin - Override video_cuda12's torch<=2.9.1 pin - Point transformers/vllm/huggingface-hub to PyPI (NVIDIA mirror behind) - Remove torch<=2.9.1 from build dependency-group Resulting versions: vLLM: 0.16.0 -> 0.19.1 torch: 2.9.1+cu128 -> 2.11.0+cu128 transformers: 4.57.6 -> 5.7.0 huggingface-hub: 0.36.2 -> 1.13.0
vLLM 0.19.1 wheels are compiled against torch 2.10.0. torch 2.11.0 has incompatible C++ ABI causing: undefined symbol: _ZN3c1013MessageLoggerC1EPKciib
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
|
Closing in favor of #1881 which targets main. |
Greptile SummaryThis PR upgrades vLLM to 0.19.1 (torch 2.10.0) for Qwen3.5 MoE support and introduces a substantial set of new audio pipeline stages:
Confidence Score: 3/5The P1 OOM risk in PnCRestorationStage.setup_on_node should be resolved before merging for distributed/multi-GPU deployments. One P1 finding (GPU memory double-allocation in distributed setup_on_node) that could silently cause OOM at pipeline start in production Kratos runs; score pulled below the P1 ceiling of 4 due to combined risk with the skip_me_key inconsistency and the large surface area of 70 files. nemo_curator/stages/audio/text_filtering/pnc_restoration.py (setup_on_node GPU allocation), nemo_curator/stages/audio/inference/qwen_omni.py (skip_me_key default mismatch) Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[NemoTarredAudioReader] --> B[InitializeFieldsStage]
B --> C[InferenceQwenOmniStage\nvLLM 0.19.1 sets _skipme]
C --> D{followup_prompt?}
D -- yes --> E[DisfluencyWerGuardStage]
D -- no --> F[WhisperHallucinationStage omni]
E --> F
F --> G{asr_model_id?}
G -- yes --> H[InferenceQwenASRStage]
G -- no --> I[SelectBestPredictionStage]
H --> H2[WhisperHallucinationStage asr]
H2 --> I
I --> J[FastTextLIDStage]
J --> K[RegexSubstitutionStage]
K --> L[AbbreviationConcatStage]
L --> M{skip_pnc?}
M -- no --> N[PnCRestorationStage\nsetup_on_node allocates GPU P1]
N --> O[PnCContentGuardStage]
O --> P{enable_itn?}
M -- yes --> P
P -- yes --> Q[ITNRestorationStage]
P -- no --> R[ShardedManifestWriterStage\nper-shard JSONL + perf_summary.json]
Q --> R
Reviews (1): Last reviewed commit: "docs: add Granary v2 pipeline architectu..." | Re-trigger Greptile |
| def setup_on_node( | ||
| self, | ||
| _node_info: NodeInfo | None = None, | ||
| _worker_metadata: WorkerMetadata | None = None, | ||
| ) -> None: | ||
| self._model = self._create_model() | ||
| self._model.setup() | ||
| logger.info("PnCRestoration model ready on node") |
There was a problem hiding this comment.
setup_on_node allocates GPU memory — diverges from intended node-level prefetch
setup_on_node calls self._model.setup(), which triggers a full vLLM engine allocation (GPU memory). In contrast, InferenceQwenOmniStage.setup_on_node explicitly only calls snapshot_download and states "no GPU allocation" in its docstring. In the Xenna executor, setup_on_node runs once per node process, and the stage is then serialized to per-worker actors — _model is lost on deserialization, so setup() re-allocates the full engine anyway. The node-level GPU allocation is therefore leaked: GPU memory is consumed and never freed in the node process, while each worker independently allocates again. On a node with multiple pipeline stages, this double allocation can cause OOM before the first batch is processed.
The fix is to align with the pattern used in InferenceQwenOmniStage: call only snapshot_download in setup_on_node and leave full model initialization to setup().
| pred_text_key: str = "qwen3_prediction_s1" | ||
| disfluency_text_key: str = "qwen3_prediction_s2" | ||
| skip_me_key: str = "_skipme" |
There was a problem hiding this comment.
skip_me_key default mismatches PnCRestorationStage
InferenceQwenOmniStage.skip_me_key defaults to "_skipme" (no second underscore), while PnCRestorationStage.skip_me_key defaults to "_skip_me". When InferenceQwenOmniStage marks a task as skipped, PnCRestorationStage reads the wrong key and gets an empty string, treating the task as eligible. In practice the downstream text_key is also empty for such tasks (preventing actual inference), but the skip-flag is silently missed for any stage that relies on it between the two. The defaults should be aligned to a single canonical field name across both stages.
| from nemo_curator.stages.base import ProcessingStage | ||
| from nemo_curator.stages.resources import Resources | ||
| from nemo_curator.tasks import AudioTask | ||
|
|
There was a problem hiding this comment.
Importing private symbol from sibling module
_LANG_CODE_TO_NAME is a module-private dict defined in qwen_omni.py and imported here with its private-by-convention name. Sharing private symbols across module boundaries makes the dependency implicit and fragile — renaming or moving the dict in qwen_omni.py silently breaks qwen_asr.py. Consider moving _LANG_CODE_TO_NAME to a shared utility location (e.g., nemo_curator/stages/audio/common.py) and exporting it as a public constant.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| """Append one task's stage perf chain to the shard's perf JSONL.""" | ||
| perf_path = os.path.join(self.output_dir, f"{shard_key}_perf.jsonl") | ||
| line = { | ||
| "task_id": task.task_id, | ||
| "stages": _serialize_stage_perf(task._stage_perf or []), | ||
| } | ||
| with open(perf_path, "a", encoding="utf-8") as f: | ||
| f.write(json.dumps(line, ensure_ascii=False) + "\n") | ||
|
|
||
| def process(self, task: AudioTask) -> FileGroupTask: | ||
| shard_key = task._metadata.get("_shard_key", "unknown/shard_0") | ||
|
|
||
| out_path = os.path.join(self.output_dir, f"{shard_key}.jsonl") | ||
| os.makedirs(os.path.dirname(out_path), exist_ok=True) | ||
|
|
||
| with open(out_path, "a", encoding="utf-8") as f: | ||
| f.write(json.dumps(task.data, ensure_ascii=False) + "\n") | ||
|
|
||
| if self.write_perf_stats: | ||
| self._accumulate_perf(task) | ||
| self._write_perf_line(task, shard_key) | ||
|
|
||
| self._shard_counts[shard_key] += 1 | ||
|
|
||
| shard_total = task._metadata.get("_shard_total", 0) | ||
| if shard_total > 0 and self._shard_counts[shard_key] >= shard_total: | ||
| done_path = os.path.join(self.output_dir, f"{shard_key}.jsonl.done") | ||
| with open(done_path, "w") as f: | ||
| f.write(f"{self._shard_counts[shard_key]}\n") |
There was a problem hiding this comment.
Per-task file open/close is a throughput bottleneck
Both process() and _write_perf_line() open and close their respective files for every single AudioTask. On a 17K-utterance pipeline run, this is ~34K syscall pairs per shard file. For high-throughput pipelines this will be a significant bottleneck that can be avoided by buffering writes (e.g., holding an open file handle in stage state after setup_on_node, flushing in teardown). Since num_workers() already returns 1, thread-safety is not a concern.
| "starlette>=0.49.1", # Address CVE GHSA-7f5h-v6xp-fcq8 | ||
| "urllib3>=2.6.3", # Address CVE GHSA-38jv-5279-wg99 | ||
| "wheel>=0.46.2", # Address CVE GHSA-8rrh-rw8j-w5fx | ||
| "transformers>=4.56.0,<5.0", # tranfomers>5 breaks with hf_hub<1.0 added in override | ||
| "transformers>=4.56.0", # removed <5.0 cap to allow qwen3_5_moe support | ||
| ] | ||
| override-dependencies = [ | ||
| "apex; sys_platform == 'never'", | ||
| "distance; sys_platform == 'never'", | ||
| "huggingface-hub>=0.34,<1.0", # Override huggingface-hub, transformers and data-designer require two different versions of hugging-face hub | ||
| "huggingface-hub>=0.34", # removed <1.0 cap — vllm 0.17+ / transformers 5.x need hf_hub>=1.0 | ||
| "transformers>=4.56.0", # Override nemo-toolkit's <4.58 pin — qwen3_5_moe needs newer transformers | ||
| "kaldiio; sys_platform == 'never'", | ||
| "levenshtein; sys_platform == 'never'", | ||
| "numpy>=2.0.0,<=2.2.0", # Override nemo-toolkits constraint of <2.0.0, upperbounds for Numba compatibility | ||
| "protobuf>=5.29.5", # Override nemo-toolkits constraint of ~=5.29.5 | ||
| "setuptools>=80.10.1", # Override setuptools range in other dependencies to address CVE GHSA-58pv-8j8x-9vj2 | ||
| "nixl-cu12>=0.10.0; (platform_machine == 'x86_64' and platform_system != 'Darwin')", # Override ray[llm]'s unconditional nixl dep for ARM | ||
| "xgrammar>=0.1.32", # Override vllm's ==0.1.29 pin to address CVE GHSA-7rgv-gqhr-fxg3 (DoS via multi-layer nesting) | ||
| "torch==2.10.0", # Override video_cuda12's <=2.9.1 pin — must match vllm 0.19.1's compiled ABI | ||
| ] | ||
|
|
There was a problem hiding this comment.
transformers>=4.56.0 appears in both constraint-dependencies and override-dependencies
After this PR, transformers>=4.56.0 is listed in both sections. While this probably does not break resolution today (the override supersedes the constraint), it is redundant and makes future version bumps error-prone — a maintainer may update only one entry, causing the two entries to diverge. Consider keeping the transformers pin only in override-dependencies and removing it from constraint-dependencies.
Summary
_log_metrics()inInferenceQwenOmniStageandPnCRestorationStage, withperf_summary.jsonaggregate output inShardedManifestWriterStageChanges
nemo_curator/stages/audio/inference/qwen_omni.py— add inference timing + utterance count metricsnemo_curator/stages/audio/text_filtering/pnc_restoration.py— add inference timing + restoration count metricsnemo_curator/stages/audio/io/sharded_manifest_writer.py— addperf_summary.json+ per-shard_perf.jsonlexporttutorials/audio/qwen_omni_inprocess/main.py— log perf summary after pipeline completionpyproject.toml/uv.lock— dependency pinsTest plan
perf_summary.jsonoutput in next Kratos runMade with Cursor