Skip to content

Add filtration and PnCWithLLM stages for audio tagging pipeline#1863

Open
sushmitha-deva-09 wants to merge 112 commits intoNVIDIA-NeMo:mainfrom
sushmitha-deva-09:audio_core_2
Open

Add filtration and PnCWithLLM stages for audio tagging pipeline#1863
sushmitha-deva-09 wants to merge 112 commits intoNVIDIA-NeMo:mainfrom
sushmitha-deva-09:audio_core_2

Conversation

@sushmitha-deva-09
Copy link
Copy Markdown
Contributor

@sushmitha-deva-09 sushmitha-deva-09 commented Apr 23, 2026

Description

Add new audio tagging pipeline stages for text processing, quality metrics, and LLM-based Punctuation & Capitalization (PnC).

New Stages

  • PNCwithvLLMInferenceStage / CleanLLMOutputStage — LLM-powered PnC via vLLM with CER-based validation and BERT fallback
  • VLLMInference — Reusable vLLM lifecycle helper (weight pre-download, engine setup, cleanup)
  • ComputeWER/CER/SQUIM/BandwidthStage — Transcript and audio quality metrics
  • PrepareModuleSegmentsStage — Punctuation-based segmentation for training-ready chunks
  • ITN / ArabicRemoveDiacritics / ChineseConversion — Text normalization stages

Other Changes

  • Moved sortformer.py to inference/speaker_diarization/
  • Added nemo_text_processing, opencc-python-reimplemented, pyarabic deps
  • Force VLLM_USE_V1=0 to prevent Ray actor conflicts
  • Expanded tutorials/audio/tagging/README.md with pipeline diagram and PnC guide

Tests

  • Unit tests for all new stages (51+ tests)
  • GPU E2E tests: PnC LLM (simple, first-pass, second-pass ASR) and full ASR pipeline
  • Reference manifests for deterministic validation

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
@sushmitha-deva-09
Copy link
Copy Markdown
Contributor Author

/ok to test cc0e8b5

Comment on lines +250 to +258
def setup_on_node(
self, _node_info: NodeInfo | None = None, _worker_metadata: WorkerMetadata | None = None
) -> None:
"""Pre-download model weights and tokenizer (called once per node)."""
self._vllm.setup_on_node()

def setup(self, _worker_metadata: WorkerMetadata | None = None) -> None:
"""Load tokenizer and vLLM engine (called once per worker)."""
self._vllm.setup()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 setup() and setup_on_node() crash after teardown()

teardown() sets self._vllm = None. Both setup() (line 258) and setup_on_node() (line 254) then call self._vllm.setup() / self._vllm.setup_on_node() with no null guard. If the pipeline framework ever calls setup() a second time after teardown (e.g., on worker restart or pipeline retry), both methods raise AttributeError: 'NoneType' object has no attribute 'setup'. A guard like if self._vllm is None: return — or re-constructing self._vllm — would prevent the crash.

Comment on lines +186 to +192
return []

try:
return self.tokenizer.apply_chat_template(entry_chat, **self.chat_template_params)
except (TypeError, ValueError, KeyError, RuntimeError) as e:
logger.error(f"Error applying chat template: {e}")
return entry_chat
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Fallback returns entry_chat (list of dicts) when use_chat_api=False

When apply_chat_template raises, the except block returns the raw entry_chat (a list[dict]) instead of a formatted string. When use_chat_api=False, process_batch routes to llm.generate(), which expects string prompts. A list-of-dicts prompt in entry_prompts causes a TypeError inside llm.generate(), re-raised as RuntimeError("Generation failed"), aborting the entire batch — the opposite of the graceful-degradation goal of this fallback. For the use_chat_api=False path, the fallback should return [] (to signal a skip) rather than the raw chat dict.

Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Comment thread nemo_curator/stages/audio/tagging/text/pnc.py
Comment thread nemo_curator/stages/audio/tagging/metrics/wer.py
@sushmitha-deva-09
Copy link
Copy Markdown
Contributor Author

/ok to test 7772fa6

Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
@sushmitha-deva-09
Copy link
Copy Markdown
Contributor Author

/ok to test a78cce7

Comment thread nemo_curator/stages/audio/tagging/split.py Outdated
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
@sushmitha-deva-09
Copy link
Copy Markdown
Contributor Author

/ok to test 6efc74e

Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Comment on lines +50 to +57
language: str = "en"
hypothesis_text_key: str = "text"
reference_text_key: str = "text"
num_words_threshold: int = 200
num_words_look_back: int = 5
compute_pnc_wer: bool = False
pnc_chars: str = "،؟.、?¿!,?।" # noqa: RUF001
edge_length: int = 12
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Same default for hypothesis_text_key and reference_text_key silently produces zero WER

Both parameters default to "text", so unless a caller explicitly sets different keys, hypothesis_clean and reference_clean will be identical strings and every WER/CER metric will always be 0.0. There is no warning or validation that the two keys are distinct, so pipelines using the defaults will silently produce useless metrics without any indication that something is wrong.

Comment thread benchmarking/scripts/audio_tagging_benchmark.py Outdated
SamplingParams = None


class VLLMInference:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since https://github.com/NVIDIA-NeMo/Curator/blob/main/nemo_curator/models/vllm_model.py is supposed to be as generic as possible, let's try to use it here.

Comment thread nemo_curator/stages/audio/tagging/text/pnc.py Outdated
Comment thread nemo_curator/stages/audio/tagging/text/pnc.py
cfg.hf_token = os.getenv("HF_TOKEN", "")

pipeline = create_pipeline_from_yaml(cfg)
executor = XennaExecutor(config={"execution_mode": "batch"})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bumping this.

text_key1: text
text_key2: text_2
resources:
cpus: 2
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


- _target_: nemo_curator.stages.audio.tagging.metrics.bandwidth.BandwidthEstimationStage
name: "BandwidthEstimation"
resources: ${resources}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

Comment thread tutorials/audio/tagging/README.md
Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
Comment on lines +173 to +174
for (task_idx, key_path), output in zip(result_map, outputs, strict=True):
self._store_generation(tasks[task_idx].data, key_path, output.outputs[0].text, self.generation_field)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 IndexError on empty vLLM outputs

output.outputs[0].text raises IndexError if vLLM returns an empty outputs list for a request. This can occur when a request is silently dropped or aborted (e.g., due to an OOM on a single request in a large batch). The sibling VLLMModel.generate() explicitly guards against this with out.outputs[0].text if out.outputs else "", but PNCwithvLLMInferenceStage.process_batch() has no equivalent guard, so a single failed vLLM request aborts generation for all remaining tasks in the batch via an unhandled IndexError.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
This assumes every vLLM result has at least one completion in output.outputs. zip(..., strict=True) catches a result-count mismatch, but it does not protect against an individual RequestOutput(outputs=[]), which would raise IndexError here.

Can we add a small helper to safely extract generated text and either fail loudly with task/audio id + prompt index, or mark the segment with something like pnc_generation_error? This should also have tests for empty completions and partial/mismatched vLLM outputs.

Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
@sushmitha-deva-09
Copy link
Copy Markdown
Contributor Author

/ok to test 8eece4b

Comment thread nemo_curator/models/vllm_model.py Outdated
Comment on lines +357 to +359
def load_model(self) -> None:
"""Instantiate the ``vllm.LLM`` engine."""
os.environ["VLLM_USE_V1"] = "0"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 load_model() unconditionally sets VLLM_USE_V1=0, silently overriding any user- or system-level environment variable. The import-time os.environ.setdefault(...) correctly preserves user overrides, but this second, unconditional assignment undoes that guarantee. A user who exports VLLM_USE_V1=1 for compatibility with a specific vLLM build will have it silently clobbered on every load_model() call, causing hard-to-diagnose engine-initialisation failures.

Suggested change
def load_model(self) -> None:
"""Instantiate the ``vllm.LLM`` engine."""
os.environ["VLLM_USE_V1"] = "0"
def load_model(self) -> None:
"""Instantiate the ``vllm.LLM`` engine."""
os.environ.setdefault("VLLM_USE_V1", "0")

Comment on lines +113 to +116
def teardown(self) -> None:
if self._vllm is not None:
self._vllm.clean_up()
self._vllm = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 teardown() nullifies _vllm, breaking subsequent setup() calls

teardown() sets self._vllm = None. Both setup_on_node() (line 107) and setup() (line 111) call self._vllm.setup_on_node() / self._vllm.setup() with no null-guard. If the pipeline framework calls setup() after teardown() — for example on a worker restart or pipeline retry — both methods raise AttributeError: 'NoneType' object has no attribute 'setup'. Similarly, process_batch() enters _collect_prompts() which calls self._vllm.get_entry_prompt(...) after teardown, crashing on the same null reference.

Comment on lines +173 to +174
for (task_idx, key_path), output in zip(result_map, outputs, strict=True):
self._store_generation(tasks[task_idx].data, key_path, output.outputs[0].text, self.generation_field)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 IndexError on empty vLLM outputs list and strict=True zip length mismatch

Two failure paths converge here. First, output.outputs[0].text raises IndexError when vLLM returns an empty outputs list for a request (e.g., due to an OOM on a single request in a large batch). Second, _collect_prompts appends (prompt, key_path) unconditionally even when get_entry_prompt returns [] on failure; those empty-list entries may be dropped or rejected by vLLM silently, making len(outputs) < len(result_map), which causes zip(..., strict=True) to raise ValueError and abort the entire task batch. The sibling VLLMModel.generate() guards against the empty-outputs case with out.outputs[0].text if out.outputs else "".

Comment on lines +165 to +166
for segment in data_entry["segments"]:
duration = segment["end"] - segment["start"]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 KeyError when start/end are absent from a segment

segment["end"] - segment["start"] is evaluated before the hypothesis_text_key / reference_text_key guard on line 168. Any segment that passes inputs() validation but lacks "start" or "end" raises KeyError here, aborting processing for all remaining segments in the entry rather than just skipping the malformed one. Using .get("end", 0) / .get("start", 0) plus a if duration <= 0: continue guard would prevent the hard crash.

Comment on lines +94 to +100
logger.error(f"Failed to load audio path: {audio_path}, exception={ex}")
return task
segments = data_entry.get("segments", [])

for segment in segments:
if segment.get("speaker") == "no-speaker" or segment.get("text", "").strip() == "":
continue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Zero-length segment crashes _estimate_bandwidth

There is no guard for start >= end before slicing audio and passing the result to _estimate_bandwidth. When start == end, audio_segment is an empty NumPy array; librosa.stft on a zero-length input raises ParameterError, which propagates uncaught through process() and aborts the entire entry. squim.py explicitly checks if num_frames <= 0: continue for the same scenario — the same guard is needed here.

Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
@sushmitha-deva-09
Copy link
Copy Markdown
Contributor Author

/ok to test a94ee81

Comment on lines +117 to +118
start = segment["start"]
end = segment["end"]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Direct segment["start"]/segment["end"] access raises KeyError for malformed segments

start = segment["start"] and end = segment["end"] on lines 117–118 are unguarded direct accesses. If a segment in the pipeline lacks either key, a KeyError propagates outside the inner try/except block (lines 134–149), aborting processing for ALL remaining segments in the entry. The num_frames <= 0 guard on line 124 is present for zero-length protection, but does not help when the keys are absent entirely. Using segment.get("start", 0) / segment.get("end", 0) with an early continue when start >= end (consistent with the zero-length guard) would prevent the hard crash.

Comment on lines +93 to +101
if chunk_end < len(words) and any(c.isdigit() for c in words[chunk_end]):
shorter_strings.append(
" ".join(prev_string + words[chunk_start : chunk_end - self.num_words_look_back])
)
prev_string = words[chunk_end - self.num_words_look_back : chunk_end]
else:
shorter_strings.append(" ".join(prev_string + words[chunk_start:chunk_end]))
prev_string = []
remainder_start = chunk_end
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Text silently dropped when num_words_look_back >= num_words_threshold

When num_words_look_back is set greater than or equal to num_words_threshold, chunk_end - self.num_words_look_back becomes zero or negative. The slice words[chunk_start : non_positive_index] is empty, so the entire content of that chunk is silently discarded from shorter_strings. The prev_string carry-over from this iteration then bleeds into the next chunk, producing incorrect normalized text with no warning or error. A validation at construction time (if self.num_words_look_back >= self.num_words_threshold) would surface this misconfiguration immediately.

Comment on lines +102 to +103
start = segment["start"]
end = segment["end"]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Direct segment["start"]/segment["end"] access raises KeyError for incomplete segments

segment["start"] and segment["end"] at lines 102–103 are unguarded direct dictionary accesses. Any segment missing these keys raises KeyError, which propagates uncaught through process() and aborts all remaining segments in the entry. squim.py has the same pattern. Both stages should use .get("start", 0) / .get("end", 0) with an early continue when start >= end (matching squim.py's own zero-length guard logic) to remain consistent and resilient to malformed inputs.

Comment on lines +173 to +174
for (task_idx, key_path), output in zip(result_map, outputs, strict=True):
self._store_generation(tasks[task_idx].data, key_path, output.outputs[0].text, self.generation_field)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
This assumes every vLLM result has at least one completion in output.outputs. zip(..., strict=True) catches a result-count mismatch, but it does not protect against an individual RequestOutput(outputs=[]), which would raise IndexError here.

Can we add a small helper to safely extract generated text and either fail loudly with task/audio id + prompt index, or mark the segment with something like pnc_generation_error? This should also have tests for empty completions and partial/mismatched vLLM outputs.

Comment thread nemo_curator/models/vllm_model.py Outdated
Comment thread nemo_curator/stages/audio/tagging/text/pnc.py
metrics["char_rate"] = self.get_char_rate(segment[self.hypothesis_text_key], duration)
metrics["word_rate"] = self.get_word_rate(segment[self.hypothesis_text_key], duration)

wer_val, tokens, ins_rate, del_rate, sub_rate = word_error_rate_detail(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This checks that the raw hypothesis/reference keys exist, but the normalized strings can still be empty. For example, empty text, whitespace-only text, punctuation-only text, or text stripped by the normalizer can produce hypothesis_clean == "" or reference_clean == "".

Can we guard empty normalized hypothesis/reference before calling word_error_rate_detail()? I think the stage should either skip the metric with an explicit metric_skip_reason or write None for WER/CER, rather than relying on the WER helper's behavior for empty inputs. It would be good to add tests for empty, whitespace-only, punctuation-only, and normalized-to-empty cases.

Proposed Fix

Before each WER/CER call, explicitly check normalized strings.

For example:

if not reference_clean.strip() or not hypothesis_clean.strip():
metrics["wer"] = None
metrics["cer"] = None
metrics["metric_skip_reason"] = "empty_normalized_text"
segment["metrics"] = metrics
continue
Or use a helper:

safe_word_error_rate_detail(...)
that returns None/skip metadata for empty refs/hyps.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if one text is empty, we'll need to call the wer helper method and save all insertion, deletion rate values.

Comment thread nemo_curator/stages/audio/tagging/metrics/squim.py Outdated
Comment thread nemo_curator/stages/audio/tagging/metrics/squim.py Outdated
Comment thread nemo_curator/stages/audio/tagging/prepare_module_segments.py Outdated
Comment thread tutorials/audio/tagging/asr_pipeline.yaml Outdated
Comment thread nemo_curator/stages/audio/tagging/text/pnc.py
pipeline.add_stage(BandwidthEstimationStage(name="BandwidthEstimation").with_(resources=Resources(cpus=cpus)))

# Audio quality metrics (PESQ, STOI, SI-SDR)
pipeline.add_stage(TorchSquimQualityMetricsStage(name="SquimMetrics").with_(resources=Resources(gpus=0.05)))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a short comment that these fractional GPU settings are benchmark-specific empirical values, not general production defaults? They may be valid for this single-GPU benchmark, but values like 0.05 GPU for SQUIM are hardware/workload dependent and could be misleading if copied into other pipeline configs.

Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
@sushmitha-deva-09
Copy link
Copy Markdown
Contributor Author

/ok to test da03db0

Signed-off-by: Sushmitha Deva <sdeva@nvidia.com>
@sushmitha-deva-09
Copy link
Copy Markdown
Contributor Author

/ok to test fde3e80

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return self._llm.get_tokenizer()


class VLLMInference:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I am wondering what the overlaps of VLLMModel and VLLMInference are? Like is there some parent class we should create for them to share?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants