背景痛点当模型评测遇上“成长的烦恼”最近在关注大模型的发展发现一个有趣的现象新的模型如雨后春笋般涌现但如何公平、高效地比较它们的能力却成了一个技术难题。传统的单机评测脚本在面对几十甚至上百个模型的并发请求时常常力不从心。我自己尝试搭建一个简易的评测平台时就遇到了几个典型的“痛点”并发压力剧增当需要同时评测多个模型对同一批问题的回复时简单的同步请求会导致响应时间直线上升用户体验极差。评测标准一致性如何确保同一个问题在不同时间、不同服务器实例上提交给模型时评测环境如参数、上下文是完全一致的这直接关系到结果的公平性。资源管理与隔离不同模型的资源消耗GPU内存、计算时间差异巨大。一个“慢吞吞”的大模型任务可能会阻塞整个队列影响其他轻量级模型的评测进度。任务状态追踪与容错一个评测任务可能耗时几分钟甚至更久如何让用户实时了解进度任务中途失败后如何优雅地重试或清理资源这些痛点促使我思考需要一个更健壮、可扩展的架构来支撑一个真正的“Chatbot Arena”评测网站。技术选型为什么是FastAPI Celery在技术选型阶段我重点对比了Python生态中几个流行的Web框架在构建此类实时、异步密集型应用时的表现。Flask轻量灵活生态成熟。但对于原生异步asyncio支持较弱通常需要配合gevent或eventlet来实现并发在管理大量并发WebSocket连接或异步IO任务时配置和调试相对复杂。Django功能全面开箱即用自带ORM和Admin后台。但其设计哲学偏重同步和“全能”在需要精细控制异步流程和高并发WebSocket的场景下显得有些笨重虽然Django Channels提供了支持但整体架构不如专为异步设计的框架纯粹。FastAPI基于Starlette和Pydantic原生支持asyncio性能卓越。它自动生成交互式API文档的特性对于需要暴露大量评测API的网站非常友好。最关键的是它对WebSocket的支持非常简洁直观非常适合用来推送评测任务的实时状态更新。基于以上分析我选择了FastAPI作为Web框架。对于后台耗时的模型调用评测任务自然引入了Celery作为分布式任务队列。Celery成熟稳定支持多种消息代理如Redis、RabbitMQ可以轻松地将任务分发到多台worker机器上执行实现评测任务的解耦和水平扩展。最终技术栈FastAPI (Web框架 WebSocket) Celery (任务队列) Redis (消息代理 缓存) PostgreSQL (评测结果存储) Prometheus/Grafana (监控)。核心实现构建高可用的评测流水线1. 系统架构图整个系统的核心是围绕一个异步任务队列构建的。用户通过Web界面或API提交一个评测任务例如用模型A和模型B回答10个问题Web服务层接收请求后并不直接调用模型而是向Celery队列抛出一个任务。Celery Worker们可以部署在多台服务器上从队列中领取任务执行具体的模型调用、结果记录等耗时操作最后将状态和结果回写。graph TD subgraph “Client Layer” A[User Browser/API Client] end subgraph “Web Service Layer (FastAPI)” B[FastAPI Server] B -- C[WebSocket Manager] B -- D[Task Dispatcher] end subgraph “Message Queue (Redis)” E[Celery Task Queue] end subgraph “Worker Layer (Celery)” F[Worker Node 1] G[Worker Node 2] H[Worker Node ...] end subgraph “External Services” I[LLM API e.g., OpenAI, Claude] J[Database] K[Cache] end subgraph “Monitoring” L[Prometheus Metrics] M[Grafana Dashboard] end A -- HTTP/WebSocket -- B D -- Push Task -- E E -- Consume -- F E -- Consume -- G E -- Consume -- H F -- Call API -- I G -- Call API -- I F -- Store Result -- J G -- Store Result -- J B -- Query/Write -- K F G B -- Expose Metrics -- L L -- Data Source -- M这个架构的关键在于解耦。Web服务层变得无状态且快速压力被转移到可水平扩展的Worker层。Redis作为消息代理确保了任务的不丢失配合持久化和高效传递。2. 评测任务的幂等性设计评测任务可能因为网络波动、Worker崩溃等原因被重复执行。我们必须保证同一任务相同唯一ID即使被多次执行也不会产生重复或错误的结果例如在数据库里插入两条相同的评测记录。这就是幂等性。实现思路是为每个评测请求生成一个全局唯一的任务ID如UUID并在任务执行前在Redis中设置一个锁SETNXkeytask_id, value‘running’并设置过期时间。如果设置成功才继续执行如果失败key已存在则直接返回之前的结果。下面是一个Celery任务示例包含了幂等性检查和基本的异常处理import uuid from typing import Dict, Any, Optional from celery import Celery from redis import Redis, RedisError from pydantic import BaseModel, Field import logging # 定义评测任务参数模型 class EvaluationRequest(BaseModel): model_id: str question_set_id: str parameters: Dict[str, Any] Field(default_factorydict) app Celery(evaluation_worker, brokerredis://localhost:6379/0) # 初始化Redis客户端用于分布式锁 redis_client Redis(hostlocalhost, port6379, db1, decode_responsesTrue) logger logging.getLogger(__name__) app.task(bindTrue, max_retries3) def evaluate_model_task(self, task_id: str, request_data: Dict[str, Any]) - Optional[Dict[str, Any]]: 执行模型评测的Celery任务。 通过Redis分布式锁实现幂等性。 request EvaluationRequest(**request_data) lock_key feval_lock:{task_id} result_key feval_result:{task_id} # 1. 尝试获取分布式锁实现幂等 try: # SETNX EXPIRE 的原子操作防止死锁 acquired redis_client.set(lock_key, processing, nxTrue, ex300) # 锁持有5分钟 if not acquired: # 锁已存在说明任务正在执行或已完成 logger.info(fTask {task_id} is already being processed or completed.) # 尝试获取已缓存的结果 cached_result redis_client.get(result_key) if cached_result: return {status: success, from_cache: True, result: cached_result} return {status: pending, message: Evaluation in progress.} except RedisError as e: logger.error(fRedis error when acquiring lock for task {task_id}: {e}) # Redis异常允许任务执行依赖数据库唯一约束作为最后防线 pass # 2. 执行业务逻辑模型调用、评估等 try: logger.info(fStarting evaluation for task {task_id}, model {request.model_id}) # 这里是模拟的评测逻辑实际中会调用对应的模型API # 例如response call_llm_api(request.model_id, request.question_set_id, request.parameters) simulated_result { score: 0.85, latency: 2.3, details: {...} } # 3. 将结果存入持久化存储如PostgreSQL和缓存 # save_to_database(task_id, request, simulated_result) # 伪代码 try: # 缓存结果设置1小时TTL redis_client.setex(result_key, 3600, str(simulated_result)) except RedisError as e: logger.warning(fFailed to cache result for task {task_id}: {e}) # 4. 清理锁 try: redis_client.delete(lock_key) except RedisError: pass return {status: success, from_cache: False, result: simulated_result} except Exception as e: logger.exception(fEvaluation failed for task {task_id}: {e}) # 任务失败清理锁允许重试 try: redis_client.delete(lock_key) except RedisError: pass # 触发Celery重试 raise self.retry(exce, countdown60) app.task def submit_evaluation_job(model_id: str, question_set_id: str) - str: 提交评测作业的入口函数由Web层调用 task_id str(uuid.uuid4()) request_data EvaluationRequest(model_idmodel_id, question_set_idquestion_set_id).dict() # 异步发送任务 evaluate_model_task.apply_async(args(task_id, request_data), task_idtask_id) return task_id # 返回任务ID用于查询状态3. 可观测性Prometheus监控指标埋点对于一个运行中的评测平台我们需要实时了解它的健康状态任务队列积压情况、Worker负载、不同模型的平均响应时间与成功率、API调用速率等。Prometheus是完成这个任务的绝佳工具。我们在FastAPI应用和Celery Worker中都埋点了指标。以下是在FastAPI中使用prometheus-fastapi-instrumentator进行关键指标采集的示例# main.py (FastAPI应用入口) from fastapi import FastAPI, Request from prometheus_fastapi_instrumentator import Instrumentator, metrics from prometheus_client import Counter, Histogram, Gauge import time app FastAPI() # 自定义指标 EVALUATION_REQUESTS Counter( evaluation_requests_total, Total number of evaluation requests submitted, [model_id, status] # 按模型和状态success, error打标签 ) EVALUATION_DURATION Histogram( evaluation_duration_seconds, Duration of evaluation processing in seconds, [model_id], buckets(0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, float(inf)) ) TASKS_IN_QUEUE Gauge( celery_tasks_in_queue, Current number of tasks waiting in the Celery queue, [queue_name] ) # 使用instrumentator自动添加默认指标请求数、延迟等 instrumentator Instrumentator() instrumentator.add(metrics.latency(buckets(0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, float(inf)))) instrumentator.add(metrics.requests()) instrumentator.instrument(app).expose(app) app.middleware(http) async def monitor_requests(request: Request, call_next): 中间件用于记录自定义请求指标 start_time time.time() model_id request.query_params.get(model_id, unknown) response await call_next(request) duration time.time() - start_time status success if response.status_code 400 else error # 记录指标 EVALUATION_REQUESTS.labels(model_idmodel_id, statusstatus).inc() EVALUATION_DURATION.labels(model_idmodel_id).observe(duration) return response # 一个定时任务可通过Celery Beat或APScheduler实现来更新队列长度指标 # 这里简化表示 # def update_queue_metrics(): # from celery import current_app # inspector current_app.control.inspect() # active_queues inspector.active_queues() or {} # for worker, queues in active_queues.items(): # for q in queues: # # 获取队列长度需要根据实际使用的broker实现如redis的LLEN # # length get_redis_queue_length(q[name]) # # TASKS_IN_QUEUE.labels(queue_nameq[name]).set(length)在Grafana中我们可以将这些指标绘制成丰富的仪表盘一眼就能看出系统的瓶颈在哪里是某个模型API变慢了还是任务队列积压了性能优化从200 QPS到1500 QPS的实战架构搭建好后我用Locust进行了压力测试。初始版本简单粗暴地为每个请求创建新的数据库和Redis连接在500并发用户下QPS只有可怜的200左右并且错误率飙升。优化点1连接池化这是提升最大的改动。为数据库如asyncpg或SQLAlchemywithaiomysql和Redisaioredis配置连接池避免频繁创建和销毁连接的开销。优化点2异步化与流式响应将可能阻塞的IO操作如读写数据库、调用外部模型API全部改为异步。对于模型返回的文本流采用Server-Sent Events (SSE) 或 WebSocket 进行流式推送避免前端长时间等待。优化点3缓存策略评测结果缓存如上文代码所示成功的评测结果缓存1小时。模型元信息缓存模型列表、配置信息等不常变化的数据缓存时间可以更长。问题集缓存评测用的标准问题集可以全部缓存在内存中。优化点4Celery Worker配置优化使用gevent或eventlet池提高Worker处理IO密集型任务网络请求的并发能力。根据任务类型划分队列例如fast队列处理轻量任务slow队列处理大模型任务并为它们分配不同数量和配置的Worker。经过上述优化再次进行压力测试QPS稳定达到了1500平均响应时间控制在200ms以内。避坑指南那些我踩过的“坑”模型冷启动超时某些云端模型服务在长时间无调用后首次请求会有数十秒的“冷启动”延迟直接导致Celery任务超时失败。解决方案为这类任务设置更长的超时时间task_soft_time_limit,task_time_limit。同时实现一个简单的“预热”机制定期向空闲模型发送一个轻量级请求保持其活跃状态。评测结果缓存的TTL陷阱最初我给所有结果设置了固定的24小时TTL。后来发现当模型更新后用户希望立即用新模型重新评测但看到的仍是旧的缓存结果。解决方案将缓存键与模型版本号绑定例如eval_result:{model_id}:v2.1:{question_set_id}。当模型升级时版本号变化自然就绕过了旧缓存。或者提供一个管理员接口用于主动清除指定模型的评测缓存。消息队列积压Backpressure当提交任务的速度持续超过Worker处理速度时Redis中的任务队列会无限增长最终可能导致Redis内存耗尽。解决方案实施背压Backpressure机制。在Web层实时监控队列长度通过Prometheus指标。当队列长度超过阈值如10000时API开始返回503 Service Unavailable或让用户进入排队状态从源头限制任务提交速率。WebSocket连接管理用户通过WebSocket连接查看任务实时日志。当用户刷新页面或断开连接时需要清理服务器端对应的连接资源否则会导致内存泄漏。解决方案使用WeakKeyDictionary或类似结构管理连接对象并确保在连接断开on_disconnect事件中移除所有对该连接的引用和订阅。延伸思考让AI来写评测报告基础的评分和排名已经很有用但一份优秀的评测报告还需要深度的分析和洞察。下一步我计划引入LLM例如豆包大模型来自动化这部分工作。设计思路数据聚合从数据库中提取一次评测活动中所有模型的表现数据得分、响应时间、错误类型分布等。提示词工程设计一个结构化的提示词Prompt要求LLM扮演一个“AI评测专家”基于提供的数据总结各模型优劣、指出在特定类型问题如逻辑推理、创意写作上的表现差异、并给出可视化的建议例如“建议用柱状图对比各模型在数学题上的准确率”。异步报告生成将数据聚合和调用LLM生成报告的过程封装成一个Celery任务。报告生成后可以存储为HTML或Markdown文件并提供下载链接。可定制化允许用户选择报告的重点如更关注速度还是质量并将其作为参数传递给提示词生成更具针对性的报告。这样一来每次评测活动结束后系统不仅能产出排名还能自动生成一份初步的分析报告极大提升了平台的价值和用户体验。总结与资源构建一个高可用的Chatbot Arena评测网站核心在于理解并处理好“异步”、“解耦”、“可观测”和“容错”这几个关键点。通过FastAPI处理实时前端交互CeleryRedis管理后台重型任务再辅以完善的监控和缓存策略就能搭建出一个稳定、可扩展的系统。这个项目让我对现代AI应用的后端架构有了更深的理解。如果你也对构建类似的AI评测平台或应用感兴趣不妨从从0打造个人豆包实时通话AI这个实验开始。它虽然聚焦实时语音但其“ASR - LLM - TTS”的异步任务编排思想与本文的“任务提交 - 模型调用 - 结果处理”流水线异曲同工。通过那个实验你可以更直观地感受到如何将多个AI服务串联起来构建一个完整的交互闭环这对于理解复杂AI应用的架构非常有帮助。进一步阅读建议Celery官方文档特别是关于任务路由、重试和监控的部分。FastAPI的依赖注入系统和后台任务BackgroundTasks机制用于处理轻量级异步操作。Prometheus的四种指标类型Counter, Gauge, Histogram, Summary及其适用场景。关于分布式锁和幂等性的更深入讨论可以查阅数据库相关文献。希望这篇从实战出发的解析能为你构建自己的AI评测系统或类似的高并发应用提供一些切实可行的思路。