Shard Utilities

mmirage.shard_process

Main entry point for processing a single shard.

Main script for processing dataset shards with MMIRAGE.

Supports both text-only and multimodal (vision-language) processing.

mmirage.shard_process.rewrite_batch(batch, mapper, renderer, image_base_path=None)[source]

Rewrite a batch of samples by applying transformations. :param batch: Dictionary mapping column names to lists of values. :param mapper: MMIRAGEMapper for processing transformations. :param renderer: TemplateRenderer for generating output. :param image_base_path: Optional base directory for resolving relative image paths.

Returns:

Dictionary mapping output keys to lists of rendered values.

Raises:

ValueError – If variables are not computable given the configuration.

Parameters:
Return type:

Dict[str, List[Any]]

mmirage.shard_process.main()[source]

Process a single shard of the dataset. Loads configuration, datasets, processes the shard using MMIRAGE transformations (including multimodal), and saves the result to disk.

mmirage.shard_utils

Low-level helpers for shard state management, atomic saves, and status markers.

Utility functions for shard and merge processing.

This module contains helper functions for dataset sharding, state management, and file operations used in the MMIRAGE shard processing pipeline.

class mmirage.shard_utils.ShardStatus(status='unknown', retry_count=0, shard_id=None, started_at=None, finished_at=None, error=None, hostname=None, pid=None, slurm_job_id=None, slurm_array_task_id=None, datasets=None)[source]

Bases: object

Typed representation of the shard status.json payload.

Parameters:
  • status (str)

  • retry_count (int)

  • shard_id (int | None)

  • started_at (str | None)

  • finished_at (str | None)

  • error (str | None)

  • hostname (str | None)

  • pid (int | None)

  • slurm_job_id (str | None)

  • slurm_array_task_id (str | None)

  • datasets (List[Dict[str, Any]] | None)

status: str = 'unknown'
retry_count: int = 0
shard_id: int | None = None
started_at: str | None = None
finished_at: str | None = None
error: str | None = None
hostname: str | None = None
pid: int | None = None
slurm_job_id: str | None = None
slurm_array_task_id: str | None = None
datasets: List[Dict[str, Any]] | None = None
classmethod from_dict(payload)[source]

Build a status object from a JSON payload.

Parameters:

payload (Dict[str, Any])

Return type:

ShardStatus

to_dict()[source]

Serialize status to the JSON payload written on disk.

Return type:

Dict[str, Any]

class mmirage.shard_utils.MergeReport(dataset_name, input_dir, output_dir, used_shards, merged_rows, skipped_invalid_dirs, skipped_zero_rows)[source]

Bases: object

Summary of a merge operation for one dataset directory.

Parameters:
  • dataset_name (str)

  • input_dir (str)

  • output_dir (str)

  • used_shards (int)

  • merged_rows (int)

  • skipped_invalid_dirs (int)

  • skipped_zero_rows (int)

dataset_name: str
input_dir: str
output_dir: str
used_shards: int
merged_rows: int
skipped_invalid_dirs: int
skipped_zero_rows: int

mmirage.merge_shards

Dataset shard merging logic.

Merge processed dataset shards.

mmirage.merge_shards.merge_dataset_dir(dataset_dir, output_dir)[source]

Merge one dataset directory containing shard_* folders.

Parameters:
  • dataset_dir (str) – Input directory containing shard_* folders.

  • output_dir (str) – Destination directory for merged dataset.

Returns:

MergeReport with summary details.

Return type:

MergeReport

mmirage.merge_shards.merge_input_dir(input_dir, output_dir)[source]

Merge all shard datasets found under an input directory.

The input can be either: - one dataset dir containing shard_* folders directly - a parent dir containing multiple dataset subdirectories, each with shard_*

Parameters:
  • input_dir (str)

  • output_dir (str)

Return type:

List[MergeReport]

mmirage.merge_shards.merge_from_config(cfg, output_root=None)[source]

Merge shard outputs described in config.loading_params.datasets.

Parameters:
  • cfg (MMirageConfig) – Loaded MMIRAGE config.

  • output_root (str | None) – Optional destination root. If omitted, each dataset writes into <dataset.output_dir>/merged.

Returns:

Merge reports for each dataset entry.

Return type:

List[MergeReport]

mmirage.merge_shards.main()[source]

CLI entrypoint for directory-based shard merging. Scans –input-dir for dataset subdirectories containing shard_* folders. For each dataset directory, merges shard datasets and writes directly to the provided –output-dir.

mmirage.cli_utils.status

Shard status checking and retry helpers.

Shard status and retry helpers for the MMIRAGE CLI.

class mmirage.cli_utils.status.ShardSummary(total, successful, running, failed, max_retries_exceeded)[source]

Bases: object

Compact status summary for shard execution.

Parameters:
  • total (int)

  • successful (int)

  • running (int)

  • failed (int)

  • max_retries_exceeded (int)

total: int
successful: int
running: int
failed: int
max_retries_exceeded: int
mmirage.cli_utils.status.max_allowed_attempts(max_retries)[source]

Return max allowed total attempts for a shard.

Total attempts = initial attempt + max_retries.

Parameters:

max_retries (int)

Return type:

int

mmirage.cli_utils.status.is_retry_budget_exceeded(attempt_count, max_retries)[source]

Return whether a shard has exceeded the retry budget.

Parameters:
  • attempt_count (int)

  • max_retries (int)

Return type:

bool

mmirage.cli_utils.status.shard_state_dir(state_root, shard_id)[source]

Return the state directory for a shard.

Parameters:
  • state_root (str)

  • shard_id (int)

Return type:

str

mmirage.cli_utils.status.get_shard_status(state_dir)[source]

Read the current status and attempt counter for a shard.

Parameters:

state_dir (str)

Return type:

Tuple[str, int]

mmirage.cli_utils.status.check_failed_shards(cfg)[source]

Return retryable failed shards and a compact summary.

Parameters:

cfg (MMirageConfig)

Return type:

Tuple[List[int], ShardSummary]

mmirage.cli_utils.status.confirm_retry(count, confirm_mode)[source]

Return whether retry submission is confirmed.

Modes: - prompt: ask the user interactively - yes: submit without prompting

Parameters:
  • count (int)

  • confirm_mode (Literal['prompt', 'yes'])

Return type:

bool

mmirage.cli_utils.status.status_exit_code(failed_shards, summary)[source]

Map shard status to an exit code.

Parameters:
Return type:

int

mmirage.cli_utils.status.submit_failed_shards(cfg, config_path, failed_shards, confirm_mode)[source]

Submit retry jobs for failed shards when requested.

Parameters:
Return type:

int

mmirage.cli_utils.slurm

SLURM submission and job monitoring helpers.

SLURM helpers for the MMIRAGE CLI.

mmirage.cli_utils.slurm.build_sbatch_script(cfg, config_path)[source]

Build the sbatch payload executed for each array task.

Parameters:
Return type:

str

mmirage.cli_utils.slurm.submit_slurm_job(cfg, config_path, shard_ids=None)[source]

Submit a SLURM array job and return its job ID.

Parameters:
Return type:

int | None

mmirage.cli_utils.slurm.wait_for_slurm_job(job_id, cfg)[source]

Wait for a SLURM job array to leave the queue.

Parameters:
Return type:

None

mmirage.cli_utils.slurm.require_slurm(cfg, command_name)[source]

Ensure command can only run in SLURM mode.

Parameters:
Return type:

int

mmirage.cli_utils.runtime

Runtime environment setup helpers.

Runtime/path helpers for the MMIRAGE CLI.

mmirage.cli_utils.runtime.expand_path(path, project_root=None)[source]

Expand environment variables, user home and relative paths.

Parameters:
  • path (str)

  • project_root (str | None)

Return type:

str

mmirage.cli_utils.runtime.get_project_root(cfg)[source]

Return the configured project root, or the current working directory.

Parameters:

cfg (MMirageConfig)

Return type:

str

mmirage.cli_utils.runtime.create_directories(paths)[source]

Create directories if they do not already exist.

Parameters:

paths (Sequence[str])

Return type:

None

mmirage.cli_utils.runtime.validate_edf_env_path(cfg)[source]

Validate the optional EDF environment file path.

Parameters:

cfg (MMirageConfig)

Return type:

None

mmirage.cli_utils.runtime.add_file_logging(log_file, level)[source]

Add a file handler so logs are also written to disk.

Parameters:
Return type:

None

mmirage.cli_utils.runtime.setup_runtime(cfg, log_level)[source]

Initialize runtime-level logging.

Parameters:
Return type:

None