本文介绍的是本人实习期间做过的内容跟大家分享一下。解决的任务如何将带有图片和表格的pdf、docx文档进行切分。步骤1使用mineru解析工具进入mineru官网了解api的使用使用这个工具将上述文档解析成md文件以及多个其他文件文件目录如下具体的代码可以参考下方代码通过api将文件批量转换然后将解析后的结果下载到本地。# --- 配置区 --- TOKEN mineru的API替换成你的 INPUT_DIR 所要解析的文件夹路径 # 放待解析文件的文件夹 OUTPUT_ROOT ../RAG_Chunk/output # 结果存放根目录 CHUNK_SIZE 800 # 文本切片长度字符 HEADERS { Authorization: fBearer {TOKEN}, Content-Type: application/json } def process_files(): # 1. 扫描文件 files_to_process [f for f in os.listdir(INPUT_DIR) if f.lower().endswith((.pdf, .docx))] if not files_to_process: print(未发现 PDF 或 DOCX 文件。) return # 2. 申请上传 URL apply_url https://mineru.net/api/v4/file-urls/batch payload { files: [{name: f, data_id: f} for f in files_to_process], model_version: vlm } resp requests.post(apply_url, headersHEADERS, jsonpayload) res_data resp.json() if res_data.get(code) ! 0: print(f申请 URL 失败: {res_data.get(msg)}) return batch_id res_data[data][batch_id] file_urls res_data[data][file_urls] # 3. 执行文件上传 for idx, f_name in enumerate(files_to_process): file_path os.path.join(INPUT_DIR, f_name) with open(file_path, rb) as f: up_resp requests.put(file_urls[idx], dataf) if up_resp.status_code 200: print(f已上传: {f_name}) else: print(f上传失败: {f_name}) # 4. 轮询结果 print(f正在等待解析结果 (Batch ID: {batch_id})...) result_url fhttps://mineru.net/api/v4/extract-results/batch/{batch_id} while True: time.sleep(10) # 每10秒询问一次 try: response requests.get(result_url, headersHEADERS) res_poll response.json() if res_poll.get(code) 0: extract_data res_poll.get(data, {}) results_list extract_data.get(extract_result, []) if not results_list: print(服务器正在排队初始化...) continue # 检查是否所有文件都已经完成 (state done) # 如果有一个文件还是 running 或 pendingall_finished 就会是 False all_finished all(item.get(state) done for item in results_list) # 打印一下当前的进度方便你观察 for item in results_list: file_name item.get(file_name, 未知文件) state item.get(state) progress item.get(extract_progress, {}) pages f{progress.get(extracted_pages, 0)}/{progress.get(total_pages, 0)} print(f任务状态: [{state}] | 进度: {pages} | 文件: {file_name}) if all_finished: print(--- 所有文件解析完成开始执行后续处理 ---) handle_results(results_list) # 此时 results_list 里的 item 才会包含结果链接/内容 break else: print(部分文件仍在解析中请稍候...) else: print(f接口报错: {res_poll.get(msg)}) except Exception as e: print(f轮询发生异常: {e}) def handle_results(results_list): 下载 ZIP 并完整解压到对应文件夹随后对其中的 Markdown 进行切片 if not os.path.exists(OUTPUT_ROOT): os.makedirs(OUTPUT_ROOT) for item in results_list: file_name item.get(file_name, 未知文件) # 1. 创建对应的子文件夹例如附件2数据治理... folder_name os.path.splitext(file_name)[0] target_dir os.path.join(OUTPUT_ROOT, folder_name) os.makedirs(target_dir, exist_okTrue) # 2. 获取下载链接 zip_url item.get(full_zip_url) if not zip_url: print(f跳过 {file_name}未发现下载链接。) continue print(f正在下载并全量解压: {file_name}...) try: # 3. 下载 ZIP r requests.get(zip_url) r.raise_for_status() # 4. 完整解压到 target_dir with zipfile.ZipFile(io.BytesIO(r.content)) as z: z.extractall(target_dir) print(f已解压所有文件至: {target_dir}) # 5. 寻找解压后的 Markdown 文件用于切片 md_files [f for f in z.namelist() if f.endswith(.md)] if md_files: # 读取第一个找到的 md 文件内容 md_path os.path.join(target_dir, md_files[0]) with open(md_path, r, encodingutf-8) as f: content f.read() fix_content fix_markdown_headings(content) # 执行切片逻辑 perform_chunking(fix_content, target_dir, folder_name) else: print(f提示{file_name} 压缩包内没找到 .md 文件跳过切片。) except Exception as e: print(f处理文件 {file_name} 时出错: {e})步骤2对解析后的md文件正则化因为解析后的md文件所有的标题都变成了一级标题如果按照标题去切分的话再远数据中是看不到标题的层级关系的所以为了解决这个问题我们应该针对md文件编写一个正则化函数将其中的一级二级三级...标题的格式改正确下边是代码示例注意不同的文件应该对应写正则化函数下边的只能作为参考def fix_markdown_headings(text): lines text.split(\n) fixed_lines [] for line in lines: original_line line.strip() if not original_line: fixed_lines.append() continue # 剥洋葱去掉原本乱掉的 # core_content original_line.lstrip(#).strip() # --- 匹配逻辑 (按层级深度从 5 到 1 反向匹配或严格区分) --- # 1. 五级标题(1) 现场应用 if re.match(r^[\(]\d[\)], core_content): fixed_lines.append(f##### {core_content}) # 2. 四级标题2、智慧工地 (数字 顿号) elif re.match(r^\d、, core_content): fixed_lines.append(f#### {core_content}) # 3. 三级标题任务 2... 或 (一) 第一阶段 # 匹配 任务 开头 或 (一) 开头 elif re.match(r^任务\s*\d, core_content) or re.match(r^[\(][一二三四五六七八九十][\)], core_content): fixed_lines.append(f### {core_content}) # 4. 二级标题一、基础管理提升工程 elif re.match(r^[一二三四五六七八九十]、, core_content): fixed_lines.append(f## {core_content}) # 5. 一级标题第五章 重点任务 elif re.match(r^第[一二三四五六七八九十百][章部分], core_content): fixed_lines.append(f# {core_content}) else: # 不符合上述特征的保持原样 fixed_lines.append(original_line) return \n.join(fixed_lines)步骤三切分文档就是先按照标题切分Chunk然后如果当前块的大小超过了阈值就用字符切分最后保存成json文件代码如下def perform_chunking(content, target_dir, folder_name): 使用 LangChain 的两步走策略进行切片并保存为 JSON 格式 if not content: print(f内容为空跳过切片。) return # 1. 第一步按 Markdown 标题层级切分 headers_to_split_on [ (#, Header 1), (##, Header 2), (###, Header 3), (####, Header 4), (#####, Header 5), ] header_splitter MarkdownHeaderTextSplitter(headers_to_split_onheaders_to_split_on) header_action_chunks header_splitter.split_text(content) # 2. 第二步二次递归切分控制在 800 字符以内重叠 100 字符 text_splitter RecursiveCharacterTextSplitter( chunk_size800, chunk_overlap100 ) final_docs text_splitter.split_documents(header_action_chunks) # 3. 构造 JSON 数据结构 chunks_data [] for i, doc in enumerate(final_docs): chunks_data.append({ chunk_id: i 1, metadata: doc.metadata, # 包含 Header 1, Header 2 等层级信息 page_content: doc.page_content # 文本内容 }) # 4. 准备保存路径保存在对应的文件夹内 # 如果你希望所有 JSON 都在同一个根目录下可以修改这里的路径逻辑 json_file_path os.path.join(target_dir, f{folder_name}.json) # 5. 写入 JSON 文件 try: with open(json_file_path, w, encodingutf-8) as f: # ensure_asciiFalse 保证中文字符正常显示indent4 方便人类阅读 json.dump(chunks_data, f, ensure_asciiFalse, indent4) print(f[{folder_name}] 切片完成已生成 {len(chunks_data)} 个切片保存至 {json_file_path}) except Exception as e: print(f[{folder_name}] 保存 JSON 出错: {e})