Wan2.1-UMT5开发实战:Node.js构建视频生成任务管理后台
Wan2.1-UMT5开发实战Node.js构建视频生成任务管理后台最近在折腾AI视频生成发现Wan2.1-UMT5这类模型效果确实惊艳但每次手动调用、等待、下载结果流程太碎片化了。特别是当你想批量处理多个创意或者团队协作时这种“手工作坊”模式效率很低。于是我花了点时间用Node.js搭了一个轻量级的后台管理系统。核心目标很简单把视频生成任务管起来。用户能提交任务、看进度、拿结果后台能自动调度、排队、回调。今天就把这套方案的思路和关键代码分享出来如果你也在做类似的事情希望能给你一些参考。1. 项目目标与核心价值我们不是要做一个大而全的AI平台而是聚焦解决一个具体问题如何高效、可靠地管理来自星图GPU平台的Wan2.1-UMT5视频生成任务。想象一下这个场景你的产品需要为用户提供“文字/图片生成短视频”的功能。用户在前端提交一段描述或一张图你不可能让用户一直盯着页面等几分钟。后台需要默默接过这个任务交给GPU服务器去处理处理完了再通知用户。期间用户还能随时查看任务进行到哪一步了。这个后台系统的核心价值就体现在这里解耦前端与AI服务前端只需提交任务和查询状态复杂的生成、排队、重试逻辑由后台负责。提升用户体验异步处理用户无需等待任务完成后通过回调或界面通知。实现资源管理通过任务队列可以控制并发数避免压垮GPU服务也能实现优先级调度。流程可追溯每个任务的状态、参数、结果、耗时都有记录方便排查问题和数据分析。接下来我们就从环境搭建开始一步步实现它。2. 环境搭建与项目初始化工欲善其事必先利其器。我们先准备好开发环境。2.1 Node.js安装及环境配置首先确保你的机器上安装了Node.js。我推荐使用Node.js 18 LTS或更高版本它在稳定性和新特性上有一个不错的平衡。检查安装打开你的终端命令行输入以下命令node --version npm --version如果正确显示了版本号比如v18.20.0和10.7.0说明已经安装。如果没有可以去Node.js官网下载安装包或者使用nvmNode Version Manager来管理多个版本这对于开发非常方便。使用nvm安装推荐# 安装nvm具体命令请参考nvm官方GitHub仓库 curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.40.1/install.sh | bash # 安装Node.js 18 nvm install 18 nvm use 182.2 初始化项目与安装核心依赖创建一个新的项目目录并初始化我们的Node.js项目。mkdir wan-video-task-manager cd wan-video-task-manager npm init -y接下来安装我们需要的核心依赖包。这里我们用Express作为Web框架Bull作为任务队列。npm install express bull axios npm install -D nodemonexpress: 快速搭建Web API。bull: 基于Redis的快速、健壮的任务队列库完美支持我们的调度需求。axios: 用于向星图GPU平台的API发起HTTP请求。nodemon: 开发工具监听文件变化自动重启服务。此外我们还需要一个Redis服务器。Bull的所有队列数据都存储在Redis中。你可以通过Docker快速启动一个docker run -d -p 6379:6379 --name redis-stack redis/redis-stack:latest或者根据你的系统安装Redis。2.3 项目基础结构创建以下目录和文件让项目结构清晰起来wan-video-task-manager/ ├── src/ │ ├── app.js # Express应用主入口 │ ├── routes/ # 路由定义 │ │ └── tasks.js │ ├── queues/ # Bull队列定义与处理器 │ │ └── videoQueue.js │ ├── services/ # 业务逻辑服务 │ │ └── wanService.js # 调用星图GPU API的服务 │ ├── controllers/ # 控制器 │ │ └── taskController.js │ └── utils/ # 工具函数 ├── public/ # 静态文件前端页面 ├── .env # 环境变量记得加入.gitignore └── package.json在package.json的scripts里添加启动命令scripts: { start: node src/app.js, dev: nodemon src/app.js }3. 核心模块设计与实现环境准备好了我们来搭建系统的骨架和核心逻辑。3.1 构建Express.js API服务首先在src/app.js中创建基础的Express应用并定义最初的路由。const express require(express); const taskRoutes require(./routes/tasks); const path require(path); const app express(); const PORT process.env.PORT || 3000; // 中间件 app.use(express.json()); // 解析JSON请求体 app.use(express.urlencoded({ extended: true })); app.use(express.static(path.join(__dirname, ../public))); // 静态文件服务 // 路由 app.use(/api/tasks, taskRoutes); // 健康检查端点 app.get(/health, (req, res) { res.json({ status: OK, timestamp: new Date().toISOString() }); }); app.listen(PORT, () { console.log(任务管理后台服务已启动监听端口: ${PORT}); });3.2 集成Bull任务队列任务队列是系统的“中枢神经系统”。我们在src/queues/videoQueue.js中创建它。const Queue require(bull); const { createVideoTask } require(../services/wanService); // 创建视频生成队列 // 第一个参数是队列名第二个是Redis连接字符串 const videoQueue new Queue(wan-video-generation, { redis: { host: process.env.REDIS_HOST || 127.0.0.1, port: process.env.REDIS_PORT || 6379, }, defaultJobOptions: { attempts: 3, // 失败重试3次 backoff: { type: exponential, // 指数退避重试 delay: 5000, // 首次重试延迟5秒 }, removeOnComplete: 50, // 保留最近50个成功任务 removeOnFail: 100, // 保留最近100个失败任务 }, }); // 定义队列处理器当有任务进入队列时这个函数会被执行 videoQueue.process(async (job) { console.log(开始处理视频生成任务: ${job.id}); const { prompt, imageUrl, config } job.data; try { // 调用真正的视频生成服务 const result await createVideoTask(prompt, imageUrl, config); // 返回结果会被存储在job的返回值中 return { success: true, taskId: result.taskId, message: 任务已提交至GPU平台 }; } catch (error) { console.error(任务 ${job.id} 处理失败:, error); // 抛出错误Bull会根据配置进行重试 throw new Error(视频生成API调用失败: ${error.message}); } }); // 监听队列事件可选用于日志或通知 videoQueue.on(completed, (job, result) { console.log(任务 ${job.id} 已完成结果:, result); // 这里可以触发WebSocket通知、写入数据库等 }); videoQueue.on(failed, (job, err) { console.error(任务 ${job.id} 失败:, err.message); }); module.exports videoQueue;3.3 实现视频生成服务层这是与星图GPU平台交互的关键层。我们在src/services/wanService.js中模拟这个调用。const axios require(axios); // 假设的星图GPU平台Wan2.1-UMT5 API端点 const WAN_API_BASE process.env.WAN_API_BASE || https://api.example-gpu-platform.com/v1; const API_KEY process.env.WAN_API_KEY; // 从环境变量读取密钥 /** * 创建视频生成任务 * param {string} prompt - 文本描述 * param {string} [imageUrl] - 参考图片URL图生视频 * param {object} [config] - 额外配置如分辨率、时长 * returns {Promiseobject} 包含平台任务ID等信息 */ async function createVideoTask(prompt, imageUrl null, config {}) { const payload { model: Wan2.1-UMT5, prompt: prompt, ...config, }; if (imageUrl) { payload.image_url imageUrl; // 图生视频参数 } try { const response await axios.post(${WAN_API_BASE}/video/generate, payload, { headers: { Authorization: Bearer ${API_KEY}, Content-Type: application/json, }, timeout: 30000, // 30秒超时 }); // 假设平台返回 { task_id: xxx, status: pending, estimated_time: 60 } return { taskId: response.data.task_id, status: response.data.status, estimatedTime: response.data.estimated_time, }; } catch (error) { console.error(调用Wan视频生成API失败:, error.response?.data || error.message); throw new Error(API请求失败: ${error.response?.status || Network Error}); } } /** * 查询任务状态 * param {string} taskId - 平台返回的任务ID */ async function getTaskStatus(taskId) { try { const response await axios.get(${WAN_API_BASE}/tasks/${taskId}, { headers: { Authorization: Bearer ${API_KEY} }, }); return response.data; // 返回完整状态信息如 status, progress, result_video_url } catch (error) { console.error(查询任务 ${taskId} 状态失败:, error.message); throw error; } } module.exports { createVideoTask, getTaskStatus };4. 业务逻辑与API开发有了队列和服务现在我们来编写处理用户请求的控制器和路由。4.1 任务提交与状态查询API在src/controllers/taskController.js中我们创建处理HTTP请求的逻辑。const videoQueue require(../queues/videoQueue); const { getTaskStatus } require(../services/wanService); // 提交一个新的视频生成任务 exports.submitTask async (req, res) { const { prompt, imageUrl, config } req.body; if (!prompt !imageUrl) { return res.status(400).json({ error: 必须提供文本描述(prompt)或图片URL(imageUrl) }); } try { // 添加任务到Bull队列 const job await videoQueue.add({ prompt, imageUrl, config: config || {}, }); res.json({ success: true, message: 视频生成任务已提交至队列, jobId: job.id, // Bull队列的任务ID data: job.data, }); } catch (error) { console.error(提交任务到队列失败:, error); res.status(500).json({ error: 任务提交失败请稍后重试 }); } }; // 根据Bull的jobId查询队列任务状态 exports.getJobStatus async (req, res) { const { jobId } req.params; try { const job await videoQueue.getJob(jobId); if (!job) { return res.status(404).json({ error: 未找到该任务 }); } const state await job.getState(); // Bull的任务状态waiting, active, completed, failed, delayed const result await job.finished().catch(() null); // 获取任务结果如果已完成 let platformStatus null; // 如果任务已完成且包含平台taskId可以进一步查询平台上的最终状态如视频URL if (state completed result?.taskId) { try { platformStatus await getTaskStatus(result.taskId); } catch (err) { // 查询平台状态失败不影响返回队列状态 console.warn(查询平台任务状态失败: ${err.message}); } } res.json({ jobId, state, // Bull队列状态 progress: job.progress(), // 进度如果设置了 result, // 任务返回值 platformStatus, // 来自GPU平台的详细状态 timestamp: new Date(), }); } catch (error) { console.error(查询任务 ${jobId} 状态失败:, error); res.status(500).json({ error: 查询任务状态失败 }); } }; // 获取所有任务列表简单示例 exports.listJobs async (req, res) { try { const jobs await videoQueue.getJobs([waiting, active, completed, failed, delayed], 0, 50); const jobList await Promise.all( jobs.map(async (job) ({ id: job.id, state: await job.getState(), data: job.data, progress: job.progress(), })) ); res.json({ jobs: jobList }); } catch (error) { res.status(500).json({ error: 获取任务列表失败 }); } };然后在src/routes/tasks.js中定义路由。const express require(express); const router express.Router(); const taskController require(../controllers/taskController); // 提交视频生成任务 router.post(/submit, taskController.submitTask); // 查询特定任务状态 router.get(/status/:jobId, taskController.getJobStatus); // 获取任务列表 router.get(/list, taskController.listJobs); module.exports router;4.2 实现结果回调与进度更新在实际生产环境中GPU平台处理完成后通常会主动回调我们的服务。我们需要一个端点来接收这个回调。在src/routes/tasks.js中添加新路由// 接收GPU平台的任务完成回调 router.post(/callback/:jobId, async (req, res) { const { jobId } req.params; const callbackData req.body; // 平台回调的数据如 { status: success, video_url: ... } console.log(收到平台回调 for job ${jobId}:, callbackData); // 1. 根据jobId找到对应的Bull任务可能需要一个映射关系这里简化处理 // 2. 更新任务状态或触发后续操作如发送邮件、WebSocket通知 // 3. 可以将最终视频URL存入数据库 // 简单响应平台表示已成功接收 res.json({ received: true }); });为了让前端能实时看到进度我们可以结合WebSocket或Server-Sent Events (SSE)。这里以简单的轮询为例前端可以定期调用我们上面写的/api/tasks/status/:jobId接口。5. 基础用户界面开发后台API跑通了我们还需要一个简单的界面来操作和展示。在public/目录下创建一个index.html。!DOCTYPE html html langzh-CN head meta charsetUTF-8 meta nameviewport contentwidthdevice-width, initial-scale1.0 titleWan视频生成任务管理/title style body { font-family: sans-serif; margin: 2rem; } .container { max-width: 800px; margin: auto; } .form-group { margin-bottom: 1rem; } label { display: block; margin-bottom: 0.5rem; font-weight: bold;} textarea, input { width: 100%; padding: 0.5rem; box-sizing: border-box;} button { padding: 0.75rem 1.5rem; background: #007bff; color: white; border: none; border-radius: 4px; cursor: pointer;} button:disabled { background: #ccc; } #status { margin-top: 2rem; padding: 1rem; background: #f8f9fa; border-radius: 4px; white-space: pre-wrap;} .task-item { border-bottom: 1px solid #eee; padding: 0.5rem 0;} /style /head body div classcontainer h1Wan2.1-UMT5 视频生成任务提交/h1 form idtaskForm div classform-group label forprompt文本描述 */label textarea idprompt rows4 placeholder请输入你想要生成的视频描述.../textarea /div div classform-group label forimageUrl参考图片URL (可选图生视频)/label input typetext idimageUrl placeholderhttps://example.com/image.jpg /div button typesubmit idsubmitBtn提交生成任务/button /form div idresult/div h2任务列表/h2 button onclickfetchJobList()刷新列表/button div idjobList/div h2任务状态查询/h2 input typetext idqueryJobId placeholder输入任务ID button onclickqueryJobStatus()查询/button div idstatus/div /div script const API_BASE /api/tasks; document.getElementById(taskForm).onsubmit async (e) { e.preventDefault(); const submitBtn document.getElementById(submitBtn); submitBtn.disabled true; submitBtn.textContent 提交中...; const prompt document.getElementById(prompt).value; const imageUrl document.getElementById(imageUrl).value; try { const resp await fetch(${API_BASE}/submit, { method: POST, headers: { Content-Type: application/json }, body: JSON.stringify({ prompt, imageUrl }) }); const data await resp.json(); document.getElementById(result).innerHTML p提交成功任务ID: strong${data.jobId}/strong/p; fetchJobList(); // 刷新列表 } catch (error) { document.getElementById(result).innerHTML p stylecolor:red;提交失败: ${error.message}/p; } finally { submitBtn.disabled false; submitBtn.textContent 提交生成任务; } }; async function fetchJobList() { try { const resp await fetch(${API_BASE}/list); const data await resp.json(); const listHtml data.jobs.map(job div classtask-item divID: ${job.id}/div div状态: ${job.state}/div div描述: ${job.data.prompt?.substring(0, 50)}.../div /div ).join(); document.getElementById(jobList).innerHTML listHtml; } catch (error) { console.error(获取列表失败, error); } } async function queryJobStatus() { const jobId document.getElementById(queryJobId).value; if (!jobId) return; try { const resp await fetch(${API_BASE}/status/${jobId}); const data await resp.json(); document.getElementById(status).textContent JSON.stringify(data, null, 2); } catch (error) { document.getElementById(status).textContent 查询失败: ${error.message}; } } // 页面加载时获取一次列表 fetchJobList(); /script /body /html现在运行npm run dev访问http://localhost:3000你就能看到一个最简单的任务管理界面了。可以提交任务、查看列表和查询状态。6. 总结这套基于Node.js Express Bull的视频生成任务管理后台虽然代码量不大但已经具备了生产级应用的核心骨架。它成功地将异步、队列、状态管理这些复杂概念封装起来对外提供了简洁的API。实际使用中你还需要考虑更多细节比如用户认证与权限、任务结果视频URL的持久化存储如MySQL/PostgreSQL、更完善的错误处理与告警如接入Sentry、使用WebSocket实现真正的进度推送、以及将前端界面用Vue/React做得更美观。但最重要的是这个项目验证了思路的可行性。通过将AI能力服务化、任务流程化我们可以更专注在业务创新上而不是陷入每次手动调用的繁琐中。希望这个实战分享能帮你快速搭建起自己的AI任务调度系统。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。