preface

In the last article, I explained that complex traffic limiting scenarios can implement their own traffic limiting policies by extending RedisRateLimiter.

Let’s assume your boss has assigned you a task. The specific requirements are as follows:

  • Perform traffic limiting for specific interfaces
  • The force of traffic limiting on different interfaces can be different
  • You can dynamically adjust traffic limiting configurations to take effect in real time

If you were given the task above, how would you design + implement it?

Everyone views problems from different perspectives, and naturally comes up with different solutions. Just as the saying goes, all roads lead to Rome, and the road that leads to the destination is a good one.

How to analyze requirements

Below I give my way of implementation, only for your reference, Daniel please ignore.

Specific problems specific analysis, according to the demand point, respectively to do the analysis.

Requirement 1 “How to limit traffic for a specific interface” was also described in the previous article. The KeyResolver only needs to return the URI of the interface, so that the dimension of limiting traffic is to limit traffic for this interface.

Requirement 2 “The force of flow limiting on different interfaces can be different” is obviously not realized through configuration, and the replenishRate and burstCapacity in configuration are configured dead. If the replenishRate and burstCapacity are to be made dynamic, they must be realized by expanding RedisRateLimiter.

The configuration list is the traffic limiting value of each interface. With this configuration we can get the limiting value of the requested interface.

Requirement three “can dynamically adjust the flow limiting configuration, real-time effect” this is also easier, whether you are to save the file, save the database, save the cache as long as every time to read, must be real-time effect, but the performance problem we have to consider ah.

Save files, read files, IO consumption, mainly is not convenient to modify the storage database, can be modified through the Web interface, also can directly change the database, every time to query, performance can not save distributed cache (REDis), performance has improved than the database

By contrast, caching is definitely the best solution. Is there a better solution? Yes, combined with the configuration center to do, I use my own configuration center (github.com/yinjihuan/s…

The advantage of the configuration center is that it is originally used to store configuration. Configuration is loaded when the project is started, and updates are pushed when there are changes. Each read is in the local object, with good performance.

We can start coding after we have the solution, but have you thought about how to initialize the limiting values of so many interfaces? Manually add them one by one?

Different service maintenance groups are different, of course, there may be a group maintenance, from the point of view of the designer, we should give the right to set the user, to our interface developers, each interface can withstand how many concurrent users decide, your responsibility is to limit traffic in the gateway. Of course, the specific limit in the company is not necessarily determined by the developer, this is based on the pressure test results, to make the best adjustment.

No more talking. – Start jerking off

First we define our own RedisRateLimiter, copy the source code and modify it slightly, here only paste the core code.

public class CustomRedisRateLimiter extends AbstractRateLimiter<CustomRedisRateLimiter.Config>
		implements ApplicationContextAware {

	public static final String CONFIGURATION_PROPERTY_NAME = "custom-redis-rate-limiter";
	public static final String REDIS_SCRIPT_NAME = "redisRequestRateLimiterScript";
	public static final String REMAINING_HEADER = "X-RateLimit-Remaining";
	public static final String REPLENISH_RATE_HEADER = "X-RateLimit-Replenish-Rate";
	public static final String BURST_CAPACITY_HEADER = "X-RateLimit-Burst-Capacity";

	public CustomRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate, RedisScript<List<Long>> script,
			Validator validator) {
		super(Config.class, CONFIGURATION_PROPERTY_NAME, validator);
		this.redisTemplate = redisTemplate;
		this.script = script;
		initialized.compareAndSet(false.true); } public CustomRedisRateLimiter(int defaultReplenishRate, int defaultBurstCapacity) { super(Config.class, CONFIGURATION_PROPERTY_NAME, null); this.defaultConfig = new Config().setReplenishRate(defaultReplenishRate).setBurstCapacity(defaultBurstCapacity); } // Traffic limiting configuration private RateLimitConf RateLimitConf; @Override @SuppressWarnings("unchecked")
	public void setApplicationContext(ApplicationContext Context) throws BeansException {// Load configuration this.rateLimitConf = context.getBean(RateLimitConf.class); } /** * This uses a basic token bucket algorithm and relies on the fact that * Redis scripts execute atomically. No other operations can run between * fetching the count and writing the new count. */ @Override @SuppressWarnings("unchecked")
	public Mono<Response> isAllowed(String routeId, String id) {
		if(! this.initialized.get()) { throw new IllegalStateException("RedisRateLimiter is not initialized");
		}

		//Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig);
		
		if (rateLimitConf == null) {
			throw new IllegalArgumentException("No Configuration found for route "+ routeId); } Map<String,Integer> routeConfig = rateLimitConf.getLimitMap(); // The Key format is the service name. Type String replenishRateKey = routeId +"." + id + ".replenishRate";
		int replenishRate = routeConfig.get(replenishRateKey) == null ? routeConfig.get("default.replenishRate") : routeConfig.get(replenishRateKey);
		
		String burstCapacityKey = routeId + "." + id + ".burstCapacity";
		int burstCapacity = routeConfig.get(burstCapacityKey) == null ? routeConfig.get("default.burstCapacity") : routeConfig.get(burstCapacityKey);
			
		try {
			List<String> keys = getKeys(id);

			// The arguments to the LUA script. time() returns unixtime in
			// seconds.
			List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
					Instant.now().getEpochSecond() + ""."1");
			// allowed, tokens_left = redis.eval(SCRIPT, keys, args)
			Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
			// .log("redisratelimiter", Level.FINER);
			return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
					.reduce(new ArrayList<Long>(), (longs, l) -> {
						longs.addAll(l);
						return longs;
					}).map(results -> {
						boolean allowed = results.get(0) == 1L;
						Long tokensLeft = results.get(1);

						Response response = new Response(allowed, getHeaders(replenishRate, burstCapacity, tokensLeft));

						if (log.isDebugEnabled()) {
							log.debug("response: " + response);
						}
						return response;
					});
		} catch (Exception e) {
			/*
			 * We don't want a hard dependency on Redis to allow traffic. Make * sure to set an alert so you know if this is happening too much. * Stripe'S Observed failure rate is 0.01%. */ log.error("Error determining if user allowed from redis", e);
		}
		return Mono.just(new Response(true, getHeaders(replenishRate, burstCapacity, -1L)));
	}

	public HashMap<String, String> getHeaders(Integer replenishRate, Integer burstCapacity, Long tokensLeft) {
		HashMap<String, String> headers = new HashMap<>();
		headers.put(this.remainingHeader, tokensLeft.toString());
		headers.put(this.replenishRateHeader, String.valueOf(replenishRate));
		headers.put(this.burstCapacityHeader, String.valueOf(burstCapacity));
		returnheaders; }}Copy the code

We need to load our configuration class in setApplicationContext. The definition of the configuration class is as follows:

@CxytianDiConf(system="fangjia-gateway") public class RateLimitConf {// Stream limiting @conffield (value ="limitMap")
	private Map<String, Integer> limitMap = new HashMap<String, Integer>(){{
		put("default.replenishRate", 100);
		put("default.burstCapacity", 1000);
	}};
	public void setLimitMap(Map<String, Integer> limitMap) {
		this.limitMap = limitMap;
	}
	public Map<String, Integer> getLimitMap() {
		return limitMap; }}Copy the code

The traffic limiting information of all interfaces is stored in the map and has default values. If no corresponding configuration is available, the default value is used to limit traffic on the interface.

In the isAllowed method, enter the service name. The interface URi. type ‘forms a Key that is used to fetch the corresponding value in the Map.

The function of the replenishRate type is mainly to distinguish the replenishRate and burstCapacity values.

Next, configure CustomRedisRateLimiter:


@Bean
@Primary
public CustomRedisRateLimiter customRedisRateLimiter(
                  ReactiveRedisTemplate<String, String> redisTemplate,	 
                  @Qualifier(CustomRedisRateLimiter.REDIS_SCRIPT_NAME)  RedisScript<List<Long>> redisScript,
		          Validator validator) {
	return new CustomRedisRateLimiter(redisTemplate, redisScript, validator);
}
Copy the code

Now that the gateway logic has been implemented, we need to customize the annotations in the specific service, and then initialize the flow limiting parameters to our configuration center.

Custom annotation

@target (ElementType.METHOD) @Retention(retentionPolicy.runtime) @documented Public @interface ApiRateLimit {/** * @return*/ int replenishRate() default 100; /** ** volume * @return
	 */
	int burstCapacity() default 1000;
	
}

Copy the code

Start listeners, read annotations, and initialize the configuration

/ * * * initialization API gateway to the limit of concurrent API * @ author yinjihuan * * / public class InitGatewayApiLimitRateListener implements ApplicationListener<ApplicationReadyEvent> {// Controller package path private String controllerPath; private RateLimitConf rateLimitConf; private ConfInit confInit; private String applicationName; public InitGatewayApiLimitRateListener(String controllerPath) { this.controllerPath = controllerPath; } @Override public void onApplicationEvent(ApplicationReadyEvent event) { this.rateLimitConf = event.getApplicationContext().getBean(RateLimitConf.class); this.confInit = event.getApplicationContext().getBean(ConfInit.class); this.applicationName = event.getApplicationContext().getEnvironment().getProperty("spring.application.name");
		try {
			initLimitRateAPI();
		} catch (Exception e) {
			throw new RuntimeException("Initialize API exceptions that require concurrency restrictions", e); @throws IOException * @throws ClassNotFoundException */ private void initLimitRateAPI() throws IOException, ClassNotFoundException { Map<String, Integer>limitMap = rateLimitConf.getLimitMap();
		ClasspathPackageScannerUtils scan = new ClasspathPackageScannerUtils(this.controllerPath);
		List<String> classList = scan.getFullyQualifiedClassNameList();
		for(String clazz : classList) { Class<? > clz = Class.forName(clazz);if(! clz.isAnnotationPresent(RestController.class)) {continue;
			}
			Method[] methods = clz.getDeclaredMethods();
			for (Method method : methods) {
				if(method.isAnnotationPresent(ApiRateLimit.class)) { ApiRateLimit apiRateLimit = method.getAnnotation(ApiRateLimit.class);  String replenishRateKey = applicationName +"." + getApiUri(clz, method) + ".replenishRate";
					String burstCapacityKey = applicationName + "." + getApiUri(clz, method) + ".burstCapacity";
					limitMap.put(replenishRateKey, apiRateLimit.replenishRate());
					limitMap.put(burstCapacityKey, apiRateLimit.burstCapacity());
				}
			}
		}
		rateLimitConf.setLimitMap(limitMap); Init (rateLimitConf); // Initialize the value to the configuration center confinit. init(rateLimitConf); } private String getApiUri(Class<? > clz, Method method) { StringBuilder uri = new StringBuilder(); uri.append(clz.getAnnotation(RequestMapping.class).value()[0]);if (method.isAnnotationPresent(GetMapping.class)) {
	            uri.append(method.getAnnotation(GetMapping.class).value()[0]);
	        } else if (method.isAnnotationPresent(PostMapping.class)) {
	            uri.append(method.getAnnotation(PostMapping.class).value()[0]);
	        } else if (method.isAnnotationPresent(RequestMapping.class)) {
	            uri.append(method.getAnnotation(RequestMapping.class).value()[0]);
	        }
	        returnuri.toString(); }}Copy the code

Configuring listeners

SpringApplication application = new SpringApplication(FshHouseServiceApplication.class);
application.addListeners(new InitGatewayApiLimitRateListener("com.fangjia.fsh.house.controller"));
context = application.run(args);
Copy the code

Finally, it is easy to use, just need to add annotations

@ApiRateLimit(replenishRate=10, burstCapacity=100)
@GetMapping("/data")
public HouseInfo getData(@RequestParam("name") String name) {
	return new HouseInfo(1L, "Shanghai"."Hongkou"."East Body Community");
}
Copy the code

I just give you a way to realize the idea, maybe you have a better plan.

I think as long as every developer doesn’t have to worry about this kind of non-business functionality, that’s fine, just get rid of it at the framework level. Of course, the realization of the principle can be shared with you, will use very good, both will use and understand the principle of that is even better.