文档处理
文档处理流程
┌─────────────────────────────────────────────────────────────┐
│ 文档处理完整流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 📄 源文档 │
│ │ │
│ ▼ │
│ 🔍 解析提取 ──▶ 文本清洗 ──▶ 切分策略 ──▶ 向量化 │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ PDF/MD/DOCX 去除噪声 语义切分 存储到 │
│ 结构化提取 规范化文本 控制长度 向量数据库 │
│ │
└─────────────────────────────────────────────────────────────┘支持的文档格式
| 格式 | 解析方式 | 支持程度 |
|---|---|---|
| TXT | 直接读取 | ⭐⭐⭐⭐⭐ |
| Markdown | 解析结构 | ⭐⭐⭐⭐⭐ |
| 文字提取/表格 | ⭐⭐⭐⭐ | |
| Word (.docx) | 解析XML | ⭐⭐⭐⭐ |
| HTML | 解析标签 | ⭐⭐⭐⭐ |
| PPT | 提取文字 | ⭐⭐⭐ |
| Excel | 表格数据 | ⭐⭐⭐ |
| 图片 | OCR识别 | ⭐⭐ |
文本清洗
常见清洗操作
python
import re
def clean_text(text):
"""文本清洗"""
# 1. 移除多余空白
text = re.sub(r'\s+', ' ', text)
# 2. 移除特殊字符(保留中文、英文、数字、常用标点)
text = re.sub(r'[^\w\s\u4e00-\u9fff.,!?;:,。!?;:""''()()【】[\]{}]', '', text)
# 3. 移除URL
text = re.sub(r'https?://\S+', '', text)
# 4. 移除邮箱
text = re.sub(r'\S+@\S+', '', text)
# 5. 规范化引号
text = text.replace('"', '"').replace('"', '"')
text = text.replace(''', "'").replace(''', "'")
# 6. 移除HTML标签
text = re.sub(r'<[^>]+>', '', text)
# 7. 首尾空白去除
text = text.strip()
return text文档结构保留
python
def preserve_document_structure(text):
"""保留文档结构信息"""
# 标题层级
lines = text.split('\n')
processed_lines = []
for line in lines:
# 检测标题
if line.startswith('#'):
processed_lines.append(f"[标题] {line}")
elif line.startswith('```'):
processed_lines.append(f"[代码块开始]" if line == '```' else "[代码块结束]")
elif line.strip().startswith('- ') or line.strip().startswith('* '):
processed_lines.append(f"[列表项] {line.strip()[2:]}")
else:
processed_lines.append(line)
return '\n'.join(processed_lines)文档切分策略
1. 固定长度切分
python
def split_by_length(text, chunk_size=500, overlap=50):
"""按固定长度切分"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
start = end - overlap # 重叠部分保持上下文
return chunks2. 语义感知切分(推荐)
python
from langchain.text_splitter import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
separators=[
"\n\n", # 段落分隔
"\n", # 换行
"。", # 句号
"!", # 感叹号
"?", # 问号
";", # 分号
",", # 逗号
" ", # 空格
],
chunk_size=500, # 目标chunk大小
chunk_overlap=50, # 重叠大小
length_function=len, # 长度计算函数
)
chunks = text_splitter.split_text(document)3. 语义切分(高级)
python
from langchain_experimental.text_splitter import SemanticChunker
from langchain_community.embeddings import OpenAIEmbeddings
# 基于语义相似度切分
semantic_splitter = SemanticChunker(
embeddings=OpenAIEmbeddings(),
breakpoint_threshold_type="percentile", # 或 "standard_deviation"
breakpoint_threshold_amount=0.95,
)
chunks = semantic_splitter.create_documents([long_text])4. 针对不同文档类型
python
def split_markdown(documents):
"""Markdown文档特殊处理"""
from langchain.text_splitter import MarkdownTextSplitter
splitter = MarkdownTextSplitter(
chunk_size=500,
chunk_overlap=50
)
# 保持Markdown格式
docs = splitter.create_documents(documents)
return docs
def split_code(documents, language="python"):
"""代码文档特殊处理"""
from langchain.text_splitter import Language
splitter = RecursiveCharacterTextSplitter.from_language(
language=Language.PYTHON,
chunk_size=500,
chunk_overlap=50
)
docs = splitter.create_documents(documents)
return docs特殊文档处理
PDF处理
python
import PyPDF2
from langchain_community.document_loaders import PyPDFLoader
# 方式1:使用PyPDF2
def extract_pdf_text(file_path):
with open(file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n\n"
return text
# 方式2:使用LangChain loader
loader = PyPDFLoader("document.pdf")
pages = loader.load_and_split() # 按页面加载Word文档处理
python
from docx import Document
from langchain_community.document_loaders import UnstructuredWordDocumentLoader
# 方式1:使用python-docx
def extract_word_text(file_path):
doc = Document(file_path)
text = ""
for para in doc.paragraphs:
text += para.text + "\n"
return text
# 方式2:使用LangChain loader
loader = UnstructuredWordDocumentLoader("document.docx")
docs = loader.load()网页内容提取
python
from langchain_community.document_loaders import WebBaseLoader
import bs4
loader = WebBaseLoader(
web_paths=["https://example.com/article"],
bs_kwargs=dict(parse_only=bs4.SoupStrainer(
"div",
attrs={"class": ["article-content", "post-content"]}
))
)
docs = loader.load()元数据提取
自动生成元数据
python
from datetime import datetime
def extract_metadata(document, source_info):
"""提取文档元数据"""
return {
"source": source_info.get("filename", "unknown"),
"file_type": source_info.get("type", "text"),
"created_at": source_info.get("created_at", datetime.now().isoformat()),
"chunk_index": source_info.get("index", 0),
"total_chunks": source_info.get("total", 1),
"char_count": len(document),
"first_line": document.split('\n')[0][:100] if document else ""
}语义元数据(高级)
python
async def extract_semantic_metadata(chunk, llm):
"""使用LLM提取语义元数据"""
prompt = f"""
从以下文档片段中提取关键信息:
文档:{chunk[:500]}
请提取:
1. 主题(2-3个关键词)
2. 实体(人名、地名、机构名)
3. 文档类型(教程、指南、参考、说明等)
4. 难度级别(入门/中等/进阶)
输出JSON格式:
"""
response = await llm.invoke(prompt)
# 解析JSON并返回
return json.loads(response)文档处理最佳实践
1. 批量处理
python
import os
from concurrent.futures import ThreadPoolExecutor
def process_documents_batch(directory, max_workers=4):
"""批量处理目录中的文档"""
files = [os.path.join(directory, f) for f in os.listdir(directory)]
def process_single(file_path):
try:
# 根据文件类型选择处理方法
ext = os.path.splitext(file_path)[1].lower()
if ext == '.pdf':
return process_pdf(file_path)
elif ext in ['.doc', '.docx']:
return process_word(file_path)
elif ext == '.txt':
return process_txt(file_path)
elif ext == '.md':
return process_markdown(file_path)
else:
return None
except Exception as e:
print(f"处理失败 {file_path}: {e}")
return None
# 并行处理
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(process_single, files))
return [r for r in results if r is not None]2. 进度跟踪
python
from tqdm import tqdm
def process_with_progress(files):
"""带进度条的文档处理"""
results = []
for file in tqdm(files, desc="处理文档"):
result = process_single(file)
results.append(result)
# 每处理10个保存一次
if len(results) % 10 == 0:
save_checkpoint(results)
return results3. 错误恢复
python
import json
from pathlib import Path
def process_with_checkpoint(files, checkpoint_file="checkpoint.json"):
"""带检查点的处理,支持中断恢复"""
checkpoint = Path(checkpoint_file)
if checkpoint.exists():
processed = set(json.loads(checkpoint.read_text()))
files = [f for f in files if f not in processed]
results = []
for file in files:
try:
result = process_single(file)
results.append(result)
processed.add(file)
# 更新检查点
checkpoint.write_text(json.dumps(list(processed)))
except Exception as e:
print(f"错误: {file}, {e}")
continue
return results