Spring Cloud Gateway 分布式限流

前文我们介绍过通过Guava中的RateLimiter进行限流,但Guava中的工具只适用同一JVM中的操作,想要在分布式环境中进行接口限流我们还需要引入中间件,本文介绍基于Redis的接口限流。

Spring Cloud Gateway

Spring Cloud Gateway为Spring官方基于Spring 5.0、Spring Boot2.0和Project Reactor等技术开发的网关,限流作为网关应用中常用功能,Gateway也提供了一个基于Redis的分布式过滤器,简单配置,即可在网关层进行流量限制。

注:本文的Spring Cloud 版本为Greenwich.SR1

Spring Cloud Gateway的路由配置支持配置文件和Java代码配置,为了直观展示源码,本文使用Java代码编写路由。

  • RedisRateLimiter 基于Redis的过滤器
  • setBurstCapacity() 设置令牌桶的容量
  • setReplenishRate() 设置每秒可取的令牌数量
  • setKeyResolver(pathKeyResolver) 配置过滤key的策略,本文使用路径作为过滤的key
@Component
public class PathKeyResolver implements KeyResolver {
    @Override
    public Mono<String> resolve(ServerWebExchange exchange) {
        return Mono.just(exchange.getRequest().getPath().toString());
    }
}

@Configuration
public class RouteLocatorConfig {

    @Autowired
    private PathKeyResolver pathKeyResolver;

    @Bean
    public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) {
        return builder.routes()
                .route("admin", r -> r.path("/admin/**")
                        .filters(f -> f.requestRateLimiter()
                                .rateLimiter(RedisRateLimiter.class, c -> c.setBurstCapacity(1).setReplenishRate(1))
                                .configure(c -> c.setKeyResolver(pathKeyResolver))
                        )
                        .uri("lb://ADMIN")
                )
                .build();
    }

}

RouteLocator配置完成后限流器就可以正常使用了,每个路径符合/admin/**的请求都会经过RequestRateLimiterGatewayFilterFactoryapply()方法。

@Override
public GatewayFilter apply(Config config) {
   // 取过滤key的策略,我们这里使用上文自定义的 PathKeyResolver
   KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);
   // 限流器,我们这里使用 RedisRateLimiter
   RateLimiter<Object> limiter = getOrDefault(config.rateLimiter,
         defaultRateLimiter);
   // 解析的key为空时是否拦截 默认拦截
   boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
   HttpStatusHolder emptyKeyStatus = HttpStatusHolder
         .parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));

   return (exchange, chain) -> {
      Route route = exchange
            .getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
	 // 根据我们写的过滤策略计算出key
      return resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -> {
         if (EMPTY_KEY.equals(key)) {
            if (denyEmpty) {
               setResponseStatus(exchange, emptyKeyStatus);
               return exchange.getResponse().setComplete();
            }
            return chain.filter(exchange);
         }
         // key 不为空时走限流逻辑,这里使用的RateLimiter的isAllowed()方法
         return limiter.isAllowed(route.getId(), key).flatMap(response -> {

            for (Map.Entry<String, String> header : response.getHeaders()
                  .entrySet()) {
               exchange.getResponse().getHeaders().add(header.getKey(),
                     header.getValue());
            }
 
            if (response.isAllowed()) {
               return chain.filter(exchange);
            }

            setResponseStatus(exchange, config.getStatusCode());
            return exchange.getResponse().setComplete();
         });
      });
   };
}

resolver.resolve(exchange)实际调用RedisRateLimiter限流器的isAllowed方法

@Override
@SuppressWarnings("unchecked")
public Mono<Response> isAllowed(String routeId, String id) {
   if (!this.initialized.get()) {
      throw new IllegalStateException("RedisRateLimiter is not initialized");
   }

   Config routeConfig = loadConfiguration(routeId);

   // How many requests per second do you want a user to be allowed to do?
    // 每秒可取多少令牌
   int replenishRate = routeConfig.getReplenishRate();

   // How much bursting do you want to allow?
    // 令牌桶的容量
   int burstCapacity = routeConfig.getBurstCapacity();

   try {
       // 生成redis的key,request_rate_limiter.{id}.tokens和request_rate_limiter.{id}.timestamp
      List<String> keys = getKeys(id);

      // The arguments to the LUA script. time() returns unixtime in seconds.
      List<String> scriptArgs = Arrays.asList(replenishRate + "",
            burstCapacity + "", Instant.now().getEpochSecond() + "", "1");
      // allowed, tokens_left = redis.eval(SCRIPT, keys, args)
       // redis中支持lua脚本原子性执行,该脚本在Bean初始化时装配进来
      Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys,
            scriptArgs);
      // .log("redisratelimiter", Level.FINER);
      return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
            .reduce(new ArrayList<Long>(), (longs, l) -> {
               longs.addAll(l);
               return longs;
            }).map(results -> {
          		// 	根据lua脚本返回值判断是否允许通过
               boolean allowed = results.get(0) == 1L;
               Long tokensLeft = results.get(1);

               Response response = new Response(allowed,
                     getHeaders(routeConfig, tokensLeft));

               if (log.isDebugEnabled()) {
                  log.debug("response: " + response);
               }
               return response;
            });
   }
   catch (Exception e) {
      /*
       * We don't want a hard dependency on Redis to allow traffic. Make sure to set
       * an alert so you know if this is happening too much. Stripe's observed
       * failure rate is 0.01%.
       */
      log.error("Error determining if user allowed from redis", e);
   }
   return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
}

this.script为Bean初始化时装配进来,GatewayRedisAutoConfiguration#redisRequestRateLimiterScript将lua脚本从文件中读取并注册成Bean

@Bean
@SuppressWarnings("unchecked")
public RedisScript redisRequestRateLimiterScript() {
   DefaultRedisScript redisScript = new DefaultRedisScript<>();
   redisScript.setScriptSource(new ResourceScriptSource(
         new ClassPathResource("META-INF/scripts/request_rate_limiter.lua")));
   redisScript.setResultType(List.class);
   return redisScript;
}

request_rate_limiter.lua中的代码,与前文所讲的Guava中的RateLimiter实现大同小异

local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)

local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local fill_time = capacity/rate
-- 计算超时时间
local ttl = math.floor(fill_time*2)

--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)

local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
  last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)
-- 最后刷新时间
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
  last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)
-- 和上次请求的时间差
local delta = math.max(0, now-last_refreshed)
-- 计算令牌桶中token数量
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
-- 桶中令牌数量是否足够
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
  -- 桶中令牌被取走
  new_tokens = filled_tokens - requested
  allowed_num = 1
end

--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)
-- 剩余令牌数量
redis.call("setex", tokens_key, ttl, new_tokens)
-- 刷新时间设为当前
redis.call("setex", timestamp_key, ttl, now)

return { allowed_num, new_tokens }

Redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。本文使用Redisson提供的RRateLimiter实现分布式限流。

public class RRateLimiterDemo {
	
    private static RRateLimiter rateLimiter = RedisClient.INSTANCE.getRateLimiter("rate_limiter");

    static {
        rateLimiter.trySetRate(RateType.PER_CLIENT, 2, 1, RateIntervalUnit.SECONDS);
    }


    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(100);
        // 5个线程同时调用,模拟多个请求同时访问
        IntStream.rangeClosed(1, 5).forEach(e -> {
            executorService.submit(() -> acquire(e));
        });
    }

    private static void acquire(int i) {
        // acquire方法默认取一个令牌,桶中令牌不够的话会阻塞线程
        rateLimiter.acquire();
        System.out.printf("当前系统时间:%s\n", LocalDateTime.now());
    }
}

静态代码块中为设置限流器的速率,每秒生成两个令牌。下面是控制台打印的结果,可以看出来时间间隔和预想的一致。

该限流器也是通过lua脚本实现限流,实现逻辑与之前介绍的大同小异,我就不做过多展示了。

当前系统时间:2020-04-08T16:49:40.425
当前系统时间:2020-04-08T16:49:40.425
当前系统时间:2020-04-08T16:49:41.420
当前系统时间:2020-04-08T16:49:41.421
当前系统时间:2020-04-08T16:49:42.422

结语

Redis限流器不仅仅今天介绍的这两种,分布式限流器也不限于Redis,但通过这两篇介绍限流器的文章可以发现他们的原理都是相通的,我们了解其中的思想即可。


Spring Cloud Gateway 分布式限流
https://blog.yjll.blog/post/c7e397c2.html
作者
简斋
发布于
2020年3月25日
许可协议