第19讲:数据持久化与状态管理

第19讲:数据持久化与状态管理

掌握 Skill 的数据持久化和状态管理技巧,实现用户会话保持、数据存储和历史记录管理。

一、为什么需要数据持久化

1.1 典型场景

场景需求解决方案
多轮对话记住用户之前提供的信息会话状态管理
用户偏好记住用户的设置和偏好用户配置存储
历史记录查看之前的操作记录历史数据存储
数据分析统计 Skill 使用情况日志和指标存储
断点续传长时间任务的中断恢复任务状态持久化

1.2 数据类型分类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Skill 数据类型
├── 会话数据(临时)
│ ├── 当前对话上下文
│ ├── 临时变量
│ └── 中间计算结果
├── 用户数据(持久)
│ ├── 用户配置
│ ├── 使用偏好
│ └── 个人数据
├── 全局数据(共享)
│ ├── Skill 配置
│ ├── 公共知识库
│ └── 统计数据
└── 日志数据(审计)
├── 操作日志
├── 错误日志
└── 性能指标

二、会话状态管理

2.1 会话生命周期

1
2
3
4
5
6
7
8
会话生命周期
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ 开始 │ → │ 进行中 │ → │ 暂停 │ → │ 结束 │
└─────────┘ └─────────┘ └─────────┘ └─────────┘
│ │ │ │
▼ ▼ ▼ ▼
创建会话 更新状态 保存状态 清理资源
初始化数据 记录交互 等待恢复 归档数据

2.2 会话管理实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import time
from typing import Dict, Any, Optional
from dataclasses import dataclass, field

@dataclass
class Session:
"""会话对象"""
session_id: str
user_id: str
created_at: float = field(default_factory=time.time)
last_active: float = field(default_factory=time.time)
context: Dict[str, Any] = field(default_factory=dict)
history: list = field(default_factory=list)

def update(self, key: str, value: Any):
"""更新会话数据"""
self.context[key] = value
self.last_active = time.time()

def get(self, key: str, default=None) -> Any:
"""获取会话数据"""
return self.context.get(key, default)

def add_history(self, role: str, content: str):
"""添加对话历史"""
self.history.append({
'role': role,
'content': content,
'timestamp': time.time()
})
self.last_active = time.time()

def is_expired(self, timeout: int = 3600) -> bool:
"""检查会话是否过期"""
return time.time() - self.last_active > timeout

class SessionManager:
"""会话管理器"""

def __init__(self, storage=None, timeout=3600):
self.sessions: Dict[str, Session] = {}
self.storage = storage # 持久化存储
self.timeout = timeout

def create_session(self, user_id: str) -> Session:
"""创建新会话"""
import uuid
session_id = str(uuid.uuid4())

session = Session(
session_id=session_id,
user_id=user_id
)

self.sessions[session_id] = session

# 持久化
if self.storage:
self.storage.save(f"session:{session_id}", {
'session_id': session_id,
'user_id': user_id,
'created_at': session.created_at,
'context': {}
})

return session

def get_session(self, session_id: str) -> Optional[Session]:
"""获取会话"""
session = self.sessions.get(session_id)

if session and session.is_expired(self.timeout):
# 会话过期,清理
self.close_session(session_id)
return None

# 从存储恢复
if not session and self.storage:
data = self.storage.load(f"session:{session_id}")
if data:
session = Session(
session_id=data['session_id'],
user_id=data['user_id'],
created_at=data['created_at'],
context=data.get('context', {})
)
self.sessions[session_id] = session

return session

def close_session(self, session_id: str):
"""关闭会话"""
if session_id in self.sessions:
session = self.sessions[session_id]

# 归档历史记录
if self.storage and session.history:
self.storage.save(
f"history:{session_id}",
session.history
)

del self.sessions[session_id]

# 清理存储
if self.storage:
self.storage.delete(f"session:{session_id}")

def cleanup_expired(self):
"""清理过期会话"""
expired = [
sid for sid, session in self.sessions.items()
if session.is_expired(self.timeout)
]
for sid in expired:
self.close_session(sid)

2.3 多轮对话示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class MultiTurnSkill:
"""支持多轮对话的 Skill"""

def __init__(self):
self.session_manager = SessionManager()
self.intent_handlers = {
'book_flight': self.handle_book_flight,
'query_order': self.handle_query_order
}

def process(self, user_input: str, session_id: str = None, user_id: str = None) -> Dict:
"""处理用户输入"""
# 获取或创建会话
session = self.get_or_create_session(session_id, user_id)

# 记录用户输入
session.add_history('user', user_input)

# 检查是否有进行中的任务
current_task = session.get('current_task')

if current_task:
# 继续之前的任务
response = self.continue_task(session, user_input)
else:
# 识别新意图
intent = self.classify_intent(user_input)
handler = self.intent_handlers.get(intent)

if handler:
response = handler(session, user_input)
else:
response = "抱歉,我不理解您的意思。"

# 记录助手回复
session.add_history('assistant', response)

return {
'response': response,
'session_id': session.session_id
}

def handle_book_flight(self, session: Session, user_input: str) -> str:
"""处理机票预订(多轮对话示例)"""
step = session.get('booking_step', 'start')

if step == 'start':
session.update('current_task', 'book_flight')
session.update('booking_step', 'ask_origin')
return "请问您要从哪个城市出发?"

elif step == 'ask_origin':
session.update('origin', user_input)
session.update('booking_step', 'ask_destination')
return f"好的,从{user_input}出发。请问您要去哪个城市?"

elif step == 'ask_destination':
session.update('destination', user_input)
session.update('booking_step', 'ask_date')
origin = session.get('origin')
return f"好的,从{origin}{user_input}。请问您计划什么时候出发?"

elif step == 'ask_date':
session.update('date', user_input)
# 完成信息收集,执行预订
result = self.book_flight(
session.get('origin'),
session.get('destination'),
user_input
)
# 清理任务状态
session.update('current_task', None)
session.update('booking_step', None)
return f"预订成功!{result}"

三、用户数据存储

3.1 用户配置管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class UserPreferenceManager:
"""用户偏好管理"""

def __init__(self, storage):
self.storage = storage
self.default_preferences = {
'language': 'zh-CN',
'timezone': 'Asia/Shanghai',
'notification': True,
'theme': 'light',
'excel_format': {
'font': '微软雅黑',
'font_size': 11,
'header_color': '4472C4'
}
}

def get_preferences(self, user_id: str) -> Dict:
"""获取用户偏好"""
key = f"user:{user_id}:preferences"
prefs = self.storage.load(key)

if not prefs:
prefs = self.default_preferences.copy()
self.storage.save(key, prefs)

return prefs

def update_preference(self, user_id: str, key: str, value: Any):
"""更新用户偏好"""
prefs = self.get_preferences(user_id)

# 支持嵌套更新
keys = key.split('.')
target = prefs
for k in keys[:-1]:
if k not in target:
target[k] = {}
target = target[k]
target[keys[-1]] = value

self.storage.save(f"user:{user_id}:preferences", prefs)

def reset_preferences(self, user_id: str):
"""重置用户偏好"""
self.storage.save(
f"user:{user_id}:preferences",
self.default_preferences.copy()
)

3.2 用户数据安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import hashlib
from cryptography.fernet import Fernet

class SecureStorage:
"""安全存储"""

def __init__(self, encryption_key: str):
self.cipher = Fernet(encryption_key.encode())

def encrypt(self, data: str) -> str:
"""加密数据"""
return self.cipher.encrypt(data.encode()).decode()

def decrypt(self, encrypted: str) -> str:
"""解密数据"""
return self.cipher.decrypt(encrypted.encode()).decode()

def hash_id(self, user_id: str) -> str:
"""哈希用户 ID"""
return hashlib.sha256(user_id.encode()).hexdigest()[:16]

def save_sensitive(self, user_id: str, key: str, data: str):
"""保存敏感数据"""
hashed_id = self.hash_id(user_id)
encrypted = self.encrypt(data)
# 存储加密后的数据
storage.save(f"sensitive:{hashed_id}:{key}", encrypted)

def load_sensitive(self, user_id: str, key: str) -> str:
"""加载敏感数据"""
hashed_id = self.hash_id(user_id)
encrypted = storage.load(f"sensitive:{hashed_id}:{key}")
return self.decrypt(encrypted) if encrypted else None

四、日志与监控

4.1 操作日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import logging
import json
from datetime import datetime

class SkillLogger:
"""Skill 日志记录器"""

def __init__(self, skill_name: str, storage=None):
self.skill_name = skill_name
self.storage = storage

# 配置日志
self.logger = logging.getLogger(skill_name)
self.logger.setLevel(logging.INFO)

def log_interaction(self, user_id: str, session_id: str,
input_text: str, output_text: str,
intent: str = None, duration_ms: int = None):
"""记录交互日志"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'skill': self.skill_name,
'user_id': user_id,
'session_id': session_id,
'input': input_text[:1000], # 限制长度
'output': output_text[:1000],
'intent': intent,
'duration_ms': duration_ms
}

self.logger.info(json.dumps(log_entry, ensure_ascii=False))

# 持久化存储
if self.storage:
self._save_to_storage(log_entry)

def log_error(self, user_id: str, error: Exception, context: Dict = None):
"""记录错误日志"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'skill': self.skill_name,
'user_id': user_id,
'error_type': type(error).__name__,
'error_message': str(error),
'context': context
}

self.logger.error(json.dumps(log_entry, ensure_ascii=False))

def _save_to_storage(self, log_entry: Dict):
"""保存到存储"""
date = datetime.now().strftime('%Y-%m-%d')
key = f"logs:{self.skill_name}:{date}"

# 追加到当日日志
existing = self.storage.load(key) or []
existing.append(log_entry)
self.storage.save(key, existing)

4.2 使用统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class UsageAnalytics:
"""使用统计分析"""

def __init__(self, storage):
self.storage = storage

def record_usage(self, skill_name: str, user_id: str,
intent: str, success: bool):
"""记录使用情况"""
today = datetime.now().strftime('%Y-%m-%d')

# 日统计
daily_key = f"stats:{skill_name}:daily:{today}"
daily_stats = self.storage.load(daily_key) or {
'total_calls': 0,
'unique_users': set(),
'intents': {},
'success_count': 0
}

daily_stats['total_calls'] += 1
daily_stats['unique_users'].add(user_id)
daily_stats['intents'][intent] = daily_stats['intents'].get(intent, 0) + 1
if success:
daily_stats['success_count'] += 1

self.storage.save(daily_key, daily_stats)

def get_daily_report(self, skill_name: str, date: str = None) -> Dict:
"""获取日报"""
if not date:
date = datetime.now().strftime('%Y-%m-%d')

key = f"stats:{skill_name}:daily:{date}"
stats = self.storage.load(key)

if not stats:
return None

return {
'date': date,
'total_calls': stats['total_calls'],
'unique_users': len(stats['unique_users']),
'success_rate': stats['success_count'] / stats['total_calls'],
'intent_distribution': stats['intents']
}

五、实战练习

练习 1:会话管理实现

为一个客服 Skill 实现完整的会话管理功能,支持:

  • 会话创建和恢复
  • 多轮对话状态保持
  • 会话过期清理

练习 2:用户偏好系统

实现一个用户偏好管理系统,支持:

  • 偏好设置存储
  • 默认配置管理
  • 偏好继承和覆盖

练习 3:日志分析

实现一个日志分析工具,可以:

  • 统计每日调用量
  • 分析热门意图
  • 计算成功率

六、下节预告

下一讲我们将学习 安全与权限管理,包括:

  • 身份认证
  • 权限控制
  • 数据加密
  • 安全审计

加入学习群

👉 加入AI编程学习交流群

点击加入


本讲是《AI Skills 从入门到实践》系列课程的第19讲。

🎓 AI 编程实战课程

想系统学习 AI 编程?程序员晚枫的 AI 编程实战课 帮你从零上手!