六种限流的实现方式及代码示例 通俗易懂

2024-12-30 20:44:36   小编

在当今的互联网应用中,限流是一项重要的技术手段,用于保障系统的稳定性和可用性。以下将为您介绍六种限流的实现方式及代码示例,让您轻松理解。

1. 令牌桶算法

令牌桶算法的基本思想是按照固定的速率往桶中放入令牌,处理请求时从桶中获取令牌,如果桶中没有令牌则拒绝请求。

import time

class TokenBucket:
    def __init__(self, rate, capacity):
        self.rate = rate  # 令牌放入速率
        self.capacity = capacity  # 桶的容量
        self.tokens = capacity  # 当前令牌数
        self.last_time = time.time()

    def consume(self, tokens):
        now = time.time()
        self.tokens += (now - self.last_time) * self.rate
        self.tokens = min(self.tokens, self.capacity)
        self.last_time = now

        if self.tokens >= tokens:
            self.tokens -= tokens
            return True
        return False

bucket = TokenBucket(10, 100)  # 每秒 10 个令牌,桶容量 100

2. 漏桶算法

漏桶算法的原理是水(请求)先进入到漏桶里,漏桶以一定的速度出水(处理请求),当水的流入速度过大时,多余的水直接溢出(拒绝请求)。

import time

class LeakyBucket:
    def __init__(self, rate):
        self.rate = rate  # 出水速率
        self.water = 0  # 当前水量
        self.last_time = time.time()

    def consume(self, volume):
        now = time.time()
        self.water = max(0, self.water - (now - self.last_time) * self.rate)
        self.last_time = now

        if self.water + volume <= 1:
            self.water += volume
            return True
        return False

bucket = LeakyBucket(10)  # 每秒处理 10 个请求

3. 计数器限流

通过维护一个计数器,在单位时间内统计请求数量,超过阈值则进行限流。

import time

class CounterLimiter:
    def __init__(self, limit, period):
        self.limit = limit  # 限制数量
        self.period = period  # 时间周期(秒)
        self.count = 0
        self.start_time = time.time()

    def consume(self):
        now = time.time()
        if now - self.start_time > self.period:
            self.count = 0
            self.start_time = now

        if self.count < self.limit:
            self.count += 1
            return True
        return False

limiter = CounterLimiter(100, 60)  # 每分钟 100 个请求

4. 滑动窗口限流

将时间划分为多个小窗口,通过滑动窗口来统计请求数量,实现更精确的限流。

import collections

class SlidingWindowLimiter:
    def __init__(self, limit, window_size, window_count):
        self.limit = limit
        self.window_size = window_size
        self.window_count = window_count
        self.windows = collections.deque([0] * window_count)
        self.current_index = 0

    def consume(self):
        total_count = sum(self.windows)
        if total_count < self.limit:
            self.windows[self.current_index] += 1
            self.current_index = (self.current_index + 1) % self.window_count
            return True
        return False

limiter = SlidingWindowLimiter(100, 10, 6)  # 每 60 秒,每 10 秒一个窗口,限制 100 个请求

5. 分布式限流

基于分布式系统,如 Redis 来实现限流。

import redis

class RedisLimiter:
    def __init__(self, redis_client, key, limit, period):
        self.redis_client = redis_client
        self.key = key
        self.limit = limit
        self.period = period

    def consume(self):
        count = self.redis_client.incr(self.key)
        if count == 1:
            self.redis_client.expire(self.key, self.period)

        if count <= self.limit:
            return True
        return False

redis_client = redis.Redis()
limiter = RedisLimiter(redis_client, 'my_limit_key', 100, 60)

6. 基于 Guava 限流库

如果使用 Java 语言,可以借助 Guava 提供的 RateLimiter 类实现限流。

import com.google.common.util.concurrent.RateLimiter;

public class GuavaLimiterExample {
    public static void main(String[] args) {
        RateLimiter limiter = RateLimiter.create(10.0);  // 每秒 10 个许可
        if (limiter.tryAcquire()) {
            // 处理请求
        } else {
            // 限流处理
        }
    }
}

以上就是六种常见的限流实现方式及代码示例,您可以根据实际需求选择适合的限流策略来保障系统的稳定运行。

TAGS: 限流技术 限流实现方式 代码示例讲解 通俗易懂介绍

欢迎使用万千站长工具!

Welcome to www.zzTool.com