Skip to content

Feature/bq squashfs pipeline#79

Draft
mohamedelabbas1996 wants to merge 23 commits into
mainfrom
feature/bq-squashfs-pipeline
Draft

Feature/bq squashfs pipeline#79
mohamedelabbas1996 wants to merge 23 commits into
mainfrom
feature/bq-squashfs-pipeline

Conversation

@mohamedelabbas1996

Copy link
Copy Markdown

No description provided.

mohamedelabbas1996 and others added 23 commits May 1, 2026 11:58
Five-stage pipeline for building a WebDataset from the BigQuery
training_images table on the fir cluster:

  1. download   - parallel SLURM array job (×10), downloads images from
                  iNaturalist, verifies with PIL, writes results back to BQ,
                  packs into per-chunk sqfs files
  2. bq_export  - streams qualifying images metadata from BQ to CSV
  3. split      - stratified train/val/test split with occurrence-level
                  grouping to prevent data leakage
  4. webdataset - two implementations: fir-specific NVMe-optimised packer
                  (create_webdataset.py) and a generic version for
                  pre-mounted directories (create_webdataset_generic.py)
  5. train      - ResNet-50 classifier with auto-resume from checkpoint

README documents each stage, the task_0…task_9 sqfs scheme, why two
webdataset implementations exist, and a one-liner to chain the full pipeline.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…hfs error handling

Scale issues fixed from real 10M-image run logs:
- Retry with exponential backoff + jitter (fixes Errno 16, 429, 503, timeouts)
- Per-thread requests.Session to reduce socket churn
- BQ write retry (3 attempts, 30s backoff) — previously silent loss on failure
- mksquashfs failures now raise RuntimeError so SLURM marks task failed cleanly
- Chunk accumulation warning when >20 chunk sqfs files build up in staging

New behaviour:
- Inline MERGE into training_images after each chunk write via temp table —
  no separate update job needed; status is current by end of each chunk
- --table-prefix flag for testing against test_training_images tables
- Per-chunk throughput logging (img/s) to detect throttling

Documentation:
- Mid-chunk restart behaviour documented in module docstring (re-download cost,
  no data loss)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Creates test_training_images (50 rows, fetch_status=pending) and
test_training_images_downloads (empty) from production table samples.
Use --table-prefix test_ with download_images.py to run against these.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
22 tests covering all failure paths identified from real 10M-image run logs:

Retry logic (_fetch_with_retry):
- 429 rate limit → retries then succeeds
- 503 server error → retries then succeeds
- ConnectionError Errno 16 (too many sockets) → retries then succeeds
- Timeout → retries then succeeds
- 404 → raises immediately, no retry
- Exhausted retries (connection error / timeout) → raises

Download + verify:
- Valid JPEG → downloaded, dimensions populated
- Network failure → fetch_status=failed
- Corrupted/truncated image → fetch_status=corrupted

BQ write retry:
- Transient BQ error → retries then succeeds
- All retries exhausted → raises

SquashFS packing:
- mksquashfs non-zero exit → RuntimeError (SLURM marks task failed)
- Empty staging dir → returns None without calling mksquashfs

Inline MERGE:
- Only failed results → skips merge entirely
- Downloaded/corrupted results → load temp table + MERGE + cleanup
- MERGE failure → temp table still deleted (finally block)

Accumulation warning:
- Below threshold → no warning
- At or above threshold → prints WARNING

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
9 new tests covering the task partitioning logic:
- No overlap between tasks across 10-way split (100 images)
- Full coverage: union of all task subsets == complete image set
- Task 3 of 10 receives only photo_ids where id % 10 == 3
- Uneven split (101 images / 10 tasks): sizes differ by at most 1
- Single job (num_jobs=1): task 0 receives all images
- Resumability: LEFT JOIN correctly excludes already-attempted images
- force_redownload=True: query has no LEFT JOIN
- Normal query: LEFT JOIN present with correct MOD clause
- --limit N: LIMIT clause present in generated SQL

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
4 fixtures available to all tests under tests/ with no imports:

  mock_bq_client  — create_autospec(bigquery.Client) with defaults:
                    query().result() → [], load_table_from_dataframe → None
                    Rejects typos in method names unlike plain MagicMock

  small_df        — raw DataFrame: 5 species × 10 images, photo_ids 0-49
  small_csv       — same written to a CSV file (direct input for split/export)
                    photo_ids span all 10 tasks, 2 images share each gbif_id,
                    all species have >= 5 images for min_instances tests

  small_sqfs      — session-scoped real sqfs with 10 PIL JPEGs built via
                    mksquashfs; skipped automatically if binary unavailable

  sample_sql_file — minimal .sql file for bq_export.py tests

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Each flag now explains:
  --staging-dir    : use scratch not home, inode quota warning, chunk files persist here
  --num-jobs       : must match SLURM --array range, typical value 10
  --task-id        : set to $SLURM_ARRAY_TASK_ID in array jobs
  --num-workers    : 320 concurrent connections at scale, Errno 16 context
  --chunk-size     : inode impact, one chunk_NNNN.sqfs per chunk
  --limit          : test-only, pair with --table-prefix
  --force-redownload: when to use it (staging deleted after failed pack)
  --table-prefix   : create test tables first with create_test_tables.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
8 new tests in TestMultiTaskDistributionAndMerge covering how work is
split across parallel jobs and how results are merged back:

Partitioning:
- num_jobs=2: task 0 + task 1 cover all images, no overlap
- num_jobs=10: all 10 tasks together cover every image exactly once
- task 0 completing does not affect task 1's pending query (disjoint UUIDs)
- non-sequential real iNat photo_ids (large ints) partition correctly
- empty task (0 images assigned) handled gracefully

BQ writes:
- both tasks append to downloads table independently (WRITE_APPEND, no conflict)
- each task's MERGE touches only its own rows, one temp table per task
- after all tasks complete, every image is accounted for

Also verified with real 2-task simulation against test BQ tables:
- task 0: 26 images (even photo_ids), chunk_0001.sqfs 7.7MB
- task 1: 24 images (odd photo_ids),  chunk_0001.sqfs 6.1MB
- BQ: 50/50 downloaded, 0 pending, 50 rows in downloads table
- resumability: re-run both tasks → 0 pending each

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Streams chunk sqfs files as a single continuous tar to stdout for piping
to sqfstar. Processes chunks one at a time (mount → stream → unmount) to
keep peak scratch usage manageable at scale.

Key behaviours tested:
  stream_dir_to_tar:
    - files added with paths relative to mount dir (bucket/filename.jpg)
    - dirs included, only files counted
    - multiple bucket dirs all streamed
  squashfuse_mount/unmount:
    - successful mount returns True
    - failed mount returns False + logs ERROR
    - unmount calls fusermount -u
  main flow:
    - empty/missing staging dir → exit 1
    - --dry-run lists chunks in sorted order without mounting
    - single chunk → valid tar stream
    - two chunks → one continuous stream (same tar object, one EOF)
    - --delete-after-stream removes each chunk after streaming
    - without flag, chunks preserved on disk
  error handling:
    - failed mount skipped, remaining chunks continue, exit 1
    - all mounts fail → exit 1
    - chunks always processed in sorted order

Verified with real simulation:
  2 tasks × 1 chunk each → 2 merged sqfs files
  task_0_test.sqfs: 26 images (7.7MB) | task_1_test.sqfs: 24 images (6.1MB)
  Total: 50/50 images ✓

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously BrokenPipeError produced a Python traceback that made it hard
to diagnose the root cause in SLURM logs. Now prints a clear message:

  FATAL: BrokenPipeError — the downstream process (sqfstar) died
  unexpectedly. This usually means sqfstar was OOM killed.
  Check sqfstar exit code and increase --mem in the job script.

Then exits with code 1 so the SLURM job is correctly marked failed.

Root cause context: in the real 10M-image run, sqfstar was OOM killed
when merging all 10 tasks globally (~10M files, inode table needs 4-8TB).
The BrokenPipeError appeared in every failed pack log alongside exit=137
from sqfstar. Fix was to merge per task (~1M files, 83-101GB RAM) instead.

Simulated and confirmed:
  - Global merge OOM → BrokenPipeError → exit=1 (correct, clear message)
  - Per-task merge success → 200/200 images across 4 task sqfs files

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Failures simulated from actual 10M-image run logs + new scenarios:

  Corrupt chunk (invalid sqfs bytes):
    - squashfuse rejects it with clear error + retries once
    - Skipped, remaining valid chunks continue streaming
    - exit=1 so job script detects partial failure

  Empty chunk (squashfuse mounts, 0 images found):
    - Previously: silent, exit=0 — could mask download failures
    - Now: WARNING logged, empty_chunks counter, exit=1

  SIGKILL crash (OOM / kill -9):
    - Previously: /tmp/sqfs_stream_* dirs leaked on disk
    - Now: atexit handler cleans up on any exit including SIGKILL

  SIGTERM (SLURM wall-time timeout):
    - Previously: unhandled, temp dirs leaked
    - Now: signal handler calls cleanup + exits 1

  squashfuse mount transient failure:
    - Previously: single attempt, permanent failure on transient error
    - Now: 1 retry with 5s delay before giving up

  squashfuse unmount failure (busy mount):
    - Previously: silently ignored via capture_output=True
    - Now: 3 retries with 2s delay, warning logged on all failures
    - Does not raise — node exit will clean up the mount

  sqfstar not in PATH (exit=127):
    - BrokenPipeError message now lists all common causes including
      "sqfstar not found (exit=127): check module load and PATH"

  --delete-after-stream data loss risk:
    - Warning printed before starting when flag is active

New summary line: total_images=N errors=M empty_chunks=K
exit=1 if errors>0 OR empty_chunks>0

23 tests total (18 original + 5 new hardening tests)

Verified with real 4-task simulation: 200/200 images across all scenarios

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…dening

Renamed for clarity — the script merges sqfs chunks, not just streams to tar.

Changes from stream_chunks_to_tar.py:
- --delete-after-stream removed entirely: chunks are never deleted by this
  script. Deletion is the job script's responsibility after verification,
  preventing all data loss on OOM or other failures.
- Full --help with WHAT IT DOES / INPUT / OUTPUT / DISK SPACE / EXIT CODES /
  TYPICAL USAGE sections including copy-paste SLURM examples
- staging_dir argument (was staging_base) with clear description
- Log prefix [stream] → [merge], temp dir prefix sqfs_stream_ → sqfs_merge_
- BrokenPipeError message updated: "Chunk files are preserved — fix the cause
  and re-submit the pack job" (removed the --delete-after-stream caveat)
- Empty chunk WARNING now mentions download_images.py as likely cause
- SIGTERM handler message updated to [merge] prefix

Hardening carried over:
  squashfuse retry, unmount retry, empty chunk detection, BrokenPipeError
  exit code 120 fix, SIGTERM cleanup, atexit cleanup

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Renamed test_stream_chunks_to_tar.py → test_merge_sqfs_chunks.py.

Updated references:
  - import merge_sqfs_chunks as sct (was stream_chunks_to_tar)
  - sys.argv uses merge_sqfs_chunks.py filename
  - log assertions check [merge] prefix (was [stream])

Test changes for --delete-after-stream removal:
  - test_delete_after_stream_removes_chunk → test_chunks_always_preserved_after_stream
    Verifies chunks remain on disk after streaming (deletion is job script's job)
  - test_delete_after_stream_warning_printed → test_delete_after_stream_flag_removed
    Verifies argparse rejects the removed flag with exit=2

22 tests, all passing.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously failed rows (404, 403, exhausted retries) were written to
training_images_downloads as 'failed' but training_images stayed
'pending' permanently. This meant:
  - 'pending' was ambiguous: "not tried" vs "tried and failed"
  - retry_failed_downloads.py would waste time retrying 404s

Now all three outcomes are merged into training_images:
  downloaded → fetch_status='downloaded', dims + corrupted populated
  corrupted  → fetch_status='corrupted',  corrupted=True, dims NULL
  failed     → fetch_status='failed',     all extra fields NULL

Permanent failures are now excluded from future re-runs via
WHERE fetch_status='pending' without needing the LEFT JOIN check.
Retrying is still possible intentionally via retry_failed_downloads.py.

7 new/updated tests in TestMergeChunkIntoTrainingImages:
  - downloaded, corrupted, failed each trigger merge independently
  - all three statuses merged in one temp table + MERGE call
  - failed rows confirmed present in temp table dataframe
  - temp table cleanup on MERGE failure still works

Verified end-to-end against real BQ:
  - 5 scenarios: clean, corrupt, 404, 403, exhausted retries
  - all 5 statuses correct in training_images after merge
  - 0 rows re-queued on re-run (WHERE fetch_status='pending')
  - 400-image scale test: all downloaded, merged, verified ✓

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Three-stage merge job replacing the old version from backup branch:

  Stage 1 — Count expected images across all chunks (unsquashfs -l)
  Stage 2 — Stream chunks → sqfstar → task_N.sqfs
  Stage 3 — Verify: ACTUAL == EXPECTED before deleting anything
             Chunks only deleted when all three conditions pass:
               stream_exit=0, sqfstar_exit=0, image count matches

Safety design:
  - Chunks are NEVER deleted on failure — always preserved for retry
  - PIPESTATUS captured atomically: PIPE_STATUS=("${PIPESTATUS[@]}")
    (assigning PIPESTATUS[0] resets PIPESTATUS — common bash trap)
  - References merge_sqfs_chunks.py (renamed from stream_chunks_to_tar)
  - Updated --array=0-9 (was 2-9 from old incident where tasks 0+1
    had staging files deleted by a failed global pack run)
  - Uses merge_sqfs_chunks.py which never deletes chunks itself

Verified with 1000-image simulation:
  - Clean merge: verify passes, chunks deleted ✓
  - OOM: chunks preserved, re-submit succeeds ✓
  - Corrupt chunk: stream exits 1, chunks preserved ✓
  - Empty chunk: stream exits 1, chunks preserved ✓
  - sqfstar missing: BrokenPipeError caught, chunks preserved ✓

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Cleans training_images table in-place by removing 3 duplicate types:
1. Exact duplicate rows (same photo+taxon+gbif)
2. Same photo mapped to multiple taxa — drop strategy by default
3. Same photo+taxon with multiple gbif_ids — keep MIN gbif_id

Supports --dry-run, --min-images-per-taxon, --multi-taxon-strategy,
and --log-file for JSON output. Operates on any dataset via --dataset.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Two layers:
- Mock-based: SQL generation, BQ client calls, log arithmetic, main() wiring
- DuckDB integration: executes real CTE SQL against in-memory rows,
  covers all 3 dup types + edge cases (overlapping types, 3+ copies, empty table)

No real BQ connection needed — all tests run locally in ~2.5s.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Add --dataset CLI arg (default: global_butterflies_2604, backwards compatible)
- Handle NULL fetch_status in get_pending_rows and MERGE condition for global_all_leps_2605
- Derive tmp_table dataset from training_table string, not hardcoded BQ_DATASET constant
- Add 6 new tests covering dataset routing, NULL query handling, and tmp_table derivation

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Pass --dataset global_all_leps_2605 to download_images.py
- Use separate staging dir /scratch/melabbas/global_all_leps_2605/task_{N}/
- Rename job to bq_dl_2605 for squeue clarity

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Concurrent array tasks each MERGE chunk results into training_images;
BigQuery aborts colliding DML with "Could not serialize access ...
due to concurrent update" (400), which the client does not retry.
Observed in production with 8 concurrent tasks (job 43176702_0).

Retry only serialization conflicts, up to 10 attempts with jittered
exponential backoff (2s base, 60s cap). Other BadRequest errors
still raise immediately.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- --dataset flag replaces hardcoded global_butterflies_2604
- sample WHERE includes fetch_status IS NULL (new datasets have no
  'downloaded' rows yet) and uses ORDER BY RAND() for unbiased
  size/distribution estimates
- test tables renamed with leading underscore
  (_test_training_images, _test_training_images_downloads) so test
  artifacts sort together, separate from production tables;
  use with --table-prefix _test_

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…eline

Each array task now performs the full lifecycle for its
MOD(photo_id, 60) slice — download (BQ-checkpointed 10k chunks) →
count-verify → stream-merge to task_N.sqfs → count-verify → upload
to Arbutus object store → byte-size verify → only then delete local
files. Peak scratch is bounded by the array throttle (%K × ~2× task
archive), not the dataset size.

Replaces the previous all-at-once design which required the whole
dataset on scratch before merging and never propagated failure exit
codes (last command was notify, so afterok chaining and FAIL mail
were broken).

Tasks are idempotent (skip if remote archive exists) and resume-safe
(prior chunks stashed to a resume_* subdir to avoid the chunk-numbering
overwrite; already-attempted images skipped via the downloads table).

Validated in production: jobs 43176702/43176730 (global_all_leps_2605).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented Jun 5, 2026

Copy link
Copy Markdown

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 6e8e3893-bbad-4fc0-99fb-482d3d96fce3

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/bq-squashfs-pipeline

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant