Kubernetes36--限速器--RateLimiter
研究一下k8s中RateLimiter接口,可以实现限速功能,用于实现限速队列。type RateLimiter interface {// When gets an item and gets to decide how long that item should waitWhen(item interface{}) time.Duration// Forget indicat...
研究一下k8s中RateLimiter接口,可以实现限速功能,用于实现限速队列。
type RateLimiter interface {
// When gets an item and gets to decide how long that item should wait
When(item interface{}) time.Duration
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
// or for success, we'll stop tracking it
Forget(item interface{})
// NumRequeues returns back how many failures the item has had
NumRequeues(item interface{}) int
}
when方法获取某个元素应该等待的时间
forget释放某个元素,不再监控该元素
numRequeues返回该方法由于失败重新入队的次数
默认的构造方法
// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
RateLimiter的类型
1.ItemExponentialFailureRateLimiter
// ItemExponentialFailureRateLimiter does a simple baseDelay*10^<num-failures> limit
// dealing with max failures and expiration are up to the caller
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
baseDelay time.Duration
maxDelay time.Duration
}
failures存储元素以及失败次数,baseDelay基本延时 maxDelay 最大延时
某一元素需要等待的延时时间计算time = min(maxDelay, baseDelay*power(2, failures[item])),与失败次数指数关系,但是不超过最大延时时间。随着失败次数增加说明该任务越来越难以完成,因此延时时间增加。
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
2.ItemFastSlowRateLimiter
// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
maxFastAttempts int
fastDelay time.Duration
slowDelay time.Duration
}
失败之后开始快速尝试,之后缓慢尝试
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
r.failures[item] = r.failures[item] + 1
if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}
return r.slowDelay
}
3.MaxOfRateLimiter
// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
// were separately delayed a longer time.
type MaxOfRateLimiter struct {
limiters []RateLimiter
}
遍历每一个RateLimiter返回最差结果
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}
return ret
}
4.BucketRateLimiter
// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
type BucketRateLimiter struct {
*rate.Limiter
}
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
return r.Limiter.Reserve().Delay()
}
BucketRateLimiter使用rate limiter golang.org/x/time/rate/rate.go
type Limiter struct {
limit Limit
burst int
mu sync.Mutex
tokens float64
// last is the last time the limiter's tokens field was updated
last time.Time
// lastEvent is the latest time of a rate-limited event (past or future)
lastEvent time.Time
}
// A Limiter controls how frequently events are allowed to happen.
// It implements a "token bucket" of size b, initially full and refilled
// at rate r tokens per second.
// Informally, in any large enough time interval, the Limiter limits the
// rate to r tokens per second, with a maximum burst size of b events.
// As a special case, if r == Inf (the infinite rate), b is ignored.
Limiter由令牌桶算法实现,控制事件发生的频率。令牌桶大小b,令牌填充速度每秒r,将事件发生频率控制在r,最大等于桶大小b。
三个核心方法Allow Reverse Wait
// Each of the three methods consumes a single token.
// They differ in their behavior when no token is available.
// If no token is available, Allow returns false.
// If no token is available, Reserve returns a reservation for a future token
// and the amount of time the caller must wait before using it.
// If no token is available, Wait blocks until one can be obtained
// or its associated context.Context is canceled.
主要使用Wait方法,阻塞等待直到可以获得令牌
// Wait is shorthand for WaitN(ctx, 1).
func (lim *Limiter) Wait(ctx context.Context) (err error) {
return lim.WaitN(ctx, 1)
}
// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
// The burst limit is ignored if the rate limit is Inf.
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
如果令牌数大于桶大小或者上下文关闭,直接返回
if n > lim.burst && lim.limit != Inf {
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)
}
// Check if ctx is already cancelled
select {
case <-ctx.Done():
return ctx.Err()
default:
}
计算等待截止时间,默认为无穷大,考虑上下文截止时间,取较小值
// Determine wait limit
now := time.Now()
waitLimit := InfDuration
if deadline, ok := ctx.Deadline(); ok {
waitLimit = deadline.Sub(now)
}
计算在截止时间前是否能够获取N个令牌
// Reserve
r := lim.reserveN(now, n, waitLimit)
if !r.ok {
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}
如果能够获取令牌,则执行等待时间
// Wait
t := time.NewTimer(r.DelayFrom(now))
defer t.Stop()
select {
case <-t.C:
// We can proceed.
return nil
case <-ctx.Done():
// Context was canceled before we could proceed. Cancel the
// reservation, which may permit other events to proceed sooner.
r.Cancel()
return ctx.Err()
}
reverseN方法 返回要想得到指定数量的令牌需要等待的时间
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
如果Limiter没有数量限制,则说明现在就可以立即获取指定数量的令牌,可以立即返回
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
根据上一次时间发生时间距离现在时间计算现在的令牌数
now, last, tokens := lim.advance(now)
令现在令牌数减去需要的令牌数,若当前令牌数小于0,说明令牌数目不够需要等待一定的时间
// Calculate the remaining number of tokens resulting from the request.
tokens -= float64(n)
// Calculate the wait duration
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// durationFromTokens is a unit conversion function from the number of tokens to the duration
// of time it takes to accumulate them at a rate of limit tokens per second.
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
seconds := tokens / float64(limit)
return time.Nanosecond * time.Duration(1e9*seconds)
}
构造Reversetion对象返回以及更新Limiter状态
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
advance方法
// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
获取上一次事件发生的时间
last := lim.last
if now.Before(last) {
last = now
}
计算现在到上一次事件时间的时间差,该时间差应该小于令牌桶的填满时间,否则后面计算令牌数可能会溢出
// Avoid making delta overflow below when last is very old.
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last)
if elapsed > maxElapsed {
elapsed = maxElapsed
}
计算新的令牌数
// Calculate the new number of tokens, due to time that passed.
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
总结:
RateLimiter实现了对于队列元素的重试规则,when返回该元素下一次应该再次入队执行的时间,forget释放该元素不再监测,numrequests返回该元素已经失败重试的次数。
RateLimiter主要类型有四种:
BucketRateLimiter NewItemExponentialFailureRateLimiter ItemFastSlowRateLimiter MaxOfRateLimiter
主要行为表现在当某一元素失败后,下一次执行时间的规则不一致,NewItemExponentialFailureRateLimiter按指数延时间隔变化,ItemFastSlowRateLimiter先快后慢,MaxOfRateLimiter返回所有RateLimiter的最坏值,BucketRateLimiter令牌桶算法,可以控制在一定频率来执行。
更多推荐
所有评论(0)