-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cog 417 chunking unit tests #205
Changes from 13 commits
6f0637a
98cbaaf
830c671
c054e89
ab55a73
ce498d9
92a66dd
ef7a190
f8e5b52
1b4a7e4
9b2fb09
9ea2634
fdec9a6
b787407
45a60b7
d906983
adc8a0b
e794bb8
8afb25e
e663675
c905510
8b3b2f8
f87fd12
8260647
f2206a0
6721eae
57d8149
eaf9167
a52d3ac
b4d509e
73f24f9
8b68152
d6a6a9e
7cf8c74
15420dd
84c98f1
928e107
14dd605
e40e738
f51a44f
6e416ed
22a0e43
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,25 +9,25 @@ class TextChunker(): | |
|
||
chunk_index = 0 | ||
chunk_size = 0 | ||
paragraph_chunks = [] | ||
|
||
def __init__(self, document, get_text: callable, chunk_size: int = 1024): | ||
self.document = document | ||
self.max_chunk_size = chunk_size | ||
self.get_text = get_text | ||
|
||
def read(self): | ||
paragraph_chunks = [] | ||
for content_text in self.get_text(): | ||
for chunk_data in chunk_by_paragraph( | ||
content_text, | ||
self.max_chunk_size, | ||
batch_paragraphs = True, | ||
): | ||
if self.chunk_size + chunk_data["word_count"] <= self.max_chunk_size: | ||
self.paragraph_chunks.append(chunk_data) | ||
paragraph_chunks.append(chunk_data) | ||
self.chunk_size += chunk_data["word_count"] | ||
else: | ||
0xideas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if len(self.paragraph_chunks) == 0: | ||
if len(paragraph_chunks) == 0: | ||
yield DocumentChunk( | ||
id = chunk_data["chunk_id"], | ||
text = chunk_data["text"], | ||
|
@@ -36,35 +36,35 @@ def read(self): | |
chunk_index = self.chunk_index, | ||
cut_type = chunk_data["cut_type"], | ||
) | ||
self.paragraph_chunks = [] | ||
paragraph_chunks = [] | ||
self.chunk_size = 0 | ||
else: | ||
chunk_text = " ".join(chunk["text"] for chunk in self.paragraph_chunks) | ||
chunk_text = " ".join(chunk["text"] for chunk in paragraph_chunks) | ||
try: | ||
yield DocumentChunk( | ||
id = uuid5(NAMESPACE_OID, f"{str(self.document.id)}-{self.chunk_index}"), | ||
text = chunk_text, | ||
word_count = self.chunk_size, | ||
is_part_of = self.document, | ||
chunk_index = self.chunk_index, | ||
cut_type = self.paragraph_chunks[len(self.paragraph_chunks) - 1]["cut_type"], | ||
cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"], | ||
) | ||
except Exception as e: | ||
print(e) | ||
self.paragraph_chunks = [chunk_data] | ||
paragraph_chunks = [chunk_data] | ||
self.chunk_size = chunk_data["word_count"] | ||
|
||
self.chunk_index += 1 | ||
|
||
if len(self.paragraph_chunks) > 0: | ||
if len(paragraph_chunks) > 0: | ||
try: | ||
yield DocumentChunk( | ||
id = uuid5(NAMESPACE_OID, f"{str(self.document.id)}-{self.chunk_index}"), | ||
text = " ".join(chunk["text"] for chunk in self.paragraph_chunks), | ||
text = " ".join(chunk["text"] for chunk in paragraph_chunks), | ||
word_count = self.chunk_size, | ||
is_part_of = self.document, | ||
chunk_index = self.chunk_index, | ||
cut_type = self.paragraph_chunks[len(self.paragraph_chunks) - 1]["cut_type"], | ||
cut_type = paragraph_chunks[len(paragraph_chunks) - 1]["cut_type"], | ||
0xideas marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+59
to
+67
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Reduce code duplication by extracting common chunk processing logic The chunk processing logic is duplicated between the main loop and final chunk handling. This makes maintenance harder and increases the chance of inconsistencies. Consider extracting the common logic into a helper method: + def _create_document_chunk(self, chunks, chunk_text):
+ try:
+ return DocumentChunk(
+ id = uuid5(NAMESPACE_OID, f"{str(self.document.id)}-{self.chunk_index}"),
+ text = chunk_text,
+ word_count = self.chunk_size,
+ is_part_of = self.document,
+ chunk_index = self.chunk_index,
+ cut_type = chunks[-1]["cut_type"],
+ )
+ except Exception as e:
+ import logging
+ logging.error(f"Failed to create DocumentChunk: {e}")
+ return None
# Then use it in both places:
if len(paragraph_chunks) > 0:
chunk_text = " ".join(chunk["text"] for chunk in paragraph_chunks)
- try:
- yield DocumentChunk(...)
- except Exception as e:
- print(e)
+ chunk = self._create_document_chunk(paragraph_chunks, chunk_text)
+ if chunk:
+ yield chunk
|
||
) | ||
except Exception as e: | ||
print(e) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,69 +1,79 @@ | ||
from uuid import uuid5, NAMESPACE_OID | ||
from typing import Dict, Any, Iterator | ||
from .chunk_by_sentence import chunk_by_sentence | ||
|
||
def chunk_by_paragraph(data: str, paragraph_length: int = 1024, batch_paragraphs = True): | ||
paragraph = "" | ||
last_cut_type = None | ||
def chunk_by_paragraph(data: str, paragraph_length: int = 1024, batch_paragraphs: bool = True) -> Iterator[Dict[str, Any]]: | ||
""" | ||
Chunks text by paragraph while preserving exact text reconstruction capability. | ||
When chunks are joined with empty string "", they reproduce the original text exactly. | ||
""" | ||
current_chunk = "" | ||
current_word_count = 0 | ||
chunk_index = 0 | ||
last_paragraph_id = None | ||
paragraph_word_count = 0 | ||
paragraph_chunk_index = 0 | ||
|
||
for (paragraph_id, __, sentence, word_count, end_type) in chunk_by_sentence(data): | ||
if paragraph_word_count > 0 and paragraph_word_count + word_count > paragraph_length: | ||
if batch_paragraphs is True: | ||
chunk_id = uuid5(NAMESPACE_OID, paragraph) | ||
yield dict( | ||
text = paragraph.strip(), | ||
word_count = paragraph_word_count, | ||
id = chunk_id, # When batching paragraphs, the paragraph_id is the same as chunk_id. | ||
# paragraph_id doens't mean anything since multiple paragraphs are merged. | ||
chunk_id = chunk_id, | ||
chunk_index = paragraph_chunk_index, | ||
cut_type = last_cut_type, | ||
) | ||
else: | ||
yield dict( | ||
text = paragraph.strip(), | ||
word_count = paragraph_word_count, | ||
id = last_paragraph_id, | ||
chunk_id = uuid5(NAMESPACE_OID, paragraph), | ||
chunk_index = paragraph_chunk_index, | ||
cut_type = last_cut_type, | ||
) | ||
|
||
paragraph_chunk_index += 1 | ||
paragraph_word_count = 0 | ||
paragraph = "" | ||
|
||
paragraph += (" " if len(paragraph) > 0 else "") + sentence | ||
paragraph_word_count += word_count | ||
|
||
if end_type == "paragraph_end" or end_type == "sentence_cut": | ||
if batch_paragraphs is True: | ||
paragraph += "\n\n" if end_type == "paragraph_end" else "" | ||
last_cut_type = None | ||
|
||
for paragraph_id, _, sentence, word_count, end_type in chunk_by_sentence(data, maximum_length=paragraph_length): | ||
assert word_count <= paragraph_length, f"{paragraph_length = } is smaller than {word_count = }" | ||
0xideas marked this conversation as resolved.
Show resolved
Hide resolved
0xideas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Check if this sentence would exceed length limit | ||
if current_word_count > 0 and current_word_count + word_count > paragraph_length: | ||
# Yield current chunk | ||
chunk_dict = { | ||
"text": current_chunk, | ||
"word_count": current_word_count, | ||
"chunk_id": uuid5(NAMESPACE_OID, current_chunk), | ||
"chunk_index": chunk_index, | ||
"cut_type": last_cut_type | ||
} | ||
|
||
if batch_paragraphs: | ||
chunk_dict["id"] = chunk_dict["chunk_id"] | ||
else: | ||
yield dict( | ||
text = paragraph.strip(), | ||
word_count = paragraph_word_count, | ||
paragraph_id = paragraph_id, | ||
chunk_id = uuid5(NAMESPACE_OID, paragraph), | ||
chunk_index = paragraph_chunk_index, | ||
cut_type = end_type, | ||
) | ||
|
||
paragraph_chunk_index = 0 | ||
paragraph_word_count = 0 | ||
paragraph = "" | ||
|
||
chunk_dict["id"] = last_paragraph_id | ||
|
||
yield chunk_dict | ||
|
||
# Start new chunk with current sentence | ||
current_chunk = sentence | ||
current_word_count = word_count | ||
chunk_index += 1 | ||
else: | ||
# Just concatenate directly - no space handling | ||
current_chunk += sentence | ||
0xideas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
current_word_count += word_count | ||
|
||
# Handle end of paragraph | ||
if end_type in ("paragraph_end", "sentence_cut") and not batch_paragraphs: | ||
# For non-batch mode, yield each paragraph separately | ||
chunk_dict = { | ||
"text": current_chunk, | ||
"word_count": current_word_count, | ||
"id": paragraph_id, | ||
"chunk_id": uuid5(NAMESPACE_OID, current_chunk), | ||
"chunk_index": chunk_index, | ||
"cut_type": end_type | ||
} | ||
yield chunk_dict | ||
current_chunk = "" | ||
current_word_count = 0 | ||
chunk_index = 0 | ||
|
||
last_cut_type = end_type | ||
last_paragraph_id = paragraph_id | ||
|
||
if len(paragraph) > 0: | ||
yield dict( | ||
chunk_id = uuid5(NAMESPACE_OID, paragraph), | ||
text = paragraph, | ||
word_count = paragraph_word_count, | ||
paragraph_id = last_paragraph_id, | ||
chunk_index = paragraph_chunk_index, | ||
cut_type = last_cut_type, | ||
) | ||
|
||
# Yield any remaining text | ||
if current_chunk: | ||
chunk_dict = { | ||
"text": current_chunk, | ||
"word_count": current_word_count, | ||
"chunk_id": uuid5(NAMESPACE_OID, current_chunk), | ||
"chunk_index": chunk_index, | ||
"cut_type": last_cut_type | ||
} | ||
0xideas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if batch_paragraphs: | ||
chunk_dict["id"] = chunk_dict["chunk_id"] | ||
else: | ||
chunk_dict["id"] = last_paragraph_id | ||
|
||
yield chunk_dict |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,60 +1,77 @@ | ||
import re | ||
|
||
SENTENCE_ENDINGS = r"[.;!?…]" | ||
PARAGRAPH_ENDINGS = r"[\n\r]" | ||
|
||
def chunk_by_word(data: str): | ||
0xideas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
sentence_endings = r"[.;!?…]" | ||
paragraph_endings = r"[\n\r]" | ||
""" | ||
Chunks text into words and endings while preserving whitespace. | ||
Whitespace is included with the preceding word. | ||
Outputs can be joined with "" to recreate the original input. | ||
""" | ||
last_processed_character = "" | ||
|
||
word = "" | ||
current_chunk = "" | ||
i = 0 | ||
|
||
|
||
# Handle leading whitespace if any | ||
while i < len(data) and (re.match(PARAGRAPH_ENDINGS, data[i]) or data[i] == " "): | ||
current_chunk += data[i] | ||
i += 1 | ||
if current_chunk: | ||
yield (current_chunk, "word") | ||
current_chunk = "" | ||
|
||
while i < len(data): | ||
character = data[i] | ||
|
||
if word == "" and (re.match(paragraph_endings, character) or character == " "): | ||
i = i + 1 | ||
continue | ||
|
||
|
||
def is_real_paragraph_end(): | ||
if re.match(sentence_endings, last_processed_character): | ||
if re.match(SENTENCE_ENDINGS, last_processed_character): | ||
return True | ||
|
||
j = i + 1 | ||
next_character = data[j] if j < len(data) else None | ||
while next_character is not None and (re.match(paragraph_endings, next_character) or next_character == " "): | ||
while next_character is not None and (re.match(PARAGRAPH_ENDINGS, next_character) or next_character == " "): | ||
j += 1 | ||
next_character = data[j] if j < len(data) else None | ||
if next_character and next_character.isupper(): | ||
return True | ||
|
||
return False | ||
0xideas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if re.match(paragraph_endings, character): | ||
yield (word, "paragraph_end" if is_real_paragraph_end() else "word") | ||
word = "" | ||
i = i + 1 | ||
|
||
0xideas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if re.match(PARAGRAPH_ENDINGS, character): | ||
if current_chunk: | ||
yield (current_chunk, "word") | ||
current_chunk = "" | ||
yield (character, "paragraph_end" if is_real_paragraph_end() else "word") | ||
i += 1 | ||
continue | ||
|
||
|
||
current_chunk += character | ||
last_processed_character = character | ||
|
||
if character == " ": | ||
yield [word, "word"] | ||
word = "" | ||
i = i + 1 | ||
yield (current_chunk, "word") | ||
current_chunk = "" | ||
i += 1 | ||
continue | ||
|
||
word += character | ||
last_processed_character = character | ||
|
||
if re.match(sentence_endings, character): | ||
# Check for ellipses. | ||
if i + 2 <= len(data) and data[i] == "." and data[i + 1] == "." and data[i + 2] == ".": | ||
word += ".." | ||
i = i + 2 | ||
|
||
is_paragraph_end = i + 1 < len(data) and re.match(paragraph_endings, data[i + 1]) | ||
yield (word, "paragraph_end" if is_paragraph_end else "sentence_end") | ||
word = "" | ||
|
||
|
||
if re.match(SENTENCE_ENDINGS, character): | ||
# Check for ellipses | ||
if i + 2 < len(data) and data[i:i+3] == "...": | ||
current_chunk += ".." | ||
i += 2 | ||
|
||
0xideas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Look ahead for whitespace | ||
next_i = i + 1 | ||
while next_i < len(data) and data[next_i] == " ": | ||
current_chunk += data[next_i] | ||
next_i += 1 | ||
|
||
is_paragraph_end = next_i < len(data) and re.match(PARAGRAPH_ENDINGS, data[next_i]) | ||
yield (current_chunk, "paragraph_end" if is_paragraph_end else "sentence_end") | ||
current_chunk = "" | ||
i = next_i | ||
continue | ||
0xideas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
i += 1 | ||
|
||
if len(word) > 0: | ||
yield (word, "word") | ||
if current_chunk: | ||
yield (current_chunk, "word") | ||
0xideas marked this conversation as resolved.
Show resolved
Hide resolved
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if the paragraph_chunk is used somewhere else. I mean it can be deleted but I think it's good to see what were the chunks from debugging point of view because in this way its gonna be local and be lost once the method finishes. But maybe @borisarzentar has more context here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Within
TextChunker
,paragraph_chunks
is only used withinread
. The attribute is never accessed from outside the class in the repo, and for a dynamic attribute like this one, it would also not be good practice to do so and break encapsulation.