Internal flow control within message brokers

Implementing a quality message broker involves quite a bit of subtlety. Dealing with high-speed message publication, where either the server, downstream consumers, or both can become overloaded can be especially challenging.

These were problems that faced me when I was putting together the first versions of the RabbitMQ server, and the same issues crop up in servers that speak the Syndicate network protocol, too.

I recently came up with a really interesting and apparently effective approach to internal flow control while working on syndicate-server.

The idea is to associate all activity with an account that records how much outstanding work the server has left to do to fully complete processing of that activity.

  • Actors reading from e.g. network sockets call an ensure_clear_funds function on their associated Accounts. This will suspend the reader until enough of the account’s “debt” has been “cleared”. This will in turn cause the TCP socket’s read buffers to fill up and the TCP window to close, which will throttle upstream senders.

  • Every time an actor (in particular, actors triggered by data from sockets) sends an event to some other actor, a LoanedItem is constructed which “borrows” some credit from the sending actor’s nominated account.

  • Every time a LoanedItem is completely processed (in the Rust implementation, when it is dropped), its cost is “repaid” to its associated account.

  • But crucially, when an actor is responding to an event by sending more events, it charges the sent events to the account associated with the triggering event. This lets the server automatically account for fan-out of events.1

Example. Imagine an actor A receiving publications from a TCP/IP socket. If it ever “owes” more than, say, 5 units of cost on its account, it stops reading from its socket until its debt decreases. Each message it forwards on to another actor costs it 1 unit. Say a given incoming message M is routed to a dataspace actor D (thereby charging A’s account 1 unit), where it results in nine outbound events M′ to peer actors O1···O9.

Then, when D receives M, 1 unit is repaid to A’s account. When D sends M′ on to each of O1···O9, 1 unit is charged to A’s account, resulting in a total of 9 units charged. At this point in time, A’s account has had net +1−1+9 = 9 units withdrawn from it as a result of M’s processing.

Imagine now that all of O1···O9 are busy with other work. Then, next time around A’s main loop, A notices that its outstanding debt is higher than its configured threshold, and stops reading from its socket. As each of O1···O9 eventually gets around to processing its copy of M′, it repays the associated 1 unit to A’s account.2 Eventually, A’s account drops below the threshold, A is woken up, and it resumes reading from its socket.  □

Anecdotally, this appears to work well. Experimenting with producers sending as quickly as they can, producers are throttled by the server, and the server seems stable even though its consumers are not able to keep up with the unthrottled send rate of each producer.

It’ll be interesting to see how it does with other workloads!

  1. A corollary to this is that, for each event internal to the system, you can potentially identify the “ultimate cause” of the event: namely, the actor owning the associated account. 

  2. Of course, if O1, say, sends more events internally as a result of receiving M′, more units will be charged to A’s account!