
第21讲:性能优化与监控
掌握 Skill 的性能优化和监控技巧,确保 Skill 高效稳定运行。
一、性能瓶颈分析
1.1 常见性能问题
| 问题类型 | 表现 | 原因 |
|---|
| 响应慢 | 用户等待时间长 | 同步处理、大文件操作 |
| 内存高 | 内存占用持续增长 | 内存泄漏、大对象缓存 |
| CPU 高 | 处理卡顿 | 复杂计算、循环处理 |
| 超时 | 请求超时失败 | 外部 API 慢、大文件处理 |
1.2 性能分析工具
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import time import functools import tracemalloc from typing import Callable
class PerformanceProfiler: """性能分析器""" @staticmethod def timer(func: Callable) -> Callable: """计时装饰器""" @functools.wraps(func) def wrapper(*args, **kwargs): start = time.perf_counter() result = func(*args, **kwargs) elapsed = time.perf_counter() - start print(f"{func.__name__} 耗时: {elapsed:.3f}s") return result return wrapper @staticmethod def memory_tracker(func: Callable) -> Callable: """内存追踪装饰器""" @functools.wraps(func) def wrapper(*args, **kwargs): tracemalloc.start() result = func(*args, **kwargs) current, peak = tracemalloc.get_traced_memory() tracemalloc.stop() print(f"{func.__name__} 内存使用: {current/1024/1024:.2f}MB, 峰值: {peak/1024/1024:.2f}MB") return result return wrapper
|
二、响应时间优化
2.1 异步处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import asyncio import aiohttp from concurrent.futures import ThreadPoolExecutor
class AsyncSkillProcessor: """异步处理器""" def __init__(self): self.executor = ThreadPoolExecutor(max_workers=10) async def process_async(self, tasks: list) -> list: """异步处理多个任务""" loop = asyncio.get_event_loop() futures = [ loop.run_in_executor(self.executor, task) for task in tasks ] results = await asyncio.gather(*futures, return_exceptions=True) return results async def fetch_urls(self, urls: list) -> list: """异步获取多个 URL""" async with aiohttp.ClientSession() as session: tasks = [self._fetch_one(session, url) for url in urls] return await asyncio.gather(*tasks) async def _fetch_one(self, session: aiohttp.ClientSession, url: str): """获取单个 URL""" async with session.get(url, timeout=30) as response: return await response.text()
|
2.2 流式处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| class StreamingProcessor: """流式处理器""" def process_large_file(self, file_path: str, chunk_size: int = 8192): """流式处理大文件""" with open(file_path, 'rb') as f: while True: chunk = f.read(chunk_size) if not chunk: break yield self.process_chunk(chunk) def process_chunk(self, chunk: bytes) -> bytes: """处理数据块""" return chunk
|
三、缓存策略
3.1 多级缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| import functools import hashlib import time from typing import Any, Optional
class CacheManager: """缓存管理器""" def __init__(self): self.memory_cache = {} self.cache_stats = {'hits': 0, 'misses': 0} def get(self, key: str) -> Optional[Any]: """获取缓存""" if key in self.memory_cache: entry = self.memory_cache[key] if entry['expiry'] > time.time(): self.cache_stats['hits'] += 1 return entry['value'] else: del self.memory_cache[key] self.cache_stats['misses'] += 1 return None def set(self, key: str, value: Any, ttl: int = 300): """设置缓存""" self.memory_cache[key] = { 'value': value, 'expiry': time.time() + ttl } def memoize(self, ttl: int = 300): """缓存装饰器""" def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): key = self._generate_key(func.__name__, args, kwargs) cached = self.get(key) if cached is not None: return cached result = func(*args, **kwargs) self.set(key, result, ttl) return result return wrapper return decorator def _generate_key(self, func_name: str, args: tuple, kwargs: dict) -> str: """生成缓存键""" key_data = f"{func_name}:{str(args)}:{str(kwargs)}" return hashlib.md5(key_data.encode()).hexdigest()
cache = CacheManager()
@cache.memoize(ttl=600) def expensive_operation(param: str) -> str: """耗时操作""" time.sleep(2) return f"Result for {param}"
|
四、监控告警
4.1 性能监控
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| import statistics from dataclasses import dataclass from typing import List
@dataclass class Metric: """指标数据""" timestamp: float name: str value: float labels: dict
class PerformanceMonitor: """性能监控器""" def __init__(self, storage): self.storage = storage self.metrics_buffer = [] def record(self, name: str, value: float, labels: dict = None): """记录指标""" metric = Metric( timestamp=time.time(), name=name, value=value, labels=labels or {} ) self.metrics_buffer.append(metric) if len(self.metrics_buffer) >= 100: self._flush_metrics() def _flush_metrics(self): """刷新指标到存储""" for metric in self.metrics_buffer: key = f"metrics:{metric.name}:{int(metric.timestamp)}" self.storage.save(key, { 'timestamp': metric.timestamp, 'value': metric.value, 'labels': metric.labels }) self.metrics_buffer.clear() def get_statistics(self, name: str, hours: int = 24) -> dict: """获取统计信息""" metrics = self._query_metrics(name, hours) if not metrics: return {} values = [m['value'] for m in metrics] return { 'count': len(values), 'mean': statistics.mean(values), 'median': statistics.median(values), 'min': min(values), 'max': max(values), 'p95': sorted(values)[int(len(values) * 0.95)], 'p99': sorted(values)[int(len(values) * 0.99)] }
|
五、实战练习
练习 1:性能分析
为一个 Skill 添加性能分析,找出瓶颈。
练习 2:缓存实现
实现一个缓存系统,提升响应速度。
练习 3:监控告警
搭建监控告警系统,及时发现性能问题。
六、下节预告
下一章我们将进入 综合项目实战,通过完整项目巩固所学知识。
加入学习群
👉 加入AI编程学习交流群

本讲是《AI Skills 从入门到实践》系列课程的第21讲。
🎓 AI 编程实战课程
想系统学习 AI 编程?程序员晚枫的 AI 编程实战课 帮你从零上手!