0

I have a scheduling system that is horizontally scaled, and stores shared state in a redis key.

The purpose of the system is to implement something similar to classic rate limiting, but a bit different in that I want to track the amount of currently active requests, and the sum of the "weights" of each request (as some requests are heavier than others in my case). The reason to use redis is to ensure minimal footprint of the system on requests latency.

The shared state reflects the sum of the states of all individual replicas (both active requests and sum of the weights), meaning that each replica individually increases and decreases the shared state.

Is there a known solution for fault tolerance in this scenario? If a replica suddenly crashes, it will never "clear" it's value from the shared state.

One possible solution is to develop another component which watches for replica failures, and decreases from the shared state when a replica crashes.

The problem with this solution is that the system relies upon external components to be hermetic. Is there any other way I can model my shared state to be more resilient to such problems?

5
  • 4
    Can you edit your question to include more information about the problem? It is difficult to write an answer to this question with such vague information. Commented Feb 27, 2024 at 15:34
  • 2
    Agree with Greg - need more information to be able to offer suitable advice. Why are you summing individual replica state. Why do other replicas need to know? Bear in mind that you shouldn't be relying on a cache for essential system state. It should have a proper data layer backing it (e.g. a database) - which your jobs can refer to if the data is not in the cache.
    – Phil S
    Commented Feb 27, 2024 at 15:45
  • Sure, I added more context to the question, feel free to ask further questions. Also I'm open to using different technologies or solutions than the ones I stated. Commented Feb 27, 2024 at 17:28
  • Appreciate the edit. Still don’t understand “The problem with this solution is that the system relies upon external components to be hermetic.” What’s that mean? Commented Feb 27, 2024 at 20:51
  • What I meant is that it seems like the proposed solution is not good, because it makes the scheduling system rely on a completely separate system (the component that listens to replica failures) to function correctly, while it is agnostic to it's existence. I might be wrong about this but I think it would be better to design the scheduling system so it could be fault tolerant on it's own. Commented Feb 27, 2024 at 21:26

2 Answers 2

2

implement something similar to classic rate limiting

The shared state reflects the sum of the states of all individual replicas

Here is the problem statement I heard.

You have web client Request Types A, B, ..., Z, each having a weight reflecting the typical elapsed CPU + I/O time to service the request. For example A might take 400 msec and B might take 600 msec.

We see (client_id, server_id, request_type, "begin") events arriving, and similar "end" events arriving, via Kafka or a similar pub/sub system. So we increment a server metric or a client metric for "begin", and decrement it for "end". Each event is labeled with a unique GUID, which makes a good PK for an RDBMS that logs events.

And presumably we use that for making cloud scale-up decisions, and for rejecting or queueing certain client requests.

failure behavior

During ugly events like network partition or server reboot, we choose to "fail open", so we permissively accept all client requests.

terminology

I feel you're getting hung up on the term "replica", which suggests they are all similar to one another.

It sounds like what you want is to take hash of client_id, modulo N servers, to decide which "shard" will exclusively serve that client. (This occasionally changes when servers come and go.)

Now each server can make purely local throttling decisions, without worrying about what their peers are up to.

eventually consistent

Servers log events to an RDBMS which survives reboot events.

Log records get replicated so all servers eventually learn about all history.

The duration of each event is from its "begin" to its "end" entry. If the "end" entry is missing then we impose some arbitrary timeout, so it ends after e.g. 60 seconds.

The realtime metrics maintained by redis are simply a cache of recent log records. The redis TTL is a convenient way of making ancient records gracefully age out.

recovery

When routers and servers come and go, the system propagates "recent" log entries so they are visible to most servers. A simple SELECT query can be used to restore metrics maintained on redis.


EDIT

  1. Currently the scheduling system schedules request to a few types of services, according to user request, each type of service being independent and having it's own "scheduling stats", which make it a bit harder to implement your proposed solution.

I have no idea why you'd say that.

Combine requests for Type A .. Z into a single unified log stream (single RDBMS table, with an enum request_type column). Maintain Stat A through Stat Z in realtime, and rebuild them from the log when a reboot / recovery operation happens.

Sum up Stats A + B + C in realtime when making admission decisions if e.g. the aggregate load is meaningful in the Business Domain, perhaps because server devoting CPU cycles to any of those predicts slower response times for any other requests that arrive.

Would it make sense in your opinion to deploy a separate scheduler for each type of service?

As explained, "no."

  1. Could you please clarify more the role of redis in your solution?

Events happen. There are two event sinks: the log and the redis metrics, which are essentially a cache that reflects recently logged activity. During normal operation the two are in sync. That is, each "begin" redis increment is paired with an "end" redis decrement, and the currently reported redis metric matches up with SELECT ... FROM log WHERE ... AND timestamp > sixty_seconds_ago.

To make scheduling decisions, normally you only consult redis. However, sometimes Bad Things happen, such as network partition or server failure / reboot. In which case we're left wondering: What is the Source of Truth? Simple, it's in the RDBMS. Quickly run a report with that SELECT query, re-populate the redis metrics from that, and continue on your merry way as though nothing had happened.

  1. Would you recommend implementing the hash mechanism using nginx?

Sure, that is a possibility. Typically in a High Availability situation people will use dedicated F5 load balancer equipment, roll their own HA proxies, contract with AWS for load balancing services, or do something similar. The key is that local health checks across the LAN keep the load balancer aware of server status, and upon noticing an unresponsive server the balancer will immediately stop sending it traffic. So e.g. if you had five servers and one goes belly up, the balancer might switch from hash(ip) % 5 to hash(ip) % 4 when assigning requests to a shard.

On a slower timescale, reports of "server died!" or "queues are big!" will trigger horizontal scale-out. Plan on it taking half a minute or a minute to spin up a new VM server if you didn't have any on standby.

Or should I use built-in Kubernetes components to do so?

Why are you asking random internet strangers these things? You know your own use case, cost-of-downtime, budget, and capabilities far better than I do, so you decide. K8s is a wonderful technology for scaling resilient services up and down. But it is complex, with many config knobs, and it offers less than 100% uptime. Sometimes hosts run out of RAM or suffer from config errors or versionitis, and simple restarts are not sufficient to restore user-visible service. Diagnosing the root cause can be complex. Only go the k8s route if you have sufficient in-house expertise to deal with the hiccups. Based on the current set of questions, I'm skeptical that k8s would be an excellent fit for your business needs.

Also, this means that each client can have only 1 replica to handle it, couldn't this become a bottleneck later on?

Again I have no idea why you'd say that. I hope you read the "modulo num_servers" part?

A tacit assumption here is that a single server can support the requests of at least K clients. If it's the other way around, then your question was far too vague. It did not occur to me that we'd expect to see a single client saturate the capacity of multiple servers, nor that we would wish to support such behavior.

Suppose you have a hundred concurrent clients visiting the site, and in steady state there's four servers. Now the load balancer is using hash(ip) % 4 when routing request to server. Then load increases a little, queue depth increases, and we cross a hysteresis threshold which triggers spinning up a fifth server. Everyone sees a "server five became healthy!" heartbeat and invalidates their stats, starting them anew from zero (or perhaps in your use case you prefer to let them age out in the usual way after one minute). Load balancer starts using hash(ip) % 5. Life is good, and settles down into a new steady state.

There can be a transition interval, so immediately after new server becomes available we roll a random number and 90% of the time we use "mod 4" and just 10% of the time we send the new guy traffic with "mod 5". Over the course of a minute or so we ramp up the new guy's traffic to 20% of offered load, 50%, 90%, then we stop using random, declare arrival at steady state, and just always use "mod 5".

2
  • Thank you! A few follow up questions: 1. Currently the scheduling system schedules request to a few types of services, according to user request, each type of service being independent and having it's own "scheduling stats", which make it a bit harder to implement your proposed solution. Would it make sense in your opinion to deploy a separate scheduler for each type of service? 2. Could you please clarify more the role of redis in your solution? 3. Would you recommend implementing the hash mechanism using nginx? Or should I use built-in Kubernetes components to do so? Commented Feb 28, 2024 at 7:25
  • Also, this means that each client can have only 1 replica to handle it, couldn't this become a bottleneck later on? Commented Feb 28, 2024 at 8:31
1

If a replica suddenly crashes, it will never "clear" it's value from the shared state.

One possible solution is to develop another component which watches for replica failures, and decreases from the shared state when a replica crashes.

This can be merged into the same component if, in addition to updating the shared key, each replica records a "last update time" somewhere world-readable when it does so. You can then have the central component check all the last update times and decide when a replica has gone "stale" and should have its contribution discarded.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.