最近在搞电商智能客服系统的优化踩了不少坑也积累了一些实战经验。电商大促期间客服系统简直就是“压力测试”的终极考场万级并发、意图识别不准、响应延迟这些问题一个比一个棘手。今天就来聊聊我们是怎么从架构和性能两个层面把一个“摇摇欲坠”的系统优化到能扛住大流量的。一、背景与痛点大促时的“惊心动魄”没优化之前我们的系统在大促时基本处于“半瘫痪”状态。主要问题集中在三个方面并发压力巨大零点秒杀瞬间涌入数万用户的咨询请求。传统的同步处理模型服务器线程池瞬间被打满大量请求排队超时用户体验极差。长尾问题处理乏力用户的问题千奇百怪规则引擎维护的成本越来越高且无法覆盖所有场景。传统机器学习模型如SVM、朴素贝叶斯在应对“商品A和商品B哪个更适合我妈妈”这类复杂、未见过的问题时准确率骤降。多轮对话状态维护混乱用户咨询退货需要先后提供订单号、退货原因、图片凭证。对话状态如果维护不好用户换个说法或者中途插入其他问题机器人就容易“失忆”导致流程重启用户非常恼火。这些问题直接导致了客服成本上升和用户满意度下降。优化势在必行。二、技术选型对比从规则到深度学习的演进在核心的意图识别和实体抽取任务上我们对比了几种方案规则引擎初期快速上线必备。优点是明确、可控、零延迟。缺点是维护成本指数级增长无法处理未预定义的句式灵活度极差。适合处理“查物流”、“找订单”这类高度结构化的问题。传统机器学习如SVM特征工程比规则引擎智能一些能处理一些模式相近的变体。但严重依赖人工设计特征如是否包含某些关键词、词性标签等对于语义相似但表述迥异的问题如“怎么付款”和“支付方式有哪些”效果一般且泛化能力有限。深度学习如Transformer/BERT当前的主流选择。通过预训练模型获取深层次的语义表示在意图分类和实体抽取任务上表现突出。特别是对于长尾、复杂问题通过微调Fine-tuning可以很好地适应我们的电商领域。缺点是模型较大推理有延迟且需要一定的标注数据。我们的结论是放弃“一招鲜”的想法采用混合策略。高频、简单的任务用规则引擎快速响应复杂的、语义相关的任务交给深度学习模型。下面重点分享深度学习方案的架构与优化。三、核心架构设计让系统变得“弹性”且“智能”整个系统我们分成了几个核心模块目标是实现高可用和高性能。异步消息队列削峰填谷这是应对高并发的第一道防线。所有用户请求不再直接冲击业务处理服务而是先丢到RabbitMQ或Kafka队列中。我们设计了多个队列根据请求优先级如“售前咨询” vs “物流查询”进行路由。后端的Worker服务从队列中消费消息进行异步处理。这样即使瞬间流量再大也只是让队列变长而不会击垮服务。基于Kubernetes的自动扩缩容光有队列还不够处理能力也得能弹性伸缩。我们将模型推理服务、业务逻辑服务都部署在K8s上并配置了HPAHorizontal Pod Autoscaler。监控指标主要看两个消息队列的堆积长度和服务的CPU使用率。当队列堆积超过阈值或CPU使用率持续高位时K8s会自动增加Pod副本数当流量低谷时自动缩减节省资源。领域自适应的预训练模型微调意图识别的核心是模型。我们选择BERT作为基座模型因为它强大的语义理解能力。但通用BERT对电商垂直领域的术语如“SKU”、“满减”、“预售定金”不敏感。因此我们进行了领域自适应预训练Domain-Adaptive Pre-training步骤一收集海量的电商场景文本如商品标题、描述、用户评论、客服历史对话脱敏后。步骤二用这些文本继续训练Continue Pre-trainingBERT模型让模型更好地学习电商领域的语言分布。这个过程也叫Domain-Adaptive Pre-training。步骤三用我们标注好的意图分类数据例如标注了“询问价格”、“咨询售后”、“比较商品”等类别对上述领域模型进行有监督微调Supervised Fine-tuning。经过这两步模型的意图识别准确率从通用BERT的85%左右提升到了92%以上。四、关键代码实现片段理论说完了上点干货看看部分核心代码是怎么实现的。1. 异步消息处理装饰器我们写了一个装饰器让任何处理函数都能轻松接入消息队列。import asyncio import functools from typing import Callable, Any import pika import json class AsyncMessageProcessor: 异步消息处理器装饰器类 def __init__(self, queue_name: str, hostlocalhost): self.queue_name queue_name self.host host def __call__(self, func: Callable) - Callable: functools.wraps(func) def wrapper(*args, **kwargs): # 这里简化了实际应将func的调用包装成消息发送到MQ # Worker端会从MQ取出消息并调用真正的func connection pika.BlockingConnection(pika.ConnectionParameters(hostself.host)) channel connection.channel() channel.queue_declare(queueself.queue_name, durableTrue) # 队列持久化 # 将任务信息序列化后放入队列 message_body json.dumps({ function: func.__name__, args: args, kwargs: kwargs }) channel.basic_publish( exchange, routing_keyself.queue_name, bodymessage_body, propertiespika.BasicProperties(delivery_mode2) # 消息持久化 ) connection.close() print(f[x] Sent task for {func.__name__} to {self.queue_name}) # 立即返回实现异步 return {status: queued, task_id: some_generated_id} return wrapper # 使用示例 AsyncMessageProcessor(queue_nameintent_classification_queue) def classify_intent(user_query: str) - dict: # 这里是实际的意图分类逻辑在Worker端执行 # 模拟一个耗时操作 import time time.sleep(0.5) return {intent: query_price, confidence: 0.95} # 用户调用时请求立刻返回任务后台处理 result classify_intent(这个手机多少钱) print(result) # 输出: {status: queued, task_id: some_generated_id}2. 动态负载均衡算法核心我们的模型服务部署了多个实例网关需要动态选择最合适的一个。这里实现一个简单的基于加权最小连接数的算法。import random from collections import defaultdict import time class DynamicLoadBalancer: 动态负载均衡器加权最小连接数 时间复杂度选择实例 O(N)更新状态 O(1) 空间复杂度O(N)N为实例数 def __init__(self): # 实例格式: {instance_id: {weight: 5, connections: 2, healthy: True}} self.instances {} self.instance_list [] def update_instance_status(self, instance_id: str, weight: int, current_connections: int, healthy: bool): 更新实例状态由健康检查器定期调用 self.instances[instance_id] { weight: weight, # 实例权重可基于CPU/内存动态计算 connections: current_connections, healthy: healthy, effective_load: current_connections / weight if weight 0 else float(inf) # 有效负载 } # 维护一个健康实例列表 self.instance_list [iid for iid, info in self.instances.items() if info[healthy]] def select_instance(self) - str: 选择有效负载最低的实例 if not self.instance_list: raise Exception(No healthy instances available) # 找出有效负载最小的实例 # 在实际生产中这里可以加缓存不需要每次遍历 min_load float(inf) selected_instance None for instance_id in self.instance_list: instance_info self.instances[instance_id] current_load instance_info[effective_load] if current_load min_load: min_load current_load selected_instance instance_id # 模拟选中后连接数1 if selected_instance: self.instances[selected_instance][connections] 1 self.instances[selected_instance][effective_load] self.instances[selected_instance][connections] / self.instances[selected_instance][weight] return selected_instance # 使用示例 lb DynamicLoadBalancer() lb.update_instance_status(model-pod-1, weight10, current_connections5, healthyTrue) lb.update_instance_status(model-pod-2, weight8, current_connections10, healthyTrue) for _ in range(5): selected lb.select_instance() print(fRequest routed to: {selected})3. 模型服务化接口FastAPI将训练好的模型封装成HTTP API供业务系统调用。from fastapi import FastAPI, HTTPException from pydantic import BaseModel import torch from transformers import BertTokenizer, BertForSequenceClassification import numpy as np import asyncio from typing import List app FastAPI(title电商智能客服意图识别API) # 模型和分词器全局加载实际应考虑懒加载或模型池 MODEL_PATH ./models/domain_adapted_bert tokenizer BertTokenizer.from_pretrained(MODEL_PATH) model BertForSequenceClassification.from_pretrained(MODEL_PATH) model.eval() # 切换到评估模式 device torch.device(cuda if torch.cuda.is_available() else cpu) model.to(device) # 意图标签映射 id2label {0: 问候, 1: 查询价格, 2: 咨询售后, 3: 比较商品, 4: 其他} class QueryRequest(BaseModel): text: str session_id: str None # 用于多轮对话状态管理 class IntentResponse(BaseModel): intent: str confidence: float session_id: str None app.post(/predict/intent, response_modelIntentResponse) async def predict_intent(request: QueryRequest): 意图识别预测接口 try: # 1. 文本编码 inputs tokenizer(request.text, return_tensorspt, paddingTrue, truncationTrue, max_length128) inputs {k: v.to(device) for k, v in inputs.items()} # 2. 模型推理 with torch.no_grad(): # 禁用梯度计算提升推理速度 outputs model(**inputs) predictions torch.nn.functional.softmax(outputs.logits, dim-1) # 3. 解析结果 probs predictions.cpu().numpy()[0] intent_id np.argmax(probs) confidence float(probs[intent_id]) return IntentResponse( intentid2label.get(intent_id, 其他), confidenceconfidence, session_idrequest.session_id ) except Exception as e: raise HTTPException(status_code500, detailfModel prediction error: {str(e)}) if __name__ __main__: import uvicorn uvicorn.run(app, host0.0.0.0, port8000)五、性能优化与压测实践架构和代码写好了效果如何还得用数据说话。使用Locust进行压力测试我们编写了Locust脚本模拟用户在大促期间的行为逐步增加并发用户数观察系统的响应时间RT和失败率。测试场景混合场景包括简单的问候、复杂的商品比较和售后咨询。关键指标我们重点关注P95和P99响应时间即95%和99%的请求在多少毫秒内完成以及系统在持续高并发下的错误率。配置Locustfile.py示例核心部分from locust import HttpUser, task, between class QuickstartUser(HttpUser): wait_time between(0.5, 2.5) task(3) # 权重为3更频繁 def ask_price(self): self.client.post(/predict/intent, json{text: 这款羽绒服现在有活动吗}) task(1) # 权重为1 def complex_compare(self): self.client.post(/predict/intent, json{text: 请帮我对比一下iPhone14和华为Mate50的摄像头和电池})缓存策略的威力我们引入了Redis作为缓存层。缓存什么高频且结果稳定的查询。例如“你好”、“在吗”这类问候语以及“运费多少”、“发货地是哪里”等有固定答案的问题的意图识别结果。效果对于命中缓存的请求响应时间从平均200ms模型推理业务逻辑直接降到5ms以内。在大促期间近30%的请求通过缓存直接返回极大减轻了模型服务的压力。注意需要设置合理的TTL生存时间和缓存淘汰策略避免数据陈旧。六、避坑指南那些年我们踩过的“坑”对话状态管理初期我们简单地把整个对话历史字符串拼接起来传给模型效果很差且长度容易超限。正确做法维护一个独立的Session State服务可以用Redis。记录当前对话的“阶段”如等待输入订单号和已收集的“槽位”Slots信息如{“退货原因”: “尺寸不符”}。模型只处理当前轮次的用户语句并结合Session State来判断意图和决定回复。模型冷启动新模型上线或服务重启后第一次推理特别慢可能达到几秒因为要加载模型权重、初始化CUDA上下文等。解决方案预热Warm-up在服务启动后、接收真实流量前先构造一批模拟请求对模型进行“预热”让所有计算图和内存分配就绪。常驻进程使用像TorchServe或Triton Inference Server这样的专业模型服务框架它们会保持模型常驻内存。我们在FastAPI的startup事件中增加了预热逻辑。敏感词过滤电商客服必须合规。实现我们采用了“多级过滤”机制。一级过滤使用高效的AC自动机算法对用户输入和机器人输出进行实时敏感词匹配和脱敏如替换为***。二级过滤会将疑似违规的对话命中某些高风险关键词组合转入人工审核队列并记录日志。千万不能只依赖模型自己“学”会不说敏感词必须在输出前进行强规则过滤。七、总结与思考经过上述一系列架构升级和优化我们的智能客服系统在大促期间的表现稳定了很多。QPS每秒查询率提升了不止3倍核心的意图识别准确率也稳在90%以上。更重要的是系统具备了弹性能够应对流量的剧烈波动。最后留一个开放性问题也是我们持续在探索的如何更好地平衡模型精度与推理延迟使用更大的模型如BERT-large通常精度更高但推理速度慢。使用蒸馏后的小模型如TinyBERT或专门优化的架构如ALBERT速度飞快但精度可能有轻微损失。未来我们正在尝试模型动态路由简单的查询走轻量级模型复杂、置信度低的查询走大型模型。同时硬件加速如TensorRT和模型量化如INT8量化也是我们下一步的重点优化方向。AI电商客服的优化之路没有终点每一次大促都是新的挑战。希望这篇笔记里的架构思路和实战代码能给大家带来一些启发。欢迎一起交流探讨