Feature/bq squashfs pipeline#79
Draft
mohamedelabbas1996 wants to merge 23 commits into
Draft
Conversation
Five-stage pipeline for building a WebDataset from the BigQuery
training_images table on the fir cluster:
1. download - parallel SLURM array job (×10), downloads images from
iNaturalist, verifies with PIL, writes results back to BQ,
packs into per-chunk sqfs files
2. bq_export - streams qualifying images metadata from BQ to CSV
3. split - stratified train/val/test split with occurrence-level
grouping to prevent data leakage
4. webdataset - two implementations: fir-specific NVMe-optimised packer
(create_webdataset.py) and a generic version for
pre-mounted directories (create_webdataset_generic.py)
5. train - ResNet-50 classifier with auto-resume from checkpoint
README documents each stage, the task_0…task_9 sqfs scheme, why two
webdataset implementations exist, and a one-liner to chain the full pipeline.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…hfs error handling Scale issues fixed from real 10M-image run logs: - Retry with exponential backoff + jitter (fixes Errno 16, 429, 503, timeouts) - Per-thread requests.Session to reduce socket churn - BQ write retry (3 attempts, 30s backoff) — previously silent loss on failure - mksquashfs failures now raise RuntimeError so SLURM marks task failed cleanly - Chunk accumulation warning when >20 chunk sqfs files build up in staging New behaviour: - Inline MERGE into training_images after each chunk write via temp table — no separate update job needed; status is current by end of each chunk - --table-prefix flag for testing against test_training_images tables - Per-chunk throughput logging (img/s) to detect throttling Documentation: - Mid-chunk restart behaviour documented in module docstring (re-download cost, no data loss) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Creates test_training_images (50 rows, fetch_status=pending) and test_training_images_downloads (empty) from production table samples. Use --table-prefix test_ with download_images.py to run against these. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
22 tests covering all failure paths identified from real 10M-image run logs: Retry logic (_fetch_with_retry): - 429 rate limit → retries then succeeds - 503 server error → retries then succeeds - ConnectionError Errno 16 (too many sockets) → retries then succeeds - Timeout → retries then succeeds - 404 → raises immediately, no retry - Exhausted retries (connection error / timeout) → raises Download + verify: - Valid JPEG → downloaded, dimensions populated - Network failure → fetch_status=failed - Corrupted/truncated image → fetch_status=corrupted BQ write retry: - Transient BQ error → retries then succeeds - All retries exhausted → raises SquashFS packing: - mksquashfs non-zero exit → RuntimeError (SLURM marks task failed) - Empty staging dir → returns None without calling mksquashfs Inline MERGE: - Only failed results → skips merge entirely - Downloaded/corrupted results → load temp table + MERGE + cleanup - MERGE failure → temp table still deleted (finally block) Accumulation warning: - Below threshold → no warning - At or above threshold → prints WARNING Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
9 new tests covering the task partitioning logic: - No overlap between tasks across 10-way split (100 images) - Full coverage: union of all task subsets == complete image set - Task 3 of 10 receives only photo_ids where id % 10 == 3 - Uneven split (101 images / 10 tasks): sizes differ by at most 1 - Single job (num_jobs=1): task 0 receives all images - Resumability: LEFT JOIN correctly excludes already-attempted images - force_redownload=True: query has no LEFT JOIN - Normal query: LEFT JOIN present with correct MOD clause - --limit N: LIMIT clause present in generated SQL Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
4 fixtures available to all tests under tests/ with no imports:
mock_bq_client — create_autospec(bigquery.Client) with defaults:
query().result() → [], load_table_from_dataframe → None
Rejects typos in method names unlike plain MagicMock
small_df — raw DataFrame: 5 species × 10 images, photo_ids 0-49
small_csv — same written to a CSV file (direct input for split/export)
photo_ids span all 10 tasks, 2 images share each gbif_id,
all species have >= 5 images for min_instances tests
small_sqfs — session-scoped real sqfs with 10 PIL JPEGs built via
mksquashfs; skipped automatically if binary unavailable
sample_sql_file — minimal .sql file for bq_export.py tests
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Each flag now explains: --staging-dir : use scratch not home, inode quota warning, chunk files persist here --num-jobs : must match SLURM --array range, typical value 10 --task-id : set to $SLURM_ARRAY_TASK_ID in array jobs --num-workers : 320 concurrent connections at scale, Errno 16 context --chunk-size : inode impact, one chunk_NNNN.sqfs per chunk --limit : test-only, pair with --table-prefix --force-redownload: when to use it (staging deleted after failed pack) --table-prefix : create test tables first with create_test_tables.py Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
8 new tests in TestMultiTaskDistributionAndMerge covering how work is split across parallel jobs and how results are merged back: Partitioning: - num_jobs=2: task 0 + task 1 cover all images, no overlap - num_jobs=10: all 10 tasks together cover every image exactly once - task 0 completing does not affect task 1's pending query (disjoint UUIDs) - non-sequential real iNat photo_ids (large ints) partition correctly - empty task (0 images assigned) handled gracefully BQ writes: - both tasks append to downloads table independently (WRITE_APPEND, no conflict) - each task's MERGE touches only its own rows, one temp table per task - after all tasks complete, every image is accounted for Also verified with real 2-task simulation against test BQ tables: - task 0: 26 images (even photo_ids), chunk_0001.sqfs 7.7MB - task 1: 24 images (odd photo_ids), chunk_0001.sqfs 6.1MB - BQ: 50/50 downloaded, 0 pending, 50 rows in downloads table - resumability: re-run both tasks → 0 pending each Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Streams chunk sqfs files as a single continuous tar to stdout for piping
to sqfstar. Processes chunks one at a time (mount → stream → unmount) to
keep peak scratch usage manageable at scale.
Key behaviours tested:
stream_dir_to_tar:
- files added with paths relative to mount dir (bucket/filename.jpg)
- dirs included, only files counted
- multiple bucket dirs all streamed
squashfuse_mount/unmount:
- successful mount returns True
- failed mount returns False + logs ERROR
- unmount calls fusermount -u
main flow:
- empty/missing staging dir → exit 1
- --dry-run lists chunks in sorted order without mounting
- single chunk → valid tar stream
- two chunks → one continuous stream (same tar object, one EOF)
- --delete-after-stream removes each chunk after streaming
- without flag, chunks preserved on disk
error handling:
- failed mount skipped, remaining chunks continue, exit 1
- all mounts fail → exit 1
- chunks always processed in sorted order
Verified with real simulation:
2 tasks × 1 chunk each → 2 merged sqfs files
task_0_test.sqfs: 26 images (7.7MB) | task_1_test.sqfs: 24 images (6.1MB)
Total: 50/50 images ✓
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously BrokenPipeError produced a Python traceback that made it hard to diagnose the root cause in SLURM logs. Now prints a clear message: FATAL: BrokenPipeError — the downstream process (sqfstar) died unexpectedly. This usually means sqfstar was OOM killed. Check sqfstar exit code and increase --mem in the job script. Then exits with code 1 so the SLURM job is correctly marked failed. Root cause context: in the real 10M-image run, sqfstar was OOM killed when merging all 10 tasks globally (~10M files, inode table needs 4-8TB). The BrokenPipeError appeared in every failed pack log alongside exit=137 from sqfstar. Fix was to merge per task (~1M files, 83-101GB RAM) instead. Simulated and confirmed: - Global merge OOM → BrokenPipeError → exit=1 (correct, clear message) - Per-task merge success → 200/200 images across 4 task sqfs files Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Failures simulated from actual 10M-image run logs + new scenarios:
Corrupt chunk (invalid sqfs bytes):
- squashfuse rejects it with clear error + retries once
- Skipped, remaining valid chunks continue streaming
- exit=1 so job script detects partial failure
Empty chunk (squashfuse mounts, 0 images found):
- Previously: silent, exit=0 — could mask download failures
- Now: WARNING logged, empty_chunks counter, exit=1
SIGKILL crash (OOM / kill -9):
- Previously: /tmp/sqfs_stream_* dirs leaked on disk
- Now: atexit handler cleans up on any exit including SIGKILL
SIGTERM (SLURM wall-time timeout):
- Previously: unhandled, temp dirs leaked
- Now: signal handler calls cleanup + exits 1
squashfuse mount transient failure:
- Previously: single attempt, permanent failure on transient error
- Now: 1 retry with 5s delay before giving up
squashfuse unmount failure (busy mount):
- Previously: silently ignored via capture_output=True
- Now: 3 retries with 2s delay, warning logged on all failures
- Does not raise — node exit will clean up the mount
sqfstar not in PATH (exit=127):
- BrokenPipeError message now lists all common causes including
"sqfstar not found (exit=127): check module load and PATH"
--delete-after-stream data loss risk:
- Warning printed before starting when flag is active
New summary line: total_images=N errors=M empty_chunks=K
exit=1 if errors>0 OR empty_chunks>0
23 tests total (18 original + 5 new hardening tests)
Verified with real 4-task simulation: 200/200 images across all scenarios
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…dening Renamed for clarity — the script merges sqfs chunks, not just streams to tar. Changes from stream_chunks_to_tar.py: - --delete-after-stream removed entirely: chunks are never deleted by this script. Deletion is the job script's responsibility after verification, preventing all data loss on OOM or other failures. - Full --help with WHAT IT DOES / INPUT / OUTPUT / DISK SPACE / EXIT CODES / TYPICAL USAGE sections including copy-paste SLURM examples - staging_dir argument (was staging_base) with clear description - Log prefix [stream] → [merge], temp dir prefix sqfs_stream_ → sqfs_merge_ - BrokenPipeError message updated: "Chunk files are preserved — fix the cause and re-submit the pack job" (removed the --delete-after-stream caveat) - Empty chunk WARNING now mentions download_images.py as likely cause - SIGTERM handler message updated to [merge] prefix Hardening carried over: squashfuse retry, unmount retry, empty chunk detection, BrokenPipeError exit code 120 fix, SIGTERM cleanup, atexit cleanup Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Renamed test_stream_chunks_to_tar.py → test_merge_sqfs_chunks.py.
Updated references:
- import merge_sqfs_chunks as sct (was stream_chunks_to_tar)
- sys.argv uses merge_sqfs_chunks.py filename
- log assertions check [merge] prefix (was [stream])
Test changes for --delete-after-stream removal:
- test_delete_after_stream_removes_chunk → test_chunks_always_preserved_after_stream
Verifies chunks remain on disk after streaming (deletion is job script's job)
- test_delete_after_stream_warning_printed → test_delete_after_stream_flag_removed
Verifies argparse rejects the removed flag with exit=2
22 tests, all passing.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously failed rows (404, 403, exhausted retries) were written to training_images_downloads as 'failed' but training_images stayed 'pending' permanently. This meant: - 'pending' was ambiguous: "not tried" vs "tried and failed" - retry_failed_downloads.py would waste time retrying 404s Now all three outcomes are merged into training_images: downloaded → fetch_status='downloaded', dims + corrupted populated corrupted → fetch_status='corrupted', corrupted=True, dims NULL failed → fetch_status='failed', all extra fields NULL Permanent failures are now excluded from future re-runs via WHERE fetch_status='pending' without needing the LEFT JOIN check. Retrying is still possible intentionally via retry_failed_downloads.py. 7 new/updated tests in TestMergeChunkIntoTrainingImages: - downloaded, corrupted, failed each trigger merge independently - all three statuses merged in one temp table + MERGE call - failed rows confirmed present in temp table dataframe - temp table cleanup on MERGE failure still works Verified end-to-end against real BQ: - 5 scenarios: clean, corrupt, 404, 403, exhausted retries - all 5 statuses correct in training_images after merge - 0 rows re-queued on re-run (WHERE fetch_status='pending') - 400-image scale test: all downloaded, merged, verified ✓ Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Three-stage merge job replacing the old version from backup branch:
Stage 1 — Count expected images across all chunks (unsquashfs -l)
Stage 2 — Stream chunks → sqfstar → task_N.sqfs
Stage 3 — Verify: ACTUAL == EXPECTED before deleting anything
Chunks only deleted when all three conditions pass:
stream_exit=0, sqfstar_exit=0, image count matches
Safety design:
- Chunks are NEVER deleted on failure — always preserved for retry
- PIPESTATUS captured atomically: PIPE_STATUS=("${PIPESTATUS[@]}")
(assigning PIPESTATUS[0] resets PIPESTATUS — common bash trap)
- References merge_sqfs_chunks.py (renamed from stream_chunks_to_tar)
- Updated --array=0-9 (was 2-9 from old incident where tasks 0+1
had staging files deleted by a failed global pack run)
- Uses merge_sqfs_chunks.py which never deletes chunks itself
Verified with 1000-image simulation:
- Clean merge: verify passes, chunks deleted ✓
- OOM: chunks preserved, re-submit succeeds ✓
- Corrupt chunk: stream exits 1, chunks preserved ✓
- Empty chunk: stream exits 1, chunks preserved ✓
- sqfstar missing: BrokenPipeError caught, chunks preserved ✓
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Cleans training_images table in-place by removing 3 duplicate types: 1. Exact duplicate rows (same photo+taxon+gbif) 2. Same photo mapped to multiple taxa — drop strategy by default 3. Same photo+taxon with multiple gbif_ids — keep MIN gbif_id Supports --dry-run, --min-images-per-taxon, --multi-taxon-strategy, and --log-file for JSON output. Operates on any dataset via --dataset. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Two layers: - Mock-based: SQL generation, BQ client calls, log arithmetic, main() wiring - DuckDB integration: executes real CTE SQL against in-memory rows, covers all 3 dup types + edge cases (overlapping types, 3+ copies, empty table) No real BQ connection needed — all tests run locally in ~2.5s. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Add --dataset CLI arg (default: global_butterflies_2604, backwards compatible) - Handle NULL fetch_status in get_pending_rows and MERGE condition for global_all_leps_2605 - Derive tmp_table dataset from training_table string, not hardcoded BQ_DATASET constant - Add 6 new tests covering dataset routing, NULL query handling, and tmp_table derivation Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Pass --dataset global_all_leps_2605 to download_images.py
- Use separate staging dir /scratch/melabbas/global_all_leps_2605/task_{N}/
- Rename job to bq_dl_2605 for squeue clarity
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Concurrent array tasks each MERGE chunk results into training_images; BigQuery aborts colliding DML with "Could not serialize access ... due to concurrent update" (400), which the client does not retry. Observed in production with 8 concurrent tasks (job 43176702_0). Retry only serialization conflicts, up to 10 attempts with jittered exponential backoff (2s base, 60s cap). Other BadRequest errors still raise immediately. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- --dataset flag replaces hardcoded global_butterflies_2604 - sample WHERE includes fetch_status IS NULL (new datasets have no 'downloaded' rows yet) and uses ORDER BY RAND() for unbiased size/distribution estimates - test tables renamed with leading underscore (_test_training_images, _test_training_images_downloads) so test artifacts sort together, separate from production tables; use with --table-prefix _test_ Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…eline Each array task now performs the full lifecycle for its MOD(photo_id, 60) slice — download (BQ-checkpointed 10k chunks) → count-verify → stream-merge to task_N.sqfs → count-verify → upload to Arbutus object store → byte-size verify → only then delete local files. Peak scratch is bounded by the array throttle (%K × ~2× task archive), not the dataset size. Replaces the previous all-at-once design which required the whole dataset on scratch before merging and never propagated failure exit codes (last command was notify, so afterok chaining and FAIL mail were broken). Tasks are idempotent (skip if remote archive exists) and resume-safe (prior chunks stashed to a resume_* subdir to avoid the chunk-numbering overwrite; already-attempted images skipped via the downloads table). Validated in production: jobs 43176702/43176730 (global_all_leps_2605). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.