后端一个常见且比较让人头疼的问题是服务限流,没有做好限流开始导致单个服务耗时增加,进而影响上下游服务,最终可能导致整个系统被拖垮。
限流的目的是通过对并发请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队 或等待、降级。 一般使用的限流算法有漏桶(Leaky Bucket)和令牌桶(Token Bucket)。
这里有个需要注意的点,这两种算法的名称和示意图都是为了便于理解,实现时并不需要一模一样。
漏桶算法 漏桶作为计量工具(The Leaky Bucket Algorithm as a Meter )时,可以用于流量整形 (Traffic Shaping ) 和流量控制 (Traffic Policing ),漏桶算法的描述如下。
一个固定容量的漏桶,按照常量固定速率流出水滴。
如果桶是空的, 则不需流出水滴。
可以以任意速率流入水滴到漏桶。
如果流入水滴超出了桶的容量, 则流入的水滴溢出了(被丢弃)。
漏桶的关键在于漏出速度恒定,超出的流量会被丢弃,最终请求看起来会是这样,峰值完全被砍掉了,过于粗暴了,适用的场景不多。
漏桶Python实现 网上常见的一种实现是用个队列直接存储请求来模拟漏桶,其实大可不必,内存空间占用大而且效率低。
其实只需要通过一个队列记录请求时间,结合漏桶漏出速率进行计算,然后移动窗口就可以实现漏桶了。
这里的实现对漏桶进行了简化,漏桶的容量恰好等于单位时间漏出的量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 from time import time, sleepfrom collections import dequeclass LeakyBucket (object) : def __init__ (self, leaky_rate) : self.leaky_rate = float(leaky_rate) self._que = [] def size (self) : return len(self._que) def inspect_expired_items (self, time: int) : """检查已经漏出的请求 args: time: 上一个时间窗口, 默认是秒 returns: item_count: 已漏出的请求个数 remaining_time: 第一个未漏出请求的剩余时间 """ volume = self.size() item_count, remaining_time = 0 , 0 for log_idx, log_item in enumerate(self._que): if log_item > time: item_count = volume - log_idx remaining_time = log_item - time break return item_count, remaining_time def acquire (self, block=True) : """ args: block: 是否阻塞直到可以请求 returns: - 是否可以请求 """ now = time() volume = self.size() if volume >= self.leaky_rate: pre_tick = now - 1 item_count, remaining_time = self.inspect_expired_items(pre_tick) if item_count >= self.leaky_rate: if block: sleep(remaining_time) print('Bucket Full, sleep {} seconds' .format(remaining_time)) return self.acquire() return False self._que[:volume-item_count] = [] self._que.append(now) return True if __name__ == '__main__' : rate_limiter = LeakyBucket(2 ) for i in range(10 ): if rate_limiter.acquire(block=False ): print(time(), 'succ' , i) else : print(time(), 'skip' , i) sleep(0.2 )
令牌桶 令牌桶算法,是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌。
令牌桶算法的描述如下。
假设限制2r/s ,则按照500毫秒的固定速率往桶中添加令牌。
桶中最多存放b个令牌, 当桶满时,新添加的令牌被丢弃或拒绝。
当一个n个字节的数据包到达,将从桶中删除n个令牌,然后发送请求
如果桶中的令牌不足n个,则该数据包将被限流(要么丢弃, 要么在缓冲区等待)。
由于令牌放置速度恒定,取出速度不限,所以令牌桶的限流是有一定弹性的,能够接受请求的一定波动。
令牌桶Python实现 通过计算令牌桶容量和产生速率就可以实现令牌桶,并不需要真的实现“把令牌放桶里”和“取出令牌”。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 from time import time, sleepclass TokenBucket (object) : def __init__ (self, tokens, fill_rate) : self.capacity = float(tokens) self._tokens = float(tokens) self.fill_rate = float(fill_rate) self.timestamp = time() def consume (self, tokens, block=True) : assert tokens <= self.capacity, 'Attempted to consume {} tokens from a bucket with capacity {}' .format(tokens, self.capacity) if block and tokens > self.tokens: deficit = tokens - self._tokens delay = deficit / self.fill_rate print('Have {} tokens, need {}; sleeping {} seconds' .format(self._tokens, tokens, delay)) sleep(delay) if tokens <= self.tokens: self._tokens -= tokens return True else : return False @property def tokens (self) : if self._tokens < self.capacity: now = time() delta = self.fill_rate * (now - self.timestamp) self._tokens = min(self.capacity, self._tokens + delta) self.timestamp = now return self._tokens def rate_limit (data, bandwidth_or_burst, steady_state_bandwidth=None) : bandwidth = steady_state_bandwidth or bandwidth_or_burst rate_limiter = TokenBucket(bandwidth_or_burst, bandwidth) for thing in data: rate_limiter.consume(len(str(thing))) yield thing if __name__ == '__main__' : stream = rate_limit(range(10 ), 5 , 1 ) for i in stream: print(time(), i)
注意:以上算法都是单机单线程的实现,如果需要多个机器限流则需要将桶的状态通过redis等外部服务来存储。
令牌桶和漏桶算法对比
令牌桶是按照固定速率往桶中添加令牌 ,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时,则拒绝新的请求。
漏桶则是按照固定速率流出请求 ,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝。
令牌桶限制的是平均流入速率(允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,或4个令牌), 并允许一定程度的突发流量。
漏桶限制的是常量流出速率(即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2) , 从而平滑突发流入速率。
令牌桶允许一定程度的突发,而漏桶主要目的是平滑流入速率。
参考