Rate limit examples in Go

| 分类 编程语言  | 标签 go 

Example1: by tickers

// _[Rate limiting](http://en.wikipedia.org/wiki/Rate_limiting)_
// is an important mechanism for controlling resource
// utilization and maintaining quality of service. Go
// elegantly supports rate limiting with goroutines,
// channels, and [tickers](tickers).

package main

import "time"
import "fmt"

func main() {

    // First we'll look at basic rate limiting. Suppose
    // we want to limit our handling of incoming requests.
    // We'll serve these requests off a channel of the
    // same name.
    requests := make(chan int, 5)
    for i := 1; i <= 5; i++ {
        requests <- i
    }
    close(requests)

    // This `limiter` channel will receive a value
    // every 200 milliseconds. This is the regulator in
    // our rate limiting scheme.
    limiter := time.Tick(time.Millisecond * 200)

    // By blocking on a receive from the `limiter` channel
    // before serving each request, we limit ourselves to
    // 1 request every 200 milliseconds.
    for req := range requests {
        <-limiter
        fmt.Println("request", req, time.Now())
    }

    // We may want to allow short bursts of requests in
    // our rate limiting scheme while preserving the
    // overall rate limit. We can accomplish this by
    // buffering our limiter channel. This `burstyLimiter`
    // channel will allow bursts of up to 3 events.
    burstyLimiter := make(chan time.Time, 3)

    // Fill up the channel to represent allowed bursting.
    for i := 0; i < 3; i++ {
        burstyLimiter <- time.Now()
    }

    // Every 200 milliseconds we'll try to add a new
    // value to `burstyLimiter`, up to its limit of 3.
    go func() {
        for t := range time.Tick(time.Millisecond * 200) {
            burstyLimiter <- t
        }
    }()

    // Now simulate 5 more incoming requests. The first
    // 3 of these will benefit from the burst capability
    // of `burstyLimiter`.
    burstyRequests := make(chan int, 5)
    for i := 1; i <= 5; i++ {
        burstyRequests <- i
    }
    close(burstyRequests)
    for req := range burstyRequests {
        <-burstyLimiter
        fmt.Println("request", req, time.Now())
    }
}

More see Go by Example: Rate Limiting.

Example2: juju/ratelimit

package main

import (
    "bytes"
    "fmt"
    "io"
    "time"

    "github.com/juju/ratelimit"
)

func main() {
    // Source holding 1MB
    src := bytes.NewReader(make([]byte, 1024*1024))
    // Destination
    dst := &bytes.Buffer{}

    // Bucket adding 100KB every second, holding max 100KB
    bucket := ratelimit.NewBucketWithRate(100*1024, 100*1024)

    start := time.Now()

    // Copy source to destination, but wrap our reader with rate limited one
    io.Copy(dst, ratelimit.Reader(src, bucket))

    fmt.Printf("Copied %d bytes in %s\n", dst.Len(), time.Since(start))
}
# go run test2.go 
Copied 1048576 bytes in 9.239732929s
  • ratelimit.Reader
type reader struct {
	r      io.Reader
	bucket *Bucket
}

// Reader returns a reader that is rate limited by
// the given token bucket. Each token in the bucket
// represents one byte.
func Reader(r io.Reader, bucket *Bucket) io.Reader {
	return &reader{
		r:      r,
		bucket: bucket,
	}
}

func (r *reader) Read(buf []byte) (int, error) {
	n, err := r.r.Read(buf)
	if n <= 0 {
		return n, err
	}
	r.bucket.Wait(int64(n))
	return n, err
}

More refer here.

Example3: time/rate

package main

import (
    "bytes"
    "fmt"
    "io"
    "time"

    "golang.org/x/time/rate"
)

type reader struct {
    r      io.Reader
    limiter *rate.Limiter
}

// Reader returns a reader that is rate limited by
// the given token bucket. Each token in the bucket
// represents one byte.
func NewReader(r io.Reader, l *rate.Limiter) io.Reader {
    return &reader{
        r:      r,
        limiter:l,
    }
}

func (r *reader) Read(buf []byte) (int, error) {
    n, err := r.r.Read(buf)
    if n <= 0 {
        return n, err
    }

    now := time.Now()
    rv := r.limiter.ReserveN(now, n)
    if !rv.OK() {
        return 0, fmt.Errorf("Exceeds limiter's burst")
    }
    delay := rv.DelayFrom(now)
    //fmt.Printf("Read %d bytes, delay %d\n", n, delay)
    time.Sleep(delay)
    return n, err
}

func main() {
    // Source holding 1MB
    src := bytes.NewReader(make([]byte, 1024*1024))
    // Destination
    dst := &bytes.Buffer{}

    // Bucket adding 100KB every second, holding max 100KB
    limit := rate.NewLimiter(100*1024, 100*1024)

    start := time.Now()

    buf := make([]byte, 10*1024)
    // Copy source to destination, but wrap our reader with rate limited one
    //io.CopyBuffer(dst, NewReader(src, limit), buf)
    r := NewReader(src, limit)
    for{
        if n, err := r.Read(buf); err == nil {
            dst.Write(buf[0:n])
        }else{
            break
        }
    }

    fmt.Printf("Copied %d bytes in %s\n", dst.Len(), time.Since(start))
}
$ go run test3.go 
Copied 1048576 bytes in 9.241212473s

值得注意的是,这里不能直接用io.Copybytes.Buffer实现了ReaderFrom,每次Read的时候,buf的长度是变化的,会导致len(buf)超过rate.Limiter.burst。对于这种情况,rv.DelayFrom(now)会返回InfDuration

Breakpoint 2, main.(*reader).Read (r=0x82038e000, buf= []uint8 = {...}, ~r1=34899238912, ~r2=...)
    at /Users/yy/dev/go/src/github.com/hustcat/golangexample/ratelimit/test3.go:28
28      fmt.Printf("buf len=%d ", len(buf))
(gdb) bt
#0  main.(*reader).Read (r=0x82038e000, buf= []uint8 = {...}, ~r1=34899238912, ~r2=...)
    at /Users/yy/dev/go/src/github.com/hustcat/golangexample/ratelimit/test3.go:28
#1  0x000000000005a50f in bytes.(*Buffer).ReadFrom (b=0x820384000, r=..., n=0, err=...)
    at /usr/local/go/src/bytes/buffer.go:173
#2  0x00000000000728b0 in io.copyBuffer (dst=..., src=..., buf= []uint8 = {...}, written=0, err=...)
    at /usr/local/go/src/io/io.go:375
#3  0x00000000000726f1 in io.CopyBuffer (dst=..., src=..., buf= []uint8 = {...}, written=1003680, err=...)
    at /usr/local/go/src/io/io.go:362
#4  0x0000000000002925 in main.main ()
    at /Users/yy/dev/go/src/github.com/hustcat/golangexample/ratelimit/test3.go:58

上一篇     下一篇