Nanbeige 4.1-3B Streamlit WebUI教程添加用户行为埋点使用数据分析看板1. 引言从炫酷界面到数据洞察你已经成功部署了那个拥有《蔚蓝档案》MomoTalk风格的精美聊天界面。每次看到AI的回复像手机短信一样优雅地弹出那种沉浸感确实很棒。但作为一个开发者或项目负责人你心里可能开始冒出一些新的疑问用户最喜欢问哪些问题对话的平均长度是多少模型在哪些问题上响应时间比较长用户活跃度随时间有什么变化一个只有前端交互的应用就像一间没有安装监控的商店。你知道有顾客进来却不知道他们看了什么、停留了多久、对什么商品感兴趣。今天我们就来解决这个问题。我将手把手教你如何为这个极简的Nanbeige WebUI添加用户行为埋点并搭建一个实时数据分析看板让你真正“看见”用户与AI的每一次互动。通过本教程你将学会在Streamlit应用中无侵入地记录关键用户行为。将行为数据持久化存储到本地数据库。利用Streamlit的强大组件创建一个动态、可视化的数据仪表盘。基于数据优化你的模型服务和用户体验。整个过程不需要复杂的后端服务所有功能都将集成在现有的单文件app.py中。让我们开始吧。2. 埋点设计我们要记录什么在写代码之前我们先明确目标。对于一个大语言模型聊天应用哪些用户行为数据最有价值这里我为你梳理了四个核心维度2.1 会话级数据 (Session-level)每次用户打开浏览器标签页Streamlit会生成一个唯一的会话ID。我们可以用它来关联同一用户的一系列操作。会话ID (session_id): Streamlit内置唯一标识一次浏览器会话。开始时间 (start_time): 会话首次建立的时间戳。最后活动时间 (last_activity): 用户最后一次操作的时间戳。2.2 消息级数据 (Message-level)这是最核心的数据记录了每一次完整的“提问-回答”交互。消息ID (message_id): 每条消息的唯一标识。用户输入 (user_input): 用户发送的问题文本。AI回复 (ai_response): 模型生成的完整回复文本。请求时间戳 (request_time): 用户点击“发送”的时间。响应完成时间戳 (response_time): AI流式输出结束的时间。思考过程 (reasoning_content): 如果模型启用了CoT思维链记录被折叠的思考内容。响应状态 (status): 成功 (success) 或失败 (error)。2.3 性能数据 (Performance Metrics)这些数据帮助你评估服务的稳定性和效率。响应时长 (response_duration): 从请求到响应完成的时间差毫秒。输入token数 (input_tokens): 用户问题的粗略token计数可用字数近似估算。输出token数 (output_tokens): AI回复的粗略token计数。2.4 界面交互数据 (UI Interaction)记录用户与界面控件的交互了解使用习惯。“清空记录”按钮点击。侧边栏设置项的更改如果你未来添加了温度、最大生成长度等设置。基于以上设计我们可以开始动手改造app.py了。3. 第一步改造你的app.py集成数据记录层我们将采用轻量级的SQLite数据库来存储数据它无需额外服务一个文件搞定。代码的添加会尽量保持模块化和无侵入性。3.1 导入必要的库并初始化数据库打开你的app.py文件在文件顶部导入区域添加以下库# 新增的导入 import sqlite3 import json from datetime import datetime, timedelta import pandas as pd from pathlib import Path import hashlib紧接着在模型加载代码之前或文件靠前的位置添加数据库初始化的函数。我建议创建一个单独的init_database()函数def init_database(): 初始化SQLite数据库创建所需的数据表 db_path Path(nanbeige_chat_analytics.db) conn sqlite3.connect(db_path) cursor conn.cursor() # 表1: 会话表 cursor.execute( CREATE TABLE IF NOT EXISTS chat_sessions ( session_id TEXT PRIMARY KEY, start_time TIMESTAMP NOT NULL, last_activity TIMESTAMP NOT NULL ) ) # 表2: 消息记录表 (核心表) cursor.execute( CREATE TABLE IF NOT EXISTS chat_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT NOT NULL, message_id TEXT NOT NULL, user_input TEXT, ai_response TEXT, reasoning_content TEXT, request_time TIMESTAMP NOT NULL, response_time TIMESTAMP, response_duration INTEGER, -- 单位: 毫秒 input_tokens INTEGER, output_tokens INTEGER, status TEXT DEFAULT success, FOREIGN KEY (session_id) REFERENCES chat_sessions (session_id) ) ) # 表3: 界面交互事件表 cursor.execute( CREATE TABLE IF NOT EXISTS ui_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT NOT NULL, event_type TEXT NOT NULL, -- 例如clear_chat, sidebar_change event_data TEXT, -- 存储JSON格式的额外数据 event_time TIMESTAMP NOT NULL, FOREIGN KEY (session_id) REFERENCES chat_sessions (session_id) ) ) conn.commit() conn.close() print(f[INFO] 数据库已初始化: {db_path.absolute()})然后在你的主函数通常是main()或Streamlit应用启动的入口处调用这个函数if __name__ __main__: init_database() # 确保数据库表存在 # ... 你原有的模型加载和Streamlit启动代码3.2 创建数据记录工具函数在同一个文件中添加几个工具函数用于向数据库插入数据。这会让主逻辑更清晰def get_db_connection(): 获取数据库连接 return sqlite3.connect(nanbeige_chat_analytics.db) def ensure_session_recorded(session_id): 确保当前会话在数据库中有记录 conn get_db_connection() cursor conn.cursor() now datetime.now().isoformat() cursor.execute( INSERT OR IGNORE INTO chat_sessions (session_id, start_time, last_activity) VALUES (?, ?, ?), (session_id, now, now) ) # 即使已存在也更新最后活动时间 cursor.execute( UPDATE chat_sessions SET last_activity ? WHERE session_id ?, (now, session_id) ) conn.commit() conn.close() def record_chat_message(session_id, message_data): 记录一条完整的聊天消息 message_data: 字典包含 user_input, ai_response, reasoning_content, request_time, response_time, status 等 conn get_db_connection() cursor conn.cursor() # 生成一个唯一消息ID (例如使用时间戳和输入内容的哈希) input_hash hashlib.md5(message_data.get(user_input, ).encode()).hexdigest()[:8] message_id f{int(datetime.now().timestamp())}_{input_hash} # 计算响应时长和token数简易估算 if message_data.get(request_time) and message_data.get(response_time): req_time datetime.fromisoformat(message_data[request_time]) resp_time datetime.fromisoformat(message_data[response_time]) duration_ms int((resp_time - req_time).total_seconds() * 1000) else: duration_ms None input_tokens len(message_data.get(user_input, ).split()) * 1.3 # 粗略估算 output_tokens len(message_data.get(ai_response, ).split()) * 1.3 cursor.execute( INSERT INTO chat_messages (session_id, message_id, user_input, ai_response, reasoning_content, request_time, response_time, response_duration, input_tokens, output_tokens, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) , ( session_id, message_id, message_data.get(user_input), message_data.get(ai_response), message_data.get(reasoning_content), message_data.get(request_time), message_data.get(response_time), duration_ms, int(input_tokens), int(output_tokens), message_data.get(status, success) )) conn.commit() conn.close() return message_id def record_ui_event(session_id, event_type, event_dataNone): 记录一个UI交互事件 conn get_db_connection() cursor conn.cursor() data_json json.dumps(event_data) if event_data else None cursor.execute( INSERT INTO ui_events (session_id, event_type, event_data, event_time) VALUES (?, ?, ?, ?), (session_id, event_type, data_json, datetime.now().isoformat()) ) conn.commit() conn.close()3.3 在聊天主逻辑中植入埋点现在找到你处理用户输入和AI流式输出的核心代码段。通常它在一个表单提交或按钮点击的回调函数里。我们需要在关键节点调用上面的记录函数。假设你原有的消息处理逻辑大致如下伪代码user_input st.chat_input(请输入您的问题...) if user_input: # 显示用户消息 with st.chat_message(user): st.markdown(user_input) # 显示AI回复流式 with st.chat_message(assistant): message_placeholder st.empty() full_response # 调用模型生成回复假设是model.generate_stream之类的函数 for chunk in generate_stream(user_input): full_response chunk message_placeholder.markdown(full_response ▌) message_placeholder.markdown(full_response)我们需要将其改造为import streamlit as st user_input st.chat_input(请输入您的问题...) if user_input: # 1. 确保当前会话被记录 session_id st.session_state.get(session_id, default_session) ensure_session_recorded(session_id) # 2. 记录请求开始时间 request_time datetime.now().isoformat() # 3. 显示用户消息原有逻辑 with st.chat_message(user): st.markdown(user_input) # 4. 显示AI回复流式 with st.chat_message(assistant): message_placeholder st.empty() full_response reasoning_content # 用于捕获思考过程 # 调用模型生成回复 for chunk in generate_stream(user_input): # 原有的流式显示逻辑 full_response chunk message_placeholder.markdown(full_response ▌) # 新增捕获思考过程如果模型输出包含特定标记如think.../think if /think in chunk and think in full_response: # 这是一个简化的提取逻辑你可能需要根据模型实际输出调整 try: start full_response.find(think) end full_response.find(/think) len(/think) reasoning_content full_response[start:end] # 从主回复中移除思考过程保持界面清爽如果你的UI已经做了折叠这里可能不需要 # full_response full_response.replace(reasoning_content, ) except: pass # 流式结束显示最终回复 message_placeholder.markdown(full_response) # 5. 记录响应完成时间 response_time datetime.now().isoformat() # 6. 将本次交互存入数据库 message_data { user_input: user_input, ai_response: full_response, reasoning_content: reasoning_content, request_time: request_time, response_time: response_time, status: success } record_chat_message(session_id, message_data)对于“清空记录”按钮的埋点 找到你清空聊天记录的按钮代码在清空逻辑后添加事件记录if st.button(清空记录, keyclear_btn): # 原有的清空逻辑例如清空 st.session_state.messages st.session_state.messages [] st.rerun() # 或使用 experimental_rerun # 记录UI事件 record_ui_event(session_id, clear_chat)至此数据埋点部分就完成了。你的应用现在已经开始默默记录每一次对话。接下来我们让这些数据“说话”。4. 第二步创建数据分析看板我们将利用Streamlit原生组件在另一个页面或同一个应用的侧边栏/新标签页中创建一个动态数据看板。4.1 设计看板布局与指标一个实用的看板通常包含以下几个部分关键指标卡片会话总数、消息总数、平均响应时间等。趋势图表每日消息量趋势、活跃时段分布。内容分析高频用户问题词云、消息长度分布。性能监控响应时间分布、错误率。原始数据浏览可筛选和排序的详细数据表格。我们在app.py中新增一个页面或一个展开的侧边栏区域来实现它。这里我以在侧边栏添加一个“数据分析”扩展区域为例。4.2 实现看板组件在app.py中找到侧边栏配置部分如果没有可以在合适位置创建添加一个扩展器expander来容纳看板# 在侧边栏添加数据分析面板 with st.sidebar.expander( 数据分析看板, expandedFalse): # 默认折叠 st.markdown(### 聊天数据洞察) # 连接到数据库 conn get_db_connection() # 1. 关键指标卡片 col1, col2, col3 st.columns(3) with col1: total_sessions pd.read_sql(SELECT COUNT(*) as count FROM chat_sessions, conn).iloc[0][count] st.metric(总会话数, total_sessions) with col2: total_messages pd.read_sql(SELECT COUNT(*) as count FROM chat_messages, conn).iloc[0][count] st.metric(总消息数, total_messages) with col3: avg_duration pd.read_sql(SELECT AVG(response_duration) as avg FROM chat_messages WHERE response_duration IS NOT NULL, conn).iloc[0][avg] st.metric(平均响应时间, f{avg_duration:.0f} ms if avg_duration else N/A) st.divider() # 2. 每日消息趋势图 st.markdown(#### 每日消息量趋势) df_messages pd.read_sql( SELECT DATE(request_time) as date, COUNT(*) as message_count FROM chat_messages WHERE request_time IS NOT NULL GROUP BY DATE(request_time) ORDER BY date , conn, parse_dates[date]) if not df_messages.empty: st.line_chart(df_messages.set_index(date)[message_count]) else: st.info(暂无足够的数据生成趋势图。) # 3. 高频问题词云需要安装wordcloud库: pip install wordcloud try: from wordcloud import WordCloud import matplotlib.pyplot as plt st.markdown(#### ️ 高频用户问题) user_inputs pd.read_sql(SELECT user_input FROM chat_messages WHERE LENGTH(user_input) 5, conn)[user_input].tolist() if user_inputs: text .join(user_inputs) wordcloud WordCloud(width800, height400, background_colorwhite, font_pathNone).generate(text) fig, ax plt.subplots(figsize(10, 5)) ax.imshow(wordcloud, interpolationbilinear) ax.axis(off) st.pyplot(fig) else: st.info(暂无用户问题数据。) except ImportError: st.warning(要显示词云请安装 wordcloud 库pip install wordcloud) # 4. 响应时间分布直方图 st.markdown(#### ⏱️ 响应时间分布) df_duration pd.read_sql( SELECT response_duration FROM chat_messages WHERE response_duration IS NOT NULL AND response_duration 30000 -- 过滤超长响应 , conn) if not df_duration.empty: st.bar_chart(df_duration[response_duration].value_counts().sort_index().head(20)) else: st.info(暂无响应时间数据。) # 5. 原始数据表格带筛选 st.markdown(#### 最近对话记录) df_recent pd.read_sql( SELECT session_id, substr(user_input, 1, 50) || ... as user_input_preview, LENGTH(ai_response) as response_length, response_duration, datetime(request_time) as request_time FROM chat_messages ORDER BY request_time DESC LIMIT 50 , conn) if not df_recent.empty: st.dataframe(df_recent, use_container_widthTrue, hide_indexTrue) # 提供数据导出 csv df_recent.to_csv(indexFalse).encode(utf-8) st.download_button( label 导出最近50条记录 (CSV), datacsv, file_namenanbeige_chat_recent.csv, mimetext/csv, ) else: st.info(暂无对话记录。) conn.close() st.divider() st.caption(数据看板每60秒自动刷新。)代码说明关键指标使用st.metric展示核心数据。趋势图使用st.line_chart展示消息量随时间变化。词云可视化用户的高频提问关键词需要额外安装wordcloud库。分布图使用st.bar_chart展示响应时间的集中区间。数据表格使用st.dataframe展示最近的原始记录并提供CSV导出功能。4.3 启用自动刷新为了让看板数据实时更新我们可以在页面顶部设置一个自动刷新的间隔。在app.py的Streamlit配置部分或页面顶部添加# 设置页面自动刷新每60秒保持看板数据最新 st.set_page_config( page_titleNanbeige Chat, page_icon, layoutwide, initial_sidebar_stateexpanded ) # 注意Streamlit社区版暂无直接设置全局自动刷新的API。 # 一个替代方案是使用st.auto_refresh实验性功能或在看板部分添加一个手动刷新按钮。一个简单的替代方法是添加一个手动刷新按钮到看板中if st.sidebar.button( 刷新数据看板, keyrefresh_dashboard): st.rerun()5. 第三步运行与验证保存并运行保存所有对app.py的修改在终端重新运行streamlit run app.py。生成数据在聊天界面中进行几次对话并点击“清空记录”按钮。查看看板点击侧边栏的“ 数据分析看板”展开它。你应该能看到指标卡片上显示了会话数和消息数。趋势图开始绘制。词云逐渐形成如果你安装了wordcloud。表格中出现了你刚才的对话记录。检查数据库你会在项目根目录看到一个名为nanbeige_chat_analytics.db的文件这就是SQLite数据库。你可以用DB Browser for SQLite等工具打开它查看详细数据。6. 总结与进阶思考恭喜你你现在拥有的是一个不仅好看而且“聪明”的Nanbeige聊天应用。它不仅能提供沉浸式的对话体验还能为你提供宝贵的用户洞察。6.1 本教程回顾我们完成了三件核心事情设计并实现了无侵入的数据埋点在关键用户路径上发送消息、清空记录插入了数据记录代码。构建了本地数据存储层使用轻量级SQLite数据库结构清晰地存储了会话、消息和事件数据。创建了交互式数据看板利用Streamlit内置的图表和组件在侧边栏实时可视化数据让分析触手可及。6.2 你可以进一步探索这个基础框架为你打开了更多可能性更精细的Token统计集成transformers库的tokenizer进行精确的token计数更准确地评估成本。用户反馈收集在每条AI回复后添加“/”按钮记录反馈用于评估回复质量。错误监控与告警记录模型生成失败的错误信息并设置阈值在错误率升高时发出通知。将看板独立成页使用Streamlit的多页面Multi-Page功能创建一个独立、功能更强大的数据分析门户。数据定期清理编写脚本自动清理过期的历史数据防止数据库无限膨胀。通过数据驱动决策你可以更好地理解你的用户优化模型提示词评估服务器资源需求最终打造出更受欢迎、更稳定的AI应用。现在就去你的数据看板里发现那些有趣的模式吧获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。