這是golang 源碼中實現的限流器,是基于令牌桶算法的:
官方地址: golang.org/x/time/rate
github地址:github.com/golang/time/rate
r := rate.Every(100 * time.Millisecond) limit := rate.NewLimiter(r, 20) for { if limit.AllowN(time.Now(), 8) { log.Info("log:event happen") } else { log.Info("log:event not allow") } }
一秒內產生10 個令牌,桶的容量是20,當前時刻取8個token
源碼很簡單只有兩個文件:
rate.gorate_test.go
1,NewLimiter
// NewLimiter returns a new Limiter that allows events up to rate r and permits// bursts of at most b tokens.func NewLimiter(r Limit, b int) *Limiter { return &Limiter{ limit: r, burst: b, }}
簡單構造了一個limiter對象
type Limiter struct { mu sync.Mutex limit Limit burst int tokens float64 // last is the last time the limiter's tokens field was updated last time.Time // lastEvent is the latest time of a rate-limited event (past or future) lastEvent time.Time}
記錄了上一次分發token的時間,和上一次請求token的時間
func Every(interval time.Duration) Limit { if interval <= 0 { return Inf } return 1 / Limit(interval.Seconds())}
僅僅做了從時間間隔向頻率的轉換。
2,AllowN/Allow
// Allow is shorthand for AllowN(time.Now(), 1).func (lim *Limiter) Allow() bool { return lim.AllowN(time.Now(), 1)}// AllowN reports whether n events may happen at time now.// Use this method if you intend to drop / skip events that exceed the rate limit.// Otherwise use Reserve or Wait.func (lim *Limiter) AllowN(now time.Time, n int) bool { return lim.reserveN(now, n, 0).ok}
底層都是調用了reserveN函數,maxFutureReserve參數傳的是0
// reserveN is a helper method for AllowN, ReserveN, and WaitN.// maxFutureReserve specifies the maximum reservation wait duration allowed.// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { lim.mu.Lock() if lim.limit == Inf { lim.mu.Unlock() return Reservation{ ok: true, lim: lim, tokens: n, timeToAct: now, } } now, last, tokens := lim.advance(now) // Calculate the remaining number of tokens resulting from the request. tokens -= float64(n) // Calculate the wait duration var waitDuration time.Duration if tokens < 0 { waitDuration = lim.limit.durationFromTokens(-tokens) } // Decide result ok := n <= lim.burst && waitDuration <= maxFutureReserve // Prepare reservation r := Reservation{ ok: ok, lim: lim, limit: lim.limit, } if ok { r.tokens = n r.timeToAct = now.Add(waitDuration) } // update state if ok { lim.last = now lim.tokens = tokens lim.lastEvent = r.timeToAct } else { lim.last = last } lim.mu.Unlock() return r}
1,如果lim.limit == Inf,返回Reservation對象
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.// A Reservation may be canceled, which may enable the Limiter to permit additional events.type Reservation struct { ok bool lim *Limiter tokens int timeToAct time.Time // This is the Limit at reservation time, it can change later. limit Limit}
2,獲取當前時間,上一次產生token的時間和,產生的token
// advance calculates and returns an updated state for lim resulting from the passage of time.// lim is not changed.// advance requires that lim.mu is held.func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { last := lim.last if now.Before(last) { last = now } // Calculate the new number of tokens, due to time that passed. elapsed := now.Sub(last) delta := lim.limit.tokensFromDuration(elapsed) tokens := lim.tokens + delta if burst := float64(lim.burst); tokens > burst { tokens = burst } return now, last, tokens}
A,如果當前時間比上一次獲取token時間早(說明有請求在等待獲取token),那么更新當前時間為上一次獲取token時間(和上一個請求一起等)
B,計算從上一次獲取token到現在的時間間隔C,計算產生的token增量
delta := lim.limit.tokensFromDuration(elapsed)
type Limit float64// tokensFromDuration is a unit conversion function from a time duration to the number of tokens// which could be accumulated during that duration at a rate of limit tokens per second.func (limit Limit) tokensFromDuration(d time.Duration) float64 { return d.Seconds() * float64(limit)}
也就是時間間隔的秒數乘以每秒產生的token數量。
D,計算總的token數量
E,如果桶已經滿了,丟棄多余的token
3,扣減本次請求需要的token
4,如果token數不夠,計算需要等待的時間間隔
5,如果請求的token數量比桶的容量小,并且可以等待的時間大于需要等待的時間說明這個請求是合法的。
ok := n <= lim.burst && waitDuration <= maxFutureReserve
6,構造Reservation對象,存儲當前limiter對象到lim
7,如果請求合法,存儲當前請求需要的token數量和需要等待的時間(當前時間+等待時間間隔)
8,如果合法,更新當前limiter的上一次獲取token時間為當前時間,獲取的token數量為扣減后剩余的token數量,獲取token時間為將來能夠真正獲取token的時間點。
9,否則更新limiter的上一次獲取token時間為本次計算的上一次獲取token時間。
上面就是獲取token的所有代碼實現。
Limiter提供了三類方法供用戶消費Token,用戶可以每次消費一個Token,也可以一次性消費多個Token。
1,AllowN 方法表示,截止到某一時刻,目前桶中數目是否至少為 n 個,滿足則返回 true,同時從桶中消費 n 個 token。反之返回不消費 token,false。也就是前面介紹的方法。
func (lim *Limiter) Allow() boolfunc (lim *Limiter) AllowN(now time.Time, n int) bool
2,當使用 Wait 方法消費 token 時,如果此時桶內 token 數組不足 (小于 N),那么 Wait 方法將會阻塞一段時間,直至 token 滿足條件。如果充足則直接返回。
func (lim *Limiter) Wait(ctx context.Context) (err error)func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
// WaitN blocks until lim permits n events to happen.// It returns an error if n exceeds the Limiter's burst size, the Context is// canceled, or the expected wait time exceeds the Context's Deadline.// The burst limit is ignored if the rate limit is Inf.func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { lim.mu.Lock() burst := lim.burst limit := lim.limit lim.mu.Unlock() if n > burst && limit != Inf { return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst) } // Check if ctx is already cancelled select { case <-ctx.Done(): return ctx.Err() default: } // Determine wait limit now := time.Now() waitLimit := InfDuration if deadline, ok := ctx.Deadline(); ok { waitLimit = deadline.Sub(now) } // Reserve r := lim.reserveN(now, n, waitLimit) if !r.ok { return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) } // Wait if necessary delay := r.DelayFrom(now) if delay == 0 { return nil } t := time.NewTimer(delay) defer t.Stop() select { case <-t.C: // We can proceed. return nil case <-ctx.Done(): // Context was canceled before we could proceed. Cancel the // reservation, which may permit other events to proceed sooner. r.Cancel() return ctx.Err() }}
A,如果請求數量超出了桶的容量,直接報錯
B,通過ctx.Deadline()計算允許等待的時間間隔
C,調用r := lim.reserveN(now, n, waitLimit) 獲取Reserve對象
D,如果reserve對象表示不能成功(超出桶的容量,超出時間限制),返回錯誤
E,計算需要等待的時間,timeToAct表示能夠獲取token的時間。
// DelayFrom returns the duration for which the reservation holder must wait// before taking the reserved action. Zero duration means act immediately.// InfDuration means the limiter cannot grant the tokens requested in this// Reservation within the maximum wait time.func (r *Reservation) DelayFrom(now time.Time) time.Duration { if !r.ok { return InfDuration } delay := r.timeToAct.Sub(now) if delay < 0 { return 0 } return delay}
F,啟動定時器等待。
3,ReserveN 的用法就相對來說復雜一些,當調用完成后,無論 token 是否充足,都會返回一個 Reservation * 對象。
你可以調用該對象的 Delay() 方法,該方法返回了需要等待的時間。如果等待時間為 0,則說明不用等待。
必須等到等待時間之后,才能進行接下來的工作。
或者,如果不想等待,可以調用 Cancel() 方法,該方法會將 token 歸還。
func (lim *Limiter) Reserve() *Reservationfunc (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
這個方法比較原始直接返回Reserve對象,交給用戶處理
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation { r := lim.reserveN(now, n, InfDuration) return &r}
本文由 貴州做網站公司 整理發布,部分圖文來源于互聯網,如有侵權,請聯系我們刪除,謝謝!
網絡推廣與網站優化公司(網絡優化與推廣專家)作為數字營銷領域的核心服務提供方,其價值在于通過技術手段與策略規劃幫助企業提升線上曝光度、用戶轉化率及品牌影響力。這...
在當今數字化時代,公司網站已成為企業展示形象、傳遞信息和開展業務的重要平臺。然而,對于許多公司來說,網站建設的價格是一個關鍵考量因素。本文將圍繞“公司網站建設價...
在當今的數字化時代,企業網站已成為企業展示形象、吸引客戶和開展業務的重要平臺。然而,對于許多中小企業來說,高昂的網站建設費用可能會成為其發展的瓶頸。幸運的是,隨...
699美元換成人民幣多少?當前美元對人民幣的匯率是1美元=7.0942人民幣1人民幣=0.141美元699美元=4958.8458人民幣參考2019年4月21日的匯率,699美元=4685.5368人民幣;溫馨提示:匯率波動699美元合計人民幣多少?]首先,699是iPhone8的價格,對應中國銀行5888的價格。X是999美元,相當于8388元。關稅10%,增值稅17%。699*6.54*1.1...
東莞市虎門鎮郵政編碼怎么查?虎門鎮萬信漁區(虎門中醫院#)郵政編碼:523939虎門鎮萬信老漁港郵政編碼:523938虎門鎮沙角郵政編碼:523936虎門鎮東路郵編:523935虎門鎮燕崗:523933虎門鎮南門郵編:523932虎門鎮東風郵編:523931虎門鎮大寧郵政編碼:523930虎門鎮舒天郵政編碼:523929虎門鎮村長郵政編碼:523928虎門鎮巨岐郵政編碼:523927虎門鎮懷德郵政...
Vlog到底是什么?如何做好?vlog反正那就是一種生活的表達出,記錄信息,要做了也可以指導你賺錢啊。1.你要要有一部專業的vlog設備,如高品質的手機和手機穩定器。2.會依靠當下的手機軟件加字幕簡單的vlog,剪映就相當比較好3.要多仔細觀察網上的熱點,依靠網上的熱點來排出來要什么當下換算的vlog下面我的視頻里有更多的見解,是可以具體一點直接看。@西瓜VLOG@西瓜視頻創作中心@微頭條話題視頻...