Source code for pythainlp.summarize.keybert
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2016-2024 PyThaiNLP Project
# SPDX-License-Identifier: Apache-2.0
"""
Minimal re-implementation of KeyBERT.
KeyBERT is a minimal and easy-to-use keyword extraction technique
that leverages BERT embeddings to create keywords and keyphrases
that are most similar to a document.
https://github.com/MaartenGr/KeyBERT
"""
from typing import List, Optional, Iterable, Tuple, Union
from collections import Counter
import numpy as np
from transformers import pipeline
from pythainlp.corpus import thai_stopwords
from pythainlp.tokenize import word_tokenize
[docs]class KeyBERT:
[docs] def __init__(
self, model_name: str = "airesearch/wangchanberta-base-att-spm-uncased"
):
self.ft_pipeline = pipeline(
"feature-extraction",
tokenizer=model_name,
model=model_name,
revision="main",
)
[docs] def extract_keywords(
self,
text: str,
keyphrase_ngram_range: Tuple[int, int] = (1, 2),
max_keywords: int = 5,
min_df: int = 1,
tokenizer: str = "newmm",
return_similarity=False,
stop_words: Optional[Iterable[str]] = None,
) -> Union[List[str], List[Tuple[str, float]]]:
"""
Extract Thai keywords and/or keyphrases with KeyBERT algorithm.
See https://github.com/MaartenGr/KeyBERT.
:param str text: text to be summarized
:param Tuple[int, int] keyphrase_ngram_range: Number of token units to be defined as keyword.
The token unit varies w.r.t. `tokenizer_engine`.
For instance, (1, 1) means each token (unigram) can be a keyword (e.g. "เสา", "ไฟฟ้า"),
(1, 2) means one and two consecutive tokens (unigram and bigram) can be keywords
(e.g. "เสา", "ไฟฟ้า", "เสาไฟฟ้า") (default: (1, 2))
:param int max_keywords: Number of maximum keywords to be returned. (default: 5)
:param int min_df: Minimum frequency required to be a keyword. (default: 1)
:param str tokenizer: Name of tokenizer engine to use.
Refer to options in :func: `pythainlp.tokenize.word_tokenizer() (default: 'newmm')
:param bool return_similarity: If `True`, return keyword scores. (default: False)
:param Optional[Iterable[str]] stop_words: A list of stop words (a.k.a words to be ignored).
If not specified, :func:`pythainlp.corpus.thai_stopwords` is used. (default: None)
:return: list of keywords with score
:Example:
::
from pythainlp.summarize.keybert import KeyBERT
text = '''
อาหาร หมายถึง ของแข็งหรือของเหลว
ที่กินหรือดื่มเข้าสู่ร่างกายแล้ว
จะทำให้เกิดพลังงานและความร้อนแก่ร่างกาย
ทำให้ร่างกายเจริญเติบโต
ซ่อมแซมส่วนที่สึกหรอ ควบคุมการเปลี่ยนแปลงต่างๆ ในร่างกาย
ช่วยทำให้อวัยวะต่างๆ ทำงานได้อย่างปกติ
อาหารจะต้องไม่มีพิษและไม่เกิดโทษต่อร่างกาย
'''
kb = KeyBERT()
keywords = kb.extract_keyword(text)
# output: ['อวัยวะต่างๆ',
# 'ซ่อมแซมส่วน',
# 'เจริญเติบโต',
# 'ควบคุมการเปลี่ยนแปลง',
# 'มีพิษ']
keywords = kb.extract_keyword(text, max_keywords=10, return_similarity=True)
# output: [('อวัยวะต่างๆ', 0.3228477063109462),
# ('ซ่อมแซมส่วน', 0.31320597838000375),
# ('เจริญเติบโต', 0.29115434699705506),
# ('ควบคุมการเปลี่ยนแปลง', 0.2678430841321016),
# ('มีพิษ', 0.24996827960821494),
# ('ทำให้ร่างกาย', 0.23876962942443258),
# ('ร่างกายเจริญเติบโต', 0.23191285218852364),
# ('จะทำให้เกิด', 0.22425422716846247),
# ('มีพิษและ', 0.22162962875299588),
# ('เกิดโทษ', 0.20773497763458507)]
"""
try:
text = text.strip()
except AttributeError:
raise AttributeError(
f"Unable to process data of type {type(text)}. "
f"Please provide input of string type."
)
if not text:
return []
# generate all lists of keywords / keyphrases
stop_words_ = stop_words if stop_words else thai_stopwords()
kw_candidates = _generate_ngrams(
text, keyphrase_ngram_range, min_df, tokenizer, stop_words_
)
# create document and word vectors
doc_vector = self.embed(text)
kw_vectors = self.embed(kw_candidates)
# rank keywords
keywords = _rank_keywords(
doc_vector, kw_vectors, kw_candidates, max_keywords
)
if return_similarity:
return keywords
else:
return [kw for kw, _ in keywords]
[docs] def embed(self, docs: Union[str, List[str]]) -> np.ndarray:
"""
Create an embedding of each input in `docs` by averaging vectors from the last hidden layer.
"""
embs = self.ft_pipeline(docs)
if isinstance(docs, str) or len(docs) == 1:
# embed doc. return shape = [1, hidden_size]
emb_mean = np.array(embs).mean(axis=1)
else:
# mean of embedding of each word
# return shape = [len(docs), hidden_size]
emb_mean = np.stack(
[np.array(emb[0]).mean(axis=0) for emb in embs]
)
return emb_mean
def _generate_ngrams(
doc: str,
keyphrase_ngram_range: Tuple[int, int],
min_df: int,
tokenizer_engine: str,
stop_words: Iterable[str],
) -> List[str]:
assert keyphrase_ngram_range[0] >= 1, (
f"`keyphrase_ngram_range` must start from 1. "
f"current value={keyphrase_ngram_range}."
)
assert keyphrase_ngram_range[0] <= keyphrase_ngram_range[1], (
f"The value first argument of `keyphrase_ngram_range` must not exceed the second. "
f"current value={keyphrase_ngram_range}."
)
def _join_ngram(ngrams: List[Tuple[str, str]]) -> List[str]:
ngrams_joined = []
for ng in ngrams:
joined = "".join(ng)
if joined.strip() == joined:
# ngram must not start or end with whitespace as this may cause duplication.
ngrams_joined.append(joined)
return ngrams_joined
words = word_tokenize(doc, engine=tokenizer_engine)
all_grams = []
ngram_range = (keyphrase_ngram_range[0], keyphrase_ngram_range[1] + 1)
for n in range(*ngram_range):
if n == 1:
# filter out space
ngrams = [word for word in words if word.strip()]
else:
ngrams_tuple = zip(*[words[i:] for i in range(n)])
ngrams = _join_ngram(ngrams_tuple)
ngrams_cnt = Counter(ngrams)
ngrams = [
word
for word, freq in ngrams_cnt.items()
if (freq >= min_df) and (word not in stop_words)
]
all_grams.extend(ngrams)
return all_grams
def _rank_keywords(
doc_vector: np.ndarray,
word_vectors: np.ndarray,
keywords: List[str],
max_keywords: int,
) -> List[Tuple[str, float]]:
def l2_norm(v: np.ndarray) -> np.ndarray:
vec_size = v.shape[1]
result = np.divide(
v,
np.linalg.norm(v, axis=1).reshape(-1, 1).repeat(vec_size, axis=1),
)
assert np.isclose(
np.linalg.norm(result, axis=1), 1
).all(), "Cannot normalize a vector to unit vector."
return result
def cosine_sim(a: np.ndarray, b: np.ndarray) -> np.ndarray:
return (np.matmul(a, b.T).T).sum(axis=1)
doc_vector = l2_norm(doc_vector)
word_vectors = l2_norm(word_vectors)
cosine_sims = cosine_sim(doc_vector, word_vectors)
ranking_desc = np.argsort(-cosine_sims)
final_ranks = [
(keywords[r], cosine_sims[r]) for r in ranking_desc[:max_keywords]
]
return final_ranks