影刀RPA店群自动化教程Python协同流程模块化与依赖注入实战一个上货流程从登录到提交两百个步骤塞在一个文件里。拼多多店群自动化上架方案改一个选择器要在这个“意大利面条”里翻找十分钟还担心牵一发动全身。店群自动化做到中后期流程复杂度是指数级增长的。早期一个“上货”流程就是一条线性的影刀脚本后来逐渐加入了价格校验、图片审核、运费模板选择、活动报名检查……流程文件越来越长维护成本越来越高。更麻烦的是很多操作在不同流程中是重复的。“登录店铺”、“选择商品类目”、“上传图片”这些步骤在商品上架、商品编辑、活动报名等多个流程中都会出现。但因为是复制粘贴的一个步骤的优化需要同步修改四五个流程文件。我们后来将影刀RPA的流程进行了彻底的模块化拆分并用Python构建了一套依赖注入和流程组装框架让每一个流程都是“可组合的积木”。这篇文章就展开这套模块化体系的设计思想、工程实现和落地经验。TEMU店群如何管理运营一、把流程拆成原子模块从“脚本”到“组件”模块化的第一步是把长流程分解为短小、单一职责的子流程。拆分原则一个子流程只做一件事比如“登录”、“选择商品类目”、“上传主图”、“填写价格”。子流程独立可测试每个子流程可以单独运行和验证不依赖外部上下文。子流程有明确的输入和输出输入是参数JSON输出是结果JSON和状态码。早期我们拆分后的模块结构大概是这样的flows/ pdd/ login.flow select_category.flow upload_image.flow fill_price.flow submit_product.flow temu/ login.flow ... common/ wait_for_element.flow scroll_page.flow 但光有文件拆分还不够。 模块之间的调用关系、参数传递、错误处理如果在影刀内部硬编码依旧难以维护。 所以我们将“组装”的职责交给了Python调度层。 --- ## 二、流程组装的“配方”用配置描述流程 我们不直接在影刀里写死调用哪个子流程而是用一个JSON/YAML配置文件来描述整个业务流程的组合逻辑。 一个拼多多商品上架的“配方”示例如下 yaml flow_name: pdd_upload_product version: 3.2.0 modules: - module: pdd/login - id: login_step - params: - shop_id: {{ input.shop_id }} - on_failure: abort - module: common/navigate - id: nav_create - params: - url: /product/create - depends_on: [login_step] - module: pdd/select_category - id: select_cat - params: - category_id: {{ input.category_id }} - depends_on: [nav_create] - module: pdd/upload_image - id: upload_main_img - params: - image_url: {{ input.main_image }} - image_type: main - depends_on: [select_cat] - module: pdd/upload_image - id: upload_detail_imgs - params: - image_urls: {{ input.detail_images }} - image_type: detail - depends_on: [select_cat] - module: pdd/fill_price - id: fill_price_step - params: - price: {{ input.price }} - compare_price: {{ input.compare_price }} - depends_on: [select_cat] - module: pdd/submit_product - id: submit_step - params: - expect_status: onsale - depends_on: [upload_main_img, upload_detail_imgs, fill_price_step] - on_failure: retry_twice - 这个配置描述了整个上货流程由哪些模块组成模块之间的依赖关系depends_on以及每个模块的输入参数如何从上游或外部输入中获得。 Python调度引擎读取这个配方自动解析依赖生成执行计划。 --- ## 三、依赖注入让模块不关心参数从哪来 上面的配置中参数值使用了 {{ input.shop_id }} 这样的占位符。 Python引擎在执行前会将外部输入的数据注入到模块参数中模块本身不需要知道数据来源。 依赖注入分几个层级 - **外部输入**由调度层在创建任务时提供如 shop_id, product_data - - **上游模块输出**前一个模块的返回值可以作为后续模块的输入 - - **全局配置**店铺的公共配置如运费模板ID、默认仓库ID - - **环境变量**Worker相关的参数如临时目录路径 python import re from typing import Any, Dict class ParameterResolver: EXPR_PATTERN re.compile(r\{\{\s*(.?)\s*\}\}) def resolve(self, template: Any, context: Dict[str, Any]) - Any: if isinstance(template, str): return self.EXPR_PATTERN.sub( lambda m: str(self._resolve_expression(m.group(1), context)), template ) elif isinstance(template, dict): return {k: self.resolve(v, context) for k, v in template.items()} elif isinstance(template, list): return [self.resolve(item, context) for item in template] return template def _resolve_expression(self, expr: str, context: dict): parts expr.split(.) value context for part in parts: if isinstance(value, dict): value value.get(part, ) else: value getattr(value, part, ) return value 上下文在任务创建时初始化随着模块执行不断更新。 每个模块执行完毕后其输出会自动合并到上下文中供后续模块使用。 --- ## 四、流程组装引擎自动处理依赖和并行 Python引擎在接收到配方后先做两件事 1. 解析 depends_on 关系构建DAG有向无环图。 2. 2. 按依赖顺序执行无依赖关系的模块可以并行执行。 python import asyncio class FlowAssembler: def __init__(self, module_registry, resolver): self.registry module_registry self.resolver resolver def build_execution_plan(self, recipe: dict) - dict: modules recipe[modules] id_map {m[id]: m for m in modules} dep_graph {m[id]: set(m.get(depends_on, [])) for m in modules} # 拓扑排序 order self._topological_sort(dep_graph) return {order: order, id_map: id_map} async def execute(self, recipe: dict, input_context: dict): plan self.build_execution_plan(recipe) context {input: input_context, modules: {}} for batch in plan[order]: batch_tasks [] for module_id in batch: module_def plan[id_map][module_id] resolved_params self.resolver.resolve(module_def[params], context) batch_tasks.append(self._execute_module(module_def, resolved_params)) results await asyncio.gather(*batch_tasks, return_exceptionsTrue) for module_id, result in zip(batch, results): if isinstance(result, Exception): context[modules][module_id] {success: False, error: str(result)} else: context[modules][module_id] result return context def _topological_sort(self, dep_graph: dict) - list: in_degree {n: 0 for n in dep_graph} for deps in dep_graph.values(): for d in deps: in_degree[d] in_degree.get(d, 0) 1 # Kahn算法返回每层并行节点 from collections import deque batches [] queue deque([n for n, d in in_degree.items() if d 0]) while queue: batch list(queue) batches.append(batch) queue.clear() for n in batch: for dep in dep_graph.get(n, []): in_degree[dep] - 1 if in_degree[dep] 0: queue.append(dep) return batches 同一个配方在上例中upload_main_img、upload_detail_imgs 和 fill_price_step 之间没有依赖会被安排在同一批次并行执行缩短总时间。 --- ## 五、模块注册表与版本管理 随着模块增加需要一个注册表来管理所有可用模块及其版本。 python class ModuleRegistry: def __init__(self, db): self.db db async def register_module(self, name: str, version: str, flow_file_path: str, input_schema: dict, output_schema: dict, platform: str): await self.db.execute( INSERT INTO flow_modules (name, version, flow_path, input_schema, output_schema, platform, created_at) VALUES ($1, $2, $3, $4, $5, $6, NOW()), name, version, flow_file_path, json.dumps(input_schema), json.dumps(output_schema), platform ) async def resolve_module(self, module_name: str, platform: str, version: str None) - dict: if version: row await self.db.fetchrow( SELECT * FROM flow_modules WHERE name$1 AND platform$2 AND version$3, module_name, platform, version ) else: row await self.db.fetchrow( SELECT * FROM flow_modules WHERE name$1 AND platform$2 ORDER BY created_at DESC LIMIT 1, module_name, platform ) if not row: raise ModuleNotFoundError(fModule {module_name} for {platform} not found) return dict(row) 配方中引用的模块由注册表找到对应的影刀流程文件路径Worker拉取后执行。 --- ## 六、模块的独立测试与Mock 每个子流程模块都可以单独测试。 我们在沙箱测试环境中可以只运行一个模块传入模拟的输入参数验证输出是否符合预期。 对于依赖外部服务的模块如“上传图片”需要图片服务可用我们提供Mock模块来替换。 Mock模块有相同的接口输入输出Schema但用固定返回数据替代真实操作。 python class MockModule: def __init__(self, name, output): self.name name self.output output async def execute(self, params): return {success: True, mock: True, data: self.output} 在测试配方中可以通过配置将特定模块替换为Mock实现低成本的功能验证。 --- ## 七、模块复用的收益 模块化推行后我们看到了几个直接的收益 - **新流程开发时间**从平均2天降到半天。很多新业务流程只需编写一个新的配方复用现成的子流程模块。 - - **缺陷修复扩散**一个“登录”模块的bug修复后所有引用它的流程都自动修复不再需要逐个改。 - - **跨平台复用**虽然拼多多和TEMU的页面结构不同但“商品图片压缩”、“文本敏感词过滤”这类通用模块可以直接复用平台无关。 **真正的问题不在于“能不能用”而在于“改起来疼不疼”。** --- ## 八、与影刀子流程的配合 影刀RPA本身支持子流程调用我们利用这个能力将每个模块实现为一个独立的子流程。 主流程由配方生成其实是一个很薄的壳里面只有“调用子流程A”、“调用子流程B”的控制逻辑以及对返回结果的判断。 这样模块化不仅停留在Python调度层也贯彻到了影刀流程文件本身的结构上。 --- ## 九、踩坑与优化 **参数传递的序列化开销。** 模块之间通过JSON文件传递参数早期每个模块执行完都写一次磁盘影响性能。后来改为在Worker内存中维护上下文仅将必要数据序列化性能提升明显。 **模块契约管理。** 模块的输入输出Schema如果没有严格执行会出现“改了输出格式导致下游模块报错”的问题。我们用JSON Schema校验每个模块的输入和输出在测试阶段就能发现契约不一致。 **配方配置的校验。** 运营在配置配方时可能引用不存在的模块或者写错参数名。我们在配方保存时做静态校验确认所有引用的模块在注册表中存在且参数名与模块Input Schema匹配。 --- ## 十、写在最后 自动化流程的模块化不是简单的“把大文件拆成小文件”。 它是用工程化的手段——依赖注入、注册表、版本管理、契约校验——将自动化脚本变成可装配、可复用、可测试的软件组件。 当流程不再是一个几千步的巨型脚本而是一个优雅的配方和一组精巧的积木时整个团队的开发效率和信心都上了一个台阶。 模块化之后的自动化系统不再怕改需求。 因为你知道改一个模块二十个流程自动跟着变好而不是悄悄坏掉。 --- *作者林焱*