Skip to content

文档处理

文档处理流程

┌─────────────────────────────────────────────────────────────┐
│                    文档处理完整流程                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  📄 源文档                                                    │
│     │                                                       │
│     ▼                                                       │
│  🔍 解析提取 ──▶ 文本清洗 ──▶ 切分策略 ──▶ 向量化           │
│     │              │              │             │           │
│     ▼              ▼              ▼             ▼           │
│  PDF/MD/DOCX   去除噪声     语义切分      存储到            │
│  结构化提取    规范化文本    控制长度       向量数据库        │
│                                                             │
└─────────────────────────────────────────────────────────────┘

支持的文档格式

格式解析方式支持程度
TXT直接读取⭐⭐⭐⭐⭐
Markdown解析结构⭐⭐⭐⭐⭐
PDF文字提取/表格⭐⭐⭐⭐
Word (.docx)解析XML⭐⭐⭐⭐
HTML解析标签⭐⭐⭐⭐
PPT提取文字⭐⭐⭐
Excel表格数据⭐⭐⭐
图片OCR识别⭐⭐

文本清洗

常见清洗操作

python
import re

def clean_text(text):
    """文本清洗"""
    
    # 1. 移除多余空白
    text = re.sub(r'\s+', ' ', text)
    
    # 2. 移除特殊字符(保留中文、英文、数字、常用标点)
    text = re.sub(r'[^\w\s\u4e00-\u9fff.,!?;:,。!?;:""''()()【】[\]{}]', '', text)
    
    # 3. 移除URL
    text = re.sub(r'https?://\S+', '', text)
    
    # 4. 移除邮箱
    text = re.sub(r'\S+@\S+', '', text)
    
    # 5. 规范化引号
    text = text.replace('"', '"').replace('"', '"')
    text = text.replace(''', "'").replace(''', "'")
    
    # 6. 移除HTML标签
    text = re.sub(r'<[^>]+>', '', text)
    
    # 7. 首尾空白去除
    text = text.strip()
    
    return text

文档结构保留

python
def preserve_document_structure(text):
    """保留文档结构信息"""
    
    # 标题层级
    lines = text.split('\n')
    processed_lines = []
    
    for line in lines:
        # 检测标题
        if line.startswith('#'):
            processed_lines.append(f"[标题] {line}")
        elif line.startswith('```'):
            processed_lines.append(f"[代码块开始]" if line == '```' else "[代码块结束]")
        elif line.strip().startswith('- ') or line.strip().startswith('* '):
            processed_lines.append(f"[列表项] {line.strip()[2:]}")
        else:
            processed_lines.append(line)
    
    return '\n'.join(processed_lines)

文档切分策略

1. 固定长度切分

python
def split_by_length(text, chunk_size=500, overlap=50):
    """按固定长度切分"""
    chunks = []
    start = 0
    
    while start < len(text):
        end = start + chunk_size
        chunks.append(text[start:end])
        start = end - overlap  # 重叠部分保持上下文
    
    return chunks

2. 语义感知切分(推荐)

python
from langchain.text_splitter import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
    separators=[
        "\n\n",      # 段落分隔
        "\n",        # 换行
        "。",        # 句号
        "!",        # 感叹号
        "?",        # 问号
        ";",        # 分号
        ",",        # 逗号
        " ",         # 空格
    ],
    chunk_size=500,        # 目标chunk大小
    chunk_overlap=50,      # 重叠大小
    length_function=len,   # 长度计算函数
)

chunks = text_splitter.split_text(document)

3. 语义切分(高级)

python
from langchain_experimental.text_splitter import SemanticChunker
from langchain_community.embeddings import OpenAIEmbeddings

# 基于语义相似度切分
semantic_splitter = SemanticChunker(
    embeddings=OpenAIEmbeddings(),
    breakpoint_threshold_type="percentile",  # 或 "standard_deviation"
    breakpoint_threshold_amount=0.95,
)

chunks = semantic_splitter.create_documents([long_text])

4. 针对不同文档类型

python
def split_markdown(documents):
    """Markdown文档特殊处理"""
    
    from langchain.text_splitter import MarkdownTextSplitter
    
    splitter = MarkdownTextSplitter(
        chunk_size=500,
        chunk_overlap=50
    )
    
    # 保持Markdown格式
    docs = splitter.create_documents(documents)
    return docs

def split_code(documents, language="python"):
    """代码文档特殊处理"""
    
    from langchain.text_splitter import Language
    
    splitter = RecursiveCharacterTextSplitter.from_language(
        language=Language.PYTHON,
        chunk_size=500,
        chunk_overlap=50
    )
    
    docs = splitter.create_documents(documents)
    return docs

特殊文档处理

PDF处理

python
import PyPDF2
from langchain_community.document_loaders import PyPDFLoader

# 方式1:使用PyPDF2
def extract_pdf_text(file_path):
    with open(file_path, 'rb') as file:
        reader = PyPDF2.PdfReader(file)
        text = ""
        for page in reader.pages:
            text += page.extract_text() + "\n\n"
    return text

# 方式2:使用LangChain loader
loader = PyPDFLoader("document.pdf")
pages = loader.load_and_split()  # 按页面加载

Word文档处理

python
from docx import Document
from langchain_community.document_loaders import UnstructuredWordDocumentLoader

# 方式1:使用python-docx
def extract_word_text(file_path):
    doc = Document(file_path)
    text = ""
    for para in doc.paragraphs:
        text += para.text + "\n"
    return text

# 方式2:使用LangChain loader
loader = UnstructuredWordDocumentLoader("document.docx")
docs = loader.load()

网页内容提取

python
from langchain_community.document_loaders import WebBaseLoader
import bs4

loader = WebBaseLoader(
    web_paths=["https://example.com/article"],
    bs_kwargs=dict(parse_only=bs4.SoupStrainer(
        "div",
        attrs={"class": ["article-content", "post-content"]}
    ))
)

docs = loader.load()

元数据提取

自动生成元数据

python
from datetime import datetime

def extract_metadata(document, source_info):
    """提取文档元数据"""
    
    return {
        "source": source_info.get("filename", "unknown"),
        "file_type": source_info.get("type", "text"),
        "created_at": source_info.get("created_at", datetime.now().isoformat()),
        "chunk_index": source_info.get("index", 0),
        "total_chunks": source_info.get("total", 1),
        "char_count": len(document),
        "first_line": document.split('\n')[0][:100] if document else ""
    }

语义元数据(高级)

python
async def extract_semantic_metadata(chunk, llm):
    """使用LLM提取语义元数据"""
    
    prompt = f"""
    从以下文档片段中提取关键信息:
    
    文档:{chunk[:500]}
    
    请提取:
    1. 主题(2-3个关键词)
    2. 实体(人名、地名、机构名)
    3. 文档类型(教程、指南、参考、说明等)
    4. 难度级别(入门/中等/进阶)
    
    输出JSON格式:
    """
    
    response = await llm.invoke(prompt)
    # 解析JSON并返回
    return json.loads(response)

文档处理最佳实践

1. 批量处理

python
import os
from concurrent.futures import ThreadPoolExecutor

def process_documents_batch(directory, max_workers=4):
    """批量处理目录中的文档"""
    
    files = [os.path.join(directory, f) for f in os.listdir(directory)]
    
    def process_single(file_path):
        try:
            # 根据文件类型选择处理方法
            ext = os.path.splitext(file_path)[1].lower()
            
            if ext == '.pdf':
                return process_pdf(file_path)
            elif ext in ['.doc', '.docx']:
                return process_word(file_path)
            elif ext == '.txt':
                return process_txt(file_path)
            elif ext == '.md':
                return process_markdown(file_path)
            else:
                return None
        except Exception as e:
            print(f"处理失败 {file_path}: {e}")
            return None
    
    # 并行处理
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(executor.map(process_single, files))
    
    return [r for r in results if r is not None]

2. 进度跟踪

python
from tqdm import tqdm

def process_with_progress(files):
    """带进度条的文档处理"""
    
    results = []
    for file in tqdm(files, desc="处理文档"):
        result = process_single(file)
        results.append(result)
        
        # 每处理10个保存一次
        if len(results) % 10 == 0:
            save_checkpoint(results)
    
    return results

3. 错误恢复

python
import json
from pathlib import Path

def process_with_checkpoint(files, checkpoint_file="checkpoint.json"):
    """带检查点的处理,支持中断恢复"""
    
    checkpoint = Path(checkpoint_file)
    if checkpoint.exists():
        processed = set(json.loads(checkpoint.read_text()))
        files = [f for f in files if f not in processed]
    
    results = []
    
    for file in files:
        try:
            result = process_single(file)
            results.append(result)
            processed.add(file)
            
            # 更新检查点
            checkpoint.write_text(json.dumps(list(processed)))
        except Exception as e:
            print(f"错误: {file}, {e}")
            continue
    
    return results

Released under the MIT License.