限流器はバックエンドサービスにおいて非常に重要なコンポーネントであり、実際のビジネスシーンで多く使用されます。その設計はマイクロサービス、ゲートウェイ、およびいくつかのバックエンドサービスで頻繁に遭遇します。限流器の役割は、リクエストの速度を制限し、バックエンドの応答サービスを保護することで、サービスの過負荷によるサービス不可状態の発生を防ぐことです。
限流器の実装方法は多くあり、例えばトークンバケット、スライディングウィンドウ法、リーキーバケットなどがあります。
Golang ライブラリでは、公式に限流器の実装golang.org/x/time/rate
が提供されており、これはトークンバケットアルゴリズム(Token Bucket)に基づいて設計されています。
トークンバケットアルゴリズム#
トークンバケットの設計は比較的シンプルで、固定数量のアイスクリームを保存できる冷蔵庫のように理解できます。各リクエストはアイスクリームを取りに来る人と考えられ、1 回のリクエストでアイスクリームを 1 つだけ取ることができます。アイスクリームがなくなった場合はどうなるでしょうか?ここにはアイスクリームを補充する作業員がいて、彼は一定の頻度で冷蔵庫にアイスクリームを補充します。例えば、彼は 1 秒間に 10 個のアイスクリームを補充できるとします。ここからリクエスト応答の頻度が見えてきます。
トークンバケット設計概念:
- トークン:各リクエストはトークンを取得した後にのみアクセスを続けることができます;
- バケット:固定数量のトークンを格納できるバケットがあり、各バケットには設計された固定数量のトークンしか入れられません;
- 入バケット頻度:固定の頻度でバケットにトークンを追加し、トークンを追加する量はバケットの容量を超えてはいけません。
つまり、トークンバケット設計アルゴリズムに基づいてリクエストの速度を制限することで、リクエスト応答を制御可能にし、特に高い同時接続シーンでの突発的なトラフィックリクエストに対処できるようになります。バックエンドはリクエストに簡単に対応でき、具体的なサービスに到達する際には突発的なトラフィックリクエストはすでに限流されています。
具体的な設計#
限流器の定義#
type Limiter struct {
mu sync.Mutex // ミューテックス(排他ロック)
limit Limit // バケットに入れる頻度 float64 型
burst int // バケットのサイズ
tokens float64 // トークンの現在の残り数量
last time.Time // 最後にトークンを取得した時間
lastEvent time.Time // 最後の限流イベントの時間
}
limit、burst、および token はこの限流器の核心的なパラメータであり、リクエストの同時接続の大きさがここで実現されています。
トークンが発行された後は、Reservation 予約オブジェクトに保存されます:
type Reservation struct {
ok bool // 条件を満たしてトークンが割り当てられたか
lim *Limiter // トークンを送信する限流器
tokens int // 送信されたトークンの数量
timeToAct time.Time // トークン発行の条件を満たす時間
limit Limit // トークン発行速度
}
トークンの消費#
Limiter はユーザーがトークンを消費するための 3 種類のメソッドを提供します。ユーザーは毎回 1 つのトークンを消費することも、複数のトークンを一度に消費することもできます。それぞれのメソッドは、トークンが不足している場合の異なる対応手段を表します。
Wait、WaitN#
func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
ここで、Wait はWaitN(ctx, 1)
と同じであり、以下のメソッド紹介でも同様です。
Wait メソッドを使用してトークンを消費する際、バケット内のトークン配列が不足している場合(n 未満)、Wait メソッドは一定時間ブロックされ、トークンが条件を満たすまで待機します。十分な場合は直接戻ります。
Allow、AllowN#
func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool
AllowN メソッドは、現在の特定の時点までにバケット内の数が少なくとも n 個であるかどうかを示し、満たされていれば true を返し、同時にバケットから n 個のトークンを消費します。そうでなければトークンを消費せず、false を返します。
通常、リクエストの速度が速すぎる場合は、いくつかのリクエストを直接破棄します。
Reserve、ReserveN#
公式に提供されている限流器には、ブロッキング待機式の Wait もあれば、直接判断する Allow もあり、自己管理の予約式も提供されていますが、核心的な実装は以下の reserveN メソッドです。
func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
呼び出しが完了すると、トークンが十分であるかどうかにかかわらず、Reservation * オブジェクトが返されます。
このオブジェクトの Delay () メソッドを呼び出すことができ、このメソッドは待機する必要がある時間を返します。待機時間が 0 の場合、待機する必要はありません。
待機時間が終了するまで次の作業を行うことはできません。
また、待機したくない場合は Cancel () メソッドを呼び出すことができ、このメソッドはトークンを返却します。
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
// まず、放入頻度が無限大かどうかを判断します
// 無限大の場合、一時的に限流がないことを示します
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
// 現在の時刻までに
// 取得可能なトークンの数量と最後にトークンを取得した時間を取得します
now, last, tokens := lim.advance(now)
// トークンの数量を更新します
tokens -= float64(n)
// tokensが負数の場合、現在バケットにトークンがないことを示します
// 待機が必要であり、待機時間を計算します
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// 割り当て条件を満たすかどうかを計算します
// 1、割り当てるサイズがバケットのサイズを超えないこと
// 2、待機時間が設定された待機時間を超えないこと
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// 予約を前処理します
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
// 現在、割り当て条件を満たす場合
// 1、割り当てサイズを設定します
// 2、トークン発行の時間 = 現在の時間 + 待機時間
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// limiterの値を更新し、返します
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}
具体的な使用#
rate パッケージでは、限流器の使用が提供されており、limit(バケットに入れる頻度)と burst(バケットのサイズ)を指定するだけです。
func NewLimiter(r Limit, b int) *Limiter {
return &Limiter{
limit: r, // バケットに入れる頻度
burst: b, // バケットのサイズ
}
}
ここでは、http API を使用してtime/rate
の強力さを簡単に検証します:
func main() {
r := rate.Every(1 * time.Millisecond)
limit := rate.NewLimiter(r, 10)
http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
if limit.Allow() {
fmt.Printf("リクエスト成功、現在の時間:%s\n", time.Now().Format("2006-01-02 15:04:05"))
} else {
fmt.Printf("リクエスト成功ですが、限流されました。。。\n")
}
})
_ = http.ListenAndServe(":8081", nil)
}
ここでは、バケットを毎ミリ秒に 1 回トークンを投入するように設定し、バケットの容量を 10 にし、http サービスを起動してバックエンド API をシミュレートします。
次に、ストレステストを行い、効果を確認します:
func GetApi() {
api := "http://localhost:8081/"
res, err := http.Get(api)
if err != nil {
panic(err)
}
defer res.Body.Close()
if res.StatusCode == http.StatusOK {
fmt.Printf("API取得成功\n")
}
}
func Benchmark_Main(b *testing.B) {
for i := 0; i < b.N; i++ {
GetApi()
}
}
効果は以下の通りです:
......
リクエスト成功、現在の時間:2020-08-24 14:26:52
リクエスト成功ですが、限流されました。。。
リクエスト成功ですが、限流されました。。。
リクエスト成功ですが、限流されました。。。
リクエスト成功ですが、限流されました。。。
リクエスト成功、現在の時間:2020-08-24 14:26:52
リクエスト成功ですが、限流されました。。。
リクエスト成功ですが、限流されました。。。
リクエスト成功ですが、限流されました。。。
リクエスト成功ですが、限流されました。。。
リクエスト成功ですが、限流されました。。。
......
ここで、AllowN メソッドを使用すると、トークンが生成されるまでトークンを消費できず、リクエストを続けることができます。残りのリクエストは破棄されます。もちろん、実際のビジネス処理では、より親切な方法でフロントエンドにフィードバックできます。
最初の数回のリクエストは成功するのは、サービスが起動した後、トークンバケットが初期化され、トークンがバケットに投入されるからですが、突発的なトラフィックリクエストが発生すると、トークンは予定された速度で生成されるため、トークンの供給が需要に追いつかない現象が明らかになります。
オープンソースリポジトリ#
現在、time/rate
は独立した限流器のオープンソースソリューションであり、興味のある方はこのプロジェクトにスターを付けていただけると幸いです。ありがとうございます。