Add filtration and PnCWithLLM stages for audio tagging pipeline#1863
Add filtration and PnCWithLLM stages for audio tagging pipeline#1863sushmitha-deva-09 wants to merge 112 commits intoNVIDIA-NeMo:mainfrom
Conversation
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
|
/ok to test cc0e8b5 |
| def setup_on_node( | ||
| self, _node_info: NodeInfo | None = None, _worker_metadata: WorkerMetadata | None = None | ||
| ) -> None: | ||
| """Pre-download model weights and tokenizer (called once per node).""" | ||
| self._vllm.setup_on_node() | ||
|
|
||
| def setup(self, _worker_metadata: WorkerMetadata | None = None) -> None: | ||
| """Load tokenizer and vLLM engine (called once per worker).""" | ||
| self._vllm.setup() |
There was a problem hiding this comment.
setup() and setup_on_node() crash after teardown()
teardown() sets self._vllm = None. Both setup() (line 258) and setup_on_node() (line 254) then call self._vllm.setup() / self._vllm.setup_on_node() with no null guard. If the pipeline framework ever calls setup() a second time after teardown (e.g., on worker restart or pipeline retry), both methods raise AttributeError: 'NoneType' object has no attribute 'setup'. A guard like if self._vllm is None: return — or re-constructing self._vllm — would prevent the crash.
| return [] | ||
|
|
||
| try: | ||
| return self.tokenizer.apply_chat_template(entry_chat, **self.chat_template_params) | ||
| except (TypeError, ValueError, KeyError, RuntimeError) as e: | ||
| logger.error(f"Error applying chat template: {e}") | ||
| return entry_chat |
There was a problem hiding this comment.
Fallback returns
entry_chat (list of dicts) when use_chat_api=False
When apply_chat_template raises, the except block returns the raw entry_chat (a list[dict]) instead of a formatted string. When use_chat_api=False, process_batch routes to llm.generate(), which expects string prompts. A list-of-dicts prompt in entry_prompts causes a TypeError inside llm.generate(), re-raised as RuntimeError("Generation failed"), aborting the entire batch — the opposite of the graceful-degradation goal of this fallback. For the use_chat_api=False path, the fallback should return [] (to signal a skip) rather than the raw chat dict.
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
|
/ok to test 7772fa6 |
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
|
/ok to test a78cce7 |
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
|
/ok to test 6efc74e |
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
| language: str = "en" | ||
| hypothesis_text_key: str = "text" | ||
| reference_text_key: str = "text" | ||
| num_words_threshold: int = 200 | ||
| num_words_look_back: int = 5 | ||
| compute_pnc_wer: bool = False | ||
| pnc_chars: str = "،؟.、?¿!,?।" # noqa: RUF001 | ||
| edge_length: int = 12 |
There was a problem hiding this comment.
Same default for
hypothesis_text_key and reference_text_key silently produces zero WER
Both parameters default to "text", so unless a caller explicitly sets different keys, hypothesis_clean and reference_clean will be identical strings and every WER/CER metric will always be 0.0. There is no warning or validation that the two keys are distinct, so pipelines using the defaults will silently produce useless metrics without any indication that something is wrong.
| SamplingParams = None | ||
|
|
||
|
|
||
| class VLLMInference: |
There was a problem hiding this comment.
Since https://github.com/NVIDIA-NeMo/Curator/blob/main/nemo_curator/models/vllm_model.py is supposed to be as generic as possible, let's try to use it here.
| cfg.hf_token = os.getenv("HF_TOKEN", "") | ||
|
|
||
| pipeline = create_pipeline_from_yaml(cfg) | ||
| executor = XennaExecutor(config={"execution_mode": "batch"}) |
| text_key1: text | ||
| text_key2: text_2 | ||
| resources: | ||
| cpus: 2 |
There was a problem hiding this comment.
|
|
||
| - _target_: nemo_curator.stages.audio.tagging.metrics.bandwidth.BandwidthEstimationStage | ||
| name: "BandwidthEstimation" | ||
| resources: ${resources} |
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
| for (task_idx, key_path), output in zip(result_map, outputs, strict=True): | ||
| self._store_generation(tasks[task_idx].data, key_path, output.outputs[0].text, self.generation_field) |
There was a problem hiding this comment.
IndexError on empty vLLM outputs
output.outputs[0].text raises IndexError if vLLM returns an empty outputs list for a request. This can occur when a request is silently dropped or aborted (e.g., due to an OOM on a single request in a large batch). The sibling VLLMModel.generate() explicitly guards against this with out.outputs[0].text if out.outputs else "", but PNCwithvLLMInferenceStage.process_batch() has no equivalent guard, so a single failed vLLM request aborts generation for all remaining tasks in the batch via an unhandled IndexError.
There was a problem hiding this comment.
+1
This assumes every vLLM result has at least one completion in output.outputs. zip(..., strict=True) catches a result-count mismatch, but it does not protect against an individual RequestOutput(outputs=[]), which would raise IndexError here.
Can we add a small helper to safely extract generated text and either fail loudly with task/audio id + prompt index, or mark the segment with something like pnc_generation_error? This should also have tests for empty completions and partial/mismatched vLLM outputs.
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
|
/ok to test 8eece4b |
| def load_model(self) -> None: | ||
| """Instantiate the ``vllm.LLM`` engine.""" | ||
| os.environ["VLLM_USE_V1"] = "0" |
There was a problem hiding this comment.
load_model() unconditionally sets VLLM_USE_V1=0, silently overriding any user- or system-level environment variable. The import-time os.environ.setdefault(...) correctly preserves user overrides, but this second, unconditional assignment undoes that guarantee. A user who exports VLLM_USE_V1=1 for compatibility with a specific vLLM build will have it silently clobbered on every load_model() call, causing hard-to-diagnose engine-initialisation failures.
| def load_model(self) -> None: | |
| """Instantiate the ``vllm.LLM`` engine.""" | |
| os.environ["VLLM_USE_V1"] = "0" | |
| def load_model(self) -> None: | |
| """Instantiate the ``vllm.LLM`` engine.""" | |
| os.environ.setdefault("VLLM_USE_V1", "0") |
| def teardown(self) -> None: | ||
| if self._vllm is not None: | ||
| self._vllm.clean_up() | ||
| self._vllm = None |
There was a problem hiding this comment.
teardown() nullifies _vllm, breaking subsequent setup() calls
teardown() sets self._vllm = None. Both setup_on_node() (line 107) and setup() (line 111) call self._vllm.setup_on_node() / self._vllm.setup() with no null-guard. If the pipeline framework calls setup() after teardown() — for example on a worker restart or pipeline retry — both methods raise AttributeError: 'NoneType' object has no attribute 'setup'. Similarly, process_batch() enters _collect_prompts() which calls self._vllm.get_entry_prompt(...) after teardown, crashing on the same null reference.
| for (task_idx, key_path), output in zip(result_map, outputs, strict=True): | ||
| self._store_generation(tasks[task_idx].data, key_path, output.outputs[0].text, self.generation_field) |
There was a problem hiding this comment.
IndexError on empty vLLM outputs list and strict=True zip length mismatch
Two failure paths converge here. First, output.outputs[0].text raises IndexError when vLLM returns an empty outputs list for a request (e.g., due to an OOM on a single request in a large batch). Second, _collect_prompts appends (prompt, key_path) unconditionally even when get_entry_prompt returns [] on failure; those empty-list entries may be dropped or rejected by vLLM silently, making len(outputs) < len(result_map), which causes zip(..., strict=True) to raise ValueError and abort the entire task batch. The sibling VLLMModel.generate() guards against the empty-outputs case with out.outputs[0].text if out.outputs else "".
| for segment in data_entry["segments"]: | ||
| duration = segment["end"] - segment["start"] |
There was a problem hiding this comment.
KeyError when start/end are absent from a segment
segment["end"] - segment["start"] is evaluated before the hypothesis_text_key / reference_text_key guard on line 168. Any segment that passes inputs() validation but lacks "start" or "end" raises KeyError here, aborting processing for all remaining segments in the entry rather than just skipping the malformed one. Using .get("end", 0) / .get("start", 0) plus a if duration <= 0: continue guard would prevent the hard crash.
| logger.error(f"Failed to load audio path: {audio_path}, exception={ex}") | ||
| return task | ||
| segments = data_entry.get("segments", []) | ||
|
|
||
| for segment in segments: | ||
| if segment.get("speaker") == "no-speaker" or segment.get("text", "").strip() == "": | ||
| continue |
There was a problem hiding this comment.
Zero-length segment crashes
_estimate_bandwidth
There is no guard for start >= end before slicing audio and passing the result to _estimate_bandwidth. When start == end, audio_segment is an empty NumPy array; librosa.stft on a zero-length input raises ParameterError, which propagates uncaught through process() and aborts the entire entry. squim.py explicitly checks if num_frames <= 0: continue for the same scenario — the same guard is needed here.
|
/ok to test a94ee81 |
| start = segment["start"] | ||
| end = segment["end"] |
There was a problem hiding this comment.
Direct
segment["start"]/segment["end"] access raises KeyError for malformed segments
start = segment["start"] and end = segment["end"] on lines 117–118 are unguarded direct accesses. If a segment in the pipeline lacks either key, a KeyError propagates outside the inner try/except block (lines 134–149), aborting processing for ALL remaining segments in the entry. The num_frames <= 0 guard on line 124 is present for zero-length protection, but does not help when the keys are absent entirely. Using segment.get("start", 0) / segment.get("end", 0) with an early continue when start >= end (consistent with the zero-length guard) would prevent the hard crash.
| if chunk_end < len(words) and any(c.isdigit() for c in words[chunk_end]): | ||
| shorter_strings.append( | ||
| " ".join(prev_string + words[chunk_start : chunk_end - self.num_words_look_back]) | ||
| ) | ||
| prev_string = words[chunk_end - self.num_words_look_back : chunk_end] | ||
| else: | ||
| shorter_strings.append(" ".join(prev_string + words[chunk_start:chunk_end])) | ||
| prev_string = [] | ||
| remainder_start = chunk_end |
There was a problem hiding this comment.
Text silently dropped when
num_words_look_back >= num_words_threshold
When num_words_look_back is set greater than or equal to num_words_threshold, chunk_end - self.num_words_look_back becomes zero or negative. The slice words[chunk_start : non_positive_index] is empty, so the entire content of that chunk is silently discarded from shorter_strings. The prev_string carry-over from this iteration then bleeds into the next chunk, producing incorrect normalized text with no warning or error. A validation at construction time (if self.num_words_look_back >= self.num_words_threshold) would surface this misconfiguration immediately.
| start = segment["start"] | ||
| end = segment["end"] |
There was a problem hiding this comment.
Direct
segment["start"]/segment["end"] access raises KeyError for incomplete segments
segment["start"] and segment["end"] at lines 102–103 are unguarded direct dictionary accesses. Any segment missing these keys raises KeyError, which propagates uncaught through process() and aborts all remaining segments in the entry. squim.py has the same pattern. Both stages should use .get("start", 0) / .get("end", 0) with an early continue when start >= end (matching squim.py's own zero-length guard logic) to remain consistent and resilient to malformed inputs.
| for (task_idx, key_path), output in zip(result_map, outputs, strict=True): | ||
| self._store_generation(tasks[task_idx].data, key_path, output.outputs[0].text, self.generation_field) |
There was a problem hiding this comment.
+1
This assumes every vLLM result has at least one completion in output.outputs. zip(..., strict=True) catches a result-count mismatch, but it does not protect against an individual RequestOutput(outputs=[]), which would raise IndexError here.
Can we add a small helper to safely extract generated text and either fail loudly with task/audio id + prompt index, or mark the segment with something like pnc_generation_error? This should also have tests for empty completions and partial/mismatched vLLM outputs.
| metrics["char_rate"] = self.get_char_rate(segment[self.hypothesis_text_key], duration) | ||
| metrics["word_rate"] = self.get_word_rate(segment[self.hypothesis_text_key], duration) | ||
|
|
||
| wer_val, tokens, ins_rate, del_rate, sub_rate = word_error_rate_detail( |
There was a problem hiding this comment.
This checks that the raw hypothesis/reference keys exist, but the normalized strings can still be empty. For example, empty text, whitespace-only text, punctuation-only text, or text stripped by the normalizer can produce hypothesis_clean == "" or reference_clean == "".
Can we guard empty normalized hypothesis/reference before calling word_error_rate_detail()? I think the stage should either skip the metric with an explicit metric_skip_reason or write None for WER/CER, rather than relying on the WER helper's behavior for empty inputs. It would be good to add tests for empty, whitespace-only, punctuation-only, and normalized-to-empty cases.
Proposed Fix
Before each WER/CER call, explicitly check normalized strings.
For example:
if not reference_clean.strip() or not hypothesis_clean.strip():
metrics["wer"] = None
metrics["cer"] = None
metrics["metric_skip_reason"] = "empty_normalized_text"
segment["metrics"] = metrics
continue
Or use a helper:
safe_word_error_rate_detail(...)
that returns None/skip metadata for empty refs/hyps.
There was a problem hiding this comment.
Even if one text is empty, we'll need to call the wer helper method and save all insertion, deletion rate values.
| pipeline.add_stage(BandwidthEstimationStage(name="BandwidthEstimation").with_(resources=Resources(cpus=cpus))) | ||
|
|
||
| # Audio quality metrics (PESQ, STOI, SI-SDR) | ||
| pipeline.add_stage(TorchSquimQualityMetricsStage(name="SquimMetrics").with_(resources=Resources(gpus=0.05))) |
There was a problem hiding this comment.
Can we add a short comment that these fractional GPU settings are benchmark-specific empirical values, not general production defaults? They may be valid for this single-GPU benchmark, but values like 0.05 GPU for SQUIM are hardware/workload dependent and could be misleading if copied into other pipeline configs.
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
|
/ok to test da03db0 |
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
|
/ok to test fde3e80 |
There was a problem hiding this comment.
Can this be integrated with https://github.com/NVIDIA-NeMo/Curator/blob/main/nemo_curator/stages/audio/metrics/get_wer.py ?
| return self._llm.get_tokenizer() | ||
|
|
||
|
|
||
| class VLLMInference: |
There was a problem hiding this comment.
I guess I am wondering what the overlaps of VLLMModel and VLLMInference are? Like is there some parent class we should create for them to share?
Description
Add new audio tagging pipeline stages for text processing, quality metrics, and LLM-based Punctuation & Capitalization (PnC).
New Stages
Other Changes
Tests
Checklist