告别手动整理!用Python脚本调用Eeyes实现自动化C段资产梳理
用Python脚本调用Eeyes实现自动化C段资产梳理在安全运维和DevSecOps领域资产梳理是基础但极其重要的工作。传统手动整理C段资产的方式不仅效率低下还容易遗漏关键信息。本文将介绍如何通过Python脚本调用Eeyes工具实现自动化C段资产梳理提升安全运维效率。1. 环境准备与工具介绍在开始自动化脚本开发前需要准备好基础环境。Eeyes是一款专注于从大量域名中提取真实IP并整理为C段的工具特别适合在红蓝对抗和外部攻击面管理(EASM)场景中使用。基础环境要求Python 3.6Eeyes工具可从官方仓库获取网络访问权限用于域名解析安装必要的Python依赖pip install requests subprocess32 pandasEeyes的主要功能特点支持从文件批量读取域名自动识别CDN并获取真实IP将IP整理为C段格式输出可生成日志文件便于后续分析2. Eeyes基础功能封装首先我们需要用Python封装Eeyes的基础功能使其可以通过脚本调用。这里我们使用subprocess模块来调用命令行工具。import subprocess import os class EeyesWrapper: def __init__(self, eeyes_path): self.eeyes_path eeyes_path def scan_domains_from_file(self, input_file, output_fileNone): 从文件读取域名进行扫描 cmd [self.eeyes_path, -l, input_file] if output_file: cmd.extend([-log, output_file]) try: result subprocess.run( cmd, capture_outputTrue, textTrue, checkTrue ) return result.stdout except subprocess.CalledProcessError as e: print(fEeyes执行失败: {e.stderr}) return None这个基础封装类提供了从文件读取域名进行扫描的功能。在实际使用中你可以根据需要扩展更多功能比如直接扫描单个域名或处理特殊格式的输入。3. 输出结果解析与处理Eeyes的输出结果需要进一步处理才能生成结构化的C段资产报告。下面是一个结果解析器的实现示例import re from collections import defaultdict import ipaddress class EeyesResultParser: def __init__(self): self.ip_pattern re.compile(r\b(?:\d{1,3}\.){3}\d{1,3}\b) def parse_to_c_blocks(self, eeyes_output): 将Eeyes输出解析为C段分类 ips self._extract_ips(eeyes_output) c_blocks defaultdict(list) for ip in ips: try: network ipaddress.IPv4Network(f{ip}/24, strictFalse) c_block str(network.network_address) c_blocks[c_block].append(ip) except ValueError: continue return c_blocks def _extract_ips(self, text): 从文本中提取所有IP地址 return self.ip_pattern.findall(text)使用这个解析器我们可以将Eeyes的输出转换为更有组织的C段资产数据# 使用示例 wrapper EeyesWrapper(path/to/eeyes) result wrapper.scan_domains_from_file(domains.txt) parser EeyesResultParser() c_blocks parser.parse_to_c_blocks(result) for c_block, ips in c_blocks.items(): print(fC段 {c_block}/24 包含IP: {, .join(ips)})4. 自动化流程设计与优化要实现完整的自动化资产梳理流程我们需要考虑以下几个关键点输入处理支持多种输入来源文件、API、数据库等扫描执行稳定调用Eeyes进行扫描结果解析提取并结构化C段信息报告生成输出易读的报告异常处理应对网络波动等异常情况完整自动化流程示例import time import json from datetime import datetime class CBlockAutomation: def __init__(self, eeyes_path): self.eeyes EeyesWrapper(eeyes_path) self.parser EeyesResultParser() def run_automation(self, input_source, output_formatjson): 执行完整的自动化流程 # 1. 准备输入 domains self._prepare_input(input_source) # 2. 执行扫描 scan_result self._execute_scan(domains) # 3. 解析结果 c_blocks self.parser.parse_to_c_blocks(scan_result) # 4. 生成报告 report self._generate_report(c_blocks, output_format) return report def _prepare_input(self, source): 准备输入数据 if isinstance(source, list): return source elif os.path.isfile(source): with open(source) as f: return [line.strip() for line in f if line.strip()] else: raise ValueError(不支持的输入类型) def _execute_scan(self, domains): 执行Eeyes扫描 temp_file ftemp_domains_{int(time.time())}.txt try: with open(temp_file, w) as f: f.write(\n.join(domains)) return self.eeyes.scan_domains_from_file(temp_file) finally: if os.path.exists(temp_file): os.remove(temp_file) def _generate_report(self, c_blocks, format): 生成报告 if format json: return json.dumps(c_blocks, indent2) elif format text: report [] for c_block, ips in c_blocks.items(): report.append(fC段 {c_block}/24 (共{len(ips)}个IP):) report.extend(f - {ip} for ip in sorted(ips)) report.append() return \n.join(report) else: raise ValueError(不支持的输出格式)5. 高级功能与实战技巧在实际使用中我们还需要考虑一些高级功能和优化技巧5.1 处理网络波动问题网络不稳定可能导致Eeyes识别率下降。我们可以实现重试机制def execute_scan_with_retry(self, domains, max_retries3): 带重试机制的扫描执行 for attempt in range(max_retries): try: result self._execute_scan(domains) if result and self._validate_result(result): return result except Exception as e: print(f扫描失败 (尝试 {attempt 1}/{max_retries}): {str(e)}) time.sleep(5 * (attempt 1)) raise RuntimeError(扫描失败达到最大重试次数) def _validate_result(self, result): 验证扫描结果是否有效 return len(self.parser._extract_ips(result)) 05.2 与资产管理系统集成将自动化脚本与现有资产管理系统集成可以进一步提升效率def upload_to_asset_management(self, c_blocks, api_endpoint): 将C段资产上传到资产管理系统 payload { timestamp: datetime.now().isoformat(), c_blocks: [ { network: f{c_block}/24, ips: ips, count: len(ips) } for c_block, ips in c_blocks.items() ] } response requests.post( api_endpoint, jsonpayload, headers{Content-Type: application/json} ) if response.status_code ! 200: raise RuntimeError(f上传失败: {response.text}) return response.json()5.3 定时任务与无人值守运行使用Python的schedule库可以实现定时自动运行import schedule def schedule_daily_scan(input_file, output_dir): 设置每日自动扫描任务 automation CBlockAutomation(path/to/eeyes) def job(): date_str datetime.now().strftime(%Y%m%d) output_file os.path.join(output_dir, fc_blocks_{date_str}.json) report automation.run_automation(input_file, json) with open(output_file, w) as f: f.write(report) print(f扫描完成结果已保存到 {output_file}) # 每天凌晨2点执行 schedule.every().day.at(02:00).do(job) while True: schedule.run_pending() time.sleep(60)6. 性能优化与最佳实践为了确保脚本在大规模资产梳理场景下的性能可以考虑以下优化措施批量处理将域名分批处理避免一次性处理过多导致内存问题并行扫描使用多进程加速扫描过程结果缓存缓存已扫描的域名结果避免重复工作日志记录详细记录操作日志便于问题排查并行扫描实现示例from multiprocessing import Pool def parallel_scan(domains, eeyes_path, workers4): 并行扫描域名 chunk_size (len(domains) workers - 1) // workers chunks [ domains[i:ichunk_size] for i in range(0, len(domains), chunk_size) ] with Pool(workers) as pool: results pool.starmap( scan_chunk, [(chunk, eeyes_path) for chunk in chunks] ) return \n.join(filter(None, results)) def scan_chunk(domains, eeyes_path): 扫描一批域名 temp_file ftemp_{os.getpid()}.txt try: with open(temp_file, w) as f: f.write(\n.join(domains)) wrapper EeyesWrapper(eeyes_path) return wrapper.scan_domains_from_file(temp_file) finally: if os.path.exists(temp_file): os.remove(temp_file)在实际项目中根据域名数量和服务器资源情况调整worker数量通常设置为CPU核心数的2-4倍效果最佳。