别再死记硬背公式了!用Python手把手带你实现UserCF和ItemCF(附MovieLens数据集实战)
从零实现UserCF与ItemCF用Python构建电影推荐系统推荐系统早已渗透进我们数字生活的每个角落——从电商平台的猜你喜欢到视频网站的推荐观看背后都离不开协同过滤算法的支撑。今天我将带大家用Python亲手实现两种最经典的协同过滤算法基于用户的协同过滤(UserCF)和基于物品的协同过滤(ItemCF)。我们将使用MovieLens数据集这个包含10万条电影评分的数据集是推荐系统领域的Hello World。1. 环境准备与数据加载在开始编码前我们需要准备好Python环境和必要的数据集。推荐使用Anaconda创建干净的虚拟环境conda create -n recommender python3.8 conda activate recommender pip install pandas numpy scikit-learnMovieLens 100K数据集可以从GroupLens官网下载解压后我们主要使用u.data文件它包含用户ID、电影ID、评分和时间戳的四列数据。让我们先加载并探索这个数据集import pandas as pd # 加载数据 ratings pd.read_csv(ml-100k/u.data, sep\t, names[user_id, item_id, rating, timestamp]) print(f数据集形状: {ratings.shape}) print(f用户数量: {ratings[user_id].nunique()}) print(f电影数量: {ratings[item_id].nunique()}) print(\n前5条数据:) print(ratings.head())输出结果会显示我们有943位用户对1682部电影的100,000条评分记录评分范围是1-5分。为了后续处理方便我们将数据转换为字典格式def convert_to_dict(df): user_item_dict {} for _, row in df.iterrows(): user_id str(row[user_id]) item_id str(row[item_id]) rating row[rating] if user_id not in user_item_dict: user_item_dict[user_id] {} user_item_dict[user_id][item_id] rating return user_item_dict data_dict convert_to_dict(ratings)2. 相似度计算的核心原理协同过滤算法的关键在于如何量化用户或物品之间的相似度。常用的相似度度量方法有余弦相似度(Cosine Similarity): 测量两个向量夹角的余弦值杰卡德相似系数(Jaccard Index): 适合二元交互数据(如点击/未点击)皮尔逊相关系数(Pearson Correlation): 考虑用户评分偏置在MovieLens这种显式评分数据集中余弦相似度通常表现良好。其数学表达式为$$ \text{sim}(u,v) \frac{N(u) \cap N(v)}{\sqrt{|N(u)| \times |N(v)|}} $$其中$N(u)$表示用户$u$评分过的电影集合。Python实现如下from math import sqrt def cosine_sim(user1, user2, user_item_dict): 计算两个用户之间的余弦相似度 common_items set(user_item_dict[user1]) set(user_item_dict[user2]) if not common_items: return 0 numerator len(common_items) denominator sqrt(len(user_item_dict[user1])) * sqrt(len(user_item_dict[user2])) return numerator / denominator3. 实现基于用户的协同过滤(UserCF)UserCF的核心思想是相似用户喜欢相似物品。算法分为三个步骤计算目标用户与其他用户的相似度选择最相似的K个用户(邻居)根据邻居的评分预测目标用户的兴趣3.1 构建用户相似度矩阵直接计算所有用户两两之间的相似度效率很低($O(n^2)$复杂度)。我们可以优化使用物品-用户倒排表def build_user_sim_matrix(user_item_dict): 使用倒排表优化用户相似度计算 # 建立物品-用户倒排表 item_users {} for user, items in user_item_dict.items(): for item in items: if item not in item_users: item_users[item] set() item_users[item].add(user) # 计算共同评分物品数 user_sim_matrix {} for item, users in item_users.items(): for u in users: if u not in user_sim_matrix: user_sim_matrix[u] {} for v in users: if u v: continue if v not in user_sim_matrix[u]: user_sim_matrix[u][v] 0 user_sim_matrix[u][v] 1 # 计算最终相似度 for u, related_users in user_sim_matrix.items(): for v, count in related_users.items(): user_sim_matrix[u][v] count / (sqrt(len(user_item_dict[u])) * sqrt(len(user_item_dict[v]))) return user_sim_matrix user_sim build_user_sim_matrix(data_dict)3.2 生成推荐列表有了相似度矩阵后我们可以为目标用户生成推荐def user_cf_recommend(target_user, user_item_dict, user_sim_matrix, k20): 基于用户的协同过滤推荐 # 存储物品的推荐得分 item_scores {} # 目标用户已评分的物品 rated_items set(user_item_dict[target_user]) # 遍历最相似的k个用户 similar_users sorted(user_sim_matrix[target_user].items(), keylambda x: x[1], reverseTrue)[:k] for similar_user, similarity in similar_users: # 遍历相似用户评分过的物品 for item, rating in user_item_dict[similar_user].items(): if item in rated_items: continue if item not in item_scores: item_scores[item] 0 # 累加相似度*评分 item_scores[item] similarity * float(rating) # 按得分排序返回推荐结果 return sorted(item_scores.items(), keylambda x: x[1], reverseTrue) # 为用户196生成推荐 recommendations user_cf_recommend(196, data_dict, user_sim, k10) print(UserCF推荐结果:, recommendations[:10])4. 实现基于物品的协同过滤(ItemCF)与UserCF不同ItemCF的核心是用户喜欢相似物品。其优势在于更适合物品数远少于用户数的场景推荐结果更稳定可解释性更强适合长尾物品的发现4.1 构建物品相似度矩阵同样使用倒排表优化计算def build_item_sim_matrix(user_item_dict): 构建物品相似度矩阵 # 计算物品被多少用户评分过 item_popularity {} for user, items in user_item_dict.items(): for item in items: if item not in item_popularity: item_popularity[item] 0 item_popularity[item] 1 # 建立用户-物品倒排表 user_items user_item_dict # 计算物品共现矩阵 item_sim_matrix {} for user, items in user_items.items(): for item1 in items: if item1 not in item_sim_matrix: item_sim_matrix[item1] {} for item2 in items: if item1 item2: continue if item2 not in item_sim_matrix[item1]: item_sim_matrix[item1][item2] 0 # 引入IUF(Inverse User Frequency)惩罚活跃用户 item_sim_matrix[item1][item2] 1 / math.log(1 len(items)) # 计算最终相似度 for item1, related_items in item_sim_matrix.items(): for item2, count in related_items.items(): item_sim_matrix[item1][item2] count / math.sqrt(item_popularity[item1] * item_popularity[item2]) return item_sim_matrix item_sim build_item_sim_matrix(data_dict)4.2 生成ItemCF推荐def item_cf_recommend(target_user, user_item_dict, item_sim_matrix, k20): 基于物品的协同过滤推荐 item_scores {} rated_items user_item_dict[target_user] for item, rating in rated_items.items(): # 找到与当前物品最相似的k个物品 similar_items sorted(item_sim_matrix[item].items(), keylambda x: x[1], reverseTrue)[:k] for similar_item, similarity in similar_items: if similar_item in rated_items: continue if similar_item not in item_scores: item_scores[similar_item] 0 item_scores[similar_item] similarity * float(rating) return sorted(item_scores.items(), keylambda x: x[1], reverseTrue) # 为用户196生成ItemCF推荐 item_recommendations item_cf_recommend(196, data_dict, item_sim, k10) print(ItemCF推荐结果:, item_recommendations[:10])5. 算法对比与性能优化UserCF和ItemCF在实际应用中各有优劣对比维度UserCFItemCF适用场景用户数较少社交属性强的领域物品数较少电商等场景实时性用户新行为会影响推荐结果物品相似度矩阵更新频率可以较低推荐多样性相对较低更容易发现长尾物品冷启动问题新用户难以处理新物品难以处理5.1 性能优化技巧相似度矩阵稀疏化只存储每个用户/物品最相似的TopN邻居大幅减少内存占用增量更新定期只更新变化部分的相似度而非全量计算并行计算使用多进程或Spark等分布式框架加速大规模计算相似度归一化将相似度缩放到[0,1]范围避免热门物品主导推荐# 相似度矩阵稀疏化示例 def sparsify_sim_matrix(sim_matrix, topk50): 只保留每个用户/物品最相似的topk个邻居 sparse_matrix {} for key, neighbors in sim_matrix.items(): sparse_matrix[key] dict(sorted(neighbors.items(), keylambda x: x[1], reverseTrue)[:topk]) return sparse_matrix sparse_user_sim sparsify_sim_matrix(user_sim)5.2 评估推荐质量我们可以将数据集划分为训练集和测试集使用准确率、召回率等指标评估from sklearn.model_selection import train_test_split # 划分训练测试集 train_data, test_data train_test_split(ratings, test_size0.2, random_state42) # 转换为字典格式 train_dict convert_to_dict(train_data) test_dict convert_to_dict(test_data) def evaluate(user_id, recommendations, test_dict, topk10): 评估推荐效果 if user_id not in test_dict: return 0, 0 test_items set(test_dict[user_id]) if not test_items: return 0, 0 recommended_items set([item for item, _ in recommendations[:topk]]) hit recommended_items test_items precision len(hit) / topk recall len(hit) / len(test_items) return precision, recall # 评估UserCF user_rec user_cf_recommend(196, train_dict, user_sim) precision, recall evaluate(196, user_rec, test_dict) print(fUserCF Precision{10}: {precision:.4f}, Recall{10}: {recall:.4f}) # 评估ItemCF item_rec item_cf_recommend(196, train_dict, item_sim) precision, recall evaluate(196, item_rec, test_dict) print(fItemCF Precision{10}: {precision:.4f}, Recall{10}: {recall:.4f})6. 工程实践中的挑战与解决方案在实际生产环境中实现协同过滤算法会遇到几个典型问题冷启动问题新用户采用混合推荐策略结合内容推荐或热门推荐新物品利用物品内容信息计算初始相似度数据稀疏性使用矩阵填充技术补全缺失值引入隐语义模型(LFM)降维处理可扩展性对于超大规模数据采用MinHash等近似算法使用分布式计算框架如Spark MLlib实时性要求在线学习更新用户最近兴趣构建两级推荐系统离线计算在线调整# 简单的在线学习示例 class OnlineRecommender: def __init__(self, initial_sim_matrix): self.sim_matrix initial_sim_matrix def update_with_new_rating(self, user, item, rating): 根据新评分动态更新相似度矩阵 # 简化示例实际实现会更复杂 if user not in self.sim_matrix: self.sim_matrix[user] {} # 更新与已有用户的相似度 for other_user in self.sim_matrix: if other_user user: continue # 简化的相似度更新逻辑 common_items ... # 计算共同评分物品 self.sim_matrix[user][other_user] len(common_items) / ... return self.sim_matrix在实现推荐系统时我经常遇到相似度计算耗时的瓶颈。一个实用的技巧是预先计算并缓存相似度矩阵然后定期(如每天)增量更新而不是实时计算。对于千万级用户规模的系统专业的向量数据库如Milvus或Faiss能极大加速最近邻搜索过程。