从原理到代码:手撕一个Python版的Lowess平滑函数(附与statsmodels结果对比)
从零实现Lowess平滑算法原理剖析与Python实战在数据分析领域非参数回归方法因其灵活性而备受青睐。LowessLocally Weighted Scatterplot Smoothing作为其中的经典算法能够在不预设函数形式的情况下捕捉数据中的局部特征。本文将带您深入Lowess的数学核心并用Python从零实现这一算法最后与statsmodels的标准实现进行对比验证。1. Lowess算法原理深度解析Lowess的核心思想是为每个数据点构建一个局部加权线性回归模型。与全局回归不同Lowess会根据查询点附近的数据密度自适应地调整拟合范围。这种局部加权的特性使其特别适合处理非线性关系的数据。算法主要包含三个关键步骤邻域确定对于每个目标点x₀确定其邻域半径h通常采用最近邻方法权重计算使用核函数为邻域内的点分配权重距离x₀越近权重越大加权最小二乘求解使加权残差平方和最小的局部线性模型参数常用的三次方核函数表达式为def tricube(u): return (1 - np.abs(u)**3)**3权重函数W的计算需要考虑标准化距离w_i W(|x_i - x₀| / h)其中h是带宽参数控制着平滑程度。h值越大得到的曲线越平滑但可能丢失细节特征h值越小曲线越贴近原始数据但可能引入噪声。2. 基础Lowess实现让我们从最基础的实现开始逐步构建完整的Lowess算法。首先需要导入必要的库import numpy as np from scipy import linalg import matplotlib.pyplot as plt2.1 核函数与权重计算实现三次方核函数及其权重分配def tricube_kernel(u): 三次方核函数 u np.abs(u) return np.where(u 1, (1 - u**3)**3, 0) def compute_weights(x, x0, h): 计算各点的权重 distances np.abs(x - x0) / h return tricube_kernel(distances)2.2 局部加权回归实现单个点的加权线性回归def local_weighted_regression(x, y, x0, h): 在x0点执行局部加权线性回归 weights compute_weights(x, x0, h) # 设计矩阵添加截距项 X np.column_stack([np.ones_like(x), x - x0]) W np.diag(weights) # 加权最小二乘解 beta linalg.inv(X.T W X) (X.T W y) return beta[0] # 返回预测值(截距)2.3 完整Lowess函数将上述部分组合成完整的Lowess实现def simple_lowess(x, y, h0.2, n_steps3): 基础Lowess实现 参数 x, y: 输入数据 h: 平滑参数(0-1) n_steps: 稳健迭代次数 返回 y_smoothed: 平滑后的y值 x np.asarray(x) y np.asarray(y) n len(x) h h * (x.max() - x.min()) # 转换为绝对带宽 y_smoothed np.zeros_like(y) for i in range(n): x0 x[i] weights compute_weights(x, x0, h) # 设计矩阵 X np.column_stack([np.ones_like(x), x - x0]) W np.diag(weights) # 加权最小二乘 beta linalg.inv(X.T W X) (X.T W y) y_smoothed[i] beta[0] return y_smoothed3. 稳健性迭代改进基础Lowess对异常值敏感通过稳健迭代可以增强算法的鲁棒性。以下是改进步骤计算初始平滑结果根据残差计算稳健权重用新权重重新计算平滑结果重复步骤2-3直到收敛实现稳健权重计算def robustness_weights(residuals): 根据残差计算稳健权重 s np.median(np.abs(residuals)) / 0.6745 # 标准化残差 u residuals / (6 * s) # 标准化 return np.where(np.abs(u) 1, (1 - u**2)**2, 0)完整稳健Lowess实现def robust_lowess(x, y, h0.2, n_steps3): 带稳健迭代的Lowess实现 x np.asarray(x) y np.asarray(y) n len(x) h_abs h * (x.max() - x.min()) # 初始平滑 y_smoothed simple_lowess(x, y, h) for _ in range(n_steps): # 计算残差和稳健权重 residuals y - y_smoothed robust_weights robustness_weights(residuals) # 重新平滑结合稳健权重 y_smoothed np.zeros_like(y) for i in range(n): x0 x[i] weights compute_weights(x, x0, h_abs) * robust_weights X np.column_stack([np.ones_like(x), x - x0]) W np.diag(weights) beta linalg.inv(X.T W X) (X.T W y) y_smoothed[i] beta[0] return y_smoothed4. 性能优化与向量化实现上述实现使用了显式循环效率较低。我们可以利用NumPy的广播机制进行向量化优化def vectorized_lowess(x, y, h0.2, n_steps3): 向量化优化的Lowess实现 x np.asarray(x) y np.asarray(y) n len(x) h_abs h * (x.max() - x.min()) # 初始平滑 y_smoothed np.zeros(n) robust_weights np.ones(n) for _ in range(n_steps 1): # 额外一步用于初始平滑 for i in range(n): x0 x[i] distances np.abs(x - x0) weights tricube_kernel(distances / h_abs) * robust_weights # 加权最小二乘 X np.column_stack([np.ones_like(x), x - x0]) WX weights[:, None] * X XWX X.T WX XWy X.T (weights * y) beta linalg.solve(XWX, XWy) y_smoothed[i] beta[0] # 更新稳健权重除了最后一次迭代 if _ n_steps: residuals y - y_smoothed s np.median(np.abs(residuals)) / 0.6745 u residuals / (6 * s) robust_weights np.where(np.abs(u) 1, (1 - u**2)**2, 0) return y_smoothed5. 与statsmodels的对比验证为了验证我们的实现我们生成测试数据并与statsmodels的结果进行对比from statsmodels.nonparametric.smoothers_lowess import lowess # 生成测试数据 np.random.seed(42) x np.linspace(0, 2*np.pi, 100) y np.sin(x) 0.3 * np.random.normal(size100) # 计算各版本结果 sm_result lowess(y, x, frac0.2, it3)[:, 1] our_result robust_lowess(x, y, h0.2, n_steps3) # 计算差异 diff np.abs(sm_result - our_result) print(f最大差异: {diff.max():.4f}, 平均差异: {diff.mean():.4f})典型输出结果最大差异: 0.0231, 平均差异: 0.0058可视化对比plt.figure(figsize(10, 6)) plt.scatter(x, y, label原始数据, alpha0.5) plt.plot(x, np.sin(x), k--, label真实函数) plt.plot(x, sm_result, r-, labelstatsmodels) plt.plot(x, our_result, b:, label我们的实现, linewidth3) plt.legend() plt.title(Lowess平滑结果对比) plt.show()差异主要来源于核函数实现的细微差别权重标准化处理方式不同稳健迭代中残差计算的顺序差异6. 参数选择与调优建议Lowess的效果很大程度上依赖于参数选择以下是实用建议参数典型范围影响推荐策略h/frac0.1-0.5控制平滑程度从0.2开始根据数据噪声调整n_steps/it2-5稳健迭代次数3次通常足够更多次收益递减核函数-影响权重分配三次方核平衡了平滑与局部性实际应用中可以通过交叉验证选择最优参数def optimize_lowess(x, y, h_valuesnp.linspace(0.1, 0.5, 5)): 通过交叉验证选择最优h值 mse_scores [] n len(x) k 5 # 5折交叉验证 for h in h_values: mse 0 for i in range(k): mask np.zeros(n, dtypebool) mask[i::k] True y_pred robust_lowess(x[~mask], y[~mask], hh)[mask] mse np.mean((y[mask] - y_pred)**2) mse_scores.append(mse/k) return h_values[np.argmin(mse_scores)]7. 高级应用与扩展思路基础Lowess可以扩展为更强大的变体多变量Lowess将算法扩展到多维特征空间自适应带宽根据数据密度动态调整h值导数估计通过局部多项式拟合估计导数分类Lowess适用于分类问题的变体实现自适应带宽的示例def adaptive_bandwidth(x, k10): 计算每个点的自适应带宽 n len(x) h np.zeros(n) for i in range(n): distances np.sort(np.abs(x - x[i])) h[i] distances[k] # 取第k近邻的距离 return h def adaptive_lowess(x, y, k10, n_steps3): 带自适应带宽的Lowess h adaptive_bandwidth(x, k) y_smoothed np.zeros_like(y) robust_weights np.ones_like(y) for _ in range(n_steps 1): for i in range(len(x)): weights tricube_kernel(np.abs(x - x[i]) / h[i]) * robust_weights X np.column_stack([np.ones_like(x), x - x[i]]) WX weights[:, None] * X beta linalg.solve(X.T WX, X.T (weights * y)) y_smoothed[i] beta[0] if _ n_steps: residuals y - y_smoothed s np.median(np.abs(residuals)) / 0.6745 u residuals / (6 * s) robust_weights np.where(np.abs(u) 1, (1 - u**2)**2, 0) return y_smoothed在实际项目中自实现Lowess的价值体现在对特殊需求的定制化处理上。比如处理非均匀采样数据时可以调整权重计算方式在实时系统中可以优化计算效率在边缘计算场景可以简化算法以适应资源限制。