基于Node.js调用EVA-02构建高并发文本处理API服务最近在做一个内容审核平台需要处理海量的用户生成文本。最开始用Python脚本一个个跑速度慢不说服务器还动不动就卡死。后来我们团队尝试用Node.js搭了个API服务把EVA-02模型包装起来效果出奇的好——单台机器每秒能处理上百个请求而且资源占用还特别低。如果你也在为文本处理的高并发问题头疼或者想给现有的AI服务加个高性能的网关这篇文章就是为你准备的。我会带你一步步搭建一个基于Node.js的EVA-02 API服务重点解决异步处理、请求队列这些实际问题让你能轻松应对互联网级别的流量冲击。1. 环境准备与快速上手在开始之前我们先看看需要准备些什么。整个过程其实不复杂跟着步骤走半小时内就能跑起来。1.1 基础环境搭建首先确保你的系统已经安装了Node.js。如果还没有可以去官网下载最新的LTS版本。安装完成后打开终端验证一下node --version npm --version看到版本号就说明安装成功了。接下来创建一个项目目录并初始化项目mkdir eva02-api-service cd eva02-api-service npm init -y1.2 核心依赖安装我们需要几个关键的包来构建服务。在项目目录下执行npm install express axios dotenv npm install --save-dev nodemon简单解释一下这些包的作用express用来快速搭建Web服务器和API接口axios处理HTTP请求调用后端的EVA-02服务dotenv管理环境变量比如API密钥、服务地址nodemon开发工具代码修改后自动重启服务1.3 项目结构规划一个好的项目结构能让后续开发轻松很多。我建议这样组织文件eva02-api-service/ ├── src/ │ ├── app.js # 主应用文件 │ ├── routes/ # 路由目录 │ │ └── api.js # API路由 │ ├── controllers/ # 控制器目录 │ │ └── evaController.js # 处理EVA-02请求 │ ├── services/ # 服务层 │ │ └── evaService.js # 调用EVA-02的核心逻辑 │ ├── utils/ # 工具函数 │ │ ├── queue.js # 请求队列管理 │ │ └── logger.js # 日志工具 │ └── config/ # 配置文件 │ └── index.js # 配置管理 ├── .env # 环境变量不要提交到Git ├── .gitignore # Git忽略文件 ├── package.json # 项目配置 └── README.md # 项目说明这个结构看起来有点多但别担心我们一步步来。先从最简单的开始。2. 搭建基础API服务我们先从最基础的Express服务开始确保一切能正常运行。2.1 创建基础服务器在src/app.js文件中写入以下代码const express require(express); const dotenv require(dotenv); // 加载环境变量 dotenv.config(); const app express(); const PORT process.env.PORT || 3000; // 中间件配置 app.use(express.json()); // 解析JSON请求体 app.use(express.urlencoded({ extended: true })); // 解析URL编码请求体 // 健康检查接口 app.get(/health, (req, res) { res.json({ status: healthy, timestamp: new Date().toISOString(), service: EVA-02 API Gateway }); }); // 启动服务器 app.listen(PORT, () { console.log( 服务已启动监听端口: ${PORT}); console.log( 健康检查地址: http://localhost:${PORT}/health); });在package.json的scripts部分添加启动命令{ scripts: { start: node src/app.js, dev: nodemon src/app.js } }现在运行npm run dev打开浏览器访问http://localhost:3000/health应该能看到健康状态信息。基础服务就搭建好了。2.2 添加EVA-02调用接口接下来创建处理EVA-02请求的核心逻辑。在src/services/evaService.js中const axios require(axios); class EvaService { constructor() { // EVA-02服务的基础地址从环境变量读取 this.baseURL process.env.EVA02_BASE_URL || http://localhost:8000; this.client axios.create({ baseURL: this.baseURL, timeout: 30000, // 30秒超时 }); } /** * 调用EVA-02进行文本处理 * param {string} text - 待处理的文本 * param {object} options - 处理选项 * returns {Promiseobject} 处理结果 */ async processText(text, options {}) { try { const response await this.client.post(/process, { text: text, ...options }); return { success: true, data: response.data, latency: response.headers[x-response-time] || unknown }; } catch (error) { console.error(EVA-02服务调用失败:, error.message); return { success: false, error: error.message, code: error.response?.status || 500 }; } } /** * 批量处理文本 * param {string[]} texts - 文本数组 * returns {Promiseobject[]} 批量处理结果 */ async batchProcess(texts) { const promises texts.map(text this.processText(text)); return Promise.all(promises); } } module.exports new EvaService();然后在src/controllers/evaController.js中创建控制器const evaService require(../services/evaService); class EvaController { async process(req, res) { const { text, options } req.body; if (!text) { return res.status(400).json({ error: 缺少必要参数: text }); } const result await evaService.processText(text, options); if (result.success) { res.json({ status: success, data: result.data, metadata: { latency: result.latency, timestamp: new Date().toISOString() } }); } else { res.status(500).json({ status: error, message: 文本处理失败, error: result.error }); } } async batchProcess(req, res) { const { texts } req.body; if (!Array.isArray(texts) || texts.length 0) { return res.status(400).json({ error: texts必须是非空数组 }); } // 限制批量处理的数量 const maxBatchSize parseInt(process.env.MAX_BATCH_SIZE) || 10; const textsToProcess texts.slice(0, maxBatchSize); const results await evaService.batchProcess(textsToProcess); res.json({ status: success, processed_count: results.length, results: results }); } } module.exports new EvaController();最后在src/routes/api.js中添加路由const express require(express); const evaController require(../controllers/evaController); const router express.Router(); // 单文本处理接口 router.post(/process, evaController.process); // 批量处理接口 router.post(/batch-process, evaController.batchProcess); module.exports router;更新src/app.js引入路由// 在健康检查接口后添加 const apiRoutes require(./routes/api); app.use(/api/v1, apiRoutes);现在你的API服务就有了两个核心接口POST /api/v1/process用于单文本处理POST /api/v1/batch-process用于批量处理。3. 应对高并发的关键设计基础服务搭好了但直接这样用在高并发场景下会出问题。想象一下如果同时有1000个请求过来每个都直接调用EVA-02后端服务肯定扛不住。我们需要一些机制来保护后端服务。3.1 实现请求队列管理当请求量突然增大时我们需要一个队列来缓冲请求避免瞬间压垮后端服务。在src/utils/queue.js中class RequestQueue { constructor(maxConcurrent 5) { this.queue []; this.processing new Set(); this.maxConcurrent maxConcurrent; this.isProcessing false; } /** * 添加任务到队列 * param {Function} task - 异步任务函数 * returns {Promise} 任务执行结果的Promise */ enqueue(task) { return new Promise((resolve, reject) { this.queue.push({ task, resolve, reject, addedAt: Date.now() }); this._processQueue(); }); } /** * 处理队列中的任务 */ async _processQueue() { // 如果已经在处理中或者队列为空或者达到并发上限则返回 if (this.isProcessing || this.queue.length 0 || this.processing.size this.maxConcurrent) { return; } this.isProcessing true; while (this.queue.length 0 this.processing.size this.maxConcurrent) { const item this.queue.shift(); this.processing.add(item); // 执行任务 item.task() .then(result { item.resolve(result); }) .catch(error { item.reject(error); }) .finally(() { this.processing.delete(item); this._processQueue(); }); } this.isProcessing false; } /** * 获取队列状态 */ getStatus() { return { queueLength: this.queue.length, processingCount: this.processing.size, maxConcurrent: this.maxConcurrent, waitingTime: this.queue.length 0 ? Date.now() - this.queue[0].addedAt : 0 }; } } // 创建全局队列实例 const evaQueue new RequestQueue( parseInt(process.env.MAX_CONCURRENT_REQUESTS) || 3 ); module.exports evaQueue;3.2 集成队列到服务层修改src/services/evaService.js使用队列来管理请求const axios require(axios); const evaQueue require(../utils/queue); class EvaService { constructor() { this.baseURL process.env.EVA02_BASE_URL || http://localhost:8000; this.client axios.create({ baseURL: this.baseURL, timeout: 30000, }); // 重试配置 this.maxRetries parseInt(process.env.MAX_RETRIES) || 3; this.retryDelay parseInt(process.env.RETRY_DELAY_MS) || 1000; } /** * 带重试机制的请求 */ async _requestWithRetry(text, options, retryCount 0) { try { const response await this.client.post(/process, { text: text, ...options }); return { success: true, data: response.data, latency: response.headers[x-response-time] || unknown, retries: retryCount }; } catch (error) { // 如果是可重试的错误并且重试次数未达上限 if (this._isRetryableError(error) retryCount this.maxRetries) { console.log(请求失败第${retryCount 1}次重试...); // 指数退避延迟 const delay this.retryDelay * Math.pow(2, retryCount); await new Promise(resolve setTimeout(resolve, delay)); return this._requestWithRetry(text, options, retryCount 1); } throw error; } } /** * 判断错误是否可重试 */ _isRetryableError(error) { // 网络错误、超时、5xx服务器错误可以重试 if (!error.response) return true; // 网络错误 if (error.code ECONNABORTED) return true; // 超时 if (error.response.status 500) return true; // 服务器错误 return false; } /** * 通过队列处理文本 */ async processText(text, options {}) { try { // 将请求加入队列 const result await evaQueue.enqueue(() this._requestWithRetry(text, options) ); return result; } catch (error) { console.error(队列处理失败:, error.message); return { success: false, error: error.message, code: error.response?.status || 500 }; } } /** * 获取队列状态 */ getQueueStatus() { return evaQueue.getStatus(); } } module.exports new EvaService();3.3 添加监控接口为了让运维更方便我们添加一个监控接口来查看队列状态。在控制器中添加async getQueueStatus(req, res) { const status evaService.getQueueStatus(); res.json({ status: success, data: status, timestamp: new Date().toISOString() }); }在路由中添加router.get(/queue-status, evaController.getQueueStatus);现在访问GET /api/v1/queue-status就能看到当前队列的状态了。4. 性能优化与生产环境配置基础功能都有了但要真正用在生产环境还需要一些优化和配置。4.1 添加速率限制防止单个用户或IP占用过多资源。安装express-rate-limitnpm install express-rate-limit在src/app.js中添加const rateLimit require(express-rate-limit); // API速率限制 const apiLimiter rateLimit({ windowMs: 15 * 60 * 1000, // 15分钟 max: 100, // 每个IP最多100个请求 message: { error: 请求过于频繁请稍后再试 }, standardHeaders: true, legacyHeaders: false, }); // 应用到API路由 app.use(/api/v1, apiLimiter);4.2 添加请求日志在src/utils/logger.js中创建简单的日志中间件const logger (req, res, next) { const startTime Date.now(); // 请求完成后记录日志 res.on(finish, () { const duration Date.now() - startTime; const logMessage ${new Date().toISOString()} | ${req.method} ${req.originalUrl} | ${res.statusCode} | ${duration}ms; console.log(logMessage); // 如果是慢请求记录警告 if (duration 1000) { console.warn(慢请求警告: ${logMessage}); } }); next(); }; module.exports logger;在src/app.js中使用const logger require(./utils/logger); app.use(logger);4.3 环境配置管理创建.env文件来管理配置# 服务器配置 PORT3000 NODE_ENVproduction # EVA-02服务配置 EVA02_BASE_URLhttp://your-eva02-service:8000 EVA02_API_KEYyour-api-key-here # 队列配置 MAX_CONCURRENT_REQUESTS5 MAX_BATCH_SIZE20 # 重试配置 MAX_RETRIES3 RETRY_DELAY_MS1000 # 速率限制 RATE_LIMIT_WINDOW_MS900000 # 15分钟 RATE_LIMIT_MAX_REQUESTS100创建src/config/index.js来统一管理配置require(dotenv).config(); module.exports { server: { port: process.env.PORT || 3000, nodeEnv: process.env.NODE_ENV || development }, eva02: { baseURL: process.env.EVA02_BASE_URL, apiKey: process.env.EVA02_API_KEY, timeout: parseInt(process.env.EVA02_TIMEOUT) || 30000 }, queue: { maxConcurrent: parseInt(process.env.MAX_CONCURRENT_REQUESTS) || 5, maxBatchSize: parseInt(process.env.MAX_BATCH_SIZE) || 10 }, retry: { maxRetries: parseInt(process.env.MAX_RETRIES) || 3, delayMs: parseInt(process.env.RETRY_DELAY_MS) || 1000 }, rateLimit: { windowMs: parseInt(process.env.RATE_LIMIT_WINDOW_MS) || 900000, maxRequests: parseInt(process.env.RATE_LIMIT_MAX_REQUESTS) || 100 } };4.4 添加健康检查端点在生产环境中健康检查很重要。更新健康检查接口app.get(/health, async (req, res) { const health { status: healthy, timestamp: new Date().toISOString(), service: EVA-02 API Gateway, uptime: process.uptime(), memory: process.memoryUsage(), // 检查EVA-02服务连接 dependencies: { eva02: checking... } }; try { // 尝试连接EVA-02服务 const evaService require(./services/evaService); const queueStatus evaService.getQueueStatus(); health.dependencies.eva02 connected; health.queue queueStatus; res.json(health); } catch (error) { health.status degraded; health.dependencies.eva02 disconnected; health.error error.message; res.status(503).json(health); } });5. 实际应用与测试服务搭建好了我们来实际测试一下效果。5.1 编写测试脚本创建test/load-test.js来模拟高并发场景const axios require(axios); const API_URL http://localhost:3000/api/v1/process; const CONCURRENT_REQUESTS 50; // 并发请求数 const TOTAL_REQUESTS 200; // 总请求数 const testTexts [ 这是一段需要处理的文本内容。, 今天天气真好适合出去散步。, 人工智能正在改变我们的生活和工作方式。, Node.js的高并发特性非常适合构建API网关。, EVA-02模型在文本处理方面表现出色。 ]; async function sendRequest(index) { const text testTexts[index % testTexts.length]; try { const startTime Date.now(); const response await axios.post(API_URL, { text: text, options: { task: classification, language: zh } }); const endTime Date.now(); return { success: true, duration: endTime - startTime, status: response.status }; } catch (error) { return { success: false, error: error.message, status: error.response?.status }; } } async function runLoadTest() { console.log(开始负载测试...); console.log(并发数: ${CONCURRENT_REQUESTS}, 总请求数: ${TOTAL_REQUESTS}); const results { total: 0, success: 0, failed: 0, durations: [] }; // 分批发送请求 for (let i 0; i TOTAL_REQUESTS; i CONCURRENT_REQUESTS) { const batchSize Math.min(CONCURRENT_REQUESTS, TOTAL_REQUESTS - i); const promises []; for (let j 0; j batchSize; j) { promises.push(sendRequest(i j)); } const batchResults await Promise.all(promises); batchResults.forEach(result { results.total; if (result.success) { results.success; results.durations.push(result.duration); } else { results.failed; } }); console.log(已处理: ${results.total}/${TOTAL_REQUESTS}); // 稍微等待一下避免过于密集 await new Promise(resolve setTimeout(resolve, 100)); } // 计算统计数据 const avgDuration results.durations.length 0 ? results.durations.reduce((a, b) a b, 0) / results.durations.length : 0; const maxDuration results.durations.length 0 ? Math.max(...results.durations) : 0; const minDuration results.durations.length 0 ? Math.min(...results.durations) : 0; console.log(\n测试结果:); console.log(总请求数: ${results.total}); console.log(成功: ${results.success}); console.log(失败: ${results.failed}); console.log(成功率: ${((results.success / results.total) * 100).toFixed(2)}%); console.log(平均响应时间: ${avgDuration.toFixed(2)}ms); console.log(最大响应时间: ${maxDuration}ms); console.log(最小响应时间: ${minDuration}ms); console.log(QPS: ${(results.success / (results.durations.reduce((a, b) a b, 0) / 1000)).toFixed(2)}); } runLoadTest().catch(console.error);5.2 监控队列状态在测试过程中可以同时监控队列状态# 在一个终端运行服务 npm run dev # 在另一个终端监控队列状态 watch -n 1 curl -s http://localhost:3000/api/v1/queue-status | python -m json.tool5.3 实际使用示例假设你有一个内容审核的需求可以这样调用APIconst axios require(axios); async function moderateContent(content) { try { const response await axios.post(http://your-api-service:3000/api/v1/process, { text: content, options: { task: content_moderation, strict_level: medium } }); if (response.data.status success) { const result response.data.data; if (result.is_safe) { console.log(内容安全可以发布); return { safe: true, score: result.confidence }; } else { console.log(内容不安全需要审核); console.log(风险类型:, result.risk_types); console.log(置信度:, result.confidence); return { safe: false, reasons: result.risk_types }; } } else { console.error(处理失败:, response.data.message); return { error: response.data.message }; } } catch (error) { console.error(API调用失败:, error.message); return { error: error.message }; } } // 使用示例 const userContent 这是一段用户生成的文本内容...; moderateContent(userContent).then(result { console.log(审核结果:, result); });6. 总结与建议实际用下来这套基于Node.js的EVA-02 API网关方案确实能很好地应对高并发场景。队列机制让后端服务不会因为突发流量而崩溃重试机制保证了服务的稳定性监控接口让运维变得简单很多。如果你打算在生产环境使用我建议再考虑几个方面。首先是缓存对于相同的文本处理请求可以加一层Redis缓存能大幅减少对EVA-02服务的调用。其次是监控告警可以集成Prometheus和Grafana实时监控队列长度、响应时间这些关键指标。还有就是多实例部署用Nginx做负载均衡进一步提高系统的可用性。部署的时候要注意环境变量的管理敏感信息一定要通过环境变量传递不要硬编码在代码里。日志也要做好分级开发环境可以多打一些调试日志生产环境主要记录错误和关键操作。这个方案最大的好处是灵活你可以根据实际需求调整队列的并发数、重试策略这些参数。如果后端EVA-02服务性能很强可以适当增加并发数如果服务不太稳定就多设置几次重试。我们团队用这个方案处理过单日千万级别的文本请求整体表现很稳定。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。