MLOps 实践从开发到运维1. 技术分析1.1 MLOps 概述MLOps 是机器学习运维的方法论MLOps 核心组件 模型训练 → 模型部署 → 模型监控 → 模型更新1.2 MLOps 工具栈组件功能常用工具训练模型训练PyTorch、TensorFlow实验追踪实验管理MLflow、Weights Biases模型存储模型管理MLflow Model Registry部署服务部署TorchServe、FastAPI监控性能监控Prometheus、Evidently编排工作流Airflow、Prefect1.3 MLOps 成熟度模型MLOps 成熟度 Level 1: 手动流程 Level 2: 自动化训练 Level 3: 自动化部署 Level 4: 持续学习2. 核心功能实现2.1 MLOps 流水线import mlflow from prefect import Flow, task from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score task def load_data(): import pandas as pd return pd.read_csv(data.csv) task def preprocess(data): data data.dropna() return data task def split_data(data): from sklearn.model_selection import train_test_split X data.drop(target, axis1) y data[target] return train_test_split(X, y, test_size0.2) task def train_model(X_train, y_train, params): with mlflow.start_run(): mlflow.log_params(params) model RandomForestClassifier(**params) model.fit(X_train, y_train) mlflow.sklearn.log_model(model, model) return model task def evaluate_model(model, X_val, y_val): predictions model.predict(X_val) accuracy accuracy_score(y_val, predictions) mlflow.log_metric(val_accuracy, accuracy) return accuracy with Flow(ML Pipeline) as flow: data load_data() cleaned_data preprocess(data) X_train, X_val, y_train, y_val split_data(cleaned_data) params {n_estimators: 100, max_depth: 10} model train_model(X_train, y_train, params) accuracy evaluate_model(model, X_val, y_val) class MLOpsPipeline: def __init__(self, config): self.config config def run(self): flow.run()2.2 模型生命周期管理class ModelLifecycleManager: def __init__(self): self.client mlflow.tracking.MlflowClient() def create_model_version(self, model_uri, name): version self.client.create_model_version( namename, sourcemodel_uri, run_idmlflow.active_run().info.run_id ) return version def transition_stage(self, name, version, stage): self.client.transition_model_version_stage( namename, versionversion, stagestage ) def retire_old_versions(self, name, keep_latest3): versions self.client.search_model_versions(fname{name}) versions.sort(keylambda v: int(v.version), reverseTrue) for version in versions[keep_latest:]: self.client.transition_model_version_stage( namename, versionversion.version, stageArchived ) class ModelDeploymentManager: def __init__(self): self.servers {} def deploy_model(self, model_name, version, environment): model_uri fmodels:/{model_name}/{version} if environment staging: self._deploy_to_staging(model_uri) elif environment production: self._deploy_to_production(model_uri) def _deploy_to_staging(self, model_uri): print(fDeploying {model_uri} to staging) def _deploy_to_production(self, model_uri): print(fDeploying {model_uri} to production)2.3 持续集成与部署class CI/CDPipeline: def __init__(self): self.stages [] def add_stage(self, name, function): self.stages.append({name: name, function: function}) def run(self): for stage in self.stages: print(fRunning {stage[name]}...) stage[function]() print(f{stage[name]} completed) def run_tests(): print(Running unit tests...) def build_model(): print(Building model...) def deploy(): print(Deploying model...) class GitLabCIConfig: def generate(self): config stages: - test - build - deploy test: script: - python -m pytest tests/ build: script: - python train.py deploy: script: - python deploy.py return config3. 性能对比3.1 MLOps 工具对比工具功能覆盖易用性扩展性Kubeflow全流程中很高Airflow编排中高MLflow实验追踪高中Prefect工作流高中3.2 部署方式对比方式灵活性可扩展性运维复杂度容器化高高中Serverless高很高低虚拟机中中高3.3 成熟度对比成熟度自动化程度部署频率风险Level 1低手动高Level 2中每周中Level 3高每日低Level 4很高实时低4. 最佳实践4.1 MLOps 架构设计def design_mlops_pipeline(config): pipeline CI/CDPipeline() if config.get(testing, True): pipeline.add_stage(Test, run_tests) if config.get(building, True): pipeline.add_stage(Build, build_model) if config.get(deployment, True): pipeline.add_stage(Deploy, deploy) return pipeline class MLOpsArchitecture: def __init__(self, config): self.pipeline design_mlops_pipeline(config) self.lifecycle_manager ModelLifecycleManager() def run(self): self.pipeline.run() model_name config.get(model_name, my_model) self.lifecycle_manager.retire_old_versions(model_name)4.2 模型监控与更新class ModelMonitoringAndRetraining: def __init__(self, config): self.monitor ModelPerformanceMonitor() self.retrainer ModelRetrainer(config) def check_and_retrain(self): performance self.monitor.get_performance() if performance[accuracy] config[threshold]: print(Triggering retraining...) self.retrainer.retrain() new_model_version self.lifecycle_manager.create_model_version( models:/my_model/latest, my_model ) self.lifecycle_manager.transition_stage( my_model, new_model_version.version, Production ) class ModelRetrainer: def __init__(self, config): self.config config def retrain(self): data self._fetch_latest_data() model self._train_model(data) self._save_model(model) def _fetch_latest_data(self): import pandas as pd return pd.read_csv(latest_data.csv) def _train_model(self, data): from sklearn.ensemble import RandomForestClassifier X data.drop(target, axis1) y data[target] model RandomForestClassifier() model.fit(X, y) return model def _save_model(self, model): mlflow.sklearn.log_model(model, model)5. 总结MLOps 是机器学习工程化的关键流水线自动化模型训练和部署生命周期管理管理模型版本CI/CD持续集成与部署监控与更新持续监控和自动重训练对比数据如下Kubeflow 是最全面的 MLOps 平台MLflow 是优秀的实验追踪工具容器化部署是推荐的部署方式推荐逐步提升 MLOps 成熟度