Skip to content

spec_io

I/O helpers for pipeline specifications.

load_pipeline_spec

load_pipeline_spec(path: str | Path) -> PipelineSpec

Load a TOML pipeline spec from disk.

Source code in cltk/pipeline/spec_io.py
def load_pipeline_spec(path: str | Path) -> PipelineSpec:
    """Load a TOML pipeline spec from disk."""
    spec_path = Path(path)
    data = _toml.loads(spec_path.read_text(encoding="utf-8"))
    if not isinstance(data, dict):
        raise ValueError("Pipeline TOML must decode to a table.")

    step_table = data.get("step") or {}
    step_entries = (
        _flatten_step_table(step_table) if isinstance(step_table, dict) else {}
    )

    overrides_table = data.get("step_overrides") or {}
    overrides = (
        _flatten_step_table(overrides_table)
        if isinstance(overrides_table, dict)
        else {}
    )

    steps_order = data.get("steps")
    preset_name = data.get("preset")
    language = data.get("language")
    meta = data.get("meta") or {}

    base_steps: list[StepSpec] = []
    base_configs: dict[str, dict[str, Any]] = {}
    base_enabled: dict[str, bool] = {}

    if steps_order is None:
        if not preset_name:
            presets = ", ".join(list_presets())
            raise ValueError(
                f"Missing steps list. Provide 'steps' or a 'preset'. Available presets: {presets}"
            )
        preset = get_preset(preset_name)
        base_steps = preset.steps or []
        base_configs = {step.id: dict(step.config) for step in base_steps}
        base_enabled = {step.id: step.enabled for step in base_steps}
        if language is None:
            language = preset.language
    else:
        if not isinstance(steps_order, list) or not all(
            isinstance(step, str) for step in steps_order
        ):
            raise ValueError("'steps' must be a list of process_id strings.")

    ordered_ids: list[str] = (
        [step.id for step in base_steps] if steps_order is None else list(steps_order)
    )

    extras = [step_id for step_id in step_entries if step_id not in ordered_ids]
    if extras and steps_order is None:
        logger.info("Appending un-ordered steps from [step.*]: %s", ", ".join(extras))
        ordered_ids.extend(sorted(extras))

    merged_overrides = {
        step_id: dict(cfg)
        for step_id, cfg in overrides.items()
        if isinstance(cfg, dict)
    }

    steps: list[StepSpec] = []
    for step_id in ordered_ids:
        config: dict[str, Any] = {}
        enabled = base_enabled.get(step_id, True)
        if step_id in base_configs:
            config.update(base_configs[step_id])
        entry = step_entries.get(step_id, {})
        if entry:
            entry = dict(entry)
            enabled = bool(entry.pop("enabled", enabled))
            config.update(entry)
        if step_id in merged_overrides:
            config.update(merged_overrides[step_id])
        steps.append(StepSpec(id=step_id, enabled=enabled, config=config))

    return PipelineSpec(
        preset=preset_name,
        language=language,
        steps=steps,
        step_overrides=merged_overrides,
        meta=meta if isinstance(meta, dict) else {},
    )