Source code for pythainlp.tokenize.newmm

# -*- coding: utf-8 -*-
"""
Dictionary-based Thai Word Segmentation
using maximal matching algorithm and Thai Character Cluster (TCC).
The code is based on the notebooks created by Korakot Chaovavanich.
:See Also:
    * \
        https://colab.research.google.com/notebook#fileId=1V1Z657_5eSWPo8rLfVRwA0A5E4vkg7SI
    * \
        https://colab.research.google.com/drive/14Ibg-ngZXj15RKwjNwoZlOT32fQBOrBx#scrollTo=MYZ7NzAR7Dmw
"""
import re
from collections import defaultdict
from heapq import heappop, heappush
from typing import Generator, List

from pythainlp.tokenize import DEFAULT_DICT_TRIE

from .tcc import tcc_pos
from .trie import Trie

# match non-Thai tokens
_PAT_NONTHAI = re.compile(
    r"""(?x)
[-a-zA-Z]+|   # Latin characters
\d[\d,\.]*|   # number
[ \t]+|       # space
\r?\n         # newline
"""
)

# match 2-char Thai tokens
_PAT_THAI_TWOCHARS = re.compile("[ก-ฮ]{,2}$")


# maximum graph size before cutoff
_MAX_GRAPH_SIZE = 50

# window size for safe mode
_TEXT_SCAN_POINT = 120
_TEXT_SCAN_LEFT = 20
_TEXT_SCAN_RIGHT = 20
_TEXT_SCAN_BEGIN = _TEXT_SCAN_POINT - _TEXT_SCAN_LEFT
_TEXT_SCAN_END = _TEXT_SCAN_POINT + _TEXT_SCAN_RIGHT
del _TEXT_SCAN_POINT
del _TEXT_SCAN_LEFT
del _TEXT_SCAN_RIGHT


def _bfs_paths_graph(
    graph: defaultdict, start: int, goal: int
) -> Generator[List[int], None, None]:
    queue = [(start, [start])]
    while queue:
        (vertex, path) = queue.pop(0)
        for pos in graph[vertex]:
            if pos == goal:
                yield path + [pos]
            else:
                queue.append((pos, path + [pos]))


def _onecut(text: str, custom_dict: Trie) -> Generator[str, None, None]:
    # main data structure:
    # - key is begin position (int)
    # - value is possible end positions (List[int])
    # if key is not found, value is empty list
    graph = defaultdict(list)

    graph_size = 0  # keep track of graph size, if too big will force cutoff

    valid_poss = tcc_pos(text)  # breaking positions that are TCC-valid

    pos_list = [0]  # priority queue of possible breaking positions
    end_pos = 0
    while pos_list[0] < len(text):
        begin_pos = heappop(pos_list)
        for word in custom_dict.prefixes(text[begin_pos:]):
            end_pos_candidate = begin_pos + len(word)
            if end_pos_candidate in valid_poss:
                graph[begin_pos].append(end_pos_candidate)
                graph_size = graph_size + 1

                if end_pos_candidate not in pos_list:
                    heappush(pos_list, end_pos_candidate)

                if graph_size > _MAX_GRAPH_SIZE:
                    break

        if len(pos_list) == 1:  # one candidate, no longer ambiguous
            end_pos_candidates = next(
                _bfs_paths_graph(graph, end_pos, pos_list[0]))
            graph_size = 0
            for _pos in end_pos_candidates[1:]:
                yield text[end_pos:_pos]
                end_pos = _pos
        elif len(pos_list) == 0:  # no candidate, deal with non-dictionary word
            m = _PAT_NONTHAI.match(text[begin_pos:])
            if m:  # non-Thai token, skip to the end
                end_pos = begin_pos + m.end()
            else:  # Thai token, find minimum skip
                for _pos in range(begin_pos + 1, len(text)):
                    if _pos in valid_poss:
                        words = [
                            word
                            for word in custom_dict.prefixes(text[_pos:])
                            if ((_pos + len(word) in valid_poss) and
                                not _PAT_THAI_TWOCHARS.match(word))
                        ]
                        if words:  # is a Thai token that longer than 2 chars
                            end_pos = _pos
                            break

                        # is a non-Thai token
                        if _PAT_NONTHAI.match(text[_pos:]):
                            end_pos = _pos
                            break
                else:
                    end_pos = len(text)

            graph[begin_pos].append(end_pos)
            graph_size = graph_size + 1
            yield text[begin_pos:end_pos]
            heappush(pos_list, end_pos)


[docs]def segment( text: str, custom_dict: Trie = DEFAULT_DICT_TRIE, safe_mode: bool = False ) -> List[str]: """ Dictionary-based maximal matching word segmentation, constrained with Thai Character Cluster boundaries. :param str text: text to be tokenized to words :param pythainlp.trie.Trie custom_dict: dictionary for tokenization :param bool safe_mode: True to help avoid long wait for text with long\ and continuous ambiguous breaking points. Long wait may still able\ to occur. Default is False. :return: list of words, tokenized from the text """ if not text or not isinstance(text, str): return [] if not custom_dict: custom_dict = DEFAULT_DICT_TRIE if not safe_mode or len(text) < _TEXT_SCAN_END: return list(_onecut(text, custom_dict)) # if the text is longer than the limit, # breaks them into smaller chunks then tokenizes each chunk text_parts = [] while len(text) >= _TEXT_SCAN_END: sample = text[_TEXT_SCAN_BEGIN:_TEXT_SCAN_END] # find possible break positions cut_pos = _TEXT_SCAN_END # try to break by space first space_idx = sample.rfind(" ") if space_idx >= 0: cut_pos = space_idx + 1 else: tokens = list(_onecut(sample, custom_dict)) token_max_idx = 0 for i, token in enumerate(tokens): token_max_len = 0 if len(token) > token_max_len: token_max_len = len(token) token_max_idx = i # choose the position that covers longest token cut_pos = _TEXT_SCAN_BEGIN for i in range(0, token_max_idx): cut_pos = cut_pos + len(tokens[i]) text_parts.append(text[:cut_pos]) text = text[cut_pos:] # append remaining text if len(text): text_parts.append(text) # tokenizes each text parts tokens = [] for text_part in text_parts: tokens.extend(list(_onecut(text_part, custom_dict))) return tokens