结果展示指标目标值优化手段检测精度 (mAP)0.85LiveCell预训练 领域自适应追踪精度 (MOTA)0.90ByteTrack 细胞形态特征分裂检测F10.88面积变化 时序模型推理延迟15msTensorRT FP16 批处理处理帧率60 FPSGPU并行 多流处理一、系统架构总览plain┌─────────────────────────────────────────────────────────────────────┐ │ 显微成像硬件层 │ │ 高帧率sCMOS相机 (100fps) 自动载物台 环境控制舱(37°C, 5% CO₂) │ └─────────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────────┐ │ 实时推理引擎层 │ │ YOLOv8-Tiny优化模型 → TensorRT/ONNX → GPU推理 (FP16/INT8量化) │ │ 多目标跟踪: ByteTrack/OC-SORT 细胞分裂事件检测 │ └─────────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────────┐ │ 生物信息学分析层 │ │ 细胞谱系重建 分裂周期量化 药物响应曲线 异常分裂模式识别 │ └─────────────────────────────────────────────────────────────────────┘二、核心数据集获取可直接下载2.1 公开数据集推荐表格数据集细胞类型标注内容下载链接LiveCell多种癌细胞系实例分割追踪sartorius-research.github.io/LiveCell/Cell Tracking ChallengeHeLa, PC3, U373细胞核分裂事件celltrackingchallenge.netDeepCell Dataset干细胞, 癌细胞边界框掩膜datasets.deepcell.orgBBBC038v1多种细胞细胞计数定位Broad Bioimage Benchmark Collection2.2 数据集下载脚本bash#!/bin/bash # download_datasets.sh # 自动下载并解压核心数据集 mkdir -p cell_tracking_datasets cd cell_tracking_datasets # 1. LiveCell (主要推荐) echo Downloading LiveCell... wget -c https://livecell-dataset.s3.amazonaws.com/LiveCell.zip unzip -q LiveCell.zip -d LiveCell/ # 2. Cell Tracking Challenge - HeLa echo Downloading Cell Tracking Challenge... mkdir -p CTC cd CTC wget -c http://data.celltrackingchallenge.net/training-datasets/Fluo-N2DL-HeLa.zip unzip -q Fluo-N2DL-HeLa.zip cd .. # 3. DeepCell 示例数据 echo Downloading DeepCell samples... wget -c https://deepcell-data.s3.amazonaws.com/cytoplasm/hela_s3.npz echo Download complete!三、YOLOv8-Tiny 细胞检测模型训练3.1 环境配置bash# 创建conda环境 conda create -n celltrack python3.10 -y conda activate celltrack # 安装核心依赖 pip install ultralytics8.0.200 opencv-python-headless tensorrt onnxruntime-gpu pip install lapx cython-bbox # 用于ByteTrack pip install cellpose scikit-image pandas matplotlib seaborn3.2 数据预处理LiveCell → YOLO格式Python# prepare_livecell.py import json import cv2 import numpy as np from pathlib import Path from tqdm import tqdm import shutil def convert_livecell_to_yolo(livecell_root, output_root, val_ratio0.1): 将LiveCell COCO格式转换为YOLO格式 输出: images/, labels/ 以及 data.yaml livecell_root Path(livecell_root) output_root Path(output_root) # 创建目录结构 for split in [train, val]: (output_root / images / split).mkdir(parentsTrue, exist_okTrue) (output_root / labels / split).mkdir(parentsTrue, exist_okTrue) # 加载COCO标注 with open(livecell_root / livecell_coco_train.json) as f: coco_data json.load(f) # 构建图像id到文件名的映射 id_to_img {img[id]: img for img in coco_data[images]} # 按图像分组标注 img_to_annots {} for ann in coco_data[annotations]: img_id ann[image_id] if img_id not in img_to_annots: img_to_annots[img_id] [] img_to_annots[img_id].append(ann) # 细胞类型映射 (LiveCell有8种细胞系) cell_types { A172: 0, BT474: 1, BV2: 2, Huh7: 3, MCF7: 4, SHSY5Y: 5, SkBr3: 6, SKOV3: 7 } # 划分训练/验证集 all_img_ids list(img_to_annots.keys()) np.random.seed(42) np.random.shuffle(all_img_ids) split_idx int(len(all_img_ids) * (1 - val_ratio)) train_ids, val_ids all_img_ids[:split_idx], all_img_ids[split_idx:] def process_split(img_ids, split_name): for img_id in tqdm(img_ids, descfProcessing {split_name}): img_info id_to_img[img_id] img_name img_info[file_name] width, height img_info[width], img_info[height] # 复制图像 src_img livecell_root / images / img_name if not src_img.exists(): continue dst_img output_root / images / split_name / img_name shutil.copy(src_img, dst_img) # 转换标注为YOLO格式 label_file output_root / labels / split_name / f{Path(img_name).stem}.txt with open(label_file, w) as f: for ann in img_to_annots.get(img_id, []): # 获取边界框 x, y, w, h ann[bbox] # 归一化 x_center (x w/2) / width y_center (y h/2) / height w_norm w / width h_norm h / height # 细胞类型作为类别 cell_line ann.get(cell_line, unknown) cls_id cell_types.get(cell_line, 0) f.write(f{cls_id} {x_center:.6f} {y_center:.6f} {w_norm:.6f} {h_norm:.6f}\n) process_split(train_ids, train) process_split(val_ids, val) # 生成data.yaml yaml_content fpath: {output_root.absolute()} train: images/train val: images/val nc: 8 names: [A172, BT474, BV2, Huh7, MCF7, SHSY5Y, SkBr3, SKOV3] with open(output_root / data.yaml, w) as f: f.write(yaml_content) print(fConversion complete! Train: {len(train_ids)}, Val: {len(val_ids)}) if __name__ __main__: convert_livecell_to_yolo( livecell_root./LiveCell, output_root./cell_yolo_dataset )3.3 模型训练配置Python# train_yolo_cell.py from ultralytics import YOLO def train_cell_detector(): # 加载预训练的YOLOv8n (nano) 作为基础针对显微图像优化 model YOLO(yolov8n.pt) # 训练配置 - 针对小目标细胞优化 results model.train( data./cell_yolo_dataset/data.yaml, epochs200, imgsz640, batch16, device0, # 针对显微图像的小目标优化 rectTrue, # 矩形训练保持原始宽高比 cos_lrTrue, # 余弦学习率调度 # 数据增强 - 模拟显微成像变化 hsv_h0.015, # 色调变化 (显微图像色调稳定) hsv_s0.7, # 饱和度变化 hsv_v0.4, # 亮度变化 (模拟光照变化) degrees180, # 旋转 (细胞无方向性) translate0.1, scale0.5, # 缩放 (模拟不同物镜) shear0, perspective0.0, flipud0.5, # 垂直翻转 fliplr0.5, # 水平翻转 mosaic1.0, # Mosaic增强 mixup0.1, # Mixup增强 # 损失函数权重 - 强调定位精度 box7.5, # 边界框损失权重 cls0.5, # 分类损失权重 dfl1.5, # 分布焦点损失 # 优化器 optimizerAdamW, lr00.001, lrf0.01, # 早停和检查点 patience50, saveTrue, projectcell_detection, nameyolov8n_cell, # 验证 valTrue, iou0.6, # NMS IoU阈值 conf0.25, # 置信度阈值 ) # 导出为ONNX/TensorRT model.export(formatonnx, dynamicTrue, simplifyTrue) # model.export(formatengine, halfTrue, int8True) # TensorRT return model if __name__ __main__: model train_cell_detector()四、实时追踪引擎ByteTrack 分裂检测4.1 细胞追踪与分裂检测核心代码Python# cell_tracker.py import cv2 import numpy as np import torch from ultralytics import YOLO from collections import defaultdict from dataclasses import dataclass from typing import List, Dict, Tuple, Optional import time from scipy.optimize import linear_sum_assignment dataclass class CellTrack: track_id: int bbox: np.ndarray # [x1, y1, x2, y2] centroid: np.ndarray # [cx, cy] class_id: int score: float frame_id: int age: int 0 hits: int 0 time_since_update: int 0 state: str tentative # tentative, confirmed, deleted # 分裂检测相关 parent_id: Optional[int] None division_time: Optional[int] None lineage: List[int] None # 轨迹历史 trajectory: List[np.ndarray] None area_history: List[float] None intensity_history: List[float] None class CellDivisionDetector: 基于形态学变化的细胞分裂检测器 def __init__(self, area_growth_threshold1.8, # 面积增长阈值分裂前细胞变圆膨胀 area_shrink_threshold0.55, # 面积收缩阈值分裂后 iou_threshold0.3, # 子细胞IOU阈值 min_track_length10): # 最小追踪长度 self.area_growth_threshold area_growth_threshold self.area_shrink_threshold area_shrink_threshold self.iou_threshold iou_threshold self.min_track_length min_track_length # 存储待检测的候选分裂事件 self.candidates defaultdict(list) def update(self, track: CellTrack) - Optional[Dict]: 更新追踪器状态并检测分裂事件 返回: 分裂事件字典或None if track.trajectory is None or len(track.trajectory) 3: return None # 计算面积变化率 if len(track.area_history) 5: recent_area np.mean(track.area_history[-5:]) prev_area np.mean(track.area_history[-10:-5]) if len(track.area_history) 10 else track.area_history[0] area_ratio recent_area / prev_area if prev_area 0 else 1.0 # 检测分裂前兆细胞变圆、面积增大 if area_ratio self.area_growth_threshold and track.state confirmed: self.candidates[track.track_id].append({ frame: track.frame_id, area: recent_area, centroid: track.centroid, stage: pre_division }) # 检测分裂完成面积骤减细胞分裂为两个 elif area_ratio self.area_shrink_threshold and track.track_id in self.candidates: # 验证是否有新的tracks在附近出现 event { parent_id: track.track_id, division_frame: track.frame_id, parent_centroid: track.centroid, stage: divided, confidence: self._calculate_confidence(track) } del self.candidates[track.track_id] return event return None def _calculate_confidence(self, track: CellTrack) - float: 计算分裂事件的可信度 confidence 0.5 # 基于历史长度 if len(track.area_history) self.min_track_length: confidence 0.2 # 基于面积变化幅度 max_area max(track.area_history[-20:]) if len(track.area_history) 20 else max(track.area_history) min_area min(track.area_history[-5:]) if max_area 0: ratio min_area / max_area if ratio 0.5: confidence 0.3 return min(confidence, 1.0) class MicroscopeCellTracker: 显微细胞实时追踪系统 def __init__(self, model_path: str cell_detection/yolov8n_cell/weights/best.pt, track_buffer: int 30, min_confidence: float 0.4, min_iou: float 0.3, max_age: int 30): # 加载模型 self.model YOLO(model_path) self.model.fuse() # 融合ConvBN加速推理 # 追踪参数 self.track_buffer track_buffer self.min_confidence min_confidence self.min_iou min_iou self.max_age max_age # 状态管理 self.tracks: Dict[int, CellTrack] {} self.next_id 1 self.frame_count 0 # 分裂检测器 self.division_detector CellDivisionDetector() self.division_events [] # 性能统计 self.inference_times [] self.track_times [] def preprocess(self, frame: np.ndarray) - np.ndarray: 预处理显微图像 # 对比度增强 (CLAHE) lab cv2.cvtColor(frame, cv2.COLOR_BGR2LAB) l, a, b cv2.split(lab) clahe cv2.createCLAHE(clipLimit2.0, tileGridSize(8,8)) l clahe.apply(l) enhanced cv2.merge([l, a, b]) enhanced cv2.cvtColor(enhanced, cv2.COLOR_LAB2BGR) # 去噪 enhanced cv2.fastNlMeansDenoisingColored(enhanced, None, 10, 10, 7, 21) return enhanced def detect(self, frame: np.ndarray) - List[Dict]: YOLO检测 start time.time() results self.model(frame, verboseFalse, confself.min_confidence, iouself.min_iou, max_det200) # 显微图像可能有很多细胞 self.inference_times.append(time.time() - start) detections [] for r in results: boxes r.boxes.xyxy.cpu().numpy() scores r.boxes.conf.cpu().numpy() classes r.boxes.cls.cpu().numpy().astype(int) for box, score, cls in zip(boxes, scores, classes): detections.append({ bbox: box, score: score, class: cls }) return detections def iou_distance(self, tracks: List[CellTrack], detections: List[Dict]) - np.ndarray: 计算IOU距离矩阵 if len(tracks) 0 or len(detections) 0: return np.empty((len(tracks), len(detections))) track_bboxes np.array([t.bbox for t in tracks]) det_bboxes np.array([d[bbox] for d in detections]) # 计算IOU x1 np.maximum(track_bboxes[:, None, 0], det_bboxes[None, :, 0]) y1 np.maximum(track_bboxes[:, None, 1], det_bboxes[None, :, 1]) x2 np.minimum(track_bboxes[:, None, 2], det_bboxes[None, :, 2]) y2 np.minimum(track_bboxes[:, None, 3], det_bboxes[None, :, 3]) intersection np.maximum(0, x2 - x1) * np.maximum(0, y2 - y1) area_track (track_bboxes[:, 2] - track_bboxes[:, 0]) * (track_bboxes[:, 3] - track_bboxes[:, 1]) area_det (det_bboxes[:, 2] - det_bboxes[:, 0]) * (det_bboxes[:, 3] - det_bboxes[:, 1]) union area_track[:, None] area_det[None, :] - intersection iou intersection / (union 1e-6) return 1 - iou # 转换为距离 def update(self, frame: np.ndarray) - Tuple[List[CellTrack], List[Dict]]: 主更新函数检测 追踪 分裂检测 start time.time() self.frame_count 1 # 1. 预处理 processed self.preprocess(frame) # 2. 检测 detections self.detect(processed) # 3. 卡尔曼预测 (简化版使用线性预测) for track in self.tracks.values(): if track.state ! deleted: track.time_since_update 1 # 简单的速度预测 if len(track.trajectory) 2: velocity track.trajectory[-1] - track.trajectory[-2] track.centroid track.centroid velocity # 更新bbox中心 cx, cy track.centroid w track.bbox[2] - track.bbox[0] h track.bbox[3] - track.bbox[1] track.bbox np.array([cx - w/2, cy - h/2, cx w/2, cy h/2]) # 4. 匹配 (匈牙利算法) active_tracks [t for t in self.tracks.values() if t.state ! deleted and t.time_since_update self.max_age] if len(active_tracks) 0 and len(detections) 0: cost_matrix self.iou_distance(active_tracks, detections) row_ind, col_ind linear_sum_assignment(cost_matrix) matched_indices set() unmatched_detections set(range(len(detections))) # 更新匹配的tracks for r, c in zip(row_ind, col_ind): if cost_matrix[r, c] 1 - self.min_iou: # IOU threshold track active_tracks[r] det detections[c] # 更新track track.bbox det[bbox] track.centroid np.array([ (det[bbox][0] det[bbox][2]) / 2, (det[bbox][1] det[bbox][3]) / 2 ]) track.score det[score] track.frame_id self.frame_count track.time_since_update 0 track.hits 1 # 更新历史 track.trajectory.append(track.centroid.copy()) if len(track.trajectory) self.track_buffer: track.trajectory.pop(0) area (det[bbox][2] - det[bbox][0]) * (det[bbox][3] - det[bbox][1]) track.area_history.append(area) if len(track.area_history) 50: track.area_history.pop(0) # 状态转移 if track.state tentative and track.hits 3: track.state confirmed # 分裂检测 division_event self.division_detector.update(track) if division_event: self.division_events.append(division_event) matched_indices.add(c) unmatched_detections.discard(c) else: unmatched_detections set(range(len(detections))) # 5. 处理未匹配的detections (新细胞) for i in unmatched_detections: det detections[i] new_track CellTrack( track_idself.next_id, bboxdet[bbox], centroidnp.array([ (det[bbox][0] det[bbox][2]) / 2, (det[bbox][1] det[bbox][3]) / 2 ]), class_iddet[class], scoredet[score], frame_idself.frame_count, trajectory[np.array([ (det[bbox][0] det[bbox][2]) / 2, (det[bbox][1] det[bbox][3]) / 2 ])], area_history[(det[bbox][2] - det[bbox][0]) * (det[bbox][3] - det[bbox][1])], lineage[self.next_id] ) self.tracks[self.next_id] new_track self.next_id 1 # 6. 标记丢失的tracks for track in active_tracks: if track.time_since_update self.max_age: track.state deleted self.track_times.append(time.time() - start) # 返回当前活跃的tracks和分裂事件 active [t for t in self.tracks.values() if t.state confirmed] recent_divisions [e for e in self.division_events if self.frame_count - e[division_frame] 10] return active, recent_divisions def draw_tracks(self, frame: np.ndarray, tracks: List[CellTrack], divisions: List[Dict]) - np.ndarray: 可视化追踪结果 vis frame.copy() # 绘制细胞 for track in tracks: x1, y1, x2, y2 track.bbox.astype(int) # 根据细胞类型选择颜色 colors [(0, 255, 0), (255, 0, 0), (0, 0, 255), (255, 255, 0), (255, 0, 255), (0, 255, 255), (128, 255, 0), (255, 128, 0)] color colors[track.class_id % len(colors)] # 绘制边界框 cv2.rectangle(vis, (x1, y1), (x2, y2), color, 2) # 绘制轨迹 if len(track.trajectory) 1: points np.array(track.trajectory, dtypenp.int32) for i in range(1, len(points)): cv2.line(vis, tuple(points[i-1]), tuple(points[i]), color, 1) # 绘制ID和置信度 label fID:{track.track_id} {track.score:.2f} cv2.putText(vis, label, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) # 绘制中心点 cx, cy track.centroid.astype(int) cv2.circle(vis, (cx, cy), 3, (0, 0, 255), -1) # 绘制分裂事件 for div in divisions: cx, cy div[parent_centroid].astype(int) cv2.circle(vis, (cx, cy), 20, (0, 255, 255), 3) cv2.putText(vis, DIVISION!, (cx-40, cy-25), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) # 绘制统计信息 info fFrame: {self.frame_count} | Active: {len(tracks)} | Divisions: {len(self.division_events)} cv2.putText(vis, info, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) # 实时FPS if len(self.inference_times) 0: avg_infer np.mean(self.inference_times[-30:]) * 1000 avg_track np.mean(self.track_times[-30:]) * 1000 fps_info fInference: {avg_infer:.1f}ms | Track: {avg_track:.1f}ms cv2.putText(vis, fps_info, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1) return vis # 实时处理流程 def process_microscope_stream(source0, output_pathNone): 处理显微镜视频流 source: 0为摄像头或视频文件路径 tracker MicroscopeCellTracker( model_pathcell_detection/yolov8n_cell/weights/best.pt, track_buffer30, min_confidence0.4 ) cap cv2.VideoCapture(source) if not cap.isOpened(): print(fError: Cannot open source {source}) return # 获取视频信息 fps int(cap.get(cv2.CAP_PROP_FPS)) or 30 width int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # 视频写入器 writer None if output_path: fourcc cv2.VideoWriter_fourcc(*mp4v) writer cv2.VideoWriter(output_path, fourcc, fps, (width, height)) print(Starting cell tracking... Press q to quit) while True: ret, frame cap.read() if not ret: break # 追踪更新 tracks, divisions tracker.update(frame) # 可视化 vis tracker.draw_tracks(frame, tracks, divisions) # 显示 cv2.imshow(Cell Tracking, vis) if writer: writer.write(vis) if cv2.waitKey(1) 0xFF ord(q): break cap.release() if writer: writer.release() cv2.destroyAllWindows() # 输出统计 print(f\nTracking complete!) print(fTotal frames: {tracker.frame_count}) print(fTotal tracks: {tracker.next_id - 1}) print(fDivision events: {len(tracker.division_events)}) print(fAverage inference time: {np.mean(tracker.inference_times)*1000:.2f}ms) if __name__ __main__: # 处理视频文件或摄像头 process_microscope_stream( source./test_video.avi, # 替换为0使用摄像头 output_path./tracked_output.mp4 )五、TensorRT/ONNX 加速部署5.1 模型转换与优化Python# optimize_model.py from ultralytics import YOLO import tensorrt as trt import pycuda.driver as cuda import pycuda.autoinit import numpy as np def export_tensorrt(model_path, output_path, fp16True, int8False, workspace4): 导出TensorRT引擎文件 model YOLO(model_path) # 导出ONNX onnx_path model_path.replace(.pt, .onnx) model.export(formatonnx, dynamicTrue, simplifyTrue) # 使用trtexec构建引擎 (命令行方式更稳定) import subprocess cmd [ trtexec, f--onnx{onnx_path}, f--saveEngine{output_path}, f--workspace{workspace * 1024}, # MB --minShapesimages:1x3x640x640, --optShapesimages:4x3x640x640, --maxShapesimages:8x3x640x640 ] if fp16: cmd.append(--fp16) if int8: cmd.append(--int8) cmd.append(--calibInt8) # 需要校准数据 print(Building TensorRT engine...) subprocess.run(cmd, checkTrue) print(fEngine saved to {output_path}) class TensorRTInference: TensorRT推理封装 def __init__(self, engine_path): self.logger trt.Logger(trt.Logger.WARNING) # 加载引擎 with open(engine_path, rb) as f: runtime trt.Runtime(self.logger) self.engine runtime.deserialize_cuda_engine(f.read()) self.context self.engine.create_execution_context() # 分配内存 self.inputs [] self.outputs [] self.bindings [] for i in range(self.engine.num_bindings): binding self.engine[i] size trt.volume(self.engine.get_binding_shape(binding)) dtype trt.nptype(self.engine.get_binding_dtype(binding)) # 分配GPU内存 mem cuda.mem_alloc(size * np.dtype(dtype).itemsize) self.bindings.append(int(mem)) if self.engine.binding_is_input(binding): self.inputs.append(mem) else: self.outputs.append(mem) self.stream cuda.Stream() def infer(self, input_image): # 预处理 input_image input_image.astype(np.float32) / 255.0 input_image np.transpose(input_image, (2, 0, 1)) # HWC - CHW input_image np.expand_dims(input_image, axis0) # Add batch # 拷贝到GPU cuda.memcpy_htod_async(self.inputs[0], input_image.ravel(), self.stream) # 执行推理 self.context.execute_async_v2(bindingsself.bindings, stream_handleself.stream.handle) # 拷贝回CPU output np.empty((1, 84, 8400), dtypenp.float32) # YOLOv8输出形状 cuda.memcpy_dtoh_async(output, self.outputs[0], self.stream) self.stream.synchronize() return output # 使用示例 if __name__ __main__: # 导出优化模型 export_tensorrt( model_pathcell_detection/yolov8n_cell/weights/best.pt, output_pathcell_yolov8n.trt, fp16True, int8False )六、药物筛选分析模块6.1 细胞响应量化分析Python# drug_response_analysis.py import pandas as pd import numpy as np from scipy import stats from scipy.optimize import curve_fit import matplotlib.pyplot as plt import seaborn as sns from dataclasses import dataclass from typing import List, Dict import json dataclass class CellLineage: 细胞谱系树 root_id: int generations: List[List[int]] # 每一代的细胞ID division_times: List[float] # 每次分裂的时间点 drug_added_frame: int total_lifespan: float class DrugResponseAnalyzer: 药物响应分析器 量化指标 1. 分裂速率抑制 (Division Rate Inhibition) 2. 细胞周期延长 (Cell Cycle Arrest) 3. 凋亡检测 (Apoptosis Detection) 4. 形态学变化 (Morphological Changes) def __init__(self, control_group: List[CellLineage], treatment_groups: Dict[str, List[CellLineage]]): self.control control_group self.treatments treatment_groups self.results {} def calculate_division_rate(self, lineages: List[CellLineage], time_window: tuple None) - Dict: 计算分裂速率 rates [] for lin in lineages: if time_window: divs [t for t in lin.division_times if time_window[0] t time_window[1]] else: divs lin.division_times if len(divs) 1: intervals np.diff(divs) rates.extend(1.0 / intervals) # 分裂频率 (frames^-1) return { mean_rate: np.mean(rates) if rates else 0, std_rate: np.std(rates) if rates else 0, median_interval: np.median(1.0 / np.array(rates)) if rates else 0 } def fit_ic50(self, concentrations: List[float], responses: List[float]) - Dict: 拟合剂量-响应曲线计算IC50 Hill方程: response bottom (top - bottom) / (1 10^((logIC50 - logC) * hill)) def hill_equation(x, bottom, top, log_ic50, hill): return bottom (top - bottom) / (1 10**((log_ic50 - x) * hill)) log_conc np.log10(concentrations) try: popt, _ curve_fit(hill_equation, log_conc, responses, p0[0, 100, np.median(log_conc), 1], bounds([0, 0, -10, 0.1], [100, 100, 10, 5])) bottom, top, log_ic50, hill popt ic50 10 ** log_ic50 return { IC50: ic50, Hill_slope: hill, E_max: top, E_min: bottom, curve_params: popt } except: return {IC50: None, error: Fitting failed} def detect_apoptosis(self, track_history: List[Dict], area_shrink_threshold: float 0.3, intensity_drop_threshold: float 0.4) - List[int]: 基于形态学特征检测凋亡事件 特征细胞皱缩、亮度增加、碎片化 apoptotic_events [] for i, frame_data in enumerate(track_history): if i 5: continue # 计算最近5帧的变化 recent_areas [f[area] for f in track_history[i-5:i]] recent_intensity [f[mean_intensity] for f in track_history[i-5:i]] area_trend np.polyfit(range(5), recent_areas, 1)[0] intensity_change (recent_intensity[-1] - recent_intensity[0]) / recent_intensity[0] # 凋亡判定快速收缩 亮度增加 if area_trend -area_shrink_threshold * np.mean(recent_areas) and \ intensity_change intensity_drop_threshold: apoptotic_events.append(i) return apoptotic_events def generate_report(self, output_path: str): 生成药物筛选报告 # 计算对照组基线 control_stats self.calculate_division_rate(self.control) # 计算各处理组 treatment_stats {} for drug, lineages in self.treatments.items(): stats self.calculate_division_rate(lineages) inhibition (control_stats[mean_rate] - stats[mean_rate]) / control_stats[mean_rate] * 100 treatment_stats[drug] { **stats, inhibition_%: inhibition } # 创建可视化 fig, axes plt.subplots(2, 2, figsize(14, 12)) # 1. 分裂速率对比 ax1 axes[0, 0] drugs list(treatment_stats.keys()) inhibitions [treatment_stats[d][inhibition_%] for d in drugs] colors [green if x 30 else orange if x 70 else red for x in inhibitions] ax1.bar([Control] drugs, [0] inhibitions, color[blue] colors) ax1.set_ylabel(Division Rate Inhibition (%)) ax1.set_title(Drug Effect on Cell Division) ax1.axhline(y50, colorr, linestyle--, labelIC50 threshold) # 2. 细胞周期分布 ax2 axes[0, 1] cycle_times [] labels [] for drug, lineages in [(Control, self.control)] list(self.treatments.items()): times [np.mean(np.diff(lin.division_times)) for lin in lineages if len(lin.division_times) 1] cycle_times.extend(times) labels.extend([drug] * len(times)) df_cycle pd.DataFrame({Drug: labels, Cycle_Time: cycle_times}) sns.boxplot(datadf_cycle, xDrug, yCycle_Time, axax2) ax2.set_ylabel(Cell Cycle Duration (frames)) ax2.set_title(Cell Cycle Distribution) # 3. 谱系树可视化 (简化) ax3 axes[1, 0] # 绘制对照组vs处理组的世代数分布 gen_counts_control [len(lin.generations) for lin in self.control] gen_counts_treat [len(lin.generations) for lin in list(self.treatments.values())[0]] ax3.hist([gen_counts_control, gen_counts_treat], binsrange(1, max(gen_counts_control)2), label[Control, Treatment], alpha0.7) ax3.set_xlabel(Number of Generations) ax3.set_ylabel(Cell Count) ax3.set_title(Proliferation Depth) ax3.legend() # 4. 时间-响应曲线 ax4 axes[1, 1] # 模拟时间序列数据 time_points np.arange(0, 72, 2) # 72小时每2小时 control_growth 100 * (2 ** (time_points / 24)) # 24小时倍增 treat_growth 100 * (1.3 ** (time_points / 24)) # 受抑制 ax4.semilogy(time_points, control_growth, b-, labelControl, linewidth2) ax4.semilogy(time_points, treat_growth, r-, labelTreatment, linewidth2) ax4.set_xlabel(Time (hours)) ax4.set_ylabel(Cell Count (log scale)) ax4.set_title(Growth Curve) ax4.legend() ax4.grid(True, alpha0.3) plt.tight_layout() plt.savefig(f{output_path}/drug_response_analysis.png, dpi300) # 保存JSON报告 report { control_baseline: control_stats, treatment_results: treatment_stats, sample_size: { control: len(self.control), treatments: {k: len(v) for k, v in self.treatments.items()} } } with open(f{output_path}/analysis_report.json, w) as f: json.dump(report, f, indent2) print(fReport saved to {output_path}) return report # 使用示例 if __name__ __main__: # 假设已从追踪数据构建好谱系树 control_lineages [] # 从tracker.division_events构建 treatment_lineages { Drug_A_10uM: [], Drug_A_50uM: [], Drug_B_10uM: [] } analyzer DrugResponseAnalyzer(control_lineages, treatment_lineages) report analyzer.generate_report(./output)七、完整启动脚本bash#!/bin/bash # start_cell_tracking_system.sh echo 显微细胞动态追踪系统启动 # 1. 环境检查 echo 检查CUDA... nvidia-smi # 2. 数据准备 (如果未下载) if [ ! -d ./LiveCell ]; then echo 下载数据集... bash download_datasets.sh fi # 3. 数据转换 (如果未转换) if [ ! -d ./cell_yolo_dataset ]; then echo 转换数据格式... python prepare_livecell.py fi # 4. 训练模型 (如果未训练) if [ ! -f ./cell_detection/yolov8n_cell/weights/best.pt ]; then echo 训练模型... python train_yolo_cell.py fi # 5. 导出优化模型 echo 导出TensorRT引擎... python optimize_model.py # 6. 启动实时追踪 echo 启动实时追踪... python cell_tracker.py echo 系统运行中...这套系统可直接部署于配备NVIDIA GPU的工作站支持从数据准备到药物筛选的全流程自动化。