Lecture 19: Data Persistence and State Management

Master Skill data persistence and state management techniques, achieve user session persistence, data storage, and history record management.

1. Why Data Persistence is Needed

1.1 Typical Scenarios

ScenarioRequirementsSolution
Multi-turn dialogueRemember information previously provided by userSession state management
User preferencesRemember user's settings and preferencesUser configuration storage
History recordsView previous operation recordsHistory data storage
Data analysisStatistics on Skill usageLogs and metrics storage
Breakpoint resumeResume interrupted long-time tasksTask state persistence

1.2 Data Type Classification

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Skill Data Types
├── Session data (temporary)
│ ├── Current dialogue context
│ ├── Temporary variables
│ └── Intermediate calculation results
├── User data (persistent)
│ ├── User configuration
│ ├── Usage preferences
│ └── Personal data
├── Global data (shared)
│ ├── Skill configuration
│ ├── Public knowledge base
│ └── Statistics data
└── Log data (audit)
├── Operation logs
├── Error logs
└── Performance metrics

2. Session State Management

2.1 Session Lifecycle

1
2
3
4
5
6
7
8
Session Lifecycle
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ Start │ → │ In Progress│ → │ Paused │ → │ End │
└─────────┘ └─────────┘ └─────────┘ └─────────┘
│ │ │ │
▼ ▼ ▼ ▼
Create session Update state Save state Clean up resources
Initialize data Record interactionWait for resume Archive data

2.2 Session Management Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import time
from typing import Dict, Any, Optional
from dataclasses import dataclass, field

@dataclass
class Session:
"""Session object"""
session_id: str
user_id: str
created_at: float = field(default_factory=time.time)
last_active: float = field(default_factory=time.time)
context: Dict[str, Any] = field(default_factory=dict)
history: list = field(default_factory=list)

def update(self, key: str, value: Any):
"""Update session data"""
self.context[key] = value
self.last_active = time.time()

def get(self, key: str, default=None) -> Any:
"""Get session data"""
return self.context.get(key, default)

def add_history(self, role: str, content: str):
"""Add dialogue history"""
self.history.append({
'role': role,
'content': content,
'timestamp': time.time()
})
self.last_active = time.time()

def is_expired(self, timeout: int = 3600) -> bool:
"""Check if session is expired"""
return time.time() - self.last_active > timeout

🎓 AI 编程实战课程

想系统学习 AI 编程?程序员晚枫的 AI 编程实战课 帮你从零上手!