Skip to content

cltk.pipeline

Declarative pipeline specifications and compilation helpers.

PipelineSpec

Bases: BaseModel

Declarative pipeline definition.

preset class-attribute instance-attribute

preset: Optional[str] = None

language class-attribute instance-attribute

language: Optional[str] = None

steps class-attribute instance-attribute

steps: Optional[list[StepSpec]] = None

step_overrides class-attribute instance-attribute

step_overrides: dict[str, dict[str, Any]] = Field(
    default_factory=dict
)

meta class-attribute instance-attribute

meta: dict[str, Any] = Field(default_factory=dict)

model_config class-attribute instance-attribute

model_config = {'extra': 'forbid'}

StepSpec

Bases: BaseModel

Single pipeline step definition.

id instance-attribute

id: str

enabled class-attribute instance-attribute

enabled: bool = True

config class-attribute instance-attribute

config: dict[str, Any] = Field(default_factory=dict)

model_config class-attribute instance-attribute

model_config = {'extra': 'forbid'}

compile_pipeline

compile_pipeline(spec: PipelineSpec) -> Pipeline

Compile a PipelineSpec into a Pipeline instance.

Source code in cltk/pipeline/compiler.py
def compile_pipeline(spec: PipelineSpec) -> Pipeline:
    """Compile a PipelineSpec into a Pipeline instance."""
    steps = _resolve_steps(spec)
    compiled = Pipeline(
        processes=compile_processes(spec),
        glottolog_id=spec.language,
        spec=PipelineSpec(
            preset=spec.preset,
            language=spec.language,
            steps=steps,
            step_overrides=spec.step_overrides,
            meta=spec.meta,
        ),
    )
    return compiled

compile_processes

compile_processes(spec: PipelineSpec) -> list[Any]

Compile a PipelineSpec into an ordered list of Process instances.

Source code in cltk/pipeline/compiler.py
def compile_processes(spec: PipelineSpec) -> list[Any]:
    """Compile a PipelineSpec into an ordered list of Process instances."""
    steps = _resolve_steps(spec)
    processes: list[Any] = []
    for step in steps:
        if not step.enabled:
            continue
        process_cls = ProcessRegistry.get_process(step.id)
        config = dict(step.config)
        if spec.language and "glottolog_id" not in config:
            config["glottolog_id"] = spec.language
        if spec.step_overrides and step.id in spec.step_overrides:
            config.update(spec.step_overrides[step.id])
        processes.append(process_cls(**config))
    return processes

get_preset

get_preset(name: str) -> PipelineSpec

Return a deep copy of the named preset.

Source code in cltk/pipeline/presets.py
def get_preset(name: str) -> PipelineSpec:
    """Return a deep copy of the named preset."""
    try:
        preset = _PRESETS[name]
    except KeyError as exc:
        available = ", ".join(sorted(_PRESETS))
        raise KeyError(f"Unknown preset '{name}'. Available: {available}") from exc
    return preset.model_copy(deep=True)

list_presets

list_presets() -> list[str]

Return available preset names.

Source code in cltk/pipeline/presets.py
def list_presets() -> list[str]:
    """Return available preset names."""
    return sorted(_PRESETS)

load_pipeline_spec

load_pipeline_spec(path: str | Path) -> PipelineSpec

Load a TOML pipeline spec from disk.

Source code in cltk/pipeline/spec_io.py
def load_pipeline_spec(path: str | Path) -> PipelineSpec:
    """Load a TOML pipeline spec from disk."""
    spec_path = Path(path)
    data = _toml.loads(spec_path.read_text(encoding="utf-8"))
    if not isinstance(data, dict):
        raise ValueError("Pipeline TOML must decode to a table.")

    step_table = data.get("step") or {}
    step_entries = (
        _flatten_step_table(step_table) if isinstance(step_table, dict) else {}
    )

    overrides_table = data.get("step_overrides") or {}
    overrides = (
        _flatten_step_table(overrides_table)
        if isinstance(overrides_table, dict)
        else {}
    )

    steps_order = data.get("steps")
    preset_name = data.get("preset")
    language = data.get("language")
    meta = data.get("meta") or {}

    base_steps: list[StepSpec] = []
    base_configs: dict[str, dict[str, Any]] = {}
    base_enabled: dict[str, bool] = {}

    if steps_order is None:
        if not preset_name:
            presets = ", ".join(list_presets())
            raise ValueError(
                f"Missing steps list. Provide 'steps' or a 'preset'. Available presets: {presets}"
            )
        preset = get_preset(preset_name)
        base_steps = preset.steps or []
        base_configs = {step.id: dict(step.config) for step in base_steps}
        base_enabled = {step.id: step.enabled for step in base_steps}
        if language is None:
            language = preset.language
    else:
        if not isinstance(steps_order, list) or not all(
            isinstance(step, str) for step in steps_order
        ):
            raise ValueError("'steps' must be a list of process_id strings.")

    ordered_ids: list[str] = (
        [step.id for step in base_steps] if steps_order is None else list(steps_order)
    )

    extras = [step_id for step_id in step_entries if step_id not in ordered_ids]
    if extras and steps_order is None:
        logger.info("Appending un-ordered steps from [step.*]: %s", ", ".join(extras))
        ordered_ids.extend(sorted(extras))

    merged_overrides = {
        step_id: dict(cfg)
        for step_id, cfg in overrides.items()
        if isinstance(cfg, dict)
    }

    steps: list[StepSpec] = []
    for step_id in ordered_ids:
        config: dict[str, Any] = {}
        enabled = base_enabled.get(step_id, True)
        if step_id in base_configs:
            config.update(base_configs[step_id])
        entry = step_entries.get(step_id, {})
        if entry:
            entry = dict(entry)
            enabled = bool(entry.pop("enabled", enabled))
            config.update(entry)
        if step_id in merged_overrides:
            config.update(merged_overrides[step_id])
        steps.append(StepSpec(id=step_id, enabled=enabled, config=config))

    return PipelineSpec(
        preset=preset_name,
        language=language,
        steps=steps,
        step_overrides=merged_overrides,
        meta=meta if isinstance(meta, dict) else {},
    )

Submodules