第20讲:安全与权限管理

第20讲:安全与权限管理

掌握 Skill 的安全与权限管理,保护用户数据安全,防范常见安全风险。

一、安全风险分析

1.1 常见安全风险

风险类型描述危害
数据泄露用户敏感信息被未授权访问隐私泄露、财产损失
注入攻击恶意代码通过输入执行系统被控制、数据被破坏
越权访问用户访问超出权限的资源数据被篡改、删除
会话劫持攻击者窃取用户会话冒充用户操作
API 滥用接口被恶意调用资源耗尽、服务中断

1.2 安全原则

1
2
3
4
5
6
7
8
9
10
11
安全设计原则
├── 最小权限原则
│ └── 只授予必要的权限
├── 纵深防御原则
│ └── 多层安全防护
├── 失败安全原则
│ └── 出错时默认拒绝
├── 完全仲裁原则
│ └── 每次访问都验证权限
└── 开放设计原则
└── 不依赖隐藏实现的安全

二、身份认证

2.1 认证方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from abc import ABC, abstractmethod
from typing import Optional
import hashlib
import secrets
import time

class Authenticator(ABC):
"""认证器基类"""

@abstractmethod
def authenticate(self, credentials: dict) -> Optional[str]:
"""认证,返回用户ID或None"""
pass

@abstractmethod
def verify_token(self, token: str) -> Optional[str]:
"""验证令牌,返回用户ID或None"""
pass

class APIKeyAuthenticator(Authenticator):
"""API Key 认证"""

def __init__(self, storage):
self.storage = storage

def generate_api_key(self, user_id: str) -> str:
"""生成 API Key"""
api_key = secrets.token_urlsafe(32)
hashed_key = hashlib.sha256(api_key.encode()).hexdigest()

# 存储哈希值
self.storage.save(f"apikey:{hashed_key}", {
'user_id': user_id,
'created_at': time.time(),
'last_used': time.time()
})

return api_key

def authenticate(self, credentials: dict) -> Optional[str]:
"""验证 API Key"""
api_key = credentials.get('api_key')
if not api_key:
return None

hashed_key = hashlib.sha256(api_key.encode()).hexdigest()
key_data = self.storage.load(f"apikey:{hashed_key}")

if key_data:
# 更新最后使用时间
key_data['last_used'] = time.time()
self.storage.save(f"apikey:{hashed_key}", key_data)
return key_data['user_id']

return None

def verify_token(self, token: str) -> Optional[str]:
"""API Key 即令牌"""
return self.authenticate({'api_key': token})

def revoke_api_key(self, api_key: str):
"""吊销 API Key"""
hashed_key = hashlib.sha256(api_key.encode()).hexdigest()
self.storage.delete(f"apikey:{hashed_key}")

class JWTAuthenticator(Authenticator):
"""JWT 认证"""

def __init__(self, secret_key: str, storage):
self.secret_key = secret_key
self.storage = storage

def generate_token(self, user_id: str, expires_in: int = 3600) -> str:
"""生成 JWT 令牌"""
import jwt

payload = {
'user_id': user_id,
'exp': time.time() + expires_in,
'iat': time.time()
}

return jwt.encode(payload, self.secret_key, algorithm='HS256')

def authenticate(self, credentials: dict) -> Optional[str]:
"""验证用户名密码(示例)"""
username = credentials.get('username')
password = credentials.get('password')

# 从存储验证
user_data = self.storage.load(f"user:{username}")
if user_data and self._verify_password(password, user_data['password_hash']):
return user_data['user_id']

return None

def verify_token(self, token: str) -> Optional[str]:
"""验证 JWT 令牌"""
import jwt

try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
return payload.get('user_id')
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None

def _verify_password(self, password: str, password_hash: str) -> bool:
"""验证密码"""
import bcrypt
return bcrypt.checkpw(password.encode(), password_hash.encode())

2.2 多因素认证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class MFAAuthenticator:
"""多因素认证"""

def __init__(self, storage):
self.storage = storage

def setup_totp(self, user_id: str) -> str:
"""设置 TOTP(基于时间的一次性密码)"""
import pyotp

# 生成密钥
secret = pyotp.random_base32()

# 存储密钥
self.storage.save(f"mfa:{user_id}", {
'secret': secret,
'enabled': False # 需要验证一次后才启用
})

# 生成二维码 URI
totp = pyotp.TOTP(secret)
provisioning_uri = totp.provisioning_uri(
name=user_id,
issuer_name="AI Skill"
)

return provisioning_uri

def verify_totp(self, user_id: str, code: str) -> bool:
"""验证 TOTP 码"""
import pyotp

mfa_data = self.storage.load(f"mfa:{user_id}")
if not mfa_data:
return False

totp = pyotp.TOTP(mfa_data['secret'])

if totp.verify(code):
# 首次验证成功后启用 MFA
if not mfa_data['enabled']:
mfa_data['enabled'] = True
self.storage.save(f"mfa:{user_id}", mfa_data)
return True

return False

def generate_backup_codes(self, user_id: str, count: int = 10) -> list:
"""生成备用验证码"""
import secrets

codes = [secrets.token_hex(4) for _ in range(count)]

# 存储哈希后的备用码
hashed_codes = [hashlib.sha256(c.encode()).hexdigest() for c in codes]
self.storage.save(f"mfa:{user_id}:backup", hashed_codes)

return codes

三、权限控制

3.1 RBAC 权限模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from enum import Enum
from typing import List, Set

class Permission(Enum):
"""权限枚举"""
SKILL_USE = "skill:use"
SKILL_ADMIN = "skill:admin"
DATA_READ = "data:read"
DATA_WRITE = "data:write"
DATA_DELETE = "data:delete"
USER_MANAGE = "user:manage"
CONFIG_READ = "config:read"
CONFIG_WRITE = "config:write"

class Role:
"""角色"""

def __init__(self, name: str, permissions: Set[Permission]):
self.name = name
self.permissions = permissions

# 预定义角色
ROLES = {
'admin': Role('admin', {
Permission.SKILL_USE,
Permission.SKILL_ADMIN,
Permission.DATA_READ,
Permission.DATA_WRITE,
Permission.DATA_DELETE,
Permission.USER_MANAGE,
Permission.CONFIG_READ,
Permission.CONFIG_WRITE
}),
'user': Role('user', {
Permission.SKILL_USE,
Permission.DATA_READ,
Permission.DATA_WRITE
}),
'guest': Role('guest', {
Permission.SKILL_USE
})
}

class RBACManager:
"""RBAC 权限管理器"""

def __init__(self, storage):
self.storage = storage

def assign_role(self, user_id: str, role_name: str):
"""分配角色"""
if role_name not in ROLES:
raise ValueError(f"未知角色: {role_name}")

self.storage.save(f"role:{user_id}", role_name)

def get_user_role(self, user_id: str) -> Role:
"""获取用户角色"""
role_name = self.storage.load(f"role:{user_id}") or 'guest'
return ROLES.get(role_name, ROLES['guest'])

def check_permission(self, user_id: str, permission: Permission) -> bool:
"""检查权限"""
role = self.get_user_role(user_id)
return permission in role.permissions

def require_permission(self, permission: Permission):
"""权限装饰器"""
def decorator(func):
def wrapper(self, user_id: str, *args, **kwargs):
if not self.check_permission(user_id, permission):
raise PermissionError(f"缺少权限: {permission.value}")
return func(self, user_id, *args, **kwargs)
return wrapper
return decorator

3.2 资源级权限

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class ResourcePermissionManager:
"""资源级权限管理"""

def __init__(self, storage):
self.storage = storage

def grant_access(self, resource_id: str, user_id: str,
permissions: List[str]):
"""授予资源访问权限"""
key = f"acl:{resource_id}"
acl = self.storage.load(key) or {}

acl[user_id] = {
'permissions': permissions,
'granted_at': time.time()
}

self.storage.save(key, acl)

def revoke_access(self, resource_id: str, user_id: str):
"""撤销资源访问权限"""
key = f"acl:{resource_id}"
acl = self.storage.load(key) or {}

if user_id in acl:
del acl[user_id]
self.storage.save(key, acl)

def check_resource_access(self, resource_id: str, user_id: str,
action: str) -> bool:
"""检查资源访问权限"""
# 1. 检查资源所有者
owner = self.storage.load(f"owner:{resource_id}")
if owner == user_id:
return True

# 2. 检查 ACL
acl = self.storage.load(f"acl:{resource_id}") or {}
user_acl = acl.get(user_id, {})

return action in user_acl.get('permissions', [])

def set_owner(self, resource_id: str, user_id: str):
"""设置资源所有者"""
self.storage.save(f"owner:{resource_id}", user_id)

四、输入安全

4.1 输入验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import re
from typing import Any, Dict

class InputValidator:
"""输入验证器"""

@staticmethod
def validate_email(email: str) -> bool:
"""验证邮箱格式"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))

@staticmethod
def validate_phone(phone: str) -> bool:
"""验证手机号"""
pattern = r'^1[3-9]\d{9}$'
return bool(re.match(pattern, phone))

@staticmethod
def validate_file_path(path: str) -> bool:
"""验证文件路径(防止目录遍历)"""
# 检查是否包含 .. 或绝对路径
dangerous_patterns = ['..', '~', '//']
return not any(p in path for p in dangerous_patterns)

@staticmethod
def sanitize_html(html: str) -> str:
"""清理 HTML(防止 XSS)"""
import bleach

allowed_tags = ['p', 'br', 'strong', 'em', 'u', 'h1', 'h2', 'h3']
allowed_attrs = {}

return bleach.clean(html, tags=allowed_tags, attributes=allowed_attrs)

@staticmethod
def validate_json(data: str) -> Dict:
"""验证并解析 JSON"""
import json

try:
parsed = json.loads(data)
if not isinstance(parsed, dict):
raise ValueError("JSON 必须是对象")
return parsed
except json.JSONDecodeError as e:
raise ValueError(f"无效的 JSON: {e}")

@staticmethod
def limit_string_length(text: str, max_length: int = 10000) -> str:
"""限制字符串长度"""
if len(text) > max_length:
raise ValueError(f"输入超过最大长度 {max_length}")
return text

@staticmethod
def validate_sql_injection(text: str) -> bool:
"""检查 SQL 注入风险"""
dangerous_keywords = [
'SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP',
'UNION', 'EXEC', 'EXECUTE', 'SCRIPT'
]
upper_text = text.upper()
return not any(kw in upper_text for kw in dangerous_keywords)

4.2 命令注入防护

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import subprocess
import shlex

class SafeCommandExecutor:
"""安全命令执行器"""

ALLOWED_COMMANDS = {
'ls': ['ls', '-la'],
'cat': ['cat'],
'grep': ['grep'],
'wc': ['wc', '-l']
}

@classmethod
def execute(cls, command: str, args: list = None) -> str:
"""安全执行命令"""
if command not in cls.ALLOWED_COMMANDS:
raise ValueError(f"不允许的命令: {command}")

# 构建命令
cmd = cls.ALLOWED_COMMANDS[command].copy()

if args:
# 验证参数
for arg in args:
if not cls._is_safe_arg(arg):
raise ValueError(f"不安全的参数: {arg}")
cmd.extend(args)

# 执行命令
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
return result.stdout
except subprocess.TimeoutExpired:
raise TimeoutError("命令执行超时")

@staticmethod
def _is_safe_arg(arg: str) -> bool:
"""检查参数是否安全"""
dangerous_chars = [';', '&', '|', '`', '$', '(', ')', '{', '}']
return not any(c in arg for c in dangerous_chars)

五、数据安全

5.1 加密存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os

class DataEncryption:
"""数据加密"""

def __init__(self, master_key: str):
"""初始化加密器"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=os.urandom(16),
iterations=100000
)
key = base64.urlsafe_b64encode(kdf.derive(master_key.encode()))
self.cipher = Fernet(key)

def encrypt(self, data: str) -> str:
"""加密数据"""
return self.cipher.encrypt(data.encode()).decode()

def decrypt(self, encrypted: str) -> str:
"""解密数据"""
return self.cipher.decrypt(encrypted.encode()).decode()

def hash_sensitive(self, data: str) -> str:
"""哈希敏感数据(不可逆)"""
import hashlib
return hashlib.sha256(data.encode()).hexdigest()

class SecureDataStore:
"""安全数据存储"""

def __init__(self, storage, encryption: DataEncryption):
self.storage = storage
self.encryption = encryption

def save_sensitive(self, key: str, data: str):
"""安全保存敏感数据"""
encrypted = self.encryption.encrypt(data)
self.storage.save(f"enc:{key}", encrypted)

def load_sensitive(self, key: str) -> str:
"""加载敏感数据"""
encrypted = self.storage.load(f"enc:{key}")
if encrypted:
return self.encryption.decrypt(encrypted)
return None

def save_hash(self, key: str, data: str):
"""保存哈希值(用于验证)"""
hashed = self.encryption.hash_sensitive(data)
self.storage.save(f"hash:{key}", hashed)

def verify_hash(self, key: str, data: str) -> bool:
"""验证哈希值"""
stored_hash = self.storage.load(f"hash:{key}")
if not stored_hash:
return False

data_hash = self.encryption.hash_sensitive(data)
return stored_hash == data_hash

六、安全审计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class SecurityAuditor:
"""安全审计"""

def __init__(self, storage):
self.storage = storage

def log_security_event(self, event_type: str, user_id: str,
details: dict, risk_level: str = 'low'):
"""记录安全事件"""
event = {
'timestamp': time.time(),
'type': event_type,
'user_id': user_id,
'details': details,
'risk_level': risk_level
}

# 存储事件
date = datetime.now().strftime('%Y-%m-%d')
key = f"security:{date}"

events = self.storage.load(key) or []
events.append(event)
self.storage.save(key, events)

# 高风险事件立即告警
if risk_level == 'high':
self._alert_security_team(event)

def _alert_security_team(self, event: dict):
"""告警安全团队"""
# 发送告警通知
pass

def detect_anomaly(self, user_id: str, action: str) -> bool:
"""检测异常行为"""
# 检查短时间内大量操作
recent_actions = self._get_recent_actions(user_id, minutes=5)

if len(recent_actions) > 100: # 5分钟内超过100次操作
self.log_security_event(
'anomaly_detected',
user_id,
{'action': action, 'count': len(recent_actions)},
'high'
)
return True

# 检查异地登录
# 检查非常规时间操作
# ...

return False

def _get_recent_actions(self, user_id: str, minutes: int) -> list:
"""获取最近操作"""
# 实现查询逻辑
pass

七、实战练习

练习 1:认证系统实现

实现一个完整的用户认证系统,支持:

  • API Key 认证
  • JWT 令牌认证
  • 多因素认证

练习 2:权限控制实现

为一个文件管理 Skill 实现 RBAC 权限控制:

  • 角色定义和分配
  • 资源级权限控制
  • 权限检查装饰器

练习 3:输入验证

实现全面的输入验证机制:

  • 各种数据类型验证
  • SQL 注入防护
  • XSS 防护

八、下节预告

下一讲我们将学习 性能优化与监控,包括:

  • 响应时间优化
  • 资源使用优化
  • 缓存策略
  • 监控告警

加入学习群

👉 加入AI编程学习交流群

点击加入


本讲是《AI Skills 从入门到实践》系列课程的第20讲。

🎓 AI 编程实战课程

想系统学习 AI 编程?程序员晚枫的 AI 编程实战课 帮你从零上手!