本帖最后由 tomcar 于 2021-8-6 07:43 编辑
5、HyperLogLog(粗略统计数据)
[Bash shell] 纯文本查看 复制代码 >pfadd codehole user1
>pfadd codehole user2
>pfadd codehole user3 user4 user5
>pfcount codehole #5
5.1、实现原理
插入一系列的随机数,记录随机数低位连续零位的最大值k,通过这个k就可以大致估计出这个随机数的数量。(工业实践)
n=2^k #约等于
k值的计算演变:最大值k的所有值求平均值-->最大值k的所有值求调和平均值(倒数的平均)
6、布隆过滤器(定位查看某个元素是否已经在这个集合中)
有误差:如果判断这个值存在,可能不存在。如果判断不存在,则肯定不存在。
[Bash shell] 纯文本查看 复制代码 >bf.add codehole user1
>bf.add codehole user2
>bf.exists codehole user1
>bf.madd codehole user4 user5 user6
>bf.mexists codehole user4 user5 user6
6.1、实现原理
hash算法为基础,把元素的值通过一种平均的hash算法算出一个hash值,然后对应到位数组中。
当判断一个值是否存在时,用同样的方式取得hash值,判断这个hash值是否存在。
如果不存在,则肯定不存在;如果存在,有可能是hash值一致的别的值。
6.2、注意事项
位数组相对越长,错误率越低;
位数组相对越长,hash函数需要的最佳数量也越多,影响计算效率;
如果实际数据数量超出位数组长度后,错误率会明显上升。
7、简单限流
可以用redis做一个简单的限流功能。
zset中用毫秒值作为score值,然后通过zset的zrangebyscore命令来判断值是否在范围之内,做到简单的限流工作。
代码实现:
[Java] 纯文本查看 复制代码 public class SimpleRateLimiter {
private Jedis jedis;
public SimpleRateLimiter(Jedis jedis) {
this.jedis = jedis;
}
public boolean isActionAllowed(String userId, String actionKey, int period, int maxCount) {
String key = String.format("hist:%s:%s", userId, actionKey);
long nowTs = System.currentTimeMillis(); // 毫秒时间戳
Pipeline pipe = jedis.pipelined();
pipe.multi(); // Redis Multi 命令用于标记一个事务块的开始。
pipe.zadd(key, nowTs, "" + nowTs); // 记录行为
pipe.zremrangeByScore(key, 0, nowTs - period * 1000); //移除时间窗口之前的行为记录,剩下的都是时间窗口之内给的
Response<Long> count = pipe.zcard(key); // 获取窗口内的行为数量
pipe.expire(key, period + 1); // 设置zset过期时间,避免冷用户持续占用内存 过期时间应该等于时间窗口的长度,再多宽限1s
pipe.exec(); // 批量执行
pipe.close();
return count.get() <= maxCount; // 比较数量是否超标
}
public static void main(String[] args) {
Jedis jedis = new Jedis();
SimpleRateLimiter limiter = new SimpleRateLimiter(jedis);
for(int i=0;i<20;i++) {
System.out.println(limiter.isActionAllowed("codehole", "reply", 60, 5));
}
}
}
8、漏斗限流
代码实现:
[Java] 纯文本查看 复制代码 public class FunnelRateLimiter {
static class Funnel {
int capacity; // 漏斗容量
float leakingRate; // 漏嘴流水速率
int leftQuota; // 漏斗剩余空间
long leakingTs; // 上一次漏水时间
public Funnel(int capacity, float leakingRate) {
this.capacity = capacity;
this.leakingRate = leakingRate;
this.leftQuota = capacity;
this.leakingTs = System.currentTimeMillis();
}
void makeSpace() {
long nowTs = System.currentTimeMillis();
long deltaTs = nowTs - leakingTs; // 距离上一次漏水过去了多久
int deltaQuota = (int) (deltaTs * leakingRate); // 又可以腾出不少空间
if (deltaQuota < 0) { // 间隔时间太长,整数数字过大溢出
this.leftQuota = capacity;
this.leakingTs = nowTs;
return;
}
if (deltaQuota < 1) { // 腾出空间太小,最小单位是 1
return;
}
this.leftQuota += deltaQuota; // 增加剩余空间
this.leakingTs = nowTs; // 记录漏水时间
if (this.leftQuota > this.capacity) { // 剩余空间不得高于容量
this.leftQuota = this.capacity;
}
}
boolean watering(int quota) {
makeSpace();
if (this.leftQuota >= quota) { // 判断剩余空间是否足够
this.leftQuota -= quota;
return true;
}
return false;
}
}
private Map<String, Funnel> funnels = new HashMap<>();
public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
String key = String.format("%s:%s", userId, actionKey);
Funnel funnel = funnels.get(key);
if (funnel == null) {
funnel = new Funnel(capacity, leakingRate);
funnels.put(key, funnel);
}
return funnel.watering(1); // 需要 1 个 quota
}
}
Redis 4.0 提供了一个限流Redis模块,它叫redis-cell。该模块使用了漏斗算法,并提供了原子的限流命令。
# codehole key
# 15 漏斗容量是15
# 30 operations / 60 seconds 漏水速率,每60秒最多30次
# 1 可选参数默认是1
[Bash shell] 纯文本查看 复制代码 >cl.throttle codehole:reply 15 30 60 1
1)(integer) 0 # 0 表示允许,1 表示拒绝
2)(integer) 15 # 漏斗容量 capacity
3)(integer) 14 # 漏斗剩余空间 left_quota
4)(integer) -1 # 如果拒绝了,需要多长时间后再试 (漏斗有空间了,单位秒)
5)(integer) 2 # 多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)
在执行限流指令时,如果被拒绝了,就需要丢弃或重试。cl.throttle 指令考虑的非常周到,连重试时间都帮你算好了,直接取返回结果数组的第四个值进行 sleep 即可,如果不想阻塞线程,也可以异步定时任务来重试。 |