数据管理:从采集到特征存储
数据管理从采集到特征存储1. 技术分析1.1 数据管理流程数据管理是机器学习工程的基础数据管理流程 数据采集 → 数据存储 → 数据清洗 → 特征工程 → 特征存储1.2 数据存储方案对比方案类型特点适用场景PostgreSQL关系型结构化数据交易数据MongoDBNoSQL文档型非结构化数据Apache Parquet列存储高效查询大数据Feast特征存储特征管理ML特征1.3 数据质量维度数据质量维度 完整性: 数据是否完整 准确性: 数据是否准确 一致性: 数据格式一致 时效性: 数据是否及时2. 核心功能实现2.1 数据采集import pandas as pd import numpy as np class DataCollector: def __init__(self): self.sources {} def add_source(self, name, source): self.sources[name] source def collect(self): dataframes [] for name, source in self.sources.items(): df source.fetch() df[source] name dataframes.append(df) return pd.concat(dataframes, ignore_indexTrue) class DatabaseSource: def __init__(self, connection_string, query): self.connection_string connection_string self.query query def fetch(self): import psycopg2 conn psycopg2.connect(self.connection_string) df pd.read_sql(self.query, conn) conn.close() return df class APIDataSource: def __init__(self, url, paramsNone): self.url url self.params params or {} def fetch(self): import requests response requests.get(self.url, paramsself.params) data response.json() return pd.DataFrame(data) class FileDataSource: def __init__(self, file_path): self.file_path file_path def fetch(self): if self.file_path.endswith(.csv): return pd.read_csv(self.file_path) elif self.file_path.endswith(.parquet): return pd.read_parquet(self.file_path) elif self.file_path.endswith(.json): return pd.read_json(self.file_path)2.2 数据清洗class DataCleaner: def __init__(self): self.rules [] def add_rule(self, rule): self.rules.append(rule) def clean(self, df): for rule in self.rules: df rule(df) return df class MissingValueHandler: def __init__(self, strategydrop): self.strategy strategy def __call__(self, df): if self.strategy drop: return df.dropna() elif self.strategy fill_mean: numeric_cols df.select_dtypes(include[np.number]).columns df[numeric_cols] df[numeric_cols].fillna(df[numeric_cols].mean()) return df elif self.strategy fill_median: numeric_cols df.select_dtypes(include[np.number]).columns df[numeric_cols] df[numeric_cols].fillna(df[numeric_cols].median()) return df class OutlierHandler: def __init__(self, methodiqr): self.method method def __call__(self, df): numeric_cols df.select_dtypes(include[np.number]).columns for col in numeric_cols: if self.method iqr: q1 df[col].quantile(0.25) q3 df[col].quantile(0.75) iqr q3 - q1 lower_bound q1 - 1.5 * iqr upper_bound q3 1.5 * iqr df df[(df[col] lower_bound) (df[col] upper_bound)] return df class DuplicateHandler: def __call__(self, df): return df.drop_duplicates()2.3 特征存储import feast class FeatureStore: def __init__(self, repo_path): self.repo_path repo_path self.store feast.FeatureStore(repo_pathrepo_path) def register_feature(self, feature_def): self.store.apply([feature_def]) def materialize_features(self, start_date, end_date): self.store.materialize(start_date, end_date) def get_features(self, entity_df, features): return self.store.get_historical_features( entity_dfentity_df, featuresfeatures ).to_df() class FeatureRegistry: def __init__(self): self.features {} def register(self, name, feature): self.features[name] feature def get(self, name): return self.features.get(name) def list_features(self): return list(self.features.keys()) class OnlineFeatureStore: def __init__(self): self.features {} def update(self, entity_id, features): if entity_id not in self.features: self.features[entity_id] {} self.features[entity_id].update(features) def get(self, entity_id): return self.features.get(entity_id, {}) def batch_get(self, entity_ids): return {eid: self.get(eid) for eid in entity_ids}3. 性能对比3.1 数据存储对比存储读取速度写入速度查询灵活性适用场景CSV慢慢低小规模Parquet快中中大规模PostgreSQL中中高事务性Feast快快高ML特征3.2 数据清洗方法对比方法效果计算开销适用场景删除缺失值简单低缺失率低均值填充保持分布低数值特征中位数填充抗异常值低偏态分布3.3 特征存储对比工具在线服务离线服务版本管理部署复杂度Feast是是是中Tecton是是是高Hopsworks是是是高4. 最佳实践4.1 数据管理流程def build_data_pipeline(config): collector DataCollector() for source_config in config[sources]: if source_config[type] database: source DatabaseSource(source_config[connection_string], source_config[query]) elif source_config[type] api: source APIDataSource(source_config[url], source_config.get(params)) elif source_config[type] file: source FileDataSource(source_config[path]) collector.add_source(source_config[name], source) cleaner DataCleaner() cleaner.add_rule(MissingValueHandler(config.get(missing_strategy, drop))) cleaner.add_rule(OutlierHandler()) cleaner.add_rule(DuplicateHandler()) return collector, cleaner class DataManagementWorkflow: def __init__(self, config): self.collector, self.cleaner build_data_pipeline(config) self.feature_store FeatureStore(config.get(feature_store_path, .)) def run(self): print(Collecting data...) raw_data self.collector.collect() print(Cleaning data...) cleaned_data self.cleaner.clean(raw_data) print(Storing features...) self.feature_store.materialize_features( cleaned_data[timestamp].min(), cleaned_data[timestamp].max() ) return cleaned_data4.2 数据质量检查class DataQualityChecker: def __init__(self): pass def check_completeness(self, df): completeness (1 - df.isnull().sum() / len(df)) * 100 return completeness.to_dict() def check_uniqueness(self, df, unique_columns): results {} for col in unique_columns: results[col] df[col].nunique() len(df) return results def check_range(self, df, column_ranges): results {} for col, (min_val, max_val) in column_ranges.items(): if col in df.columns: within_range ((df[col] min_val) (df[col] max_val)).all() results[col] within_range return results def run_all_checks(self, df, unique_columns[], column_ranges{}): return { completeness: self.check_completeness(df), uniqueness: self.check_uniqueness(df, unique_columns), range_check: self.check_range(df, column_ranges) }5. 总结数据管理是机器学习工程的基础数据采集从多种来源获取数据数据清洗确保数据质量特征存储管理和复用特征数据质量持续监控数据质量对比数据如下Feast 是最佳的开源特征存储工具Parquet 是大规模数据的最佳格式推荐使用 Feast 管理 ML 特征数据质量检查应自动化