# SPDX-FileCopyrightText: 2016-2026 PyThaiNLP Project
# SPDX-FileType: SOURCE
# SPDX-License-Identifier: Apache-2.0
"""Corpus related functions."""
from __future__ import annotations
import json
import os
import re
import sys
import tarfile
import zipfile
from http.client import HTTPMessage, HTTPResponse
from importlib.resources import files
from typing import TYPE_CHECKING
from pythainlp import __version__
from pythainlp.corpus import corpus_db_path, corpus_db_url, corpus_path
from pythainlp.tools import get_full_data_path
if TYPE_CHECKING:
from typing import Any, Optional
_CHECK_MODE: Optional[str] = os.getenv("PYTHAINLP_READ_MODE")
_USER_AGENT: str = (
f"PyThaiNLP/{__version__} "
f"(Python/{sys.version_info.major}.{sys.version_info.minor}; "
f"{sys.platform})"
)
class _ResponseWrapper:
"""Wrapper to provide requests.Response-like interface for urllib response."""
status_code: int
headers: HTTPMessage
_content: bytes
def __init__(self, response: HTTPResponse) -> None:
self.status_code = response.status
self.headers = response.headers
self._content = response.read()
def json(self) -> dict[str, Any]:
"""Parse JSON content from response."""
try:
return json.loads(self._content.decode("utf-8")) # type: ignore[no-any-return]
except (json.JSONDecodeError, UnicodeDecodeError) as err:
raise ValueError(f"Failed to parse JSON response: {err}") from err
[docs]
def get_corpus_db(url: str) -> Optional[_ResponseWrapper]:
"""Get corpus catalog from server.
:param str url: URL corpus catalog
Security Note: Uses HTTPS with certificate validation enabled by default
in Python's urllib. Only download corpus from trusted URLs.
"""
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
corpus_db = None
try:
req = Request(url, headers={"User-Agent": _USER_AGENT})
# SSL certificate verification is enabled by default
with urlopen(req, timeout=10) as response:
corpus_db = _ResponseWrapper(response)
except HTTPError as http_err:
print(f"HTTP error occurred: {http_err}")
except URLError as err:
print(f"URL error occurred: {err}")
except Exception as err:
print(f"Error occurred: {err}")
return corpus_db
[docs]
def get_corpus_db_detail(name: str, version: str = "") -> dict[str, Any]:
"""Get details about a corpus, using information from local catalog.
:param str name: name of corpus
:return: details about corpus
:rtype: dict
"""
with open(corpus_db_path(), encoding="utf-8-sig") as f:
local_db = json.load(f)
if not version:
for corpus in local_db["_default"].values():
if corpus["name"] == name:
return corpus # type: ignore[no-any-return]
else:
for corpus in local_db["_default"].values():
if corpus["name"] == name and corpus["version"] == version:
return corpus # type: ignore[no-any-return]
return {}
def path_pythainlp_corpus(filename: str) -> str:
"""Get path pythainlp.corpus data
:param str filename: filename of the corpus to be read
:return: : path of corpus
:rtype: str
"""
return os.path.join(corpus_path(), filename)
[docs]
def get_corpus(filename: str, comments: bool = True) -> frozenset[str]:
"""Read corpus data from file and return a frozenset.
Each line in the file will be a member of the set.
Whitespace stripped and empty values and duplicates removed.
If comments is False, any text at any position after the character
'#' in each line will be discarded.
:param str filename: filename of the corpus to be read
:param bool comments: keep comments
:return: :class:`frozenset` consisting of lines in the file
:rtype: :class:`frozenset`
:Example:
::
from pythainlp.corpus import get_corpus
# input file (negations_th.txt):
# แต่
# ไม่
get_corpus("negations_th.txt")
# output:
# frozenset({'แต่', 'ไม่'})
# input file (ttc_freq.txt):
# ตัวบท<tab>10
# โดยนัยนี้<tab>1
get_corpus("ttc_freq.txt")
# output:
# frozenset({'โดยนัยนี้\\t1',
# 'ตัวบท\\t10',
# ...})
# input file (icubrk_th.txt):
# # Thai Dictionary for ICU BreakIterator
# กก
# กกขนาก
get_corpus("icubrk_th.txt")
# output:
# frozenset({'กกขนาก',
# '# Thai Dictionary for ICU BreakIterator',
# 'กก',
# ...})
get_corpus("icubrk_th.txt", comments=False)
# output:
# frozenset({'กกขนาก',
# 'กก',
# ...})
"""
corpus_files = files("pythainlp.corpus")
corpus_file = corpus_files.joinpath(filename)
text = corpus_file.read_text(encoding="utf-8-sig")
lines = text.splitlines()
if not comments:
# if the line has a '#' character, take only text before the first '#'
lines = [line.split("#", 1)[0].strip() for line in lines]
return frozenset(filter(None, lines))
[docs]
def get_corpus_as_is(filename: str) -> list[str]:
"""Read corpus data from file, as it is, and return a list.
Each line in the file will be a member of the list.
No modifications in member values and their orders.
If strip or comment removal is needed, use get_corpus() instead.
:param str filename: filename of the corpus to be read
:return: :class:`list` consisting of lines in the file
:rtype: :class:`list`
:Example:
::
from pythainlp.corpus import get_corpus
# input file (negations_th.txt):
# แต่
# ไม่
get_corpus_as_is("negations_th.txt")
# output:
# ['แต่', 'ไม่']
"""
corpus_files = files("pythainlp.corpus")
corpus_file = corpus_files.joinpath(filename)
text = corpus_file.read_text(encoding="utf-8-sig")
lines = text.splitlines()
return lines
[docs]
def get_corpus_default_db(name: str, version: str = "") -> Optional[str]:
"""Get model path from default_db.json
:param str name: corpus name
:return: path to the corpus or **None** if the corpus doesn't \
exist on the device
:rtype: str
If you want to edit default_db.json, \
you can edit pythainlp/corpus/default_db.json
"""
corpus_files = files("pythainlp.corpus")
default_db_file = corpus_files.joinpath("default_db.json")
text = default_db_file.read_text(encoding="utf-8-sig")
corpus_db = json.loads(text)
if name in corpus_db:
if version in corpus_db[name]["versions"]:
return path_pythainlp_corpus(
corpus_db[name]["versions"][version]["filename"]
)
elif not version: # load latest version
version = corpus_db[name]["latest_version"]
return path_pythainlp_corpus(
corpus_db[name]["versions"][version]["filename"]
)
return None
[docs]
def get_corpus_path(
name: str, version: str = "", force: bool = False
) -> Optional[str]:
"""Get corpus path.
:param str name: corpus name
:param str version: version
:param bool force: force downloading
:return: path to the corpus or **None** if the corpus doesn't \
exist on the device
:rtype: str
:Example:
(Please see the filename in
`this file
<https://pythainlp.org/pythainlp-corpus/db.json>`_
If the corpus already exists::
from pythainlp.corpus import get_corpus_path
print(get_corpus_path('ttc'))
# output: /root/pythainlp-data/ttc_freq.txt
If the corpus has not been downloaded yet::
from pythainlp.corpus import download, get_corpus_path
print(get_corpus_path('wiki_lm_lstm'))
# output: None
download('wiki_lm_lstm')
# output:
# Download: wiki_lm_lstm
# wiki_lm_lstm 0.32
# thwiki_lm.pth?dl=1: 1.05GB [00:25, 41.5MB/s]
# /root/pythainlp-data/thwiki_model_lstm.pth
print(get_corpus_path('wiki_lm_lstm'))
# output: /root/pythainlp-data/thwiki_model_lstm.pth
"""
CUSTOMIZE: dict[str, str] = {
# "the corpus name":"path"
}
if name in CUSTOMIZE:
return CUSTOMIZE[name]
default_path = get_corpus_default_db(name=name, version=version)
if default_path is not None:
return default_path
# check if the corpus is in local catalog, download it if not
corpus_db_detail = get_corpus_db_detail(name, version=version)
if not corpus_db_detail or not corpus_db_detail.get("filename"):
download(name, version=version, force=force)
corpus_db_detail = get_corpus_db_detail(name, version=version)
if corpus_db_detail and corpus_db_detail.get("filename"):
# corpus is in the local catalog, get full path to the file
if corpus_db_detail.get("is_folder"):
foldername = corpus_db_detail.get("foldername")
if foldername:
path = get_full_data_path(foldername)
else:
return None
else:
filename = corpus_db_detail.get("filename")
if filename:
path = get_full_data_path(filename)
else:
return None
# check if the corpus file actually exists, download it if not
if not os.path.exists(path):
download(name, version=version, force=force)
if os.path.exists(path):
return path
return None
def _download(url: str, dst: str) -> int:
"""Download helper.
@param: URL for downloading file
@param: dst place to put the file into
Security Note: Downloads use HTTPS with SSL certificate validation.
Files are verified using MD5 checksums after download.
"""
CHUNK_SIZE = 64 * 1024 # 64 KiB
from urllib.request import Request, urlopen
req = Request(url, headers={"User-Agent": _USER_AGENT})
# SSL certificate verification is enabled by default
with urlopen(req, timeout=10) as response:
file_size = int(response.info().get("Content-Length", -1))
with open(get_full_data_path(dst), "wb") as f:
pbar = None
try:
from tqdm.auto import tqdm
pbar = tqdm(total=file_size)
except ImportError:
pbar = None
while chunk := response.read(CHUNK_SIZE):
f.write(chunk)
if pbar:
pbar.update(len(chunk))
if pbar:
pbar.close()
else:
print("Done.")
return file_size
def _check_hash(dst: str, md5: str) -> None:
"""Check hash helper.
@param: dst place to put the file into
@param: md5 place to file hash (MD5)
"""
if md5 and md5 != "-":
import hashlib
with open(get_full_data_path(dst), "rb") as f:
content = f.read()
# MD5 is insecure but sufficient here
file_md5 = hashlib.md5(content).hexdigest() # noqa: S324
if md5 != file_md5:
raise ValueError("Hash does not match expected.")
def _is_within_directory(directory: str, target: str) -> bool:
"""Check if target path is within directory (prevent path traversal).
@param: directory base directory path
@param: target target file path to check
@return: True if target is within directory, False otherwise
Security Note: This function normalizes paths using os.path.abspath()
to handle relative paths and .. sequences. It does NOT follow symlinks
(unlike os.path.realpath()), because:
- Symlink validation is handled separately in extraction functions
- We want to check if the path string itself is safe, not where it points
- This prevents false negatives when symlinks don't exist yet
For symlink security, use the extraction function's symlink validation.
"""
# Use abspath to normalize paths but NOT realpath (which follows symlinks)
abs_directory = os.path.abspath(directory)
abs_target = os.path.abspath(target)
# Ensure directory ends with separator for proper prefix check
# This prevents /foo/bar from matching /foo/barz
if not abs_directory.endswith(os.sep):
abs_directory += os.sep
return abs_target.startswith(
abs_directory
) or abs_target == abs_directory.rstrip(os.sep)
def _safe_extract_tar(tar: tarfile.TarFile, path: str) -> None:
"""Safely extract tar archive, preventing path traversal attacks.
@param: tar tarfile object
@param: path destination path for extraction
Security Note: This function prevents path traversal attacks including:
- Files with .. in their path
- Symlinks pointing outside the extraction directory
- Files extracted through malicious symlinks
For Python 3.12+, uses tarfile.data_filter for additional protection.
For Python 3.9-3.11, implements custom validation of all members.
"""
# Check if data_filter is available (Python 3.12+)
if hasattr(tarfile, "data_filter"):
# Use built-in filter which handles symlinks and other security issues
try:
tar.extractall(path=path, filter="data")
except (
tarfile.OutsideDestinationError,
tarfile.LinkOutsideDestinationError,
) as e:
# Re-raise as ValueError for consistency with older Python versions
raise ValueError(str(e))
else:
# Manual validation for older Python versions
for member in tar.getmembers():
# Check the member's target path
member_path = os.path.join(path, member.name)
if not _is_within_directory(path, member_path):
raise ValueError(
f"Attempted path traversal in tar file: {member.name}"
)
# For symlinks, also validate the link target
if member.issym() or member.islnk():
# Get the link target (can be absolute or relative)
link_target = member.linkname
# If it's a relative symlink, resolve it relative to the member's directory
if not os.path.isabs(link_target):
member_dir = os.path.dirname(member_path)
link_target = os.path.join(member_dir, link_target)
else:
# Absolute symlinks are dangerous - make them relative to extraction path
link_target = os.path.join(
path, link_target.lstrip(os.sep)
)
# Check if the resolved symlink target is within the directory
if not _is_within_directory(path, link_target):
raise ValueError(
f"Symlink {member.name} points outside extraction directory: {member.linkname}"
)
tar.extractall(path=path)
def _safe_extract_zip(zip_file: zipfile.ZipFile, path: str) -> None:
"""Safely extract zip archive, preventing path traversal attacks.
@param: zip_file zipfile object
@param: path destination path for extraction
Security Note: This function prevents path traversal attacks including:
- Files with .. in their path
- Symlinks pointing outside the extraction directory (on Unix systems)
Note: ZIP format has limited symlink support. Symlinks are primarily
created by Unix-based archiving tools and may not be portable.
"""
for member in zip_file.namelist():
member_path = os.path.join(path, member)
if not _is_within_directory(path, member_path):
raise ValueError(f"Attempted path traversal in zip file: {member}")
# Check for potential symlinks in ZIP files
# ZIP files can contain symlinks on Unix systems (external_attr indicates this)
info = zip_file.getinfo(member)
# Check if this is a symlink (Unix: external_attr with S_IFLNK set)
# The high 16 bits of external_attr contain Unix file mode
is_symlink = (info.external_attr >> 16) & 0o170000 == 0o120000
if is_symlink:
# Read the symlink target from the file content
link_target = zip_file.read(member).decode("utf-8")
# Resolve the link target relative to the member's directory
if not os.path.isabs(link_target):
member_dir = os.path.dirname(member_path)
resolved_target = os.path.join(member_dir, link_target)
else:
# Absolute symlinks - make them relative to extraction path
resolved_target = os.path.join(
path, link_target.lstrip(os.sep)
)
# Check if the symlink target is within the directory
if not _is_within_directory(path, resolved_target):
raise ValueError(
f"Symlink {member} points outside extraction directory: {link_target}"
)
zip_file.extractall(path=path)
def _version2int(v: str) -> int:
"""X.X.X => X0X0X"""
if "-" in v:
v = v.split("-")[0]
if v.endswith(".*"):
v = v.replace(".*", ".0") # X.X.* => X.X.0
v_list = v.split(".")
if len(v_list) < 3:
v_list.append("0")
v_new = ""
for i, value in enumerate(v_list):
if i != 0:
if len(value) < 2:
v_new += "0" + value
else:
v_new += value
else:
v_new += value
return int(v_new)
def _check_version(cause: str) -> bool:
temp = cause
check = False
__version = __version__
if "dev" in __version:
__version = __version.split("dev", maxsplit=1)[0]
elif "beta" in __version:
__version = __version.split("beta", maxsplit=1)[0]
v = _version2int(__version)
if cause == "*":
check = True
elif cause.startswith("==") and ">" not in cause and "<" not in cause:
temp = cause.replace("==", "")
check = v == _version2int(temp)
elif cause.startswith(">=") and "<" not in cause:
temp = cause.replace(">=", "")
check = v >= _version2int(temp)
elif cause.startswith(">") and "<" not in cause:
temp = cause.replace(">", "")
check = v > _version2int(temp)
elif cause.startswith(">=") and "<=" not in cause and "<" in cause:
temp_parts = cause.replace(">=", "").split("<")
check = _version2int(temp_parts[0]) <= v < _version2int(temp_parts[1])
elif cause.startswith(">=") and "<=" in cause:
temp_parts = cause.replace(">=", "").split("<=")
check = _version2int(temp_parts[0]) <= v <= _version2int(temp_parts[1])
elif cause.startswith(">") and "<" in cause:
temp_parts = cause.replace(">", "").split("<")
check = _version2int(temp_parts[0]) < v < _version2int(temp_parts[1])
elif cause.startswith("<="):
temp = cause.replace("<=", "")
check = v <= _version2int(temp[0])
elif cause.startswith("<"):
temp = cause.replace("<", "")
check = v < _version2int(temp[0])
return check
[docs]
def download(
name: str, force: bool = False, url: str = "", version: str = ""
) -> bool:
"""Download corpus.
The available corpus names can be seen in this file:
https://pythainlp.org/pythainlp-corpus/db.json
:param str name: corpus name
:param bool force: force downloading
:param str url: URL of the corpus catalog
:param str version: version of the corpus
:return: **True** if the corpus is found and successfully downloaded.
Otherwise, it returns **False**.
:rtype: bool
:Example:
::
from pythainlp.corpus import download
download("wiki_lm_lstm", force=True)
# output:
# Corpus: wiki_lm_lstm
# - Downloading: wiki_lm_lstm 0.1
# thwiki_lm.pth: 26%|██▌ | 114k/434k [00:00<00:00, 690kB/s]
By default, downloaded corpora and models will be saved in
``$HOME/pythainlp-data/``
(e.g. ``/Users/bact/pythainlp-data/wiki_lm_lstm.pth``).
"""
if _CHECK_MODE == "1":
print("PyThaiNLP is read-only mode. It can't download.")
return False
if not url:
url = corpus_db_url()
corpus_db = get_corpus_db(url)
if not corpus_db:
print(f"Cannot download corpus catalog from: {url}")
return False
corpus_db_dict = corpus_db.json()
# check if corpus is available
if name in corpus_db_dict:
with open(corpus_db_path(), encoding="utf-8-sig") as f:
local_db = json.load(f)
corpus = corpus_db_dict[name]
print("Corpus:", name)
if not version:
for v, file in corpus["versions"].items():
if _check_version(file["pythainlp_version"]):
version = v
# version may still be None here
if version not in corpus["versions"]:
print("Corpus not found.")
return False
elif (
_check_version(corpus["versions"][version]["pythainlp_version"])
is False
):
print("Corpus version not supported.")
return False
corpus_versions = corpus["versions"][version]
file_name = corpus_versions["filename"]
found = ""
for i, item in local_db["_default"].items():
# Do not check version here
if item["name"] == name:
# Record corpus no. if found in local database
found = i
break
# If not found in local, download it
if force or not found:
print(f"- Downloading: {name} {version}")
_download(
corpus_versions["download_url"],
file_name,
)
_check_hash(
file_name,
corpus_versions["md5"],
)
is_folder = False
foldername = None
if corpus_versions["is_tar_gz"] == "True":
is_folder = True
foldername = name + "_" + str(version)
if not os.path.exists(get_full_data_path(foldername)):
os.mkdir(get_full_data_path(foldername))
with tarfile.open(get_full_data_path(file_name)) as tar:
_safe_extract_tar(tar, get_full_data_path(foldername))
elif corpus_versions["is_zip"] == "True":
is_folder = True
foldername = name + "_" + str(version)
if not os.path.exists(get_full_data_path(foldername)):
os.mkdir(get_full_data_path(foldername))
with zipfile.ZipFile(
get_full_data_path(file_name), "r"
) as zip_file:
_safe_extract_zip(zip_file, get_full_data_path(foldername))
if found:
local_db["_default"][found]["version"] = version
local_db["_default"][found]["filename"] = file_name
local_db["_default"][found]["is_folder"] = is_folder
local_db["_default"][found]["foldername"] = foldername
else:
# This awkward behavior is for backward-compatibility with
# database files generated previously using TinyDB
if local_db["_default"]:
corpus_no = max(int(no) for no in local_db["_default"]) + 1
else:
corpus_no = 1
local_db["_default"][str(corpus_no)] = {
"name": name,
"version": version,
"filename": file_name,
"is_folder": is_folder,
"foldername": foldername,
}
with open(corpus_db_path(), "w", encoding="utf-8") as f:
json.dump(local_db, f, ensure_ascii=False)
# Check if versions match or if the corpus is found in local database
# but a re-download is not forced
else:
current_ver = local_db["_default"][found]["version"]
if current_ver == version:
# Corpus of the same version already exists
print("- Already up to date.")
else:
# Corpus exists but is of different version
print(f"- Existing version: {current_ver}")
print(f"- New version available: {version}")
print("- Use download(data_name, force=True) to update")
return True
print("Corpus not found:", name)
return False
[docs]
def remove(name: str) -> bool:
"""Remove corpus
:param str name: corpus name
:return: **True** if the corpus is found and successfully removed.
Otherwise, it returns **False**.
:rtype: bool
:Example:
::
from pythainlp.corpus import remove, get_corpus_path, get_corpus
print(remove("ttc"))
# output: True
print(get_corpus_path("ttc"))
# output: None
get_corpus("ttc")
# output:
# FileNotFoundError: [Errno 2] No such file or directory:
# '/usr/local/lib/python3.6/dist-packages/pythainlp/corpus/ttc'
"""
if _CHECK_MODE == "1":
print("PyThaiNLP is read-only mode. It can't download.")
return False
with open(corpus_db_path(), encoding="utf-8-sig") as f:
db = json.load(f)
data = [
corpus for corpus in db["_default"].values() if corpus["name"] == name
]
if data:
path = get_corpus_path(name)
if data[0].get("is_folder"):
import shutil
filename = data[0].get("filename")
if filename:
os.remove(get_full_data_path(filename))
if path:
shutil.rmtree(path, ignore_errors=True)
else:
if path:
os.remove(path)
for i, corpus in db["_default"].copy().items():
if corpus["name"] == name:
del db["_default"][i]
with open(corpus_db_path(), "w", encoding="utf-8") as f:
json.dump(db, f, ensure_ascii=False)
return True
return False
def get_path_folder_corpus(name: str, version: str, *path: str) -> str:
corpus_path = get_corpus_path(name, version)
if corpus_path is None:
raise ValueError(f"Corpus path not found for {name} version {version}")
return os.path.join(corpus_path, *path)
def make_safe_directory_name(name: str) -> str:
"""Make safe directory name
:param str name: directory name
:return: safe directory name
:rtype: str
"""
# Replace invalid characters with an underscore
safe_name = re.sub(r'[<>:"/\\|?*]', "_", name)
# Remove leading/trailing spaces or periods (especially important for Windows)
safe_name = safe_name.strip(" .")
# Prevent names that are reserved on Windows
reserved_names = [
"CON",
"PRN",
"AUX",
"NUL",
"COM1",
"COM2",
"COM3",
"COM4",
"COM5",
"COM6",
"COM7",
"COM8",
"COM9",
"LPT1",
"LPT2",
"LPT3",
"LPT4",
"LPT5",
"LPT6",
"LPT7",
"LPT8",
"LPT9",
]
if safe_name.upper() in reserved_names:
safe_name = f"_{safe_name}" # Prepend underscore to avoid conflict
return safe_name
def get_hf_hub(repo_id: str, filename: str = "") -> str:
"""HuggingFace Hub in :mod:`pythainlp` data directory.
:param str repo_id: repo_id
:param str filename: filename (optional, default is empty string).
If empty, downloads entire snapshot.
:return: path
:rtype: str
"""
try:
from huggingface_hub import hf_hub_download, snapshot_download
except ModuleNotFoundError:
raise ModuleNotFoundError(
"""
huggingface-hub isn't found!
Please installing the package via 'pip install huggingface-hub'.
"""
)
except Exception as e:
raise RuntimeError(f"An unexpected error occurred: {e}") from e
hf_root = get_full_data_path("hf_models")
name_dir = make_safe_directory_name(repo_id)
root_project = os.path.join(hf_root, name_dir)
if filename:
output_path = hf_hub_download(
repo_id=repo_id, filename=filename, local_dir=root_project
)
else:
output_path = snapshot_download(
repo_id=repo_id, local_dir=root_project
)
return output_path # type: ignore[no-any-return]