第20讲:安全与权限管理 掌握 Skill 的安全与权限管理,保护用户数据安全,防范常见安全风险。
一、安全风险分析 1.1 常见安全风险 风险类型 描述 危害 数据泄露 用户敏感信息被未授权访问 隐私泄露、财产损失 注入攻击 恶意代码通过输入执行 系统被控制、数据被破坏 越权访问 用户访问超出权限的资源 数据被篡改、删除 会话劫持 攻击者窃取用户会话 冒充用户操作 API 滥用 接口被恶意调用 资源耗尽、服务中断
1.2 安全原则 1 2 3 4 5 6 7 8 9 10 11 安全设计原则 ├── 最小权限原则 │ └── 只授予必要的权限 ├── 纵深防御原则 │ └── 多层安全防护 ├── 失败安全原则 │ └── 出错时默认拒绝 ├── 完全仲裁原则 │ └── 每次访问都验证权限 └── 开放设计原则 └── 不依赖隐藏实现的安全
二、身份认证 2.1 认证方式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 from abc import ABC, abstractmethodfrom typing import Optional import hashlibimport secretsimport timeclass Authenticator (ABC ): """认证器基类""" @abstractmethod def authenticate (self, credentials: dict ) -> Optional [str ]: """认证,返回用户ID或None""" pass @abstractmethod def verify_token (self, token: str ) -> Optional [str ]: """验证令牌,返回用户ID或None""" pass class APIKeyAuthenticator (Authenticator ): """API Key 认证""" def __init__ (self, storage ): self.storage = storage def generate_api_key (self, user_id: str ) -> str : """生成 API Key""" api_key = secrets.token_urlsafe(32 ) hashed_key = hashlib.sha256(api_key.encode()).hexdigest() self.storage.save(f"apikey:{hashed_key} " , { 'user_id' : user_id, 'created_at' : time.time(), 'last_used' : time.time() }) return api_key def authenticate (self, credentials: dict ) -> Optional [str ]: """验证 API Key""" api_key = credentials.get('api_key' ) if not api_key: return None hashed_key = hashlib.sha256(api_key.encode()).hexdigest() key_data = self.storage.load(f"apikey:{hashed_key} " ) if key_data: key_data['last_used' ] = time.time() self.storage.save(f"apikey:{hashed_key} " , key_data) return key_data['user_id' ] return None def verify_token (self, token: str ) -> Optional [str ]: """API Key 即令牌""" return self.authenticate({'api_key' : token}) def revoke_api_key (self, api_key: str ): """吊销 API Key""" hashed_key = hashlib.sha256(api_key.encode()).hexdigest() self.storage.delete(f"apikey:{hashed_key} " ) class JWTAuthenticator (Authenticator ): """JWT 认证""" def __init__ (self, secret_key: str , storage ): self.secret_key = secret_key self.storage = storage def generate_token (self, user_id: str , expires_in: int = 3600 ) -> str : """生成 JWT 令牌""" import jwt payload = { 'user_id' : user_id, 'exp' : time.time() + expires_in, 'iat' : time.time() } return jwt.encode(payload, self.secret_key, algorithm='HS256' ) def authenticate (self, credentials: dict ) -> Optional [str ]: """验证用户名密码(示例)""" username = credentials.get('username' ) password = credentials.get('password' ) user_data = self.storage.load(f"user:{username} " ) if user_data and self._verify_password(password, user_data['password_hash' ]): return user_data['user_id' ] return None def verify_token (self, token: str ) -> Optional [str ]: """验证 JWT 令牌""" import jwt try : payload = jwt.decode(token, self.secret_key, algorithms=['HS256' ]) return payload.get('user_id' ) except jwt.ExpiredSignatureError: return None except jwt.InvalidTokenError: return None def _verify_password (self, password: str , password_hash: str ) -> bool : """验证密码""" import bcrypt return bcrypt.checkpw(password.encode(), password_hash.encode())
2.2 多因素认证 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 class MFAAuthenticator : """多因素认证""" def __init__ (self, storage ): self.storage = storage def setup_totp (self, user_id: str ) -> str : """设置 TOTP(基于时间的一次性密码)""" import pyotp secret = pyotp.random_base32() self.storage.save(f"mfa:{user_id} " , { 'secret' : secret, 'enabled' : False }) totp = pyotp.TOTP(secret) provisioning_uri = totp.provisioning_uri( name=user_id, issuer_name="AI Skill" ) return provisioning_uri def verify_totp (self, user_id: str , code: str ) -> bool : """验证 TOTP 码""" import pyotp mfa_data = self.storage.load(f"mfa:{user_id} " ) if not mfa_data: return False totp = pyotp.TOTP(mfa_data['secret' ]) if totp.verify(code): if not mfa_data['enabled' ]: mfa_data['enabled' ] = True self.storage.save(f"mfa:{user_id} " , mfa_data) return True return False def generate_backup_codes (self, user_id: str , count: int = 10 ) -> list : """生成备用验证码""" import secrets codes = [secrets.token_hex(4 ) for _ in range (count)] hashed_codes = [hashlib.sha256(c.encode()).hexdigest() for c in codes] self.storage.save(f"mfa:{user_id} :backup" , hashed_codes) return codes
三、权限控制 3.1 RBAC 权限模型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 from enum import Enumfrom typing import List , Set class Permission (Enum ): """权限枚举""" SKILL_USE = "skill:use" SKILL_ADMIN = "skill:admin" DATA_READ = "data:read" DATA_WRITE = "data:write" DATA_DELETE = "data:delete" USER_MANAGE = "user:manage" CONFIG_READ = "config:read" CONFIG_WRITE = "config:write" class Role : """角色""" def __init__ (self, name: str , permissions: Set [Permission] ): self.name = name self.permissions = permissions ROLES = { 'admin' : Role('admin' , { Permission.SKILL_USE, Permission.SKILL_ADMIN, Permission.DATA_READ, Permission.DATA_WRITE, Permission.DATA_DELETE, Permission.USER_MANAGE, Permission.CONFIG_READ, Permission.CONFIG_WRITE }), 'user' : Role('user' , { Permission.SKILL_USE, Permission.DATA_READ, Permission.DATA_WRITE }), 'guest' : Role('guest' , { Permission.SKILL_USE }) } class RBACManager : """RBAC 权限管理器""" def __init__ (self, storage ): self.storage = storage def assign_role (self, user_id: str , role_name: str ): """分配角色""" if role_name not in ROLES: raise ValueError(f"未知角色: {role_name} " ) self.storage.save(f"role:{user_id} " , role_name) def get_user_role (self, user_id: str ) -> Role: """获取用户角色""" role_name = self.storage.load(f"role:{user_id} " ) or 'guest' return ROLES.get(role_name, ROLES['guest' ]) def check_permission (self, user_id: str , permission: Permission ) -> bool : """检查权限""" role = self.get_user_role(user_id) return permission in role.permissions def require_permission (self, permission: Permission ): """权限装饰器""" def decorator (func ): def wrapper (self, user_id: str , *args, **kwargs ): if not self.check_permission(user_id, permission): raise PermissionError(f"缺少权限: {permission.value} " ) return func(self, user_id, *args, **kwargs) return wrapper return decorator
3.2 资源级权限 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 class ResourcePermissionManager : """资源级权限管理""" def __init__ (self, storage ): self.storage = storage def grant_access (self, resource_id: str , user_id: str , permissions: List [str ] ): """授予资源访问权限""" key = f"acl:{resource_id} " acl = self.storage.load(key) or {} acl[user_id] = { 'permissions' : permissions, 'granted_at' : time.time() } self.storage.save(key, acl) def revoke_access (self, resource_id: str , user_id: str ): """撤销资源访问权限""" key = f"acl:{resource_id} " acl = self.storage.load(key) or {} if user_id in acl: del acl[user_id] self.storage.save(key, acl) def check_resource_access (self, resource_id: str , user_id: str , action: str ) -> bool : """检查资源访问权限""" owner = self.storage.load(f"owner:{resource_id} " ) if owner == user_id: return True acl = self.storage.load(f"acl:{resource_id} " ) or {} user_acl = acl.get(user_id, {}) return action in user_acl.get('permissions' , []) def set_owner (self, resource_id: str , user_id: str ): """设置资源所有者""" self.storage.save(f"owner:{resource_id} " , user_id)
四、输入安全 4.1 输入验证 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 import refrom typing import Any , Dict class InputValidator : """输入验证器""" @staticmethod def validate_email (email: str ) -> bool : """验证邮箱格式""" pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return bool (re.match (pattern, email)) @staticmethod def validate_phone (phone: str ) -> bool : """验证手机号""" pattern = r'^1[3-9]\d{9}$' return bool (re.match (pattern, phone)) @staticmethod def validate_file_path (path: str ) -> bool : """验证文件路径(防止目录遍历)""" dangerous_patterns = ['..' , '~' , '//' ] return not any (p in path for p in dangerous_patterns) @staticmethod def sanitize_html (html: str ) -> str : """清理 HTML(防止 XSS)""" import bleach allowed_tags = ['p' , 'br' , 'strong' , 'em' , 'u' , 'h1' , 'h2' , 'h3' ] allowed_attrs = {} return bleach.clean(html, tags=allowed_tags, attributes=allowed_attrs) @staticmethod def validate_json (data: str ) -> Dict : """验证并解析 JSON""" import json try : parsed = json.loads(data) if not isinstance (parsed, dict ): raise ValueError("JSON 必须是对象" ) return parsed except json.JSONDecodeError as e: raise ValueError(f"无效的 JSON: {e} " ) @staticmethod def limit_string_length (text: str , max_length: int = 10000 ) -> str : """限制字符串长度""" if len (text) > max_length: raise ValueError(f"输入超过最大长度 {max_length} " ) return text @staticmethod def validate_sql_injection (text: str ) -> bool : """检查 SQL 注入风险""" dangerous_keywords = [ 'SELECT' , 'INSERT' , 'UPDATE' , 'DELETE' , 'DROP' , 'UNION' , 'EXEC' , 'EXECUTE' , 'SCRIPT' ] upper_text = text.upper() return not any (kw in upper_text for kw in dangerous_keywords)
4.2 命令注入防护 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import subprocessimport shlexclass SafeCommandExecutor : """安全命令执行器""" ALLOWED_COMMANDS = { 'ls' : ['ls' , '-la' ], 'cat' : ['cat' ], 'grep' : ['grep' ], 'wc' : ['wc' , '-l' ] } @classmethod def execute (cls, command: str , args: list = None ) -> str : """安全执行命令""" if command not in cls.ALLOWED_COMMANDS: raise ValueError(f"不允许的命令: {command} " ) cmd = cls.ALLOWED_COMMANDS[command].copy() if args: for arg in args: if not cls._is_safe_arg(arg): raise ValueError(f"不安全的参数: {arg} " ) cmd.extend(args) try : result = subprocess.run( cmd, capture_output=True , text=True , timeout=30 ) return result.stdout except subprocess.TimeoutExpired: raise TimeoutError("命令执行超时" ) @staticmethod def _is_safe_arg (arg: str ) -> bool : """检查参数是否安全""" dangerous_chars = [';' , '&' , '|' , '`' , '$' , '(' , ')' , '{' , '}' ] return not any (c in arg for c in dangerous_chars)
五、数据安全 5.1 加密存储 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 from cryptography.fernet import Fernetfrom cryptography.hazmat.primitives import hashesfrom cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMACimport base64import osclass DataEncryption : """数据加密""" def __init__ (self, master_key: str ): """初始化加密器""" kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32 , salt=os.urandom(16 ), iterations=100000 ) key = base64.urlsafe_b64encode(kdf.derive(master_key.encode())) self.cipher = Fernet(key) def encrypt (self, data: str ) -> str : """加密数据""" return self.cipher.encrypt(data.encode()).decode() def decrypt (self, encrypted: str ) -> str : """解密数据""" return self.cipher.decrypt(encrypted.encode()).decode() def hash_sensitive (self, data: str ) -> str : """哈希敏感数据(不可逆)""" import hashlib return hashlib.sha256(data.encode()).hexdigest() class SecureDataStore : """安全数据存储""" def __init__ (self, storage, encryption: DataEncryption ): self.storage = storage self.encryption = encryption def save_sensitive (self, key: str , data: str ): """安全保存敏感数据""" encrypted = self.encryption.encrypt(data) self.storage.save(f"enc:{key} " , encrypted) def load_sensitive (self, key: str ) -> str : """加载敏感数据""" encrypted = self.storage.load(f"enc:{key} " ) if encrypted: return self.encryption.decrypt(encrypted) return None def save_hash (self, key: str , data: str ): """保存哈希值(用于验证)""" hashed = self.encryption.hash_sensitive(data) self.storage.save(f"hash:{key} " , hashed) def verify_hash (self, key: str , data: str ) -> bool : """验证哈希值""" stored_hash = self.storage.load(f"hash:{key} " ) if not stored_hash: return False data_hash = self.encryption.hash_sensitive(data) return stored_hash == data_hash
六、安全审计 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 class SecurityAuditor : """安全审计""" def __init__ (self, storage ): self.storage = storage def log_security_event (self, event_type: str , user_id: str , details: dict , risk_level: str = 'low' ): """记录安全事件""" event = { 'timestamp' : time.time(), 'type' : event_type, 'user_id' : user_id, 'details' : details, 'risk_level' : risk_level } date = datetime.now().strftime('%Y-%m-%d' ) key = f"security:{date} " events = self.storage.load(key) or [] events.append(event) self.storage.save(key, events) if risk_level == 'high' : self._alert_security_team(event) def _alert_security_team (self, event: dict ): """告警安全团队""" pass def detect_anomaly (self, user_id: str , action: str ) -> bool : """检测异常行为""" recent_actions = self._get_recent_actions(user_id, minutes=5 ) if len (recent_actions) > 100 : self.log_security_event( 'anomaly_detected' , user_id, {'action' : action, 'count' : len (recent_actions)}, 'high' ) return True return False def _get_recent_actions (self, user_id: str , minutes: int ) -> list : """获取最近操作""" pass
七、实战练习 练习 1:认证系统实现 实现一个完整的用户认证系统,支持:
练习 2:权限控制实现 为一个文件管理 Skill 实现 RBAC 权限控制:
练习 3:输入验证 实现全面的输入验证机制:
八、下节预告 下一讲我们将学习 性能优化与监控 ,包括:
加入学习群 👉 加入AI编程学习交流群
本讲是《AI Skills 从入门到实践》系列课程的第20讲。
🎓 AI 编程实战课程 想系统学习 AI 编程?程序员晚枫的 AI 编程实战课 帮你从零上手!