github star gitee star atomgit star PyPI Downloads AI 编程 AI 交流群

大家好,我是正在实战各种AI项目的程序员晚枫。

欢迎来到第三个实战项目!这次我们要解决一个让供应链同学头疼的问题:库存管理

库存太多,资金积压、仓储成本高;库存太少,断货丢单、客户流失。怎么找到平衡点?答案是:用数据预测未来销量,科学备货。


项目背景

需求场景

你是某零售公司的供应链分析师,面临这些痛点:

  1. 热销品经常断货,损失销售额
  2. 滞销品堆积如山,占用资金和仓库
  3. 采购凭经验拍脑袋,没有数据支撑

目标产出

建立销量预测模型,输出补货建议和安全库存策略。


准备数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# 生成模拟销售数据(带季节性和趋势)
np.random.seed(42)

def generate_sales_data(start_date='2022-01-01', periods=730):
"""生成带趋势和季节性的销售数据"""
dates = pd.date_range(start=start_date, periods=periods, freq='D')

# 基础销量
base_sales = 100

# 趋势:逐年增长15%
trend = np.linspace(0, 50, periods)

# 季节性:周末高、工作日低;夏季高、冬季低
seasonal = []
for i, date in enumerate(dates):
# 星期效应(周末+30%)
weekend_boost = 30 if date.weekday() >= 5 else 0
# 月份效应(夏季+20%,冬季-10%)
month_factor = {6: 20, 7: 25, 8: 20, 12: -10, 1: -15}.get(date.month, 0)
seasonal.append(weekend_boost + month_factor)

# 随机波动
noise = np.random.normal(0, 15, periods)

# 促销 spikes(每月一次)
promotions = np.zeros(periods)
promo_days = np.random.choice(periods, size=24, replace=False)
promotions[promo_days] = np.random.randint(50, 150, size=24)

# 组合所有因素
sales = base_sales + trend + np.array(seasonal) + noise + promotions
sales = np.maximum(sales, 10).astype(int) # 确保非负

df = pd.DataFrame({
'日期': dates,
'销量': sales,
'是否促销': promotions > 0
})

return df

# 生成数据
df = generate_sales_data()
df.to_csv('daily_sales.csv', index=False, encoding='utf-8-sig')
print(f"生成了 {len(df)} 天的销售数据")
print(f"平均日销量: {df['销量'].mean():.0f}")
print(f"总销量: {df['销量'].sum():,}")

数据探索与特征工程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class InventoryAnalyzer:
def __init__(self, data_path):
self.df = pd.read_csv(data_path, encoding='utf-8-sig')
self.df['日期'] = pd.to_datetime(self.df['日期'])
self.df.set_index('日期', inplace=True)
print(f"加载了 {len(self.df)} 天的数据")

def create_features(self):
"""创建时间特征"""
df = self.df.copy()

# 基础时间特征
df['年'] = df.index.year
df['月'] = df.index.month
df['日'] = df.index.day
df['星期'] = df.index.weekday
df['是否周末'] = (df['星期'] >= 5).astype(int)
df['季度'] = df.index.quarter
df['一年中的第几天'] = df.index.dayofyear

# 滞后特征(过去N天的销量)
for lag in [1, 7, 14, 30]:
df[f'销量_lag_{lag}'] = df['销量'].shift(lag)

# 滚动统计特征
for window in [7, 14, 30]:
df[f'销量_ma_{window}'] = df['销量'].shift(1).rolling(window=window).mean()
df[f'销量_std_{window}'] = df['销量'].shift(1).rolling(window=window).std()

# 同比环比
df['销量_同比'] = df['销量'].shift(365)
df['销量_环比'] = df['销量'].shift(30)

self.df_features = df.dropna()
return self.df_features

def analyze_patterns(self):
"""分析销售模式"""
print("\n【销售模式分析】")

# 按星期分析
weekday_avg = self.df.groupby(self.df.index.weekday)['销量'].mean()
print("\n星期平均销量:")
days = ['周一', '周二', '周三', '周四', '周五', '周六', '周日']
for i, day in enumerate(days):
print(f" {day}: {weekday_avg[i]:.0f}")

# 按月分析
monthly_avg = self.df.groupby(self.df.index.month)['销量'].mean()
print("\n月度平均销量:")
for month, avg in monthly_avg.items():
print(f" {month}月: {avg:.0f}")

# 促销效果
promo_effect = self.df.groupby('是否促销')['销量'].mean()
print(f"\n促销效果:")
print(f" 非促销日: {promo_effect[False]:.0f}")
print(f" 促销日: {promo_effect[True]:.0f}")
print(f" 提升幅度: {(promo_effect[True]/promo_effect[False]-1)*100:.1f}%")

销量预测模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, mean_squared_error
import warnings
warnings.filterwarnings('ignore')

class SalesForecaster:
def __init__(self, analyzer):
self.analyzer = analyzer
self.models = {}

def prepare_data(self, forecast_days=30):
"""准备训练和测试数据"""
df = self.analyzer.df_features.copy()

# 特征列(排除原始销量和目标变量)
feature_cols = [col for col in df.columns if col not in ['销量', '是否促销']]

# 划分训练集和测试集
train_size = len(df) - forecast_days
self.train_df = df.iloc[:train_size]
self.test_df = df.iloc[train_size:]

self.X_train = self.train_df[feature_cols]
self.y_train = self.train_df['销量']
self.X_test = self.test_df[feature_cols]
self.y_test = self.test_df['销量']
self.feature_cols = feature_cols

print(f"训练集: {len(self.train_df)} 天")
print(f"测试集: {len(self.test_df)} 天")

return self.X_train, self.y_train, self.X_test, self.y_test

def train_models(self):
"""训练多个模型"""
# 线性回归(基线)
lr = LinearRegression()
lr.fit(self.X_train, self.y_train)
self.models['LinearRegression'] = lr

# 随机森林
rf = RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1)
rf.fit(self.X_train, self.y_train)
self.models['RandomForest'] = rf

print("\n模型训练完成")
return self.models

def evaluate(self):
"""评估模型性能"""
results = {}

print("\n【模型评估结果】")
print("-" * 50)

for name, model in self.models.items():
y_pred = model.predict(self.X_test)

mae = mean_absolute_error(self.y_test, y_pred)
rmse = np.sqrt(mean_squared_error(self.y_test, y_pred))
mape = np.mean(np.abs((self.y_test - y_pred) / self.y_test)) * 100

results[name] = {
'MAE': mae,
'RMSE': rmse,
'MAPE': mape,
'predictions': y_pred
}

print(f"\n{name}:")
print(f" MAE: {mae:.2f}")
print(f" RMSE: {rmse:.2f}")
print(f" MAPE: {mape:.2f}%")

self.results = results
return results

def get_feature_importance(self):
"""获取特征重要性(仅随机森林)"""
if 'RandomForest' in self.models:
rf = self.models['RandomForest']
importance = pd.DataFrame({
'feature': self.feature_cols,
'importance': rf.feature_importances_
}).sort_values('importance', ascending=False)

print("\n【特征重要性 Top 10】")
print(importance.head(10).to_string(index=False))
return importance

可视化分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import matplotlib.pyplot as plt

class InventoryVisualizer:
def __init__(self, analyzer, forecaster):
self.analyzer = analyzer
self.forecaster = forecaster
plt.style.use('seaborn-v0_8')

def plot_sales_trend(self):
"""销量趋势图"""
fig, axes = plt.subplots(2, 1, figsize=(14, 8))

# 完整趋势
axes[0].plot(self.analyzer.df.index, self.analyzer.df['销量'],
alpha=0.7, linewidth=0.8)
axes[0].set_title('Daily Sales Trend (Full History)', fontsize=14)
axes[0].set_ylabel('Sales Volume')
axes[0].grid(True, alpha=0.3)

# 最近90天细节
recent = self.analyzer.df.tail(90)
axes[1].plot(recent.index, recent['销量'], marker='o', markersize=3)
axes[1].set_title('Daily Sales Trend (Last 90 Days)', fontsize=14)
axes[1].set_ylabel('Sales Volume')
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('sales_trend.png', dpi=300, bbox_inches='tight')
plt.show()

def plot_seasonal_patterns(self):
"""季节性模式图"""
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# 星期模式
weekday_avg = self.analyzer.df.groupby(self.analyzer.df.index.weekday)['销量'].mean()
days = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
bars1 = axes[0].bar(days, weekday_avg.values, color='steelblue')
axes[0].set_title('Average Sales by Weekday', fontsize=14)
axes[0].set_ylabel('Average Sales')
for bar in bars1:
height = bar.get_height()
axes[0].text(bar.get_x() + bar.get_width()/2., height,
f'{height:.0f}', ha='center', va='bottom')

# 月度模式
monthly_avg = self.analyzer.df.groupby(self.analyzer.df.index.month)['销量'].mean()
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
bars2 = axes[1].bar(months, monthly_avg.values, color='coral')
axes[1].set_title('Average Sales by Month', fontsize=14)
axes[1].set_ylabel('Average Sales')
axes[1].tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.savefig('seasonal_patterns.png', dpi=300, bbox_inches='tight')
plt.show()

def plot_forecast_comparison(self):
"""预测对比图"""
test_dates = self.forecaster.test_df.index
actual = self.forecaster.y_test.values

plt.figure(figsize=(14, 6))
plt.plot(test_dates, actual, label='Actual', marker='o', linewidth=2)

colors = ['orange', 'green']
for (name, result), color in zip(self.forecaster.results.items(), colors):
plt.plot(test_dates, result['predictions'],
label=f'{name} (MAPE: {result["MAPE"]:.1f}%)',
marker='s', alpha=0.7, color=color)

plt.title('Sales Forecast Comparison', fontsize=16)
plt.xlabel('Date')
plt.ylabel('Sales Volume')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('forecast_comparison.png', dpi=300, bbox_inches='tight')
plt.show()

库存策略建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def generate_inventory_strategy(analyzer, forecaster, current_stock=500, lead_time=7):
"""
生成库存策略建议

参数:
current_stock: 当前库存量
lead_time: 补货提前期(天)
"""
# 使用最佳模型的预测结果
best_model = min(forecaster.results.items(), key=lambda x: x[1]['MAPE'])
predictions = best_model[1]['predictions']

# 计算关键指标
historical_sales = analyzer.df['销量']
avg_daily_sales = historical_sales.mean()
std_daily_sales = historical_sales.std()
max_daily_sales = historical_sales.max()

# 预测期总需求
forecast_total = predictions.sum()
forecast_avg = predictions.mean()

print("="*60)
print("【智能库存分析报告】")
print("="*60)

print(f"\n📊 历史销售概况:")
print(f" 平均日销量: {avg_daily_sales:.0f}")
print(f" 销量标准差: {std_daily_sales:.0f}")
print(f" 最高日销量: {max_daily_sales}")
print(f" 变异系数(CV): {std_daily_sales/avg_daily_sales:.2f}")

print(f"\n📈 未来{len(predictions)}天预测:")
print(f" 预测总需求: {forecast_total:.0f}")
print(f" 预测日均: {forecast_avg:.0f}")
print(f" 预测区间: {predictions.min():.0f} - {predictions.max():.0f}")

# 安全库存计算(95%服务水平)
z_score = 1.65 # 95%置信度
safety_stock = z_score * std_daily_sales * np.sqrt(lead_time)
reorder_point = forecast_avg * lead_time + safety_stock

print(f"\n📦 库存策略建议:")
print(f" 当前库存: {current_stock}")
print(f" 补货提前期: {lead_time}天")
print(f" 安全库存: {safety_stock:.0f} (95%服务水平)")
print(f" 再订货点: {reorder_point:.0f}")

# 库存状态判断
days_of_supply = current_stock / forecast_avg
print(f"\n⚠️ 库存健康度:")
print(f" 可供应天数: {days_of_supply:.1f}天")

if current_stock < reorder_point:
shortage = reorder_point - current_stock
print(f" 状态: 🔴 库存不足")
print(f" 建议: 立即补货 {shortage:.0f} 件")
elif days_of_supply > 30:
excess = current_stock - reorder_point
print(f" 状态: 🟡 库存偏高")
print(f" 建议: 暂停补货,考虑促销清库存")
else:
print(f" 状态: 🟢 库存健康")
print(f" 建议: 维持当前策略,持续监控")

# 补货计划
print(f"\n📝 未来7天补货计划:")
cumulative_demand = 0
for i, pred in enumerate(predictions[:7], 1):
cumulative_demand += pred
remaining = current_stock - cumulative_demand
status = "✅ 充足" if remaining > safety_stock else "⚠️ 需关注" if remaining > 0 else "🔴 缺货"
print(f" Day{i}: 预测需求{pred:.0f}, 预计剩余{remaining:.0f} {status}")

return {
'safety_stock': safety_stock,
'reorder_point': reorder_point,
'days_of_supply': days_of_supply,
'forecast': predictions
}

主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def main():
"""主程序"""
print("="*60)
print("库存分析与销量预测系统")
print("="*60)

# 1. 加载和分析数据
print("\n[1/5] 加载销售数据...")
analyzer = InventoryAnalyzer('daily_sales.csv')

print("\n[2/5] 特征工程...")
analyzer.create_features()
analyzer.analyze_patterns()

# 2. 训练预测模型
print("\n[3/5] 训练预测模型...")
forecaster = SalesForecaster(analyzer)
forecaster.prepare_data(forecast_days=30)
forecaster.train_models()
forecaster.evaluate()
forecaster.get_feature_importance()

# 3. 可视化
print("\n[4/5] 生成可视化图表...")
visualizer = InventoryVisualizer(analyzer, forecaster)
visualizer.plot_sales_trend()
visualizer.plot_seasonal_patterns()
visualizer.plot_forecast_comparison()

# 4. 库存策略
print("\n[5/5] 生成库存策略...")
strategy = generate_inventory_strategy(analyzer, forecaster,
current_stock=800, lead_time=7)

# 导出预测结果
forecast_df = pd.DataFrame({
'日期': forecaster.test_df.index,
'实际销量': forecaster.y_test.values,
'预测销量': forecaster.results['RandomForest']['predictions'].round(0)
})
forecast_df.to_csv('sales_forecast.csv', index=False, encoding='utf-8-sig')
print("\n ✓ 预测结果已保存: sales_forecast.csv")

print("\n" + "="*60)
print("分析完成!")
print("="*60)

if __name__ == "__main__":
main()

项目总结

学到的技能

  • ✅ 时间序列特征工程
  • ✅ 销量预测建模
  • ✅ 安全库存计算
  • ✅ 库存健康度评估

核心公式

指标公式说明
安全库存Z × σ × √LZ是服务水平系数,σ是需求标准差,L是提前期
再订货点日均需求×L + 安全库存低于此值就触发补货
库存周转销售成本 / 平均库存越高越好

扩展方向

  • ARIMA/SARIMA等时序模型
  • Prophet处理节假日效应
  • 多SKU联合优化
  • 动态定价策略

进阶:简单销量预测方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 方法1:移动平均预测
def moving_avg_forecast(data, window=7, horizon=14):
last_avg = data[-window:].mean()
forecast = [last_avg] * horizon
return forecast

# 方法2:指数平滑
from statsmodels.tsa.holtwinters import ExponentialSmoothing

model = ExponentialSmoothing(daily_sales, trend='add', seasonal='add', seasonal_periods=7)
fitted = model.fit()
forecast = fitted.forecast(14)

# 方法3:Prophet(Facebook开源)
from prophet import Prophet

df_prophet = pd.DataFrame({'ds': dates, 'y': sales})
model = Prophet(yearly_seasonality=True, weekly_seasonality=True)
model.fit(df_prophet)
future = model.make_future_dataframe(periods=30)
forecast = model.predict(future)

安全库存计算

1
2
3
4
5
6
7
8
9
10
11
12
13
# 安全库存 = Z × σ × √L
import scipy.stats as stats

service_level = 0.95 # 95%服务水平
z_score = stats.norm.ppf(service_level) # 约1.65
lead_time = 7 # 补货周期7天
daily_std = daily_sales.std() # 日销量标准差

safety_stock = z_score * daily_std * np.sqrt(lead_time)
reorder_point = daily_sales.mean() * lead_time + safety_stock

print(f"安全库存: {safety_stock:.0f}件")
print(f"补货点: {reorder_point:.0f}件")

避坑指南

❌ 坑1:忽略季节性

1
2
3
4
5
6
# 错误:直接用全年平均来预测
avg_daily = sales.mean() # 忽略了季节波动

# 正确:考虑季节因子
monthly_avg = sales.groupby(sales.index.month).mean()
seasonal_factor = monthly_avg / monthly_avg.mean()

ABC分类法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# ABC库存分类
import pandas as pd
import numpy as np

# 模拟商品数据
np.random.seed(42)
products = pd.DataFrame({
'product_id': range(1, 501),
'annual_sales': np.random.exponential(10000, 500).round(0),
'unit_cost': np.random.uniform(10, 500, 500).round(2),
'stock': np.random.randint(0, 1000, 500)
})

# 计算年度销售额
products['annual_revenue'] = products['annual_sales'] * products['unit_cost']

# 按销售额降序排序
products = products.sort_values('annual_revenue', ascending=False)

# 计算累计占比
products['cum_revenue'] = products['annual_revenue'].cumsum()
products['cum_pct'] = products['cum_revenue'] / products['annual_revenue'].sum()

# ABC分类
products['abc_class'] = pd.cut(
products['cum_pct'],
bins=[0, 0.8, 0.95, 1.0],
labels=['A', 'B', 'C']
)

# 各类统计
abc_summary = products.groupby('abc_class').agg(
sku_count=('product_id', 'count'),
revenue=('annual_revenue', 'sum'),
avg_stock=('stock', 'mean')
)
abc_summary['sku_pct'] = abc_summary['sku_count'] / abc_summary['sku_count'].sum() * 100
abc_summary['revenue_pct'] = abc_summary['revenue'] / abc_summary['revenue'].sum() * 100

print("=== ABC分类结果 ===")
print(abc_summary.round(1))
# A类:20%的SKU贡献80%的收入 → 重点管理
# B类:30%的SKU贡献15%的收入 → 正常管理
# C类:50%的SKU贡献5%的收入 → 简化管理

库存预警系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def inventory_alert(products, lead_time=7, service_level=0.95):
'''库存预警系统'''
from scipy import stats

alerts = []
z = stats.norm.ppf(service_level)

for _, row in products.iterrows():
daily_demand = row['annual_sales'] / 365
safety_stock = z * np.sqrt(lead_time) * daily_demand * 0.3 # 假设变异系数30%
reorder_point = daily_demand * lead_time + safety_stock

if row['stock'] < safety_stock:
alerts.append({
'product_id': row['product_id'],
'current_stock': row['stock'],
'safety_stock': int(safety_stock),
'reorder_point': int(reorder_point),
'status': '🔴 缺货风险'
})
elif row['stock'] < reorder_point:
alerts.append({
'product_id': row['product_id'],
'current_stock': row['stock'],
'safety_stock': int(safety_stock),
'reorder_point': int(reorder_point),
'status': '🟡 需补货'
})

alert_df = pd.DataFrame(alerts)
if len(alert_df) > 0:
print(f"⚠️ 库存预警:{len(alert_df)}个商品需要关注")
print(alert_df)
else:
print("✅ 所有商品库存正常")

return alert_df

alerts = inventory_alert(products)

库存管理KPI

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 库存管理核心KPI计算
def inventory_kpi(products, sales, period_days=365):
'''计算库存管理KPI'''

# 1. 库存周转率
total_cogs = sales['cost'].sum()
avg_inventory = products['stock_value'].mean()
turnover_rate = total_cogs / avg_inventory if avg_inventory > 0 else 0

# 2. 库存周转天数
turnover_days = period_days / turnover_rate if turnover_rate > 0 else float('inf')

# 3. 缺货率
total_demand = sales['quantity'].sum()
stockout = sales[sales['fulfilled'] == False]['quantity'].sum()
stockout_rate = stockout / total_demand if total_demand > 0 else 0

# 4. 库存准确率
accuracy = 1 - abs(products['system_qty'] - products['actual_qty']).sum() / products['system_qty'].sum()

print("=== 库存管理KPI ===")
print(f"库存周转率: {turnover_rate:.2f}次/年")
print(f"库存周转天数: {turnover_days:.0f}天")
print(f"缺货率: {stockout_rate*100:.1f}%")
print(f"库存准确率: {accuracy*100:.1f}%")

return {
'turnover_rate': turnover_rate,
'turnover_days': turnover_days,
'stockout_rate': stockout_rate,
'accuracy': accuracy
}

库存分析完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import pandas as pd
import numpy as np
from scipy import stats

np.random.seed(42)
n = 500

# 模拟商品数据
products = pd.DataFrame({
'product_id': range(1, n+1),
'name': [f'商品{i:04d}' for i in range(1, n+1)],
'category': np.random.choice(['电子产品', '服装', '食品', '家居', '美妆'], n),
'price': np.random.uniform(20, 2000, n).round(2),
'cost': np.random.uniform(10, 1000, n).round(2),
'stock': np.random.randint(0, 500, n),
'daily_sales': np.random.poisson(10, n)
})

# ===== ABC分类 =====
products['revenue'] = products['daily_sales'] * products['price'] * 365
products = products.sort_values('revenue', ascending=False)
products['cum_revenue'] = products['revenue'].cumsum()
products['cum_pct'] = products['cum_revenue'] / products['revenue'].sum()
products['abc'] = pd.cut(products['cum_pct'], bins=[0, 0.8, 0.95, 1.0], labels=['A', 'B', 'C'])

print('=== ABC分类结果 ===')
abc_summary = products.groupby('abc').agg(
商品数=('product_id', 'count'),
收入=('revenue', 'sum')
)
abc_summary['商品占比'] = (abc_summary['商品数'] / n * 100).round(1)
abc_summary['收入占比'] = (abc_summary['收入'] / abc_summary['收入'].sum() * 100).round(1)
print(abc_summary)

# ===== 安全库存 =====
z = stats.norm.ppf(0.95) # 95%服务水平
products['safety_stock'] = (z * products['daily_sales'].std() * np.sqrt(7)).astype(int)
products['reorder_point'] = (products['daily_sales'].mean() * 7 + products['safety_stock']).astype(int)

# ===== 库存预警 =====
alert = products[products['stock'] < products['safety_stock']]
print(f'\n⚠️ 缺货风险商品: {len(alert)}个')
print(alert[['name', 'stock', 'safety_stock', 'reorder_point']].head(10))

库存优化完整方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import pandas as pd
import numpy as np
from scipy import stats

np.random.seed(42)
n = 500

# 商品基础数据
products = pd.DataFrame({
'product_id': range(1, n+1),
'name': [f'商品{i:04d}' for i in range(1, n+1)],
'category': np.random.choice(['电子产品', '服装', '食品', '家居'], n),
'unit_price': np.random.uniform(20, 2000, n).round(2),
'unit_cost': np.random.uniform(10, 1000, n).round(2),
'stock': np.random.randint(0, 500, n),
'lead_time_days': np.random.choice([3, 5, 7, 14, 21], n)
})

# 模拟30天销量
daily_sales = pd.DataFrame({
'product_id': np.repeat(range(1, n+1), 30),
'day': np.tile(range(30), n),
'quantity': np.random.poisson(5, n*30)
})

# ===== 1. ABC分类 =====
products['annual_revenue'] = daily_sales.groupby('product_id')['quantity'].sum().values * products['unit_price']
products = products.sort_values('annual_revenue', ascending=False)
products['cum_rev'] = products['annual_revenue'].cumsum()
products['cum_pct'] = products['cum_rev'] / products['annual_revenue'].sum()
products['abc'] = pd.cut(products['cum_pct'], [0, 0.8, 0.95, 1.0], labels=['A', 'B', 'C'])

# ===== 2. 安全库存计算 =====
daily_stats = daily_sales.groupby('product_id')['quantity'].agg(['mean', 'std'])
products = products.merge(daily_stats, on='product_id', how='left')

z_score = stats.norm.ppf(0.95) # 95%服务水平
products['safety_stock'] = (z_score * products['std'] * np.sqrt(products['lead_time_days'])).fillna(0).astype(int)
products['reorder_point'] = (products['mean'] * products['lead_time_days'] + products['safety_stock']).fillna(0).astype(int)
products['eoq'] = np.sqrt(2 * products['annual_revenue'] / (products['unit_cost'] * 0.2)).fillna(0).astype(int) # 经济订货量

# ===== 3. 库存健康度 =====
products['stock_days'] = (products['stock'] / products['mean']).fillna(0).round(0)
products['status'] = np.where(products['stock'] < products['safety_stock'], '缺货风险',
np.where(products['stock_days'] < 7, '需补货',
np.where(products['stock_days'] > 90, '库存积压', '正常')))

print('=== 库存健康度报告 ===')
print(products['status'].value_counts())

# ===== 4. 按ABC分类的库存策略 =====
for abc_class in ['A', 'B', 'C']:
group = products[products['abc'] == abc_class]
print(f'\n{abc_class}类商品:')
print(f' 商品数: {len(group)}')
print(f' 平均库存天数: {group["stock_days"].mean():.0f}天')
print(f' 缺货风险数: {(group["status"]=="缺货风险").sum()}')

销量预测方法对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from sklearn.metrics import mean_absolute_error

# 取一个商品的历史销量
product_sales = daily_sales[daily_sales['product_id'] == 1]
ts = product_sales.set_index('day')['quantity']

# 划分训练集和测试集
train = ts[:-7]
test = ts[-7:]

# 方法1:简单移动平均
ma_forecast = train.rolling(7).mean().iloc[-1]
ma_forecast = np.full(7, ma_forecast)

# 方法2:指数平滑
model = ExponentialSmoothing(train, trend='add', seasonal=None)
fitted = model.fit()
es_forecast = fitted.forecast(7)

# 方法3:Holt-Winters
model_hw = ExponentialSmoothing(train, trend='add', seasonal='add', seasonal_periods=7)
fitted_hw = model_hw.fit()
hw_forecast = fitted_hw.forecast(7)

# 评估
print('=== 预测方法对比 ===')
print(f'MAE - 移动平均: {mean_absolute_error(test, ma_forecast):.2f}')
print(f'MAE - 指数平滑: {mean_absolute_error(test, es_forecast):.2f}')
print(f'MAE - Holt-Winters: {mean_absolute_error(test, hw_forecast):.2f}')

下节预告

下一项目是竞品价格监控,学习如何抓取竞品数据、进行价格分析和制定定价策略。

👉 继续阅读:项目4-竞品价格监控与分析


💬 加入学习交流群

扫码加入Python学习交流群,和数千名同学一起进步:

👉 点击加入交流群

群里不定期分享:

  • 数据分析实战案例
  • Python学习资料
  • 求职面试经验
  • 行业最新动态

推荐:AI Python数据分析实战营

🎁 限时福利:送《利用Python进行数据分析》实体书

👉 点击了解详情


课程导航

上一篇: 项目2-用户行为分析与RFM模型

下一篇: 项目4-竞品价格监控与分析


PS:库存管理是供应链的核心。掌握预测方法,你就掌握了降本增效的钥匙。



📚 推荐教材

主教材《Excel+Python 飞速搞定数据分析与处理(图灵出品)》

💬 联系我

平台账号/链接
微信扫码加好友
微博@程序员晚枫
知乎@程序员晚枫
抖音@程序员晚枫
小红书@程序员晚枫
B 站Python 自动化办公社区

主营业务:AI 编程培训、企业内训、技术咨询

🎓 AI 编程实战课程

想系统学习 AI 编程?程序员晚枫的 AI 编程实战课 帮你从零上手!