Pushing Back

By: on May 5, 2016

Over the last year I’ve become more and more convinced that possibly the most important feature of any queuing system is the ability to take action immediately upon enqueuing of a new item, where the action can modify the queue, and is based on state of the queue itself. Most commonly, this is referred to as back-pressure. But back-pressure can have several different forms, suited to different scenarios.

Probably the most common form of back-pressure we’re used to is when using TCP sockets. TCP assumes an unreliable underlying communication mechanism and so has failure detection in the form of timeouts built-in. So sending a message using TCP will repeatedly send at intervals until it gets a suitable Ack from the destination, or some other response that indicates it should stop sending (e.g. connection closed or some other failure). Whilst I’m not familiar with how kernels implement this, my assumption is that there is a local buffer of finite size into which outbound messages are added. Once that buffer is full and not emptying (because no response is coming back from the destination), subsequent calls to send will have to block the application because there is no more space in the local buffer. This is back-pressure.

On the receiving side, there will be a buffer again and once the kernel has received a packet and added it to the buffer, it can send an Ack. Again, if the buffer is full, presumably because the application is processing messages too slowly, then it will have no choice but to drop the packet, not issue any Ack, and rely on the sender to resend it in due course, hopefully by which time the receiving buffer has had items removed and so the packet can now be enqueued locally.

So in this scenario, each of these buffers is a queue, and upon an enqueue, the operation may either block if there’s no room (sending side), or the item (packet) may be dropped completely (receiving side). Dropping items due to the receiving side queue being too small will cause many items to have to be re-transmitted which will hurt throughput and latency. Thus over time these buffers have often grown, which has led to its own problem, namely buffer bloat. If the buffers are too big then packets can sit in them for a long time. This can prevent back-pressure from working effectively: if your queue is growing in length rapidly then your queue consumer is obviously unable to keep up; action needs to be taken. Now if you’ve built a sophisticated system which can share the queue between several consumers and add more CPUs dynamically to try and solve the problem then maybe you can absorb the surge in workload. In lieu of that though, the only option available to you is to try and throttle your senders. If your senders become unresponsive because sends are blocking, then there’s a chance someone will notice and take some form of corrective action. Or failing that, go away.

In some cases, the value of an item decreases with the amount of time it sits in a queue, so a large buffer could contain a large number of valueless items. This could be because the user has got bored of waiting for the item to be processed so has moved on, or it could be that the item’s value was something like a sensor value which has a known period of validity and will be replaced soon anyway with the next value (though this scenario may be better served by UDP rather than TCP). If the item has been in the queue for a greater period of time than would have been necessary for the sender to timeout and resend then that alone could be sub-optimal: if the queue has O(1) enqueue and dequeue operations then this may not be a problem. But if it doesn’t have constant-time operations, then the very existence of that item in the queue is causing the queue on its own to perform worse. It’s likely it would be better in this case to drop the item, thus improving the performance of the queue, and rely on the sender re-sending.

It occurs to me that a non-O(1) queue is a bit like the problem with trying to make a rocket go into space: you have to carry your fuel with you. So to go higher you not only need more fuel to get higher, but you need more fuel to cope with the extra weight of carrying more fuel… RabbitMQ for example certainly does not do O(1) queuing: the longer the queue is the more code each new message passes through working out whether it needs to go to disk, and certain data structures each message can end up in will have O(log_2) behaviour at best. With such queues it’s very important to try and keep them empty: as soon as they become substantially non-empty (whatever that means), the queue itself can become the bottleneck in your pipeline and can fail to drive your consumers fast enough. Beyond a certain queue length, the CPU cost per message is now so high that even a reduced rate of enqueuing can leave so little CPU left over that consumers cannot be saturated. In this scenario it ends up not mattering how many extra consumers and CPUs you throw at the queue, the queue itself will only start shrinking when you’ve massively throttled your senders. If you’re building a sophisticated system (perhaps built out of a collection of very small services), determining this inflection point in the behaviour of your queues is both very hard, and fairly critical! You will want to figure out where this point is and make sure to monitor for it.

The fact that in TCP the sender re-sends on a timeout is a genius design decision. It means that when overloaded, the receiver can do the least amount of work possible (drop the packet on the floor), consuming very little CPU (and importantly the amount of CPU consumed does not increase as the queue gets longer; though really that would be nonsensical anyway – the reason we’re dropping the packet is that the queue is of finite length and full), and instead can rely on the sender committing to spend its own CPU resources re-sending the item later on. You could view this as the sender giving a lease on its CPU to the receiver until such a time as the receiver is able to locally enqueue the packet, send an Ack and so relinquish that lease. It’s important to realise this only works if the sender and receiver do not share the same CPU.

It’s also worth noting that it’s important these buffers are bounded. Trivially, all buffers are bounded anyway as all computers have a finite amount of memory in them. But more importantly, you don’t want to allow a remote sender to deny memory resources to your application otherwise denial-of-service attacks would become trivially easy. So when you can’t trust the sending party, finite and small buffers (relative to the overall amount of memory in the system) are a good idea.

So this has all mainly been about disjoint senders and receivers, unreliable communication mechanisms, and untrusted senders. Are there other scenarios?

Well yes. In an actor programming language there are many differences. You’re probably running code you trust so you don’t need to worry about your senders deliberately dossing you. Your senders and receivers are also sharing the same CPU. And communication between actors is a reliable operation: unless someone is randomly unseating RAM in your computer, it’s probably “impossible” (to the extent that anything in a computer is ever “impossible”) for a message to get lost in transit. These differences radically alter how back-pressure should work, and the design of queues themselves.

Because communication is reliable, there is no need to have timeouts and re-sending: you can be sure that the message will be received provided the receiver stays alive for long enough. I’ve never seen any actor programming language where there is the idea that a message can be lost in transit between two actors, but that doesn’t mean they don’t exist; I think working in such a language would be quite odd though. However remember that even in the TCP case, the application code calls send once; it’s the TCP stack that takes care of re-transmits when necessary. You can think of this a reliable synchronous transfer from the application to some other local agent which is going to do the sending over the network and re-transmits. This transfer blocks if the agent has no spare capacity in its buffer.

The other problem with dropping and re-sending is that now we’re sharing CPU between sender and receiver, re-sending is the worst possible thing you could do. The reason your queue is long is because your receiver can’t keep up with your sender. They’re sharing CPU anyway so you want to maximise the amount of CPU that’s available to your receiver. Dropping and re-sending is exactly the wrong thing to do in this situation: the act of re-sending is denying precious CPU cycles to your receiver.

So that seems to leave just two options for back-pressure. The first is the obvious one: queues are still bounded, and if you try to send to full queue, you block until space is made available. I’m not a fan of that unless you take additional steps to analyse your code and can prove statically it’s free from deadlocks. The danger is that two actors are simultaneously trying to send to each other and both their queues (often called mailboxes in actor languages) are full. Neither can now progress and so they’re deadlocked. If either or both of the queues were not full then this would work just fine. My fear with languages which enforce finite length queues (e.g. Go) is that unless you can prove there can never be any cycles in your messaging patterns, you can never be sure that you’ll avoid deadlocks. You are forced into picking some arbitrary random limit for your queue length and it will always be wrong for someone. You probably can’t pick the biggest number possible because in certain languages (e.g. Go) this will allocate memory when the queue is created, not on demand. Then think about trying to offer support and debugging for a piece of software built on these sorts of queues: when the queues are at their fullest, when the system is under the greatest load, when it’s most important it doesn’t fail, it may deadlock on you. Reproduction of the bug is likely going to be a nightmare.

If you want to solve this problem by just monitoring queue lengths that means you’re going to have to expose the length of every queue in the system. And don’t forget that the number of actors, and hence queues, is likely directly proportional to the number of connections to your service: it’s normal to spawn a new actor for each connection. Devops are going to love you! And then of course even if you do have all this monitoring in place, what course of action can anyone take if queues get dangerously long and you realise the probability of deadlock is high?

The other option for back-pressure in actor languages is to allow unbounded queues, so you avoid this source of deadlocks (you can always write your own deadlocks, but the language at least doesn’t help you), but you punish senders who send to long queues. That punishment takes the form of descheduling them: put them to the back of the run queue. This is my preferred solution. When queues are long and the consumer cannot keep up, senders have CPU taken away from them which then becomes available to the consumer so hopefully the consumer can catch up. As already mentioned, the previous source of deadlock goes away and this helps Devops too as now there’s no need to monitor queue lengths; instead just monitor memory usage of the whole application. This form of punishment cascades transitively too: imagine an actor reading off a network socket and sending those messages on to other queues and actors. If the actor ends up sending one of those messages to a very long queue and so gets descheduled then that reduces the rate it can read off the network socket, which makes it more likely the receiving socket buffer fills up which will throttle the sender.

Now, in the TCP case, with disjoint applications sending to each other, can they still deadlock due to full buffers? Yes, they certainly can, and you do need to watch out for that. If you happen to be working in an actor language, one way around that would be to have different actors for receiving messages off a socket and sending out of the socket. This is possibly very important for distributed databases and similar where you have multiple servers that all need to communicate with each other. In other scenarios you tend to have a client-server relationship where the client is essentially doing RPC of some form. In these cases deadlock due to full network buffers is much less likely because of the nature of the communication pattern.

Finally, consider about how appropriate (or otherwise) TCP now is for your system made up of tiny services if those services are being deployed on some virtualized environment where they’re quite possibly sharing CPU. It could be that spending CPU doing re-sends when under heavy load is not the best use of CPU.

Share

4 Comments

  1. Ian Rogers says:

    In TCP there is also the issue of the buffers in the intervening routers. IPv4 at least used to have “ICMP Source Quench” which receivers were responsible for sending out if their buffers got full, but that’s now been deprecated by rfc6633 and replaced with Explicit Congestion Notification rfc3168 (which I’d never heard about until I went looking the reference to source quenching 🙂 )

    For the shared-cpu pipeline case the “disruptor” ring-buffer pattern is useful. The queue length is finite and fixed and add and fetch are O(1), but the important part is that add is non-blocking and returns an error if the queue is full. So message drop is free and the publisher should do something smarter than a delay-retry. https://lmax-exchange.github.io/disruptor/

  2. Matthew Sackman says:

    @ian. It would be interesting to see some big applications written using disruptor and to see what actions they take on send-fail. I wonder how similar it would be to e.g. disk-write failures, which tend to just be “I’m going to panic now”. I’d love to have a crib sheet or similar for what the pros and cons of each are and when to use each.

  3. Tino Adams says:

    As the name lmax-exchange disruptor suggests and from what I understand from having watched some of Martin Thompson’s talks the pattern and library are core to the LMAX trading platform which I guess is a considerably large application.

    There are several videos and articles available online about LMAX that might address some of your questions. I found this post to be a good starting point: http://martinfowler.com/articles/lmax.html

  4. Rajiv Kurian says:

    We use ring buffers like the Disruptor project and all of them are fixed size buffers. Ours is not an actor system but more of a pipelined multi-threaded system. Since the number of pipeline stages are much fewer than the number of actors in the system, analysis of such a system is definitely easier. But here are some of the things we’ve try to do:

    1. Prefer a DAG of pipeline stages so there are no cycles and hence no chances of a deadlock. This is not possible all the time but if possible just makes life easier.

    2. If there are cyclical patterns then we try the following – have one or both of the writers in a cycle drop messages if the other’s ring buffer is full. This is a good match for a request-response type of scenario. The requester thread will retry (with a timer) if the responder thread’s buffer is full but drops the message instead of blocking. There are questions around where the requester thread will stash the request in the mean while. A general solution is to back-pressure all the way to the client. Usually it is enough if one of the threads drops. The other can block and it will lead to back-pressure but no deadlocks. This is quite similar to TCP actually.

    We have tried two other techniques – neither of which are full blown solutions but help overall:

    a. When a sender thread finds that the receiver thread’s ring-buffer is full it tries to process it’s own ring-buffer messages and then tries to enqueue messages again. The idea is that the receiver thread might itself be blocked on trying to send a message to the sender. So if you read your queue it clears up some space to unblock the receiver, which means it can now go back to its run loop and process messages and thus clear up space on its queue. Again as long as one sender does this it helps. This is not at all an easy strategy though, because processing of messages is often not re-entrant. For example processing messages leads to enqueueing more messages. So the thread(s) need to be capable of stopping in the middle of an enqueue call, freeze the state (where? and what do we do if this bounded space for frozen stacks is full?), dequeue messages and continue again. This strategy only works as a way to temporarily ease up a potential deadlock, but is clearly not a full blown solution. We do it only in the rare case that we know that our specific actor/thread processing is re-entrant safe. Again the TCP analogy is if you’re using non-blocking TCP and in your epoll loop you see that the write buffers are full, you register with the kernel so that it notifies you when the buffer has space again. Then you go back to your run loop and read from your sockets. So if you have two processes communicating via TCP and they fill up their corresponding out-buffers, they will still recover by reading their in-buffers.

    b. Another strategy that helps but does not solve the problem is to coalesce messages if possible. Again this is not a general tool. An example is that a sender is sending stock price updates to another thread. If the receiver’s buffer is full the sender can aggregate messages locally in bounded space. It needs only a constant space per symbol and can just replace the latest price locally. Additionally it needs a bit which says that this symbol needs to be flushed. When the receiver’s buffer has space again you flush the pending symbols and reset the bit. So the receiver’s end up getting the latest state, they just don’t see the intervening updates. Another example where you can use this kind of technique is say a remote sync for time-series data. A sender thread processes times series data points by storing them in in-memory tables and also sends them to a remote end for replication. Now the queue for this replication (perhaps network-buffer) might be full. The receiver can still choose to store points in memory and keep checking for when there is space in the receiver’s buffer. When there is buffer space we don’t need to send the individual points all over again (we haven’t even buffered them locally). Instead we send a digest of the points. A digest of time series data points can be a lot smaller than sending the individual points since you can factor out the common fields like time-series-ID, reduce timestamps to shorter offsets, do delta encoding on the numbers etc. This is generally applicable to any system where it is preferable to send real time updates but in case of a buffer full condition, a digest of the updates can be sent instead leading to the same final state on the receiver. It specifically helps if the digest can be a lot smaller than the sum of individual updates. Another scenario where this helps is net-code for games. You send deltas when possible, but after a certain point of time replaying lost delta packets become useless and you rather sync the state of your game (which you have maintained locally in finite space).

    We’ve found that these techniques though not at all generic do help a lot when working with finite buffers. And finite buffers have helped us with capacity planning and have improved average performance (no pointer based data structures). We regularly test our solutions by killing threads involved in a cyclical communication graph to see how our system behaves as expected. This works because we have very few threads (num_cores usually) so it is easy to analyze their communication patterns

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*