How to build a rate-limiting API with Observables?

I would like to create a simple Calculator service that has a single method to add numbers. This Add method should be async and has to limit the number of concurrent calls being made at a given time. For instance, no more than 5 concurrent calls per second. If the rate limit is exceeded, the call should throw an exception.

The class should be like:

public class RateLimitingCalculator { public async Task<int> Add(int a, int b) { //... } }

Any ideas? I would like implement it with Reactive Extensions, but if it's better to use another strategy, I would stick to it! Thanks!!


I don't think using Rx makes sense here, unless you can rewrite your method into something like public IObservable<int> Add(IObservable<Tuple<int, int>> values), as suggested by Enigmativity in a comment.

What I would do is to separate the concern of rate limiting into a separate class. That way, your code could look something like this:

public class RateLimitingCalculator { private RateLimiter rateLimiter = new RateLimiter(5, TimeSpan.FromSeconds(1)); public async Task<int> Add(int a, int b) { rateLimiter.ThrowIfRateExceeded(); //... } }

The implementation of RateLimiter depends on your exact requirements, but a very simple, not-thread-safe version could look like this:

class RateLimiter { private readonly int rate; private readonly TimeSpan perTime; private DateTime secondStart = DateTime.MinValue; private int count = 0; public RateLimiter(int rate, TimeSpan perTime) { this.rate = rate; this.perTime = perTime; } public void ThrowIfRateExceeded() { var now = DateTime.UtcNow; if (now - secondStart > perTime) { secondStart = now; count = 1; return; } if (count >= rate) throw new RateLimitExceededException(); count++; } }


