六种限流实现及代码示例

2024-12-30 20:51:17   小编

六种限流实现及代码示例

在当今的互联网应用中,限流是一项非常重要的技术手段,用于保护系统免受突发流量的冲击,确保服务的稳定性和可用性。以下将介绍六种常见的限流实现方式,并提供相应的代码示例。

1. 令牌桶算法

令牌桶算法的基本思想是按照一定的速率往桶中放入令牌,请求到来时从桶中获取令牌,如果桶中没有令牌则拒绝请求。

import time

class TokenBucket:
    def __init__(self, rate, capacity):
        self.rate = rate  # 令牌放入速率
        self.capacity = capacity  # 桶的容量
        self.tokens = capacity  # 当前令牌数量
        self.last_time = time.time()

    def get_token(self):
        now = time.time()
        # 计算新产生的令牌数量
        new_tokens = (now - self.last_time) * self.rate
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.last_time = now

        if self.tokens >= 1:
            self.tokens -= 1
            return True
        else:
            return False

2. 漏桶算法

漏桶算法则是将请求放入桶中,以固定的速率处理请求。

class LeakyBucket {
    private int capacity;  // 桶的容量
    private int rate;  // 出水速率
    private int water;  // 当前水量
    private long lastTime;  // 上次处理时间

    public LeakyBucket(int capacity, int rate) {
        this.capacity = capacity;
        this.rate = rate;
        this.water = 0;
        this.lastTime = System.currentTimeMillis();
    }

    public boolean processRequest() {
        long now = System.currentTimeMillis();
        int leaked = (int) ((now - lastTime) * rate / 1000);
        water = Math.max(0, water - leaked);
        lastTime = now;

        if (water + 1 <= capacity) {
            water++;
            return true;
        } else {
            return false;
        }
    }
}

3. 计数器限流

通过简单的计数器来限制单位时间内的请求数量。

import time

class CounterLimiter:
    def __init__(self, limit, period):
        self.limit = limit  # 限制数量
        self.period = period  # 时间周期(秒)
        self.count = 0
        self.start_time = time.time()

    def allow_request(self):
        now = time.time()
        if now - self.start_time > self.period:
            self.count = 0
            self.start_time = now

        if self.count < self.limit:
            self.count += 1
            return True
        else:
            return False

4. 滑动窗口限流

将时间划分为多个小窗口,通过统计滑动窗口内的请求数量来进行限流。

5. 分布式限流

利用分布式协调组件(如 Redis)来实现跨多个节点的限流。

import redis

class RedisLimiter:
    def __init__(self, redis_client, key, limit, period):
        self.redis_client = redis_client
        self.key = key
        self.limit = limit
        self.period = period

    def allow_request(self):
        count = self.redis_client.incr(self.key)
        if count == 1:
            self.redis_client.expire(self.key, self.period)

        if count <= self.limit:
            return True
        else:
            return False

6. 基于 Guava 限流库

Guava 提供了方便易用的限流工具类。

import com.google.common.util.concurrent.RateLimiter;

RateLimiter rateLimiter = RateLimiter.create(5);  // 每秒允许 5 个请求

if (rateLimiter.tryAcquire()) {
    // 处理请求
} else {
    // 限流处理
}

不同的限流算法适用于不同的场景,需要根据实际业务需求和系统特点选择合适的限流方式。

TAGS: 限流技术应用 限流实现方式 代码示例类型 限流效果评估

欢迎使用万千站长工具!

Welcome to www.zzTool.com