now = time() volume = self.size() if volume >= self.leaky_rate: # 容量已满需要清理已漏出请求 pre_tick = now - 1# 上一个时间窗口, 默认单位为秒 item_count, remaining_time = self.inspect_expired_items(pre_tick) if item_count >= self.leaky_rate: # 上个周期请求已超限 if block: sleep(remaining_time) # 等到出现一个空位 print('Bucket Full, sleep {} seconds'.format(remaining_time)) returnself.acquire() # 再次尝试 returnFalse self._que[:volume-item_count] = [] # 清除上个周期数据 self._que.append(now) returnTrue
if __name__ == '__main__': rate_limiter = LeakyBucket(2) for i inrange(10): if rate_limiter.acquire(block=False): print(time(), 'succ', i) else: print(time(), 'skip', i) sleep(0.2)
defconsume(self, tokens, block=True): assert tokens <= self.capacity, 'Attempted to consume {} tokens from a bucket with capacity {}'.format(tokens, self.capacity)
if block and tokens > self.tokens: deficit = tokens - self._tokens delay = deficit / self.fill_rate print('Have {} tokens, need {}; sleeping {} seconds'.format(self._tokens, tokens, delay)) sleep(delay)
if tokens <= self.tokens: self._tokens -= tokens returnTrue else: returnFalse