Skip to content

openai

OpenAI integration for CLTK.

Internal; no stability guarantees

This module provides a small wrapper class (:class:OpenAIConnection) around the OpenAI client and high‑level helpers to generate linguistic annotations from LLMs for a given language (resolved by Glottolog ID).

OpenAIConnection

OpenAIConnection(
    model: AVAILABLE_OPENAI_MODELS,
    api_key: Optional[str] = None,
    temperature: float = 1.0,
)

Thin wrapper around the OpenAI client for CLTK use cases.

Parameters:

  • api_key (Optional[str], default: None ) –

    OpenAI API key.

  • model (AVAILABLE_OPENAI_MODELS) –

    Small set of supported model aliases.

  • temperature (float, default: 1.0 ) –

    Sampling temperature (default 1.0).

Attributes:

  • client

    OpenAI client instance.

Initialize the client and resolve language/dialect metadata.

Source code in cltk/genai/openai.py
def __init__(
    self,
    model: AVAILABLE_OPENAI_MODELS,
    api_key: Optional[str] = None,
    temperature: float = 1.0,
):
    """Initialize the client and resolve language/dialect metadata."""
    self.api_key = api_key
    self.model: str = model
    self.temperature: float = temperature
    if not self.api_key:
        load_env_file()
        self.api_key = os.environ.get("OPENAI_API_KEY")
    if not self.api_key:
        msg: str = "OPENAI_API_KEY not found. Please set it in your environment or in a .env file."
        # Bind with model context even before self.log is available
        bind_context(model=str(model)).error(msg)
        raise ValueError(msg)
    # Use patched OpenAI if provided by tests; else import lazily
    openai_cls = OpenAI
    if openai_cls is None:  # pragma: no cover - import only if needed
        try:
            from openai import OpenAI as runtime_openai
        except Exception as e:
            raise ImportError(
                "OpenAI client not installed. Install with: pip install 'cltk[openai]'"
            ) from e
        openai_cls = runtime_openai
    self.client = openai_cls(api_key=self.api_key)
    # Structured logger bound with model identifier
    self.log = bind_context(model=str(self.model))

api_key instance-attribute

api_key = api_key

model instance-attribute

model: str = model

temperature instance-attribute

temperature: float = temperature

client instance-attribute

client = openai_cls(api_key=api_key)

log instance-attribute

log = bind_context(model=str(model))

generate

generate(
    prompt: str, max_retries: int = 2
) -> CLTKGenAIResponse

Call the OpenAI responses API synchronously with retries and code-block parsing.

Source code in cltk/genai/openai.py
def generate(
    self,
    prompt: str,
    max_retries: int = 2,
) -> CLTKGenAIResponse:
    """Call the OpenAI responses API synchronously with retries and code-block parsing."""
    # Avoid logging full prompt contents unless explicitly enabled
    import os as _os

    if _os.getenv("CLTK_LOG_CONTENT", "").strip().lower() in {
        "1",
        "true",
        "yes",
        "on",
    }:
        self.log.debug(prompt)
    code_block: Optional[str] = None
    openai_response: Optional[Any] = None
    attempt: Optional[int] = None
    # Accumulate tokens across attempts (including failed ones)
    agg_tokens: dict[str, int] = {"input": 0, "output": 0, "total": 0}
    for attempt in range(1, max_retries + 1):
        self.log.debug(f"Attempt {attempt} of {max_retries}")
        try:
            # TODO: Disable 4.1
            if "4.1" in self.model:
                openai_response = self.client.responses.create(
                    model=self.model, input=prompt, temperature=self.temperature
                )
            elif "-5" in self.model:
                openai_response = self.client.responses.create(
                    model=self.model,
                    input=prompt,
                    # TODO: Add params for these
                    reasoning={"effort": "low"},
                    text={"verbosity": "low"},
                )
            else:
                raise ValueError(f"Unsupported model: {self.model}.")
        except OpenAIError as openai_error:
            raise OpenAIInferenceError(
                f"An error from OpenAI occurred: {openai_error}"
            )
        if _os.getenv("CLTK_LOG_CONTENT", "").strip().lower() in {
            "1",
            "true",
            "yes",
            "on",
        }:
            self.log.debug(
                f"Raw response from OpenAI: {openai_response.output_text}"
            )
        # Add usage from this attempt even if parsing fails
        try:
            tok = self._openai_response_tokens(openai_response)
            for k in ("input", "output", "total"):
                agg_tokens[k] += tok.get(k, 0)
        except Exception:
            pass
        try:
            code_block = self._extract_code_blocks(text=openai_response.output_text)
        except Exception as e:
            # TODO: Count tokens used for failed attempts, too
            self.log.error(f"Error extracting code block: {e}")
            continue
        if code_block:
            break  # Success, exit retry loop
        else:
            self.log.warning(
                f"Attempt {attempt}: No code block found in OpenAI response. Retrying..."
            )
            if attempt == max_retries:
                final_err = "No code blocks found in OpenAI response after retries."
                self.log.error(final_err)
                # logger.error(raw_openai_response_normalized)
                raise CLTKException(final_err)
                # return doc
            # Optionally, you could modify the prompt or add a delay here
    assert openai_response
    # Use the accumulated usage across all attempts
    openai_usage: dict[str, int] = agg_tokens
    raw_openai_response_normalized: str = cltk_normalize(
        text=openai_response.output_text
    )
    if _os.getenv("CLTK_LOG_CONTENT", "").strip().lower() in {
        "1",
        "true",
        "yes",
        "on",
    }:
        self.log.debug(
            f"raw_openai_response_normalized:\n{raw_openai_response_normalized}"
        )
    self.log.debug(f"Completed generation() after {attempt} attempts")
    # return {"response": raw_openai_response_normalized, "usage": openai_usage}
    return CLTKGenAIResponse(
        response=raw_openai_response_normalized, usage=openai_usage
    )

AsyncOpenAIConnection

AsyncOpenAIConnection(
    model: AVAILABLE_OPENAI_MODELS,
    api_key: Optional[str] = None,
    temperature: float = 1.0,
)

Asynchronous variant of :class:OpenAIConnection.

Provides an async generate_async() method and uses the AsyncOpenAI client under the hood. Mirrors the behavior and logging of the synchronous client while enabling concurrent requests.

Parameters:

  • model (AVAILABLE_OPENAI_MODELS) –

    Model alias to use (see AVAILABLE_OPENAI_MODELS).

  • api_key (Optional[str], default: None ) –

    Optional OpenAI API key. Falls back to OPENAI_API_KEY.

  • temperature (float, default: 1.0 ) –

    Sampling temperature for generation.

Source code in cltk/genai/openai.py
def __init__(
    self,
    model: AVAILABLE_OPENAI_MODELS,
    api_key: Optional[str] = None,
    temperature: float = 1.0,
) -> None:
    self.api_key = api_key
    self.model: str = model
    self.temperature: float = temperature
    if not self.api_key:
        load_env_file()
        self.api_key = os.environ.get("OPENAI_API_KEY")
    if not self.api_key:
        msg: str = "OPENAI_API_KEY not found. Please set it in your environment or in a .env file."
        bind_context(model=str(model)).error(msg)
        raise ValueError(msg)
    async_openai_cls = AsyncOpenAI
    if async_openai_cls is None:  # pragma: no cover - import only if needed
        try:
            from openai import AsyncOpenAI as runtime_async_openai
        except Exception as e:
            raise ImportError(
                "OpenAI client not installed. Install with: pip install 'cltk[openai]'"
            ) from e
        async_openai_cls = runtime_async_openai
    self.client = async_openai_cls(api_key=self.api_key)
    # Structured logger bound with model identifier
    self.log = bind_context(model=str(self.model))

api_key instance-attribute

api_key = api_key

model instance-attribute

model: str = model

temperature instance-attribute

temperature: float = temperature

client instance-attribute

client = async_openai_cls(api_key=api_key)

log instance-attribute

log = bind_context(model=str(model))

generate_async async

generate_async(
    prompt: str, max_retries: int = 2
) -> CLTKGenAIResponse

Call the OpenAI responses API asynchronously with retries.

Source code in cltk/genai/openai.py
async def generate_async(
    self,
    prompt: str,
    max_retries: int = 2,
) -> CLTKGenAIResponse:
    """Call the OpenAI responses API asynchronously with retries."""
    import os as _os

    if _os.getenv("CLTK_LOG_CONTENT", "").strip().lower() in {
        "1",
        "true",
        "yes",
        "on",
    }:
        self.log.debug("[async] Prompt being sent to OpenAI:\n%s", prompt)
    code_block: Optional[str] = None
    openai_response: Optional[Any] = None
    agg_tokens: dict[str, int] = {"input": 0, "output": 0, "total": 0}
    for attempt in range(1, max_retries + 1):
        self.log.debug("[async] Attempt %s of %s", attempt, max_retries)
        try:
            if "4.1" in self.model:
                openai_response = await self.client.responses.create(
                    model=self.model,
                    input=prompt,
                    temperature=self.temperature,
                )
            elif "-5" in self.model:
                openai_response = await self.client.responses.create(
                    model=self.model,
                    input=prompt,
                    reasoning={"effort": "low"},
                    text={"verbosity": "low"},
                )
            else:
                raise ValueError(f"Unsupported model: {self.model}.")
        except OpenAIError as openai_error:
            self.log.error(
                "[async] OpenAI error on attempt %s: %s", attempt, openai_error
            )
            if attempt == max_retries:
                raise OpenAIInferenceError(
                    f"An error from OpenAI occurred: {openai_error}"
                )
            continue

        self.log.debug(
            "[async] Raw response from OpenAI: %s", openai_response.output_text
        )
        # Track usage for this attempt (even if parsing fails)
        try:
            tok = self._openai_response_tokens(openai_response)
            for k in ("input", "output", "total"):
                agg_tokens[k] += tok.get(k, 0)
        except Exception:
            pass
        try:
            code_block = self._extract_code_blocks(openai_response.output_text)
        except Exception as e:  # pragma: no cover - defensive
            self.log.error("[async] Error extracting code block: %s", e)
            code_block = None
        if code_block:
            break
        self.log.warning(
            "[async] Attempt %s: No code block found in response. Retrying...",
            attempt,
        )

    assert openai_response is not None
    usage = agg_tokens
    raw_normalized: str = cltk_normalize(text=openai_response.output_text)
    if _os.getenv("CLTK_LOG_CONTENT", "").strip().lower() in {
        "1",
        "true",
        "yes",
        "on",
    }:
        self.log.debug("[async] Normalized output text:\n%s", raw_normalized)
    return CLTKGenAIResponse(response=raw_normalized, usage=usage)