Rate Limiting in rxjs

Rate Limiting in rxjs
Rate Limiting in rxjs

I saw a question on StackOverflow recently that got me thinking. How do you rate limit HTTP calls with rxjs? Some APIs have quotas where you can only make a certain number of requests in a given time period, for example, 5 requests in 1 second. Throughout this post, I’ll stay with the 5 requests/second example. These types of quotas generally use sliding windows to count requests. So, if we want to make 10 requests, and we want to send all of them, we need to throttle at least some of them or we will hit the quota and get a nice but not so desirable error. How should we approach this? Read this blog post and find out more about rate limiting in rxjs.

What does rxjs offer?

Solving problems with rxjs feels like playing with LEGO. We connect operators to each other until they do what we want. Rxjs offers a lot of operators that could be useful for this scenario, but we need to be careful because some of them aren’t lossless. For example, the throttleTime operator only lets through one value at a time after a specified duration has passed; the rest of the events are discarded. Discarding them is not helpful. We want to send all the requests; we just want some requests to be delayed a little.

Delaying values

Luckily, rxjs has a delay operator that emits the given value after a specified amount of time. With this, we can calculate how much time a request needs to wait until it can be sent. The first request can be sent immediately. We can distribute the requests evenly in the time period by delaying the second request by 200ms, and the third request by 400ms and so on. This way, we need to track how much we need to delay a request, increasing the delay if a request is sent, but decreasing it as time passes. Then we can be sure that we won’t send more requests than we’re allowed. However, when we send only 3 requests at a time, none of them should be delayed. We could send the requests in short bursts; we don’t necessarily need to distribute them evenly over time.

Using a token bucket

Another approach is to use a token bucket algorithm. Let’s say we have 5 tokens because we can send 5 requests. When we send a request, we consume a token. We can only send a request when we have a token. A consumed token can be reused after a specified duration; in our example, used tokens can be reused after 1 second. This way, when we have tokens, we can make short bursts, but we won’t be able to send requests over the quota.

Creating a rate-limiting rxjs operator

Let’s create an rxjs operator that emits values based on the described token bucket algorithm. The outline of the operator should look something like this:

You can specify how many values can be emitted, and how big the sliding window is. The scheduler parameter is useful when testing the operator. Now we need tokens, and a way to consume and renew them.

So, we have a certain number of tokens. We need the tokenChanged to know when a token is consumed or renewed. It is a BehaviorSubject so it will emit the last value when something subscribes to it. The consumeToken function decreases the number of tokens, and tells tokenChanged to tell everyone about it; renewToken is the same, but it increases the number of tokens. With the filter operator, we create the availableTokens Observable that only emits values if there are tokens we can use. Now we need to apply this to the values coming from thisObservable. So, for every value, we need to wait for an available token, then take 1 token and consume it, then emit the value, and after the slidingWindowTime has passed, renew the token. It looks like this:

Here, the take(1) operator makes sure that we only consume 1 token. With the map operator, we can emit the value we want, consume a token, and set up a timer to renew that token. The result of this code snippet is an Observable that will sometimes emit a single value. The one thing left to do is to collect these Observables into a single Observable. We can do this by using the mergeMap operator, that collects our Observables and emits the values coming from them. With everything together, the operator looks like this:

Testing the rate limit operator

Unit testing of asynchronous code can be intimidating, but rxjs has a thing called marble tests. A marble is a string like ‘a–b‘, where a and b represent emitted values, and – represent the passage of time, so the example marble means that the Observable initially emits value a, and after some time it emits value b. Short and sweet. An example test would look like this:

Here, we create a TestScheduler to emulate the passage of time. We create a source marble and another marble to assert the result. We can see that the fand g values are expected to be emitted at a later point in time because they don’t fit in the window.

Conclusions

Rxjs is a powerful way to handle asynchronous operations. Personally, I love the possibility of using the existing operators as a building block to create more complex operators that satisfy my needs. I also like that it’s very easy to write tests for it with marbles. This is, of course just my opinion. There are other, maybe better, solutions out there. Try it yourself. If you’d like to fiddle with the code, you can find it here.