PLECS 自动化场景教程:参数扫描、寻优与自动化测试
前言本文面向已经了解 PLECS XML-RPC 基本原理见核心原理篇的读者通过5 个实际场景演示如何编写完整的自动化脚本。每个场景都提供可独立运行的代码模板只需修改配置部分即可用于你自己的 PLECS 模型。前提条件PLECS 已开启服务器模式Preferences → Enable server mode端口默认 1080。准备工作提取通用工具模块在开始编写各场景脚本之前先创建一个plecs_helper.py文件将通信层和辅助工具类封装起来后续所有场景脚本共用。这样做的好处是核心代码只需维护一份修改重试次数这种需求只需改一个地方。文件plecs_helper.pyimport xmlrpc.client import numpy as np import pandas as pd import os import math import time class PLECSConnector: 与 PLECS 的 XML-RPC 连接管理带重试机制 def __init__(self, url, port, model_path): self.url f{url}:{port} self.model_path model_path self.model_name os.path.splitext(os.path.basename(model_path))[0] self.proxy None def connect(self): print(fConnecting to PLECS at {self.url}...) self.proxy xmlrpc.client.ServerProxy(self.url) self.proxy.plecs.load(self.model_path) print(fModel loaded: {self.model_name}) return self.proxy def simulate(self, optsNone, max_retries3, retry_delay1.0): if not self.proxy: self.connect() for attempt in range(max_retries): try: if opts: return self.proxy.plecs.simulate(self.model_name, opts) else: return self.proxy.plecs.simulate(self.model_name) except Exception as e: print(f[Warn] Attempt {attempt1}/{max_retries}: {e}) if attempt max_retries - 1: time.sleep(retry_delay) raise RuntimeError(All simulation attempts failed) staticmethod def test_connection(url, port): try: proxy xmlrpc.client.ServerProxy(f{url}:{port}) ver proxy.plecs.version() return True, fConnected. PLECS version: {ver} except Exception as e: return False, str(e) class OptimizationUtils: 优化辅助工具自适应罚函数 Pareto 前沿提取 staticmethod def calculate_penalty(kpis, constraint_func_str, current_obj_value): if not constraint_func_str or constraint_func_str.strip() []: return 0.0 expr_str constraint_func_str.replace(return , ).strip() try: constraints eval(expr_str, {np: np, math: math}, {kpis: kpis}) if not isinstance(constraints, list): return 0.0 violation sum(max(0, g) ** 2 for g in constraints) if violation 0: base abs(current_obj_value) if not math.isinf(current_obj_value) else 1000.0 penalty max(base, 1000.0) * 2 violation * 1e4 print(f [Constraint] Violation! Penalty: {penalty:.2f}) return penalty except Exception as e: print(f [Error] Constraint check failed: {e}) return float(inf) return 0.0 staticmethod def get_pareto_front(df, objectives, directions): if df.empty or not objectives: return df data df[objectives].values.copy() data np.nan_to_num(data, nan1e12) for i, d in enumerate(directions): if d maximize: data[:, i] -data[:, i] n data.shape[0] is_pareto np.ones(n, dtypebool) for i in range(n): for j in range(n): if i j: continue if np.all(data[j] data[i]) and np.any(data[j] data[i]): is_pareto[i] False break return df[is_pareto].copy() class EarlyStopper: 早停机制连续 patience 次无改善则停止 def __init__(self, patience20, min_delta1e-4): self.patience patience self.min_delta min_delta self.best_value float(inf) self.wait 0 def check(self, current_value): if current_value self.best_value - self.min_delta: self.best_value current_value self.wait 0 return False self.wait 1 if self.wait self.patience: print(f\n[Early Stopping] No improvement for {self.patience} trials.) return True return False场景一参数扫描 — 我想看参数变化对结果的趋势需求对 DC-DC 变换器中电感L1和输出电容C_out进行网格扫描输出效率efficiency和电压纹波ripple保存为 CSV 供后续分析。完整脚本文件sweep_dcdc.pyimport numpy as np import pandas as pd from itertools import product import time import sys import os sys.path.append(os.path.dirname(os.path.abspath(__file__))) from plecs_helper import PLECSConnector # 修改这里的配置 CONFIG { connection: { server_url: http://localhost, port: 1080, model_path: rD:/models/dcdc_converter.plecs, }, parameters: { L1: {type: linspace, start: 50e-6, stop: 200e-6, num: 10}, C_out: {type: linspace, start: 47e-6, stop: 470e-6, num: 10}, }, output_csv: sweep_results.csv, } # def get_param_combinations(): 根据配置生成所有参数组合 ranges [] names [] for name, cfg in CONFIG[parameters].items(): names.append(name) if cfg[type] linspace: ranges.append(np.linspace(cfg[start], cfg[stop], cfg[num])) elif cfg[type] logspace: ranges.append(np.logspace(np.log10(cfg[start]), np.log10(cfg[stop]), cfg[num])) elif cfg[type] list: ranges.append(cfg[values]) return names, list(product(*ranges)) def extract_kpis(result): 提取 KPI效率Outport 0、纹波Outport 1 if not isinstance(result, dict) or Values not in result: return {efficiency: np.nan, ripple: np.nan} vals result[Values] return { efficiency: vals[0][-1] if len(vals) 0 else np.nan, # Outport 0: 取稳态值 ripple: vals[1].max() if len(vals) 1 else np.nan, # Outport 1: 取最大纹波 } if __name__ __main__: start_time time.time() print( PLECS Parameter Sweep ) conn PLECSConnector( CONFIG[connection][server_url], CONFIG[connection][port], CONFIG[connection][model_path], ) proxy conn.connect() param_names, combinations get_param_combinations() optStructs [{ModelVars: dict(zip(param_names, combo))} for combo in combinations] print(fTotal simulations: {len(optStructs)}) all_results [] for i, opt in enumerate(optStructs): print(fRun {i1}/{len(optStructs)}: {opt[ModelVars]}) try: result conn.simulate(opt) kpis extract_kpis(result) all_results.append({**opt[ModelVars], **kpis}) print(f - efficiency{kpis[efficiency]:.4f}, ripple{kpis[ripple]:.6f}) except Exception as e: print(f [Error] {e}) if all_results: df pd.DataFrame(all_results) df.to_csv(CONFIG[output_csv], indexFalse) print(f\nResults saved to: {CONFIG[output_csv]}) print(fTotal time: {time.time() - start_time:.1f}s)运行方式# 1. 确保 PLECS 已打开且启用了 Server Mode # 2. 将模型中的扫参数改为变量名如 L1, C_out # 3. 添加两个 Output PortPort 0 接 EfficiencyPort 1 接 Ripple # 4. 运行脚本 python sweep_dcdc.py输出生成的sweep_results.csv格式L1C_outefficiencyripple5.0e-054.7e-050.9540.0001235.0e-059.4e-050.9570.000098............可以直接用 Excel、MATLAB 或 Python 画 2D/3D 曲面图进行趋势分析。场景二Optuna 参数寻优 — 我想自动找到使 THD 最小的参数需求使用 Optuna 的 TPETree-structured Parzen Estimator算法自动搜索使变换器 THD总谐波失真最小的控制参数Final_Vset和Final_ps。完整脚本文件optimize_dcdc.pyimport numpy as np import pandas as pd import optuna import time import sys import os sys.path.append(os.path.dirname(os.path.abspath(__file__))) from plecs_helper import PLECSConnector, OptimizationUtils, EarlyStopper # 修改这里的配置 CONFIG { connection: { server_url: http://localhost, port: 1080, model_path: rD:/models/dbsrc_forward.plecs, }, parameters: [ {name: Final_Vset, type: Real, min: 300.0, max: 450.0, scale: Linear}, {name: Final_ps, type: Real, min: 0.05, max: 0.25, scale: Linear}, ], kpis: [ {name: THD, source: Outport, index: 0, func: lambda arr: arr[-1]}, ], run: { n_trials: 50, direction: minimize, objective_kpi: THD, output_csv: optimize_results.csv, # 可选早停 enable_early_stop: True, early_stop_patience: 20, }, } # def extract_kpis(result, proxy): 从仿真结果提取 KPI kpi_values {} if not isinstance(result, dict) or Values not in result: for k in CONFIG[kpis]: kpi_values[k[name]] np.nan return kpi_values all_values result[Values] for kpi_cfg in CONFIG[kpis]: try: if kpi_cfg[source] Outport: idx int(kpi_cfg[index]) func eval(kpi_cfg[func]) val float(func(all_values[idx])) if idx len(all_values) else np.nan kpi_values[kpi_cfg[name]] val except Exception: kpi_values[kpi_cfg[name]] np.nan return kpi_values class Objective: def __init__(self): self.conn PLECSConnector( CONFIG[connection][server_url], CONFIG[connection][port], CONFIG[connection][model_path], ) self.proxy self.conn.connect() self.log [] self.stopper None if CONFIG[run].get(enable_early_stop): self.stopper EarlyStopper(CONFIG[run].get(early_stop_patience, 20)) def __call__(self, trial): # 1. 提议参数 p {} for pc in CONFIG[parameters]: if pc[type] Real: p[pc[name]] trial.suggest_float( pc[name], pc[min], pc[max], log(pc[scale] Log) ) print(f\n--- Trial {trial.number} ---) print(f Params: {p}) # 2. 运行仿真 try: result self.conn.simulate({ModelVars: p}) except Exception: return float(inf) # 3. 提取 KPI kpis extract_kpis(result, self.proxy) print(f KPIs: {kpis}) self.log.append({**p, **kpis}) # 4. 计算目标值 obj_val kpis.get(CONFIG[run][objective_kpi], np.nan) if np.isnan(obj_val): return float(inf) direction CONFIG[run].get(direction, minimize) score -obj_val if direction maximize else obj_val print(f Score: {score:.4f}) # 5. 早停检查 if self.stopper and self.stopper.check(score): trial.study.stop() return score def save(self): if self.log: pd.DataFrame(self.log).to_csv(CONFIG[run][output_csv], indexFalse) print(fResults saved to {CONFIG[run][output_csv]}) if __name__ __main__: study optuna.create_study( directionminimize, sampleroptuna.samplers.TPESampler(seed42), ) obj Objective() try: study.optimize(obj, n_trialsCONFIG[run][n_trials]) except KeyboardInterrupt: print(\nInterrupted by user.) finally: obj.save() # 输出最优结果 print(f\n Optimization Complete ) print(fBest THD: {study.best_value:.6f}) print(fBest params: {study.best_params})运行方式pip install optuna python optimize_dcdc.py输出--- Trial 0 --- Params: {Final_Vset: 375.2, Final_ps: 0.142} KPIs: {THD: 0.0234} Score: 0.0234 ... Optimization Complete Best THD: 0.0123 Best params: {Final_Vset: 400.5, Final_ps: 0.118}补充使用 SKOptimize (高斯过程)如果更喜欢 scikit-optimize 的高斯过程代理模型替换 Objective 类和主逻辑from skopt import gp_minimize from skopt.space import Real, Integer def objective_function(args): p dict(zip([Final_Vset, Final_ps], args)) try: result conn.simulate({ModelVars: p}) thd result[Values][0][-1] print(f THD {thd:.6f} {p}) return thd except Exception: return float(inf) space [Real(300, 450, nameFinal_Vset), Real(0.05, 0.25, nameFinal_ps)] result gp_minimize(objective_function, space, n_calls50, random_state42) print(fBest THD: {result.fun:.6f}) print(fBest params: {dict(zip([Final_Vset, Final_ps], result.x))})场景三多目标优化 — 我想同时最小化损耗和 THD需求在变换器设计中损耗和 THD 往往是一对矛盾指标。使用加权和或Pareto 前沿方法进行多目标优化。加权和模式Weighted Sum将两个目标合并为一个综合指标def calculate_weighted_sum(kpis, objectives): objectives [{kpi_name: loss, direction: minimize, weight: 1.0}, {kpi_name: thd, direction: minimize, weight: 1.0}] score 0.0 for obj in objectives: val kpis.get(obj[kpi_name], np.nan) if np.isnan(val): return float(inf) w obj.get(weight, 1.0) # minimize 时直接累加maximize 时取负 score val * w if obj[direction] minimize else -val * w return score # 在 Objective.__call__ 中使用 kpis extract_kpis(result, proxy) weighted_score calculate_weighted_sum(kpis, [ {kpi_name: loss, direction: minimize, weight: 1.0}, {kpi_name: thd, direction: minimize, weight: 0.5}, ]) return weighted_scorePareto 前沿模式使用 Optuna 的 NSGA-II 采样器直接搜索 Pareto 前沿import optuna def objective_multi(trial): p { Final_Vset: trial.suggest_float(Final_Vset, 300, 450), Final_ps: trial.suggest_float(Final_ps, 0.05, 0.25), } try: result conn.simulate({ModelVars: p}) except Exception: return float(inf), float(inf) loss result[Values][0][-1] # Outport 0 thd result[Values][1][-1] # Outport 1 return loss, thd study optuna.create_study( directions[minimize, minimize], sampleroptuna.samplers.NSGAIISampler(), ) study.optimize(objective_multi, n_trials100) # 查看 Pareto 前沿上的解 for i, trial in enumerate(study.best_trials): print(fSolution {i}: loss{trial.values[0]:.4f}, thd{trial.values[1]:.4f}, params{trial.params})场景四波形导出 — 我想把仿真波形完整导出供外部分析需求单次仿真将 Time 向量和所有 Outport 信号的完整波形导出为 CSV便于在 MATLAB 或 Python 中做 FFT、时域分析等。完整脚本文件export_waveform.pyimport xmlrpc.client import numpy as np import pandas as pd import time import os # 修改这里的配置 CONFIG { server_url: http://localhost, port: 1080, model_path: rD:/models/dcdc_converter.plecs, output_csv: waveform_output.csv, signals: [ {name: Vout, index: 0}, {name: I_L1, index: 1}, {name: Efficiency, index: 2}, ], } # def main(): model_name os.path.splitext(os.path.basename(CONFIG[model_path]))[0] url f{CONFIG[server_url]}:{CONFIG[port]} print(fConnecting to {url}...) proxy xmlrpc.client.ServerProxy(url) proxy.plecs.load(CONFIG[model_path]) print(fModel loaded: {model_name}) print(Running simulation...) result proxy.plecs.simulate(model_name) if Time not in result: print([Error] No Time in result) return df pd.DataFrame() df[Time] result[Time] all_values result.get(Values, []) for sig in CONFIG[signals]: idx sig[index] if idx len(all_values): df[sig[name]] all_values[idx] print(f Exported: {sig[name]} (index {idx})) else: print(f [Warn] Signal {sig[name]} index {idx} out of range) df.to_csv(CONFIG[output_csv], indexFalse, float_format%.15g) print(fWaveform saved to: {CONFIG[output_csv]}) if __name__ __main__: main()输出格式Time,Vout,I_L1,Efficiency 0.0,0.0,0.0,0.0 1e-06,0.12,0.05,0.0 2e-06,0.25,0.12,0.0 ...场景五自动化回归测试 — 我想确保改代码后设计指标没退化需求在电路设计迭代过程中每次修改参数或拓扑后自动跑一批测试用例检查关键 KPI 是否在规定的阈值内生成 Pass/Fail 报告。完整脚本文件regression_test.pyimport xmlrpc.client import numpy as np import pandas as pd import time import sys import os sys.path.append(os.path.dirname(os.path.abspath(__file__))) from plecs_helper import PLECSConnector # 修改这里的配置 CONFIG { connection: { server_url: http://localhost, port: 1080, model_path: rD:/models/dcdc_converter.plecs, }, # 测试用例工况参数期望阈值 test_cases: [ { name: 满载效率, params: {Vin: 400, Pout: 3000}, checks: { efficiency: {min: 0.95, max: 1.0}, ripple: {max: 0.01}, }, }, { name: 轻载效率, params: {Vin: 400, Pout: 300}, checks: { efficiency: {min: 0.90, max: 1.0}, ripple: {max: 0.02}, }, }, { name: 低压满载, params: {Vin: 300, Pout: 3000}, checks: { efficiency: {min: 0.93, max: 1.0}, ripple: {max: 0.015}, }, }, ], report_csv: test_report.csv, } # def extract_kpis(result): 提取效率 (Outport 0 稳态值) 和纹波 (Outport 1 最大值) if not isinstance(result, dict) or Values not in result: return {efficiency: np.nan, ripple: np.nan} vals result[Values] return { efficiency: vals[0][-1] if len(vals) 0 else np.nan, ripple: vals[1].max() if len(vals) 1 else np.nan, } def run_test_case(conn, test_case): 执行单个测试用例返回结果和状态 print(f\n--- Test: {test_case[name]} ---) print(f Params: {test_case[params]}) status PASS messages [] try: result conn.simulate({ModelVars: test_case[params]}) kpis extract_kpis(result) except Exception as e: return {name: test_case[name], status: ERROR, detail: str(e), kpis: {}} for kpi_name, thresholds in test_case[checks].items(): actual kpis.get(kpi_name, np.nan) print(f {kpi_name}: {actual}) if np.isnan(actual): status FAIL messages.append(f{kpi_name}: NaN) continue if min in thresholds and actual thresholds[min]: status FAIL messages.append(f{kpi_name}: {actual:.4f} {thresholds[min]:.4f}) if max in thresholds and actual thresholds[max]: status FAIL messages.append(f{kpi_name}: {actual:.4f} {thresholds[max]:.4f}) detail ; .join(messages) if messages else fAll {len(test_case[checks])} checks passed return {name: test_case[name], status: status, detail: detail, kpis: kpis} def main(): print( PLECS Regression Test Suite \n) conn PLECSConnector( CONFIG[connection][server_url], CONFIG[connection][port], CONFIG[connection][model_path], ) conn.connect() results [] for tc in CONFIG[test_cases]: result run_test_case(conn, tc) results.append(result) status_label f[{result[status]}] print(f {status_label} {result[detail]}) # 生成报告 total len(results) passed sum(1 for r in results if r[status] PASS) failed sum(1 for r in results if r[status] FAIL) errors sum(1 for r in results if r[status] ERROR) print(f\n Summary ) print(fTotal: {total} | PASS: {passed} | FAIL: {failed} | ERROR: {errors}) # 写入 CSV 报告 report_data [] for r in results: row {test_name: r[name], status: r[status], detail: r[detail]} row.update(r.get(kpis, {})) report_data.append(row) pd.DataFrame(report_data).to_csv(CONFIG[report_csv], indexFalse) print(fReport saved to: {CONFIG[report_csv]}) return failed 0 and errors 0 if __name__ __main__: success main() exit(0 if success else 1)输出示例 PLECS Regression Test Suite --- Test: 满载效率 --- efficiency: 0.964 ripple: 0.00085 [PASS] All 2 checks passed --- Test: 轻载效率 --- efficiency: 0.925 ripple: 0.0012 [PASS] All 2 checks passed --- Test: 低压满载 --- efficiency: 0.945 ripple: 0.0098 [PASS] All 2 checks passed Summary Total: 3 | PASS: 3 | FAIL: 0 | ERROR: 0集成到 CI/CD脚本返回 0全通过或 1有失败可以集成到任何 CI/CD 流水线# GitHub Actions 示例 - name: Run PLECS Regression Tests run: python regression_test.py进阶技巧断点续传长时间运行的优化任务可能因意外中断。使用joblib保存 Optuna Study 的状态import joblib # 每完成一个 trial 保存状态 def save_callback(study, trial): joblib.dump(study, optimize_checkpoint.pkl) study.optimize(obj, n_trials100, callbacks[save_callback])下次启动时恢复try: study joblib.load(optimize_checkpoint.pkl) print(fResumed with {len(study.trials)} completed trials.) except FileNotFoundError: study optuna.create_study(directionminimize)热启动用已知的好参数作为初始猜测加速优化收敛study optuna.create_study(directionminimize) study.enqueue_trial({Final_Vset: 380, Final_ps: 0.12}) # 热启动 study.optimize(obj, n_trials50)增量写入 CSV防止仿真中途崩溃丢失结果推荐 SKOptimize 模式使用def append_to_csv(record, csv_path): 逐次追加写入不丢失已完成的结果 header not os.path.exists(csv_path) pd.DataFrame([record]).to_csv(csv_path, modea, headerheader, indexFalse)总结场景核心技术适用情况参数扫描itertools.product 串行/并行仿真探索参数趋势、画 2D/3D 图Optuna 优化TPE 贝叶斯 trial.suggest_float30~500 次仿真自动找到最优解多目标优化加权和 / NSGA-II Pareto多个矛盾指标同时优化波形导出单次仿真 完整时间序列获取时域波形做 FFT 等分析回归测试测试用例 阈值检查 报告设计迭代中确保指标不退步所有脚本的通用模式连接 PLECS → 定义参数空间 → 循环执行仿真 → 提取 KPI → 保存结果。掌握这一流程后你可以根据实际需求自由组合和扩展。