蓄水池采样(Reservoir Sampling)算法
蓄水池采样是一种用于解决流式数据等概率随机抽样问题的算法。其核心目标是:在数据总量未知(或非常大)的情况下,仅通过一次遍历,以等概率抽取固定数量的样本。例如,从海量日志中随机抽取1000条记录进行分析,而日志是持续生成的,无法先存储全部数据再抽样。
问题描述
给定一个数据流,其长度 \(N\) 可能非常大或未知,要求设计一个算法,仅遍历一次数据,最终以 \(\frac{m}{N}\) 的概率(等概率)保留 \(m\) 个样本(\(m \le N\))。
算法思路演进
步骤1:最简单情况(m = 1)
当只需保留1个样本时,算法应保证每个元素被选中的概率均为 \(\frac{1}{N}\)。
- 初始化:选择第一个元素作为候选样本。
- 遍历第 \(i\) 个元素(\(i \ge 2\))时,以概率 \(\frac{1}{i}\) 用当前元素替换候选样本。
- 遍历结束后,候选样本即为随机抽样结果。
证明:
- 第 \(k\) 个元素最终被选中的概率 = 第 \(k\) 步被选中 × 后续所有步骤不被替换。
\[ P(k) = \frac{1}{k} \times \left(1 - \frac{1}{k+1}\right) \times \cdots \times \left(1 - \frac{1}{N}\right) = \frac{1}{k} \times \frac{k}{k+1} \times \cdots \times \frac{N-1}{N} = \frac{1}{N} \]
因此每个元素被选中的概率相等。
步骤2:推广到一般情况(m ≥ 1)
现在需要保留 \(m\) 个样本,称为“蓄水池”。
- 初始化:将数据流的前 \(m\) 个元素直接放入蓄水池。
- 遍历后续元素:
对于第 \(i\) 个元素(\(i > m\)):- 以概率 \(\frac{m}{i}\) 决定是否将其放入蓄水池。
- 若放入,则从蓄水池中随机等概率选择一个元素替换掉。
- 遍历结束后,蓄水池中的 \(m\) 个元素即为随机样本。
详细执行过程示例
设 \(m = 3\),数据流为 \([a, b, c, d, e, f]\)(N=6)。
- 初始化蓄水池:\([a, b, c]\)。
- 处理 \(d\)(i=4):
- 生成随机数 \(r \in [1, 4]\),若 \(r \le 3\) 则保留 \(d\)。假设命中,随机选择蓄水池中一个位置(如第2个)替换为 \(d\) → 蓄水池变为 \([a, d, c]\)。
- 处理 \(e\)(i=5):
- 以概率 \(\frac{3}{5}\) 决定是否保留。若保留,随机替换一个位置。
- 处理 \(f\)(i=6):同理。
最终蓄水池中的每个元素被选中的概率均为 \(\frac{3}{6} = \frac{1}{2}\)。
概率正确性证明
关键点:证明任意元素 \(x_k\)(第 \(k\) 个到达)最终出现在蓄水池中的概率为 \(\frac{m}{N}\)。
- 情况1:\(k \le m\)
初始时 \(x_k\) 在蓄水池中。
在后续步骤 \(i\)(\(i > m\))中,\(x_k\) 被替换的概率 = (第 \(i\) 个元素被保留的概率)×(在 \(m\) 个位置中正好选到 \(x_k\) 的概率)
\[ P_{\text{替换}} = \frac{m}{i} \times \frac{1}{m} = \frac{1}{i} \]
因此 \(x_k\) 不被替换的概率为 \(1 - \frac{1}{i}\)。
从 \(i = m+1\) 到 \(N\),累乘得:
\[ P(k) = 1 \times \prod_{i=m+1}^{N} \left(1 - \frac{1}{i}\right) = \frac{m}{N} \]
因为 \(\prod_{i=m+1}^{N} \frac{i-1}{i} = \frac{m}{N}\)。
- 情况2:\(k > m\)
\(x_k\) 需要在第 \(k\) 步被选中(概率 \(\frac{m}{k}\)),且之后不被替换(同情况1的推导):
\[ P(k) = \frac{m}{k} \times \prod_{i=k+1}^{N} \left(1 - \frac{1}{i}\right) = \frac{m}{k} \times \frac{k}{N} = \frac{m}{N} \]
综上,所有元素被选中的概率均等。
算法实现(Python)
import random
def reservoir_sampling(stream, m):
reservoir = []
for i, item in enumerate(stream):
if i < m:
reservoir.append(item)
else:
# 以 m/(i+1) 的概率保留当前元素
j = random.randint(0, i)
if j < m:
reservoir[j] = item
return reservoir
注意:random.randint(0, i) 生成 [0, i] 的整数,若 j < m 则替换,这等价于以概率 \(\frac{m}{i+1}\) 进行决策。
应用场景
- 大数据流随机抽样:日志分析、网络流量监控。
- 在线随机播放:从未知大小的播放列表中随机选取歌曲。
- 机器学习:从海量数据中随机选取子集进行模型训练。
扩展变种
- 带权重的蓄水池采样:每个元素有权重,按权重概率抽样。
- 分布式蓄水池采样:在多台机器上并行处理数据流,合并抽样结果。
这种算法巧妙地将概率动态调整,仅需 \(O(N)\) 时间与 \(O(m)\) 空间,是流式随机抽样的经典方法。