温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Guava中RateLimiter的实现原理是什么

发布时间:2021-06-24 17:19:10 来源:亿速云 阅读:164 作者:Leah 栏目:大数据

这期内容当中小编将会给大家带来有关Guava中RateLimiter的实现原理是什么,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

1、RateLimiter基本属性

Guava中RateLimiter的实现原理是什么

// RateLimiter属性
/**
  * The underlying timer; used both to measure elapsed time and sleep as necessary. A separate
  * object to facilitate testing.
  * 用来计时, RateLimiter将实例化的时间设置为0值, 后续都是相对时间
  */
private final SleepingStopwatch stopwatch;

// Can't be initialized in the constructor because mocks don't call the constructor.
// 锁, RateLimiter依赖于synchronized来控制并发, 限流器里面属性都没有加volatile修饰
private volatile @Nullable Object mutexDoNotUseDirectly;
// SmoothRateLimiter属性
/**
  * The currently stored permits.
  * 当前还有多少permits没有被使用, 被存下来的permits数量
  */
double storedPermits;

/**
  * The maximum number of stored permits.
  * 最大允许缓存的permits数量, 也就是storedPermits能达到的最大值
  */
double maxPermits;

/**
  * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
  * per second has a stable interval of 200ms.
  * 每隔多少时间产生一个permit
  */
double stableIntervalMicros;

/**
  * 下一次可以获取permits的时间, 这个时间也是相对于RateLimiter的构造时间的
  * 
  * nextFreeTicketMicros是一个很关键的属性.我们每次获取permits的时候,先拿storedPermits的值,
  * 如果够,storedPermits减去相应的值就可以了,如果不够,那么还需要将nextFreeTicketMicros往前推,
  * 表示我预占了接下来多少时间的量了.那么下一个请求来的时候,如果还没到nextFreeTicketMicros这个时间点,
  * 需要sleep到这个点再返回,当然也要将这个值再往前推
  */
private long nextFreeTicketMicros = 0L; // could be either in the past or future

2、SmoothBursty实现

2.1、构造方法

public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}

@VisibleForTesting
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    // SmoothBursty默认缓存最多1s的permits, 不可修改, 也就是说最多会缓存1 * permitsPerSecond这么多个permits到池中
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
}

SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
    super(stopwatch);
    this.maxBurstSeconds = maxBurstSeconds;
}

public final void setRate(double permitsPerSecond) {
    // 加锁
    synchronized (mutex()) {
       doSetRate(permitsPerSecond, stopwatch.readMicros());
    }
}

@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
    // 在关键节点, 会先更新下storedPermits到正确的值
    resync(nowMicros);
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    // 每隔多少时间产生一个permit
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
}

void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    // 如果nextFreeTicketMicros已经过掉了, 想象一下很长时间没有再次调用limiter.acquire()的场景
    // 需要将nextFreeTicketMicros设置为当前时间, 重新计算下storedPermits
    if (nowMicros > nextFreeTicketMicros) {
      // 新生成的permits, 构造函数中进来时生成的newPermits为无限大
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      // 构造函数进来时maxPermits为0, 所以这里的storedPermits也是0
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
}

@Override
double coolDownIntervalMicros() {
    // 构造函数进来时, 此值为0.0
    return stableIntervalMicros;
}
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
    double oldMaxPermits = this.maxPermits;
    // maxPermits为1秒产生的permits
    maxPermits = maxBurstSeconds * permitsPerSecond;
    if (oldMaxPermits == Double.POSITIVE_INFINITY) {
      // if we don't special-case this, we would get storedPermits == NaN, below
      storedPermits = maxPermits;
    } else {
      // 等比例缩放storedPermits
      storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state
                                             : storedPermits * maxPermits / oldMaxPermits;
      }
}

2.2、acquire

// 常用api
@CanIgnoreReturnValue
public double acquire() {
    return acquire(1);
}

@CanIgnoreReturnValue
public double acquire(int permits) {
    // 预约, 如果当前不能直接获取到permits, 需要等待, 返回值表示需要sleep多久
    long microsToWait = reserve(permits);
    // sleep
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    // 返回sleep的时长
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

final long reserve(int permits) {
    checkPermits(permits);
    synchronized (mutex()) {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
}

final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    // 返回当前线程为了拿这些许可需要睡眠多久, 如果立即可以拿到就不需要睡眠, 否则需要睡到nextFreeTicketMicros
    return max(momentAvailable - nowMicros, 0);
}

/**
   * 我们可以看到,获取permits的时候,其实是获取了两部分,一部分来自于存量storedPermits,存量不够的话,
   * 另一部分来自于预占未来的freshPermits.这里提一个关键点吧,我们看到,返回值是nextFreeTicketMicros
   * 的旧值,因为只要到这个时间点,就说明当次acquire可以成功返回了,而不管storedPermits够不够.
   * 如果storedPermits不够,会将nextFreeTicketMicros往前推一定的时间,预占了一定的量.
   */
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    // 这里做一次同步, 更新storePermits和nextFreeTicketMicros(如果需要)
    resync(nowMicros);
    // 刚刚已经更新过了
    long returnValue = nextFreeTicketMicros;
    // storedPermits可以使用多少个
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    // storePermits不够的部分
    double freshPermits = requiredPermits - storedPermitsToSpend;
    // 为了这不够的部分, 需要等待多久.
    // SmoothBursty中storedPermitsToWaitTime返回0, 直接就可以取
    // SmoothWarmingUp的实现中,由于需要预热,所以从storedPermits中取permits需要花费一定的时间
    long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);

    // 将nextFreeTicketMicros往前推
    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    // 减去被拿走的部分
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
}

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
    // SmoothBursty中对于已经存储下来的storedPermits可以直接获取到, 不需要等待
    return 0L;
}

2.3、tryAcquire

public boolean tryAcquire(Duration timeout) {
    return tryAcquire(1, toNanosSaturated(timeout), TimeUnit.NANOSECONDS);
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
    long timeoutMicros = max(unit.toMicros(timeout), 0);
    checkPermits(permits);
    long microsToWait;
    synchronized (mutex()) {
        long nowMicros = stopwatch.readMicros();
	// 判断一下超时时间内能不能获得到锁, 不能获得直接返回false
        if (!canAcquire(nowMicros, timeoutMicros)) {
            return false;
        } else {
	    // 可以获得的话再算下为了获得这个许可需要等待多长时间
	    // 这边上面已经分析过了
            microsToWait = reserveAndGetWaitLength(permits, nowMicros);
        }
    }
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return true;
}

private boolean canAcquire(long nowMicros, long timeoutMicros) {
    // 就是看下nowMicros + timeoutMicros >= nextFreeTicketMicros
    // 意思就是看下超时时间内有没有达到可以获取令牌的时间
    return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}

@Override
final long queryEarliestAvailable(long nowMicros) {
    return nextFreeTicketMicros;
}

2.4、setRate

public final void setRate(double permitsPerSecond) {
    synchronized (mutex()) {
       doSetRate(permitsPerSecond, stopwatch.readMicros());
    }
}

@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
    resync(nowMicros);
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
}

3、SmoothWarmingUp实现

3.1、构造方法

Guava中RateLimiter的实现原理是什么

// SmoothWamingUp实现
public static RateLimiter create(double permitsPerSecond, Duration warmupPeriod) {
    return create(permitsPerSecond, toNanosSaturated(warmupPeriod), TimeUnit.NANOSECONDS);
}

public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
    return create(permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer());
}

@VisibleForTesting
static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit,
                          double coldFactor, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
}

SmoothWarmingUp( SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {
    super(stopwatch);
    // 为了达到从0到maxPermits花费warmupPeriodMicros的时间
    this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
    this.coldFactor = coldFactor;
}

3.2、setRate

public final void setRate(double permitsPerSecond) {
    synchronized (mutex()) {
       doSetRate(permitsPerSecond, stopwatch.readMicros());
    }
}

@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
    // 实现和SmoothBursty一样
    resync(nowMicros);
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
}

/**
  * 含义是storedPermits中每个permit的增长速度
  * 为了达到从 0 到 maxPermits 花费 warmupPeriodMicros 的时间
  */
@Override
double coolDownIntervalMicros() {
    return warmupPeriodMicros / maxPermits;
}

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
    double oldMaxPermits = maxPermits;
    // coldFactor是固定的3
    // 当达到maxPermits时, 此时系统处于最冷却的时候, 获取一个permit需要coldIntervalMicros
    // 而如果storedPermits < thresholPermits的时候, 只需要stableIntervalMicros
    double coldIntervalMicros = stableIntervalMicros * coldFactor;

    // https://www.fangzhipeng.com/springcloud/2019/08/20/ratelimit-guava-sentinel.html
    // https://blog.csdn.net/forezp/article/details/100060686
    // 梯形的面积等于2倍的长方形的面积
    thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;

    maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);

    // 计算斜线的斜率
    slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
    if (oldMaxPermits == Double.POSITIVE_INFINITY) {
      // if we don't special-case this, we would get storedPermits == NaN, below
      storedPermits = 0.0;
    } else {
      // 等比例缩放
      storedPermits = (oldMaxPermits == 0.0) ? maxPermits // initial state is cold
                                             : storedPermits * maxPermits / oldMaxPermits;
    }
}

Guava中RateLimiter的实现原理是什么

// storedPermits是上面两个图中最右侧那条绿色线, 表示RateLimiter已经存储了多少许可证, 那么获取storedPermits许可证
// 时应当也是从最右侧开始拿, 从右往左减许可证. 因此就会出现3种情况
// 1) storedPermits已经大于thresholdPermits, 而且所需的许可证permitsToTake右侧部分已经足够提供, 对应上图2
// 2) storedPermits已经大于thresholdPermits, 而且所需的许可证右侧不够, 还需要从左侧拿, 对应上图1
// 3) storedPermits小于thresholdPermits, 此时获取每个许可证所需的时间是固定的, 对应下面if逻辑返回false的情况
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
    double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
    long micros = 0;
    // 计算梯形部分的面积
    if (availablePermitsAboveThreshold > 0.0) {
      double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
      double length = permitsToTime(availablePermitsAboveThreshold)
                      + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
      micros = (long) (permitsAboveThresholdToTake * length / 2.0);
      permitsToTake -= permitsAboveThresholdToTake;
    }
    // 计算长方形部分的面积
    micros += (long) (stableIntervalMicros * permitsToTake);
    return micros;
}

上述就是小编为大家分享的Guava中RateLimiter的实现原理是什么了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI