手把手教你用Python+MySQL搭建足球实时数据监控系统(附worldliveball源码解析)
从零构建足球赛事实时数据监控系统PythonMySQL实战指南足球数据分析正从传统的赛后复盘转向实时决策支持。想象一下当比赛进行到第63分钟你的系统突然弹出提示主队控球率下降12%客队射正率上升至43%这种实时洞察力正是现代足球技术团队和资深球迷梦寐以求的能力。本文将带您用Python和MySQL搭建一套专业级的赛事监控原型不仅能自动捕获关键比赛指标还能通过智能预警机制识别赛事转折点。1. 系统架构设计与技术选型任何数据系统的构建都需要从明确需求开始。我们的足球实时监控系统需要实现三个核心能力稳定获取数据、高效存储数据和智能分析数据。这对应着系统的三个主要模块数据采集层、数据存储层和业务逻辑层。在技术选型上Python因其丰富的爬虫生态如Requests、BeautifulSoup和数据处理库Pandas、NumPy成为采集层的不二之选。对于需要处理JavaScript渲染的页面可以配合Selenium或Playwright实现动态抓取。存储层选择MySQL关系型数据库主要考虑到成熟的事务支持确保数据完整性灵活的索引优化查询性能明确的表结构便于后期维护以下是基础环境配置清单# 创建Python虚拟环境 python -m venv soccer_analytics source soccer_analytics/bin/activate # Linux/Mac soccer_analytics\Scripts\activate # Windows # 安装核心依赖 pip install requests beautifulsoup4 pandas pymysql selenium对于需要监控的赛事数据源建议优先考虑以下三类官方数据接口如Sportradar、Opta等商业API需申请权限转播平台数据如ESPN、腾讯体育等网站的实时比分板块专业数据网站如FlashScore、SofaScore等聚合平台2. 数据采集模块实现2.1 网页数据抓取策略针对不同数据源我们需要采用不同的抓取策略。以抓取FlashScore的实时比分为例常规的静态页面抓取可能无法获取动态加载的内容。这时就需要使用浏览器自动化工具from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC def fetch_live_matches(): options webdriver.ChromeOptions() options.add_argument(--headless) # 无头模式 driver webdriver.Chrome(optionsoptions) try: driver.get(https://www.flashscore.com) WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.CLASS_NAME, event__match)) ) matches [] for match in driver.find_elements(By.CLASS_NAME, event__match): home_team match.find_element(By.CLASS_NAME, event__participant--home).text away_team match.find_element(By.CLASS_NAME, event__participant--away).text score match.find_element(By.CLASS_NAME, event__score).text matches.append(f{home_team} {score} {away_team}) return matches finally: driver.quit()2.2 反爬虫应对方案现代网站普遍采用反爬虫机制我们需要在代码中加入以下防护措施随机User-Agent轮换不同浏览器标识请求间隔控制避免高频访问触发封禁代理IP池分散请求来源IPCookie管理模拟真实用户会话import random import time from fake_useragent import UserAgent def get_random_headers(): ua UserAgent() return { User-Agent: ua.random, Accept-Language: en-US,en;q0.9, Referer: https://www.google.com/ } def safe_request(url, max_retry3): for attempt in range(max_retry): try: response requests.get( url, headersget_random_headers(), timeout10, proxies{https: get_random_proxy()} ) response.raise_for_status() return response except Exception as e: print(fAttempt {attempt1} failed: {str(e)}) time.sleep(random.uniform(1, 3)) return None3. 数据库设计与优化3.1 核心表结构设计合理的数据库设计是系统高效运转的基础。我们的足球监控系统主要需要以下几张表比赛基本信息表(matches)CREATE TABLE matches ( match_id INT AUTO_INCREMENT PRIMARY KEY, league_id INT NOT NULL, home_team VARCHAR(100) NOT NULL, away_team VARCHAR(100) NOT NULL, start_time DATETIME NOT NULL, venue VARCHAR(100), referee VARCHAR(50), FOREIGN KEY (league_id) REFERENCES leagues(league_id), INDEX idx_teams (home_team, away_team), INDEX idx_time (start_time) );实时数据快照表(match_stats)CREATE TABLE match_stats ( stat_id BIGINT AUTO_INCREMENT PRIMARY KEY, match_id INT NOT NULL, capture_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, home_score TINYINT NOT NULL DEFAULT 0, away_score TINYINT NOT NULL DEFAULT 0, home_possession DECIMAL(5,2) CHECK (home_possession BETWEEN 0 AND 100), away_possession DECIMAL(5,2) CHECK (away_possession BETWEEN 0 AND 100), home_shots_on_target TINYINT UNSIGNED, away_shots_on_target TINYINT UNSIGNED, home_yellow_cards TINYINT UNSIGNED, away_yellow_cards TINYINT UNSIGNED, FOREIGN KEY (match_id) REFERENCES matches(match_id), INDEX idx_match (match_id), INDEX idx_capture_time (capture_time) ) ENGINEInnoDB;3.2 数据库性能优化随着数据量增长我们需要考虑以下优化策略分区表按时间范围分区历史数据读写分离主库写从库读缓存层Redis缓存热点查询连接池使用PyMySQL的连接池管理import pymysql from pymysql import cursors from dbutils.pooled_db import PooledDB # 创建数据库连接池 db_pool PooledDB( creatorpymysql, maxconnections10, mincached2, hostlocalhost, usersoccer_user, passwordsecure_password, databasesoccer_analytics, cursorclasscursors.DictCursor ) def save_match_stats(stats_data): conn db_pool.connection() try: with conn.cursor() as cursor: sql INSERT INTO match_stats (match_id, home_score, away_score, home_possession, away_possession, home_shots_on_target, away_shots_on_target) VALUES (%s, %s, %s, %s, %s, %s, %s) cursor.execute(sql, ( stats_data[match_id], stats_data[home_score], stats_data[away_score], stats_data[home_possession], stats_data[away_possession], stats_data[home_shots], stats_data[away_shots] )) conn.commit() finally: conn.close()4. 实时分析与预警系统4.1 关键指标监控算法真正的价值不在于收集数据而在于从数据中发现模式。我们设计了几种核心算法来识别比赛关键时刻控球率突变检测def detect_possession_shift(match_id, window_size5, threshold0.15): 检测控球率显著变化 :param match_id: 比赛ID :param window_size: 滑动窗口大小 :param threshold: 变化阈值 :return: 是否检测到显著变化 conn db_pool.connection() try: with conn.cursor() as cursor: sql SELECT capture_time, home_possession FROM match_stats WHERE match_id %s ORDER BY capture_time DESC LIMIT %s cursor.execute(sql, (match_id, window_size)) records cursor.fetchall() if len(records) window_size: return False # 计算最近window_size次采样的标准差 possessions [r[home_possession] for r in records] std_dev np.std(possessions) return std_dev threshold finally: conn.close()4.2 多维度预警系统预警系统需要综合考虑多个指标我们可以使用加权评分机制指标权重触发条件评分公式控球率变化0.3标准差15%5*(std_dev - 0.15)/0.1射正率变化0.25最近3次采样上升20%4*(current - previous)/0.2比分变化0.2最近10分钟有进球直接赋值5分犯规频率0.15每分钟犯规0.3次3*(foul_rate - 0.3)/0.2换人事件0.1最近5分钟有换人直接赋值2分当综合评分超过阈值时触发预警def evaluate_match_alert(match_id): metrics { possession_shift: detect_possession_shift(match_id), shot_efficiency: check_shot_efficiency_change(match_id), goal_scored: check_recent_goals(match_id, minutes10), foul_rate: calculate_foul_rate(match_id), substitution: check_recent_substitutions(match_id, minutes5) } # 计算加权得分 score ( 0.3 * metrics[possession_shift] 0.25 * metrics[shot_efficiency] 0.2 * metrics[goal_scored] 0.15 * metrics[foul_rate] 0.1 * metrics[substitution] ) if score 3.5: # 预警阈值 trigger_alert(match_id, score, metrics)5. 系统部署与监控5.1 容器化部署方案现代系统部署首选Docker容器化方案以下是一个典型的docker-compose配置version: 3.8 services: scraper: build: ./scraper environment: - DB_HOSTmysql - REDIS_HOSTredis depends_on: - mysql - redis restart: unless-stopped mysql: image: mysql:8.0 environment: - MYSQL_ROOT_PASSWORDrootpass - MYSQL_DATABASEsoccer_analytics - MYSQL_USERsoccer_user - MYSQL_PASSWORDuserpass volumes: - mysql_data:/var/lib/mysql ports: - 3306:3306 redis: image: redis:6 ports: - 6379:6379 volumes: - redis_data:/data dashboard: build: ./dashboard ports: - 5000:5000 depends_on: - mysql - redis volumes: mysql_data: redis_data:5.2 日志监控与告警完善的日志系统是运维的基石。推荐使用ELK栈ElasticsearchLogstashKibana实现日志集中管理import logging from logging.handlers import RotatingFileHandler def setup_logger(name): logger logging.getLogger(name) logger.setLevel(logging.INFO) # 控制台输出 console_handler logging.StreamHandler() console_handler.setFormatter(logging.Formatter( %(asctime)s - %(name)s - %(levelname)s - %(message)s )) # 文件输出(最大100MB保留5个备份) file_handler RotatingFileHandler( soccer_monitor.log, maxBytes100*1024*1024, backupCount5 ) file_handler.setFormatter(logging.Formatter( %(asctime)s - %(name)s - %(levelname)s - %(message)s )) logger.addHandler(console_handler) logger.addHandler(file_handler) return logger在实际项目中我们还需要考虑数据备份策略。以下是推荐的备份方案组合全量备份每日凌晨执行mysqldump增量备份每小时备份binlog异地备份每周将备份文件同步到云存储# 每日全量备份脚本示例 #!/bin/bash BACKUP_DIR/opt/backups/mysql DATE$(date %Y%m%d) mysqldump -u backup_user -ppassword --all-databases \ --single-transaction \ --routines \ --triggers \ --events \ | gzip $BACKUP_DIR/full_backup_$DATE.sql.gz # 保留最近7天备份 find $BACKUP_DIR -name full_backup_*.sql.gz -mtime 7 -delete6. 前端可视化实现6.1 实时数据看板使用FlaskECharts构建轻量级可视化界面from flask import Flask, render_template import pymysql app Flask(__name__) app.route(/match/int:match_id) def match_dashboard(match_id): conn pymysql.connect(hostlocalhost, userweb_user, passwordwebpass, databasesoccer_analytics) try: with conn.cursor() as cursor: # 获取比赛基本信息 cursor.execute( SELECT m.*, l.name as league_name FROM matches m JOIN leagues l ON m.league_id l.league_id WHERE m.match_id %s , (match_id,)) match_info cursor.fetchone() # 获取最近20条统计数据 cursor.execute( SELECT * FROM match_stats WHERE match_id %s ORDER BY capture_time DESC LIMIT 20 , (match_id,)) stats cursor.fetchall() return render_template(match.html, matchmatch_info, statsstats) finally: conn.close()对应的HTML模板中使用ECharts展示数据趋势div idpossessionChart stylewidth: 600px;height:400px;/div script var chart echarts.init(document.getElementById(possessionChart)); chart.setOption({ title: { text: 控球率变化趋势 }, tooltip: { trigger: axis }, legend: { data: [主队控球率, 客队控球率] }, xAxis: { type: category, data: [{% for stat in stats|reverse %} {{ stat.capture_time.strftime(%H:%M) }}{% if not loop.last %},{% endif %} {% endfor %}] }, yAxis: { type: value, min: 0, max: 100 }, series: [ { name: 主队控球率, type: line, data: [{% for stat in stats|reverse %} {{ stat.home_possession|default(50) }}{% if not loop.last %},{% endif %} {% endfor %}] }, { name: 客队控球率, type: line, data: [{% for stat in stats|reverse %} {{ stat.away_possession|default(50) }}{% if not loop.last %},{% endif %} {% endfor %}] } ] }); /script6.2 移动端适配方案考虑到用户可能需要在现场查看数据我们需要确保界面在移动设备上表现良好响应式布局使用Bootstrap栅格系统触摸优化增大点击区域离线缓存Service Worker缓存关键资源推送通知Web Push API实现实时提醒// 注册Service Worker if (serviceWorker in navigator) { navigator.serviceWorker.register(/sw.js).then(registration { console.log(SW registered: , registration); // 请求通知权限 Notification.requestPermission().then(permission { if (permission granted) { console.log(Notification permission granted.); } }); }).catch(registrationError { console.log(SW registration failed: , registrationError); }); } // 处理推送通知 navigator.serviceWorker.addEventListener(message, event { if (event.data.type MATCH_ALERT) { new Notification(event.data.title, { body: event.data.message, icon: /static/icon-192x192.png }); } });7. 性能优化实战技巧7.1 数据库查询优化随着数据量增长我们需要特别注意查询性能添加适当索引在WHERE、JOIN、ORDER BY字段上创建索引**避免SELECT ***只查询需要的字段使用EXPLAIN分析找出慢查询瓶颈批量操作减少单条语句执行次数# 批量插入优化示例 def batch_insert_stats(stats_list): conn db_pool.connection() try: with conn.cursor() as cursor: # 构建批量插入SQL sql INSERT INTO match_stats (match_id, home_score, away_score, home_possession) VALUES (%s, %s, %s, %s) cursor.executemany(sql, [ (s[match_id], s[home_score], s[away_score], s[home_possession]) for s in stats_list ]) conn.commit() finally: conn.close()7.2 缓存策略实现Redis作为缓存层可以显著减轻数据库压力import redis import pickle from datetime import timedelta redis_client redis.Redis(hostlocalhost, port6379, db0) def get_match_stats_with_cache(match_id): cache_key fmatch_stats:{match_id} cached_data redis_client.get(cache_key) if cached_data: return pickle.loads(cached_data) # 缓存未命中查询数据库 conn db_pool.connection() try: with conn.cursor() as cursor: cursor.execute( SELECT * FROM match_stats WHERE match_id %s ORDER BY capture_time DESC LIMIT 20 , (match_id,)) stats cursor.fetchall() # 存入缓存有效期5分钟 redis_client.setex( cache_key, timedelta(minutes5), pickle.dumps(stats) ) return stats finally: conn.close()8. 安全防护措施8.1 常见Web安全防护系统安全不容忽视必须防范以下风险SQL注入使用参数化查询XSS攻击模板引擎自动转义CSRF攻击添加Token验证暴力破解登录失败限制from flask_wtf.csrf import CSRFProtect from werkzeug.security import generate_password_hash app Flask(__name__) app.config[SECRET_KEY] your-secret-key-here csrf CSRFProtect(app) # 用户密码安全存储 def create_user(username, password): hashed_pw generate_password_hash( password, methodpbkdf2:sha256, salt_length16 ) conn db_pool.connection() try: with conn.cursor() as cursor: cursor.execute( INSERT INTO users (username, password) VALUES (%s, %s), (username, hashed_pw) ) conn.commit() finally: conn.close()8.2 数据采集合规性在采集公开数据时需要注意robots.txt遵守网站的爬虫协议请求频率控制访问间隔数据使用仅用于个人研究版权声明注明数据来源def check_robots_txt(url): import urllib.robotparser rp urllib.robotparser.RobotFileParser() robots_url urllib.parse.urljoin(url, /robots.txt) rp.set_url(robots_url) rp.read() user_agent MySoccerBot/1.0 if not rp.can_fetch(user_agent, url): raise Exception(fScraping {url} is disallowed by robots.txt)9. 扩展功能与未来迭代9.1 机器学习模型集成进阶版本可以引入预测模型from sklearn.ensemble import RandomForestClassifier from joblib import dump, load import pandas as pd def train_goal_prediction_model(): conn db_pool.connection() try: # 获取历史数据 query SELECT home_possession, away_possession, home_shots_on_target, away_shots_on_target, home_yellow_cards, away_yellow_cards, CASE WHEN home_score 0 THEN 1 ELSE 0 END as home_goal, CASE WHEN away_score 0 THEN 1 ELSE 0 END as away_goal FROM match_stats WHERE capture_time NOW() - INTERVAL 6 MONTH df pd.read_sql(query, conn) # 特征工程 X df[[home_possession, away_possession, home_shots_on_target, away_shots_on_target, home_yellow_cards, away_yellow_cards]] y_home df[home_goal] y_away df[away_goal] # 训练模型 home_model RandomForestClassifier(n_estimators100) home_model.fit(X, y_home) away_model RandomForestClassifier(n_estimators100) away_model.fit(X, y_away) # 保存模型 dump(home_model, home_goal_model.joblib) dump(away_model, away_goal_model.joblib) finally: conn.close()9.2 多语言支持方案面向国际用户时可以使用Flask-Babel添加多语言支持from flask_babel import Babel, _ app Flask(__name__) app.config[BABEL_DEFAULT_LOCALE] en babel Babel(app) app.route(/match/int:match_id) def match_dashboard(match_id): return render_template(match.html, title_(Match Dashboard)) # 在模板中使用 h1{{ _(Live Match Analytics) }}/h1 p{{ _(Current possession percentage) }}: {{ home_possession }}%/p10. 项目组织与代码规范10.1 模块化项目结构良好的项目结构有助于长期维护soccer-analytics/ ├── scraper/ # 数据采集模块 │ ├── spiders/ # 各网站爬虫 │ ├── utils/ # 通用工具 │ └── __init__.py ├── analytics/ # 数据分析模块 │ ├── algorithms/ # 分析算法 │ └── models/ # 机器学习模型 ├── database/ # 数据库相关 │ ├── migrations/ # 数据库迁移脚本 │ └── models.py # ORM模型 ├── webapp/ # 前端应用 │ ├── static/ # 静态资源 │ ├── templates/ # Jinja2模板 │ └── views.py # 路由视图 ├── config.py # 配置文件 ├── requirements.txt # 依赖列表 └── README.md # 项目说明10.2 代码质量保障建议采用以下工具保证代码质量代码格式化black、isort静态检查pylint、flake8类型检查mypy单元测试pytest代码覆盖率coverage.py# 示例测试用例 import pytest from scraper.utils import safe_request def test_safe_request_success(requests_mock): requests_mock.get(https://example.com, textsuccess) response safe_request(https://example.com) assert response.text success def test_safe_request_retry(requests_mock): requests_mock.get(https://example.com, status_code500) response safe_request(https://example.com, max_retry3) assert response is None assert requests_mock.call_count 311. 实战经验分享在真实项目部署中有几个关键点需要特别注意IP封禁问题某次我们设置的请求间隔太短导致数据源网站封禁了服务器IP。解决方案是引入随机延迟和代理IP轮换机制将请求间隔控制在15-30秒之间。数据一致性挑战早期版本曾出现因网络波动导致的数据缺失。后来我们实现了断点续采功能记录最近成功采集的时间戳程序重启后从断点继续。内存泄漏排查长时间运行的爬虫容易出现内存泄漏。通过定期重启工作进程每天一次和使用memory_profiler工具定位问题代码最终解决了这个问题。监控盲区最初只关注了技术指标忽略了系统本身的健康状态。后来添加了Prometheus监控跟踪以下关键指标from prometheus_client import start_http_server, Gauge # 定义监控指标 SCRAPE_SUCCESS Gauge(scrape_success, Last scrape success status) MATCHES_MONITORED Gauge(matches_monitored, Number of active matches) ALERTS_TRIGGERED Gauge(alerts_triggered, Alerts in last hour) def start_monitoring(port8000): start_http_server(port)12. 调试与问题排查当系统出现异常时可以按照以下步骤排查检查日志首先查看应用日志和数据库慢查询日志验证数据流确认从采集到存储的整个链路资源监控检查CPU、内存、磁盘I/O使用情况网络诊断测试到目标网站和数据库的网络连接# 网络诊断工具函数示例 import socket import subprocess def check_network_connection(): tests [ (Database, localhost, 3306), (Target Website, www.flashscore.com, 80), (Redis, localhost, 6379) ] results [] for name, host, port in tests: try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(3) s.connect((host, port)) results.append(f{name}连接正常) except Exception as e: results.append(f{name}连接失败: {str(e)}) # 添加traceroute信息 try: trace subprocess.run( [traceroute, -w, 2, www.flashscore.com], capture_outputTrue, textTrue ) results.append(f\nTraceroute结果:\n{trace.stdout}) except Exception as e: results.append(f\nTraceroute失败: {str(e)}) return \n.join(results)13. 成本控制策略运行此类系统需要考虑的三大成本基础设施成本云服务器费用数据库存储费用网络带宽费用数据获取成本商业API调用费用代理IP服务费用维护成本开发人员时间投入系统监控运维成本成本优化方案对比表优化方向具体措施预期节省效果数据采集使用免费数据源智能缓存降低30-50%API成本服务器资源使用Spot实例自动伸缩降低40-70%计算成本存储方案冷热数据分离压缩存储降低60%存储成本网络带宽启用CDN缓存静态资源降低50%带宽成本# 智能缓存实现示例 def get_with_cache(url, expire_hours24): cache_key furl_cache:{hashlib.md5(url.encode()).hexdigest()} cached redis_client.get(cache_key) if cached: return pickle.loads(cached) # 未命中缓存实际请求 response safe_request(url) if response and response.status_code 200: data response.json() redis_client.setex( cache_key, timedelta(hoursexpire_hours), pickle.dumps(data) ) return data return None14. 用户权限管理对于需要多用户协作的场景必须设计完善的权限系统-- 权限系统表设计 CREATE TABLE users ( user_id INT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(50) UNIQUE NOT NULL, password_hash VARCHAR(255) NOT NULL, email VARCHAR(100) UNIQUE, is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE roles ( role_id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(50) UNIQUE NOT NULL, description TEXT ); CREATE TABLE user_roles ( user_id INT NOT NULL, role_id INT NOT NULL, PRIMARY KEY (user_id, role_id), FOREIGN KEY (user_id) REFERENCES users(user_id), FOREIGN KEY (role_id) REFERENCES roles(role_id) ); CREATE TABLE permissions ( permission_id INT AUTO_INCREMENT PRIMARY KEY, resource_type VARCHAR(50) NOT NULL, action VARCHAR(50) NOT NULL, UNIQUE KEY (resource_type, action) ); CREATE TABLE role_permissions ( role_id INT NOT NULL, permission_id INT NOT NULL, PRIMARY KEY (role_id, permission_id), FOREIGN KEY (role_id) REFERENCES roles(role_id), FOREIGN KEY (permission_id) REFERENCES permissions(permission_id) );对应的Python权限检查装饰器from functools import wraps from flask import abort, g def has_permission(resource, action): def decorator(f): wraps(f) def decorated_function(*args, **kwargs): if not current_user.is_authenticated: abort(401) # 检查用户是否有对应权限 conn db_pool.connection() try: with conn.cursor() as cursor: cursor.execute( SELECT 1 FROM role_permissions rp JOIN permissions p ON rp.permission_id p.permission_id JOIN user_roles ur ON rp.role_id ur.role_id WHERE ur.user_id %s AND p.resource_type %s AND p.action %s LIMIT 1 , (current_user.id, resource, action)) if not cursor.fetchone(): abort(403) finally: conn.close() return f(*args, **kwargs) return decorated_function return decorator # 使用示例 app.route(/admin/matches) has_permission(match, admin) def admin_matches(): # 只有有match资源admin权限的用户可以访问 pass15. 自动化运维方案15.1 自动化部署流水线使用GitHub Actions实现CI/CDname: Deploy Soccer Analytics on: push: branches: [ main ] pull_request: branches: [ main ] jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkoutv2 - name: Set up Python uses: actions/setup-pythonv2 with: python-version: 3.9 - name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements.txt - name: Run tests run: | pytest --cov. deploy: needs: test runs-on: ubuntu-latest steps: - uses: actions/checkoutv2 - name: Docker login uses: docker/login-actionv1 with: username: ${{ secrets.DOCKER_HUB_USERNAME }} password: ${{ secrets.DOCKER_HUB_TOKEN }} - name: Build and push run: | docker-compose build docker-compose push - name: SSH deploy uses: appleboy/ssh-actionmaster with: host: ${{ secrets.SERVER_HOST }} username: ${{ secrets.SERVER_USER }} key: ${{ secrets.SERVER_SSH_KEY }} script: | cd /opt/soccer-analytics docker-compose pull docker-compose up -d15.2 监控告警配置使用PrometheusAlertmanager实现智能告警# prometheus.yml 片段 scrape_configs: - job_name: soccer-monitor static_configs: - targets: [scraper:8000, dashboard:8001] - job_name: mysql static_configs: - targets: [mysql:9104] - job_name: redis static_configs: - targets: [redis:9121] # alert.rules 示例 groups: - name: service-alerts rules: - alert: HighErrorRate expr: rate(scrape_errors_total[5m]) 0.1 for: 10m labels: severity: critical annotations: summary: High error rate on {{ $labels.instance }} description: Error rate is {{ $value }} - alert: DatabaseDown expr: mysql_up 0 for: 2m labels: severity: critical annotations: summary: MySQL database is down