Source code for cltk.embeddings.embeddings

"""Module for accessing pre-trained `fastText word embeddings
<https://fasttext.cc/>`_ and `Word2Vec embeddings from NLPL
<http://vectors.nlpl.eu/repository/>`_. Two sets of models are available
from fastText, one being trained only on corpora taken from
Wikipedia (`249 languages
<https://fasttext.cc/docs/en/pretrained-vectors.html>`_) and
the other being a combination of Wikipedia and Common Crawl
(`157 languages, a subset of the former
<https://fasttext.cc/docs/en/crawl-vectors.html>`_).

The Word2Vec models are in two versions, ``txt`` and ``bin``, with the
``txt`` being approximately twice the size and containing information
for retraining.

Note: In Oct 2022, we changed from the ``fasttext`` library to Spacy's ``floret``,
which contains ``fasttext``'s source but without its packaging problems.

# TODO: Classes ``Word2VecEmbeddings`` and ``FastTextEmbeddings`` contain duplicative code. Consider combining them.

# TODO: Instead of returning `None`, return an empty numpy array of correct len.
"""

import os
from typing import Optional
from zipfile import ZipFile

import numpy as np
from gensim import models  # type: ignore

from cltk.core.cltk_logger import logger
from cltk.core.exceptions import CLTKException, UnimplementedAlgorithmError
from cltk.data.fetch import FetchCorpus
from cltk.languages.utils import get_lang
from cltk.utils import CLTK_DATA_DIR, get_file_with_progress_bar, query_yes_no
from cltk.utils.file_operations import make_cltk_path

MAP_CLTK_SELF_HOSTED_LANGS: dict[str, str] = dict(enm="enm")

MAP_NLPL_LANG_TO_URL: dict[str, str] = dict(
    arb="http://vectors.nlpl.eu/repository/20/31.zip",
    chu="http://vectors.nlpl.eu/repository/20/60.zip",
    grc="http://vectors.nlpl.eu/repository/20/30.zip",
    lat="http://vectors.nlpl.eu/repository/20/56.zip",
)

MAP_LANGS_CLTK_FASTTEXT: dict[str, str] = {
    "ang": "ang",  # Anglo-Saxon
    "arb": "ar",  # Arabic
    "arc": "arc",  # Aramaic
    "got": "got",  # Gothic
    "lat": "la",  # Latin
    "pli": "pi",  # Pali
    "san": "sa",  # Sanskrit
}


[docs]class CLTKWord2VecEmbeddings: """Wrapper for self-hosted Word2Vec embeddings.""" def __init__( self, iso_code: str, model_type: str = "txt", interactive: bool = True, silent: bool = False, overwrite: bool = False, ): self.iso_code: str = iso_code self.model_type: str = model_type self.interactive: bool = interactive self.silent: bool = silent self.overwrite: bool = overwrite if self.interactive and self.silent: raise ValueError( "``interactive`` and ``silent`` options are not compatible with each other." ) self._check_input_params() self.model_path: str = make_cltk_path( self.iso_code, "model", f"{self.iso_code}_models_cltk", "semantics" ) # load model after all checks OK self.fp_model: str = self._build_filepath() if not self._is_model_present() or self.overwrite: self._download_cltk_self_hosted_models() elif self._is_model_present() and not self.overwrite: message: str = ( f"Model for '{self.iso_code}' / '{self.model_type}' already present " f"at '{self.fp_model}' and ``overwrite=False``." ) logger.info(message) self.model: models.word2vec.Word2Vec = self._load_model()
[docs] def _build_filepath(self) -> str: """Create filepath where chosen language should be found.""" model_dir: str = os.path.join( self.iso_code, "models", f"{self.iso_code}_models_cltk", "semantics" ) return os.path.join(model_dir, f"me_word_embeddings_model.{self.model_type}")
[docs] def get_word_vector(self, word: str) -> Optional[np.ndarray]: """Return embedding array.""" try: return self.model.wv.get_vector(word) except KeyError: return None
[docs] def get_embedding_length(self) -> int: """Return the embedding length for selected model.""" return self.model.vector_size
[docs] def get_sims(self, word: str): """Get similar words.""" return self.model.wv.most_similar(word)
[docs] def _check_input_params(self) -> None: """Confirm that input parameters are valid and in a valid configuration. """ # 1. check if lang valid get_lang(self.iso_code) # check if iso_code valid # 2. check if any fasttext embeddings for this lang if self.iso_code not in MAP_CLTK_SELF_HOSTED_LANGS: available_embeddings_str: str = "', '".join( MAP_CLTK_SELF_HOSTED_LANGS.keys() ) raise UnimplementedAlgorithmError( f"No embedding available for language '{self.iso_code}'." f" Self-hosted Word2Vec models available for: '{available_embeddings_str}'." ) # 3. assert that model type is valid valid_types: list[str] = ["bin", "txt"] if self.model_type not in valid_types: unavailable_types_str: str = "', '".join(valid_types) raise ValueError( f"Invalid ``model_type`` {self.model_type}. Valid model types: {unavailable_types_str}." )
[docs] def _download_cltk_self_hosted_models(self) -> None: """Perform complete download of Word2Vec models and save them in appropriate ``cltk_data`` dir. """ if not self.interactive: if not self.silent: print( f"CLTK message: Going to download the model ..." ) # pragma: no cover # TODO download git repository fetch_corpus = FetchCorpus(language=self.iso_code) fetch_corpus.import_corpus( corpus_name=f"{self.iso_code}_cltk_models", branch="main" ) else: print( # pragma: no cover "CLTK message: This part of the CLTK depends upon word embedding models from the NLPL project." ) # pragma: no cover dl_is_allowed: bool = query_yes_no( f"Do you want to download the {self.iso_code} models to {self.model_path}'?" ) if dl_is_allowed: fetch_corpus = FetchCorpus(language=self.iso_code) fetch_corpus.import_corpus( corpus_name=f"{self.iso_code}_models_cltk", branch="main" ) pass else: raise CLTKException(f"Impossible to download the model.")
[docs] def _is_model_present(self) -> bool: """Check if model in an otherwise valid filepath.""" if os.path.isdir(self.model_path): return True else: return False
[docs] def _load_model(self) -> models.word2vec.Word2Vec: """Load model into memory.""" try: return models.word2vec.Word2Vec.load( os.path.join(self.model_path, os.path.basename(self.fp_model)) ) except UnicodeDecodeError: msg = f"Cannot open file '{self.fp_model}' with Gensim 'load_word2vec_format'." print(msg) raise UnicodeDecodeError
[docs]class Word2VecEmbeddings: """Wrapper for Word2Vec embeddings. Note: For models provided by fastText, use class ``FastTextEmbeddings``. """ def __init__( self, iso_code: str, model_type: str = "txt", interactive: bool = True, silent: bool = False, overwrite: bool = False, ): """Constructor for ``Word2VecEmbeddings`` class.""" self.iso_code = iso_code self.model_type = model_type self.interactive = interactive self.silent = silent self.overwrite = overwrite if self.interactive and self.silent: raise ValueError( "``interactive`` and ``silent`` options are not compatible with each other." ) self._check_input_params() # load model after all checks OK self.fp_zip: str = self._build_zip_filepath() self.fp_model: str = self._build_nlpl_filepath() self.fp_model_dirs: str = os.path.split(self.fp_zip)[0] if not self._is_nlpl_model_present() or self.overwrite: self._download_nlpl_models() self._unzip_nlpl_model() elif self._is_nlpl_model_present() and not self.overwrite: message: str = ( f"Model for '{self.iso_code}' / '{self.model_type}' already present " f"at '{self.fp_model}' and ``overwrite=False``." ) logger.info(message) pass self.model: models.keyedvectors.Word2VecKeyedVectors = self._load_model()
[docs] def get_word_vector(self, word: str): """Return embedding array.""" try: return self.model.get_vector(word) except KeyError: return None
[docs] def get_embedding_length(self) -> int: """Return the embedding length for selected model.""" return self.model.vector_size
[docs] def get_sims(self, word: str): """Get similar words.""" return self.model.most_similar(word)
[docs] def _check_input_params(self) -> None: """Confirm that input parameters are valid and in a valid configuration. """ # 1. check if lang valid get_lang(self.iso_code) # check if iso_code valid # 2. check if any fasttext embeddings for this lang if self.iso_code not in MAP_NLPL_LANG_TO_URL: available_embeddings_str = "', '".join(MAP_NLPL_LANG_TO_URL.keys()) raise UnimplementedAlgorithmError( f"No embedding available for language '{self.iso_code}'. " f"Word2Vec models available for: '{available_embeddings_str}'." ) # 3. assert that model type is valid valid_types = ["bin", "txt"] if self.model_type not in valid_types: unavailable_types_str = "', '".join(valid_types) raise ValueError( f"Invalid ``model_type`` {self.model_type}. Valid model types: {unavailable_types_str}." )
[docs] def _build_zip_filepath(self) -> str: """Create filepath where .zip file will be saved.""" url_frag: str = MAP_NLPL_LANG_TO_URL[self.iso_code].split(".")[-2] nlpl_id = int(url_frag.split("/")[-1]) # str fp_zip: str = os.path.join( CLTK_DATA_DIR, f"{self.iso_code}/embeddings/nlpl/{nlpl_id}.zip" ) return fp_zip
[docs] def _build_nlpl_filepath(self) -> str: """Create filepath where chosen language should be found.""" model_dir: str = os.path.join( CLTK_DATA_DIR, f"{self.iso_code}/embeddings/nlpl/" ) return os.path.join(model_dir, f"model.{self.model_type}")
[docs] def _is_nlpl_model_present(self) -> bool: """Check if model in an otherwise valid filepath.""" if os.path.isfile(self.fp_model): return True else: return False
[docs] def _download_nlpl_models(self) -> None: """Perform complete download of Word2Vec models and save them in appropriate ``cltk_data`` dir. """ model_url = MAP_NLPL_LANG_TO_URL[self.iso_code] if not self.interactive: if not self.silent: print( f"CLTK message: Going to download file '{model_url}' to '{self.fp_zip} ..." ) # pragma: no cover get_file_with_progress_bar(model_url=model_url, file_path=self.fp_zip) else: print( # pragma: no cover "CLTK message: This part of the CLTK depends upon word embedding models from the NLPL project." ) # pragma: no cover dl_is_allowed: bool = query_yes_no( f"Do you want to download file '{model_url}' to '{self.fp_zip}'?" ) if dl_is_allowed: get_file_with_progress_bar(model_url=model_url, file_path=self.fp_zip) else: raise CLTKException( f"Download of necessary Stanza model declined for '{self.language}'. " f"Unable to continue with Stanza's processing." )
[docs] def _unzip_nlpl_model(self) -> None: """Unzip model""" with ZipFile(self.fp_zip, "r") as zipfile_obj: zipfile_obj.extractall(path=self.fp_model_dirs)
[docs] def _load_model(self) -> models.keyedvectors.Word2VecKeyedVectors: """Load model into memory. TODO: When testing show that this is a Gensim type TODO: Suppress Gensim info printout from screen """ # KJ added these two checks because NLPL embeddings # began erring in Gensim (Oct 2021) is_binary: bool = False unicode_errors: str = "strict" if self.fp_model.endswith(".txt"): unicode_errors = "ignore" if self.fp_model.endswith(".bin"): is_binary = True try: return models.KeyedVectors.load_word2vec_format( self.fp_model, binary=is_binary, unicode_errors=unicode_errors, ) except UnicodeDecodeError: msg = f"Cannot open file '{self.fp_model}' with Gensim 'load_word2vec_format'." print(msg) raise UnicodeDecodeError
[docs]class FastTextEmbeddings: """Wrapper for fastText embeddings.""" def __init__( self, iso_code: str, training_set: str = "wiki", model_type: str = "vec", interactive: bool = True, overwrite: bool = False, silent: bool = False, ): """Constructor for ``FastTextEmbeddings`` class.""" self.iso_code = iso_code self.training_set = training_set self.model_type = model_type self.interactive = interactive self.silent = silent self.overwrite = overwrite if self.interactive and self.silent: raise ValueError( "``interactive`` and ``silent`` options are not compatible with each other." ) self._check_input_params() # load model after all checks OK self.model_fp = self._build_fasttext_filepath() if not self._is_model_present() or self.overwrite: self.download_fasttext_models() elif self._is_model_present() and not self.overwrite: message = ( f"Model for '{self.iso_code}' / '{self.training_set}' / '{self.model_type}' already present " f"at '{self.model_fp}' and ``overwrite=False``." ) logger.info(message) self.model = self._load_model()
[docs] def get_word_vector(self, word: str): """Return embedding array.""" try: return self.model.get_vector(word) except KeyError: # TODO: To get an embedding from an OOV for sub-words, load the ``.bin`` file, too: `https://radimrehurek.com/gensim/models/fasttext.html#gensim.models.fasttext.load_facebook_model``_ return None
[docs] def get_embedding_length(self) -> int: """Return the embedding length for selected model.""" return self.model.vector_size
[docs] def get_sims(self, word: str): """Get similar words.""" return self.model.most_similar(word)
[docs] def download_fasttext_models(self): """Perform complete download of fastText models and save them in appropriate ``cltk_data`` dir. TODO: Add tests TODO: Implement ``overwrite`` TODO: error out better or continue to _load_model? """ model_url = self._build_fasttext_url() if not self.interactive: if not self.silent: print( f"CLTK message: Going to download file '{model_url}' to '{self.model_fp} ..." ) # pragma: no cover get_file_with_progress_bar(model_url=model_url, file_path=self.model_fp) else: print( # pragma: no cover "CLTK message: This part of the CLTK depends upon word embedding models from the Fasttext project." ) # pragma: no cover dl_is_allowed: bool = query_yes_no( f"Do you want to download file '{model_url}' to '{self.model_fp}'?" ) if dl_is_allowed: get_file_with_progress_bar(model_url=model_url, file_path=self.model_fp) else: raise CLTKException( f"Download of necessary Stanza model declined for '{self.iso_code}'. Unable to continue with Stanza's processing." )
[docs] def _is_model_present(self): """Check if model in an otherwise valid filepath.""" if os.path.isfile(self.model_fp): return True else: return False
[docs] def _check_input_params(self): """Look at combination of parameters give to class and determine if any invalid combination or missing models. """ # 1. check if lang valid get_lang(self.iso_code) # check if iso_code valid # 2. check if any fasttext embeddings for this lang if not self._is_fasttext_lang_available(): available_embeddings_str = "', '".join(MAP_LANGS_CLTK_FASTTEXT.keys()) raise UnimplementedAlgorithmError( f"No embedding available for language '{self.iso_code}'. FastTextEmbeddings available for: '{available_embeddings_str}'." ) # 3. check if requested model type is available for fasttext valid_model_types = ["bin", "vec"] if self.model_type not in valid_model_types: valid_model_types_str = "', '" raise CLTKException( f"Invalid model type '{self.model_type}'. Choose: '{valid_model_types_str}'." ) # 4. check if requested training set is available for language for fasttext training_sets = ["common_crawl", "wiki"] if self.training_set not in training_sets: training_sets_str = "', '".join(training_sets) raise CLTKException( f"Invalid ``training_set`` '{self.training_set}'. Available: '{training_sets_str}'." ) available_vectors = list() if self.training_set == "wiki": available_vectors = ["ang", "arb", "arc", "got", "lat", "pli", "san"] elif self.training_set == "common_crawl": available_vectors = ["arb", "lat", "san"] else: CLTKException("Unanticipated exception.") if self.iso_code in available_vectors: pass else: available_vectors_str = "', '".join(available_vectors) raise CLTKException( f"Training set '{self.training_set}' not available for language '{self.iso_code}'. Languages available for this training set: '{available_vectors_str}'." )
[docs] def _load_model(self): """Load model into memory. TODO: When testing show that this is a Gensim type TODO: Suppress Gensim info printout from screen """ return models.KeyedVectors.load_word2vec_format(self.model_fp)
[docs] def _is_fasttext_lang_available(self) -> bool: """Returns whether any vectors are available, for fastText, for the input language. This is not comprehensive of all fastText embeddings, only those added into the CLTK. """ get_lang(iso_code=self.iso_code) if self.iso_code not in MAP_LANGS_CLTK_FASTTEXT: return False else: return True
[docs] def _build_fasttext_filepath(self): """Create filepath at which to save a downloaded fasttext model. .. todo:: Do better than test for just name. Try trimming up to user home dir. >>> from cltk.embeddings.embeddings import FastTextEmbeddings # doctest: +SKIP >>> embeddings_obj = FastTextEmbeddings(iso_code="lat", silent=True) # doctest: +SKIP >>> vec_fp = embeddings_obj._build_fasttext_filepath() # doctest: +SKIP >>> os.path.split(vec_fp)[1] # doctest: +SKIP 'wiki.la.vec' >>> embeddings_obj = FastTextEmbeddings(iso_code="lat", training_set="bin", silent=True) # doctest: +SKIP >>> bin_fp = embeddings_obj._build_fasttext_filepath() # doctest: +SKIP >>> os.path.split(bin_fp)[1] # doctest: +SKIP 'wiki.la.bin' >>> embeddings_obj = FastTextEmbeddings(iso_code="lat", training_set="common_crawl", model_type="vec", silent=True) # doctest: +SKIP >>> os.path.split(vec_fp)[1] # doctest: +SKIP 'cc.la.300.vec' >>> embeddings_obj = FastTextEmbeddings(iso_code="lat", training_set="common_crawl", model_type="bin", silent=True) # doctest: +SKIP >>> bin_fp = embeddings_obj._build_fasttext_filepath() # doctest: +SKIP >>> vec_fp = embeddings_obj._build_fasttext_filepath() # doctest: +SKIP >>> os.path.split(bin_fp)[1] # doctest: +SKIP 'cc.la.300.bin' """ fasttext_code = MAP_LANGS_CLTK_FASTTEXT[self.iso_code] fp_model = None if self.training_set == "wiki": fp_model = os.path.join( CLTK_DATA_DIR, self.iso_code, "embeddings", "fasttext", f"wiki.{fasttext_code}.{self.model_type}", ) elif self.training_set == "common_crawl": fp_model = os.path.join( CLTK_DATA_DIR, self.iso_code, "embeddings", "fasttext", f"cc.{fasttext_code}.300.{self.model_type}", ) else: raise CLTKException(f"Unexpected ``training_set`` ``{self.training_set}``.") return fp_model
[docs] def _build_fasttext_url(self): """Make the URL at which the requested model may be downloaded.""" fasttext_code = MAP_LANGS_CLTK_FASTTEXT[self.iso_code] if self.training_set == "wiki": if self.model_type == "vec": ending = "vec" else: # for .bin ending = "zip" url = f"https://dl.fbaipublicfiles.com/fasttext/vectors-wiki/wiki.{fasttext_code}.{ending}" elif self.training_set == "common_crawl": url = f"https://dl.fbaipublicfiles.com/fasttext/vectors-crawl/cc.{fasttext_code}.300.{self.model_type}.gz" else: raise CLTKException("Unexpected exception.") return url