用Python处理交通流数据实战从纽约出租车GPS到高速卡口数据清洗当我们谈论智慧城市、交通大脑或是出行即服务MaaS时一切宏大的构想都始于最基础、也最磨人的一步数据清洗。作为一名数据工程师或算法开发者你可能不止一次面对过这样的场景从学术论文或开源项目中兴奋地找到一个“完美”的数据集链接下载解压后却发现自己面对的是格式混乱的CSV、含义模糊的列名、大量缺失的GPS点以及时间戳里隐藏的时区陷阱。论文里的模型代码跑得飞快而你80%的时间都卡在了如何将原始数据转化为模型能够“消化”的特征上。这不仅仅是体力活更是一场对数据理解深度和工程化能力的考验。本文将以两个极具代表性的公开数据集——纽约出租车行程数据和英国高速公路观测点数据——作为实战蓝本。我们不空谈理论而是直接切入代码手把手演示如何从原始、粗糙的数据文件中一步步清洗、转换、整合最终得到干净、结构化的数据为后续的时空预测、流量分析或轨迹挖掘打下坚实基础。无论你是正在复现顶会论文的研究者还是需要构建实际交通分析系统的工程师这里提供的思路和代码片段都能直接嵌入你的工作流帮你跳过那些“坑”把精力聚焦在更有价值的建模与创新上。1. 数据获取与初步探索认识你的“原材料”在动手清洗之前我们必须像厨师了解食材一样彻底认识原始数据的结构和潜在问题。盲目开始编码往往会导致后续流程的反复与重构。1.1 纽约出租车数据一份时空数据的“富矿”纽约出租车与豪华轿车委员会TLC提供的行程记录数据是交通数据领域的经典。它按月提供CSV文件每个文件都包含数百万行记录字段极其丰富包括上下车时间地点、行程距离、费用明细、支付类型等。首先我们下载并加载一个月份的数据。由于文件体积庞大通常超过1GB我们需要使用pandas的高效读取策略。import pandas as pd import numpy as np # 指定数据类型以优化内存对于分类字段使用‘category’ dtype_spec { VendorID: category, passenger_count: int8, RatecodeID: category, store_and_fwd_flag: category, payment_type: category, trip_type: category } # 仅读取我们关心的列跳过其他列以加速 usecols [ tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, PULocationID, DOLocationID, trip_distance, fare_amount, tip_amount ] # 使用chunksize进行分块读取适合内存有限的机器 chunk_iter pd.read_csv(yellow_tripdata_2023-01.csv, dtypedtype_spec, usecolsusecols, parse_dates[tpep_pickup_datetime, tpep_dropoff_datetime], chunksize100000) # 合并所有块 df_taxi pd.concat(chunk_iter, ignore_indexTrue) print(f数据形状: {df_taxi.shape}) print(df_taxi.head()) print(df_taxi.info())初步查看后你可能会立刻发现几个典型问题时间戳格式虽然我们指定了parse_dates但原始CSV中时间格式是否统一是否存在极端的未来或过去时间数据录入错误地理位置标识PULocationID和DOLocationID是区域编码我们需要另一份映射表TLC提供才能将其转换为具体的经纬度或行政区划。异常值trip_distance为0但fare_amount很高passenger_count为0或负数这些都需要逻辑清洗。提示对于超大数据集始终优先使用dtype参数指定类型、usecols筛选列、以及chunksize分块处理。这能避免内存溢出OOM错误并显著提升加载速度。1.2 英国高速公路数据结构化时间序列的挑战英国高速公路局Highways England提供的交通流量数据是另一种典型来自固定观测点的、按固定时间间隔如15分钟聚合的数据流。每个站点单独提供CSV文件内容通常包括时间、流量、平均速度、占有率等。# 加载单个观测点文件示例 df_motorway pd.read_csv(MIDAS_20230101_JD_0001.csv, skiprows5) # 前几行可能是元数据 print(df_motorway.head()) # 查看列名和基本信息 print(df_motorway.columns) print(df_motorway.describe())这里常见的问题包括文件头不一致不同站点或不同日期的文件表头行数可能不同需要动态检测。时间列格式可能是YYYY-MM-DD HH:MM:SS也可能是01/01/2023 00:00甚至可能是两个单独的日期列和时间列。缺失值与占位符流量数据可能用-1、NaN或NULL表示缺失需要统一处理。多文件整合如何高效地将成百上千个站点的每日/每月文件合并为一个统一的数据集2. 核心清洗策略针对时空数据的“手术刀”数据探索完毕我们进入核心清洗阶段。这一阶段的目标是产出一致性高、可信度强的数据。2.1 时间维度的标准化与修复时间是交通流分析的核心维度必须保证其绝对准确和一致性。处理时区与夏令时纽约出租车数据时间戳通常是UTC-5东部时间且不包含时区信息。如果分析涉及跨日期或与其它时区数据融合必须明确处理。import pytz # 假设原始时间为美国东部时间无夏令时自动转换 eastern pytz.timezone(US/Eastern) df_taxi[pickup_dt_local] df_taxi[tpep_pickup_datetime].dt.tz_localize(eastern, ambiguousNaT, nonexistentshift_forward) df_taxi[dropoff_dt_local] df_taxi[tpep_dropoff_datetime].dt.tz_localize(eastern, ambiguousNaT, nonexistentshift_forward) # 转换为UTC时间以便于统一计算和存储 df_taxi[pickup_dt_utc] df_taxi[pickup_dt_local].dt.tz_convert(pytz.UTC) df_taxi[dropoff_dt_utc] df_taxi[dropoff_dt_local].dt.tz_convert(pytz.UTC) # 处理因夏令时转换导致的无效时间NaT print(f无效的取件时间数量: {df_taxi[pickup_dt_local].isna().sum()}) # 通常可以选择删除或向前/向后插值这些极少量的无效记录 df_taxi df_taxi.dropna(subset[pickup_dt_local, dropoff_dt_local])检测并修复时间逻辑错误下车时间早于上车时间行程时长过长如超过24小时或过短如少于10秒时间戳是否在合理的采集时间范围内如数据集覆盖的年份内# 计算行程时长秒 df_taxi[trip_duration] (df_taxi[dropoff_dt_utc] - df_taxi[pickup_dt_utc]).dt.total_seconds() # 定义合理范围过滤器 valid_time_mask ( (df_taxi[trip_duration] 60) # 大于1分钟 (df_taxi[trip_duration] 3600 * 3) # 小于3小时 (df_taxi[pickup_dt_utc].dt.year 2023) # 年份在数据集范围内 ) df_taxi_clean df_taxi[valid_time_mask].copy()2.2 空间数据的匹配与映射出租车数据中的位置ID需要映射到真实地理坐标。TLC提供了出租车区域Taxi Zone的Shapefile或GeoJSON文件。import geopandas as gpd # 加载纽约出租车区域地理数据 zones_gdf gpd.read_file(taxi_zones.geojson) # 假设zones_gdf有一个LocationID列与数据中的PULocationID/DOLocationID对应 zones_lookup zones_gdf.set_index(LocationID)[[geometry, borough, zone]].to_dict(index) # 为出租车数据添加上下车区域的几何信息和行政区 def add_zone_info(row, loc_id_col): loc_id row[loc_id_col] info zones_lookup.get(loc_id, {}) return pd.Series([info.get(geometry), info.get(borough), info.get(zone)]) # 应用函数注意这在大数据集上可能较慢可以考虑向量化优化或合并操作 pu_info df_taxi_clean.apply(add_zone_info, loc_id_colPULocationID, axis1) pu_info.columns [pu_geometry, pu_borough, pu_zone] df_taxi_clean pd.concat([df_taxi_clean, pu_info], axis1) # 同样处理下车点...对于高速公路数据空间信息是固定的观测点经纬度。我们需要另一份元数据文件来匹配站点ID和其地理位置。# 加载站点元数据 stations_df pd.read_csv(motorway_stations_metadata.csv) # 假设包含列: Site_ID, Latitude, Longitude, Road, Direction # 将站点地理信息合并到流量数据中 df_motorway df_motorway.merge(stations_df[[Site_ID, Latitude, Longitude]], left_onSite ID, # 根据实际列名调整 right_onSite_ID, howleft)2.3 数值字段的异常值检测与清洗交通数据中充斥着传感器错误、人为录入错误导致的异常值。基于业务规则的清洗出租车行程距离应与根据经纬度计算的球面距离大致相符。严重不符的可能是GPS漂移或数据错误。高速公路速度不可能为负数也有一个合理的上限如150 km/h。连续多个时间片的零速度可能表示传感器故障。流量不应为负且在固定时间间隔内应有合理范围。统计方法检测对于看似合理但实则异常的值可以使用统计方法。# 以高速公路速度为例 speed_col Average Speed (mph) # 方法1基于分位数IQR过滤 Q1 df_motorway[speed_col].quantile(0.25) Q3 df_motorway[speed_col].quantile(0.75) IQR Q3 - Q1 lower_bound Q1 - 1.5 * IQR upper_bound Q3 1.5 * IQR # 方法2结合业务逻辑设定绝对边界 abs_lower 0 abs_upper 100 final_mask ( (df_motorway[speed_col] max(lower_bound, abs_lower)) (df_motorway[speed_col] min(upper_bound, abs_upper)) (df_motorway[speed_col].notna()) ) df_motorway_clean df_motorway[final_mask].copy() print(f清洗掉的速度异常记录: {len(df_motorway) - len(df_motorway_clean)})3. 数据重构与特征工程为分析建模做准备清洗后的干净数据是“砖石”而我们需要用它们构建“房屋”。这一步我们将数据重构成便于时空分析的结构。3.1 时间序列重采样与对齐高速公路数据通常是15分钟粒度但我们的模型可能需要小时级或天级数据。pandas的resample方法是利器。# 确保时间列是DatetimeIndex df_motorway_clean[Timestamp] pd.to_datetime(df_motorway_clean[Date] df_motorway_clean[Time]) df_motorway_clean.set_index(Timestamp, inplaceTrue) # 按站点分组然后对每个站点的时间序列进行重采样 resampled_data [] for site_id, group in df_motorway_clean.groupby(Site_ID): # 重采样为1小时聚合流量为求和速度为平均值 hourly_flow group[Flow (veh/15min)].resample(1H).sum() hourly_speed group[Average Speed (mph)].resample(1H).mean() temp_df pd.DataFrame({ Site_ID: site_id, Hourly_Flow: hourly_flow, Hourly_Avg_Speed: hourly_speed }) resampled_data.append(temp_df) df_motorway_hourly pd.concat(resampled_data).reset_index()对于出租车数据我们可能想生成某个区域在不同时间片如每小时的上下车需求流量。# 为出租车数据创建时间片例如每小时 df_taxi_clean[pickup_hour] df_taxi_clean[pickup_dt_utc].dt.floor(H) df_taxi_clean[dropoff_hour] df_taxi_clean[dropoff_dt_utc].dt.floor(H) # 计算每个区域每小时的上下车次数 pickup_demand df_taxi_clean.groupby([pu_zone, pickup_hour]).size().reset_index(namepickup_count) dropoff_demand df_taxi_clean.groupby([pu_zone, dropoff_hour]).size().reset_index(namedropoff_count) # 可以进一步数据透视形成以区域为行、小时为列的需求矩阵 pickup_pivot pickup_demand.pivot(indexpu_zone, columnspickup_hour, valuespickup_count).fillna(0)3.2 构建时空特征原始字段经过组合与计算能产生更有预测力的特征。对于出租车数据行程速度trip_distance / trip_duration。时段特征一天中的小时、是否周末、是否节假日、是否高峰时段如7-9点17-19点。空间特征上下车区域的组合OD对、上车区域所属的行政区borough。派生统计特征某个区域在过去1小时内的平均上车数量、某个OD对在过去一天内的行程次数。对于高速公路数据时间特征同出租车数据。滞后特征前1个时间片、前2个时间片...前N个时间片的流量和速度用于时间序列预测。空间邻接特征同一路段上下游站点的流量需要路段拓扑关系数据。聚合特征同一道路方向所有站点的平均速度反映路段整体状况。下面是一个为高速公路数据创建滞后特征的例子def create_lag_features(df, group_col, value_cols, lag_periods[1, 2, 3, 24]): 为每个站点创建滞后特征。 df: 包含[Timestamp, group_col] value_cols的DataFrame group_col: 分组列如Site_ID value_cols: 需要创建滞后特征的数值列列表如[Hourly_Flow, Hourly_Avg_Speed] lag_periods: 滞后的时间步长列表单位与时间索引频率一致 df_lagged df.copy() df_lagged df_lagged.sort_values([group_col, Timestamp]).set_index(Timestamp) for col in value_cols: for lag in lag_periods: new_col_name f{col}_lag_{lag} df_lagged[new_col_name] df_lagged.groupby(group_col)[col].shift(lag) return df_lagged.reset_index() # 使用示例 df_motorway_with_lags create_lag_features(df_motorway_hourly, group_colSite_ID, value_cols[Hourly_Flow, Hourly_Avg_Speed], lag_periods[1, 2, 3, 24])4. 可视化与质量验证让数据自己“说话”清洗和重构后的数据是否真的可靠可视化是最直观的质检工具。4.1 时空分布可视化使用geopandas和matplotlib或folium查看出租车需求的时空热点。import matplotlib.pyplot as plt # 聚合一天中某个小时各区域的上车数量 morning_pickup pickup_demand[pickup_demand[pickup_hour].dt.hour 8].groupby(pu_zone)[pickup_count].sum().reset_index() # 合并到地理数据框 zones_plot zones_gdf.merge(morning_pickup, left_onzone, right_onpu_zone, howleft) zones_plot[pickup_count] zones_plot[pickup_count].fillna(0) # 绘制分级统计图 fig, ax plt.subplots(1, 1, figsize(12, 10)) zones_plot.plot(columnpickup_count, axax, legendTrue, legend_kwds{label: Morning (8AM) Pickup Demand, orientation: horizontal}, cmapOrRd, edgecolorblack, linewidth0.2) ax.set_title(Taxi Pickup Demand by Zone (8:00 AM)) ax.set_axis_off() plt.tight_layout() plt.show()4.2 时间序列模式检查绘制高速公路某个站点一周的流量和速度曲线检查是否存在规律的日模式、周模式以及明显的异常点是否已被清除。# 选取一个特定站点 sample_site df_motorway_hourly[Site_ID].iloc[0] site_data df_motorway_hourly[df_motorway_hourly[Site_ID] sample_site].set_index(Timestamp) # 绘制一周的数据 fig, axes plt.subplots(2, 1, figsize(15, 10), sharexTrue) site_data_last_week site_data.last(7D) axes[0].plot(site_data_last_week.index, site_data_last_week[Hourly_Flow], markero, linestyle-, markersize3) axes[0].set_ylabel(Hourly Flow (veh/hr)) axes[0].set_title(fSite {sample_site} - Traffic Flow (Last 7 Days)) axes[0].grid(True, alpha0.3) axes[1].plot(site_data_last_week.index, site_data_last_week[Hourly_Avg_Speed], markers, linestyle-, colorgreen, markersize3) axes[1].set_ylabel(Average Speed (mph)) axes[1].set_xlabel(Date) axes[1].set_title(fSite {sample_site} - Average Speed (Last 7 Days)) axes[1].grid(True, alpha0.3) plt.xticks(rotation45) plt.tight_layout() plt.show()4.3 数据完整性报告最后生成一份简单的数据质量报告量化清洗过程。def generate_data_quality_report(df_original, df_cleaned, df_name): 生成数据质量对比报告 report {} report[数据集] df_name report[原始记录数] len(df_original) report[清洗后记录数] len(df_cleaned) report[数据保留率] f{(len(df_cleaned)/len(df_original)*100):.2f}% # 检查各列缺失值 missing_original df_original.isnull().sum() missing_cleaned df_cleaned.isnull().sum() report[原始数据缺失列] missing_original[missing_original 0].to_dict() report[清洗后缺失列] missing_cleaned[missing_cleaned 0].to_dict() return report # 生成报告 taxi_report generate_data_quality_report(df_taxi, df_taxi_clean, NYC Taxi Data) motorway_report generate_data_quality_report(df_motorway, df_motorway_clean, UK Motorway Data) print(出租车数据质量报告:) for key, value in taxi_report.items(): print(f {key}: {value}) print(\n高速公路数据质量报告:) for key, value in motorway_report.items(): print(f {key}: {value})经过以上四个步骤的系统化处理我们从原始、杂乱的CSV文件中得到了两份规整、可信、富含特征的“成品”数据集。纽约出租车数据现在包含了精确的时空信息、清洗后的业务指标并可以方便地聚合为区域-时间需求矩阵。英国高速公路数据则被重采样对齐并构建了关键的滞后特征可以直接输入到LSTM、Transformer等时间序列预测模型中。整个流程中最深的体会是数据清洗没有银弹。每个数据集都有其独特的“脾气”和“暗坑”。本文提供的代码和思路是一个强大的工具箱和一份避坑地图但真正面对一个新数据集时依然需要你保持耐心从最小的数据样本开始一步步探索、验证、清洗。当你对数据的来龙去脉了如指掌时后续的建模工作才会事半功倍。记住高质量的数据管道其价值丝毫不亚于一个精巧的算法模型。