Reservoir Sampling(蓄水池抽样)可以简单的从大量流式数据中随机抽取指定数量的样本,并且保证样本被取到的概率相同,适合大数据分析场景。
基础算法
-
问题:如何从一个未知行数的文本文件中提取其中一行,并且保证每行被取到的概率相同?
-
隐含条件:
- 只能顺序读取每一行
- 内存有限,不能将整个文件加载到内存
- 假设总数有 N 行,其中每一行被返回的概率是
1/N
-
算法步骤:
import "math.rand"
func init() {
rand.Seed(12) // 初始化随机种子
}
var ret string // 当前采样
var i = 0 // 样本总数
// scan 扫描每一行,并返回到目前为止的采样
// 调用者可在任意时间选取样本,到选取的时间点为止,历史上的每个样本概率相同
func scan(line string) sample {
i++
r := rand.Float32() // 取得 [0, 1) 的随机数
if r < 1.0 / float32(i) {
ret = line
}
return ret
}
-
证明过程:
当前 i 当前 N 当前元素选中概率 P 元素 1 概率 P1 元素 2 概率 P2 元素 3 概率 P3 1 1 1 1
- - 2 2 1/2 1/2=1*(1/2)
1/2
- 3 3 1/3 1/3=1*(1/2)*(2/3)
1/3=(1/2)*(2/3)
1/3
可见,随着元素数量的增加,历史上每个元素被取得的概率相同,都是1/N
。用数学归纳法同样可以证明。
- 时间复杂度 O(n),空间复杂度 O(1)
蓄水池采样
蓄水池算法是在上面基础算法基础上增加一个条件:可以保留最大 K 个样本,保证 K 个样本中每个样本的概率是 1/N
import "math.rand"
func init() {
rand.Seed(12) // 初始化随机种子
}
const K = 10 // 保持的样本数
var ret []string // 当前采样
var i = 0 // 样本总数
// scan 扫描每一行,并返回到目前为止的采样
// 调用者可在任意时间选取样本,到选取的时间点为止,历史上的每个样本概率相同
func scan(line string) sample {
i++
if i <= K {
ret = append(ret, line)
return ret
}
r := rand.Float32() // 取得 [0, 1) 的随机数
if r < float32(K) / float32(i) {
// 随机替换其中一个元素
r1 := rand.Float32()
idx := int(float32(K)*r1)
ret[idx] = line
}
return ret
}
- 上面两步随机可以优化为一步:
r := rand.Float32() // 取得 [0, 1) 的随机数
idx := int(float32(i)*r)
if idx < K {
ret[idx] = line
}
扩展问题
-
如何实现分布式“蓄水池抽样”算法?
- 问:
- 有 M 个文本文件,交给 M 台机器分别读取
- 如何从全部文本中随机选择 K 行?
- 答:
-
- 在每台机器上,对每行样本生成一个随机值,选取其中随机值最大的 K 行
-
- 把 M 台结果汇总到一台机器上,按顺序,从第一步结果中遍历得到随机值最大的 K 行
-
- 分析:
- 如果某台主机分得的样本较多,那么某样本随机到较大值的概率就高,所以不用担心主机样本数分配不均的问题
- 问:
-
如何实现加权分布式“蓄水池抽样”算法?
- 问:
- 数据是有权重的,权重值可能为整数或浮点数
- 希望数据被选中的概率与权重成正比
- 答:
- 对于每个数据生成一个随机值 r
- 假设该样本权重值为 n,对 r 求 n 次方根结果为 r1
- 按照 r1 作为上面”分布式蓄水池抽样”的随机值进行处理
- 分析:
- 利用 n 次方根对随机值进行修正,n 越大被选中的概率越高,但增大影响有限,不至于让 n 值较小的样本没有机会被选中
- 问: