文本切分-语义分割(Semantic Chunking)
当我们使用固定长度的块大小或者一些标点符号分割文本时,其并没有考虑到块中内容的连贯性与完整性。我们可以借助 embedding 来来对文本中语义相近的句子进行聚类。
可以考虑的方法包括:
层次聚类与位置奖励:如果只使用层次聚类 (逐个的将所有句子进行聚类),当一个长句之后存在一个短句时,会存在一些问题。比如:您明白吗?,这个短句可能与前边包含重要信息的句子,位置比较远。结合位置奖励,处理这种情况会好一些。
通过滑动窗口寻找句子间语义的间断点:在使用单独的句子计算变量时,句子中的某些关键字在向量表示中被强化。而一般情况下,上下文所表示的语义基本相近,通过连续的上下文可以使向量在一定程度上更好的表达出语义,当窗口内的句子语义一致时,向量会保持相对稳定;而当语义发生变化时,向量也会随之变化,从而更容易识别出“break points”。
层次聚类:
它通过构建一个层次化的嵌套簇结构来对数据进行分组。步骤包括:1. 所有元素计算向量 2. 将距离最相近的两个元素进行合并 3. 依次执行以上步骤。
与传统的划分聚类方法(如K-means)的不同:层次聚类不需要事先指定簇的数量,而是通过逐步合并或拆分簇来形成一个层次化的簇树结构,通常以树状图(Dendrogram)的形式表示。
以下是 通过滑动窗口寻找句子间语义的间断点 的具体实现:
# 读取长文本
with open('../../data/PGEssays/mit.txt') as file:
essay = file.read()
# 通过结束符号,将每个句子进行截断,获取一个句子组成的数组。
import re
# Splitting the essay on '.', '?', and '!'
single_sentences_list = re.split(r'(?<=[.?!])\s+', essay)
print (f"{len(single_sentences_list)} senteneces were found")
# 将句子列表转为字典
sentences = [{'sentence': x, 'index' : i} for i, x in enumerate(single_sentences_list)]
# 在字典的每个元素中,增加上其前边的一个句子和后边的一个句子成为滑动窗口,作为元素的key:combined_sentence。
def combine_sentences(sentences, buffer_size=1):
# Go through each sentence dict
for i in range(len(sentences)):
# Create a string that will hold the sentences which are joined
combined_sentence = ''
# Add sentences before the current one, based on the buffer size.
for j in range(i - buffer_size, i):
# Check if the index j is not negative (to avoid index out of range like on the first one)
if j >= 0:
# Add the sentence at index j to the combined_sentence string
combined_sentence += sentences[j]['sentence'] + ' '
# Add the current sentence
combined_sentence += sentences[i]['sentence']
# Add sentences after the current one, based on the buffer size
for j in range(i + 1, i + 1 + buffer_size):
# Check if the index j is within the range of the sentences list
if j < len(sentences):
# Add the sentence at index j to the combined_sentence string
combined_sentence += ' ' + sentences[j]['sentence']
# Then add the whole thing to your dict
# Store the combined sentence in the current sentence dict
sentences[i]['combined_sentence'] = combined_sentence
return sentences
sentences = combine_sentences(sentences)
# 计算每个滑动窗口的向量,并添加到字典对应的key:combined_sentence_embedding中
from langchain.embeddings import OpenAIEmbeddings
oaiembeds = OpenAIEmbeddings()
embeddings = oaiembeds.embed_documents([x['combined_sentence'] for x in sentences])
for i, sentence in enumerate(sentences):
sentence['combined_sentence_embedding'] = embeddings[i]
# 计算每个元素与下一个元素的向量距离。
from sklearn.metrics.pairwise import cosine_similarity
def calculate_cosine_distances(sentences):
distances = []
for i in range(len(sentences) - 1):
embedding_current = sentences[i]['combined_sentence_embedding']
embedding_next = sentences[i + 1]['combined_sentence_embedding']
# Calculate cosine similarity
similarity = cosine_similarity([embedding_current], [embedding_next])[0][0]
# Convert to cosine distance
distance = 1 - similarity
# Append cosine distance to the list
distances.append(distance)
# Store distance in the dictionary
sentences[i]['distance_to_next'] = distance
# Optionally handle the last sentence
# sentences[-1]['distance_to_next'] = None # or a default value
return distances, sentences
distances, sentences = calculate_cosine_distances(sentences)
# 通过可视化的方式查看距离的分布情况
import matplotlib.pyplot as plt
plt.plot(distances);
# 获取95%处的距离,获取切分点的索引,并画出切分效果图
import numpy as np
plt.plot(distances);
y_upper_bound = .2
plt.ylim(0, y_upper_bound)
plt.xlim(0, len(distances))
# We need to get the distance threshold that we'll consider an outlier
# We'll use numpy .percentile() for this
breakpoint_percentile_threshold = 95
breakpoint_distance_threshold = np.percentile(distances, breakpoint_percentile_threshold) # If you want more chunks, lower the percentile cutoff
plt.axhline(y=breakpoint_distance_threshold, color='r', linestyle='-');
# Then we'll see how many distances are actually above this one
num_distances_above_theshold = len([x for x in distances if x > breakpoint_distance_threshold]) # The amount of distances above your threshold
plt.text(x=(len(distances)*.01), y=y_upper_bound/50, s=f"{num_distances_above_theshold + 1} Chunks");
# Then we'll get the index of the distances that are above the threshold. This will tell us where we should split our text
indices_above_thresh = [i for i, x in enumerate(distances) if x > breakpoint_distance_threshold] # The indices of those breakpoints on your list
# Start of the shading and text
colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k']
for i, breakpoint_index in enumerate(indices_above_thresh):
start_index = 0 if i == 0 else indices_above_thresh[i - 1]
end_index = breakpoint_index if i < len(indices_above_thresh) - 1 else len(distances)
plt.axvspan(start_index, end_index, facecolor=colors[i % len(colors)], alpha=0.25)
plt.text(x=np.average([start_index, end_index]),
y=breakpoint_distance_threshold + (y_upper_bound)/ 20,
s=f"Chunk #{i}", horizontalalignment='center',
rotation='vertical')
# # Additional step to shade from the last breakpoint to the end of the dataset
if indices_above_thresh:
last_breakpoint = indices_above_thresh[-1]
if last_breakpoint < len(distances):
plt.axvspan(last_breakpoint, len(distances), facecolor=colors[len(indices_above_thresh) % len(colors)], alpha=0.25)
plt.text(x=np.average([last_breakpoint, len(distances)]),
y=breakpoint_distance_threshold + (y_upper_bound)/ 20,
s=f"Chunk #{i+1}",
rotation='vertical')
plt.title("PG Essay Chunks Based On Embedding Breakpoints")
plt.xlabel("Index of sentences in essay (Sentence Position)")
plt.ylabel("Cosine distance between sequential sentences")
plt.show()
# 根据切分点进行切分
# Initialize the start index
start_index = 0
# Create a list to hold the grouped sentences
chunks = []
# Iterate through the breakpoints to slice the sentences
for index in indices_above_thresh:
# The end index is the current breakpoint
end_index = index
# Slice the sentence_dicts from the current start index to the end index
group = sentences[start_index:end_index + 1]
combined_text = ' '.join([d['sentence'] for d in group])
chunks.append(combined_text)
# Update the start index for the next group
start_index = index + 1
# The last group, if any sentences remain
if start_index < len(sentences):
combined_text = ' '.join([d['sentence'] for d in sentences[start_index:]])
chunks.append(combined_text)
# 展示切分效果
for i, chunk in enumerate(chunks[:2]):
buffer = 200
print (f"Chunk #{i}")
print (chunk[:buffer].strip())
print ("...")
print (chunk[-buffer:].strip())
print ("\n")
License:
CC BY 4.0