Skip to content

Throughput limitations for state store committer #4525

Open
@gaffer01

Description

@gaffer01

NB Current testing shows that the throughput is acceptable for the expected parameters in which Sleeper should operate.

The current state store is based on a transaction log stored in DynamoDB. To avoid contention when inserting the next item in the log we apply inserts in a lambda that is triggered by a SQS FIFO queue. Updates to the state store are made asynchronously by sending transactions to this FIFO queue.

The table id is used as the message group id when transactions are put on the queue. This means that only one transaction for a Sleeper table can be being processed at a time. This avoids contention. In order to reduce the number of transactions submitted to the log, some parts of the system create large batches of updates, e.g. commits of compaction jobs are sent to a queue and then batched up before being processed by a lambda, this lambda splits the messages by table and sends separate messages for each table. This has the effect of reducing the usefulness of the batching.

When an SQS FIFO queue is triggering multiple lambda instances, with different message group ids being processed simultaneously, there is no stickiness in terms of which lambda instance processes a message for a table, e.g. lambda 1 may process a message for table 1 whilst lambda 2 is processing a message for table 2, when these have finished lambda 1 may receive a message for table 2, and lambda 2 may receive a message for table 1. This means that lambda 1 has to go to the DynamoDB transaction log to update its view of the state for table 2, and lambda 2 has to update its view of the state for table 1. This reduces the overall throughput. See the following issue for more discussion of the throughput limits when there are multiple active tables:

If we were prepared to run cdk when adding a new table, then each table could have its own state store, commit queue and lambda. However, there may be limits on the number of tables that could be supported that way and it would mean that adding a table would take several minutes.

Overall, there are several fundamental limits to the throughput of the state store updates. Batching helps considerably but eventually if the batches get too large then they will take longer to process so there is still a limit.

There are several possible mitigations:

  • Instead of having 1 FIFO queue of state store transactions for all Sleeper tables, we could have N FIFO queues. Updates for a particular Sleeper table would always be sent to the same queue, e.g. by hashing the table id modulo N. Each queue would trigger its own lambda. All lambdas would write to the same DynamoDB table (which allows varying N over time). This would cause an increase in throughput for updates across multiple tables (at least where those tables do not hash to the same value modulo N). A similar idea would be to have multiple compaction commit queues, so that the batching of commits from them would be more likely to have messages for just one table. This would increase the size of the batches applied and therefore reduce the overall number of transactions that need to be applied.
  • Instead of processing the updates in lambdas, have a single long-running process that is responsible for updating the transaction log for all Sleeper tables. Internally this could pull a message off the FIFO queue and pass that to a thread that is responsible for a particular Sleeper table. This would help increase the throughput as there would no longer be the switching problem where one lambda instance receives an update for one table, and the next update for that table goes to a different lambda instance meaning that it needs to update its state from the transaction log. This is discussed separately here: Long running service maintaining a view of table state for queries #4215
  • Further increase of batching. We already perform significant batching of updates to the transaction log, e.g. the commit requests when compaction jobs finish. There may be possibilities of increasing the batching, although bigger batches take longer to process so this may only have limited impact.
  • Backpressure - if the commit queue has unprocessed messages for a sustained period then processes such as the creation of compaction jobs should probably wait before running. This may not be easy to achieve as SQS does not give very reliable estimates of the backlog on the queue.
  • Instead of having one transaction log per Sleeper table, we could split the partitions in the table up at a high level, e.g. split the partition tree at the level where there are 16 subtrees. Each of those would have its own transaction log. This poses its own challenges though, e.g. it becomes more expensive to retrieve the entire state of the table. This discussed separately here: Table state partitioning #4395.
  • To avoid needing to put the updates through a FIFO queue we could insert the transactions directly into DynamoDB with a sort key that consists of the timestamp followed by a short unique id. If the timestamp was accurate then this would result in transactions in the correct order. Sleeper does not need two transactions that are made at almost the same time to be correctly ordered as some databases do to resolve conflicts; we just need the ordering to be roughly correct (e.g. we want a file to be added before a compaction job is assigned). In practice clocks should be accurate to roughly a millisecond, and AWS offers clocks which are accurate to a small number of microseconds (see https://aws.amazon.com/blogs/compute/its-about-time-microsecond-accurate-clocks-on-amazon-ec2-instances/). It is unclear how we would know when all transactions within a time window had been written.
  • Replace SQS FIFO with a queue that allows us to directly pull all the information from a single table. This would allow us to have one long running process per Sleeper table which could constantly pull the information from a single table.
  • Apply back pressure to parts of the system that are sending updates to the state store commit queue. This would cause other parts of the system to slow down. This might mean that they produced larger batch updates which would mean the overall throughput might increase. It might also mean that they could not keep up with the rate of updates they need to perform their function. This is discussed more in Failure handling / backpressure for state store updates #4396.

There are approaches that involve replacing the current state store with something that might allow greater throughput:

  • The current approach of using a FIFO SQS queue and a Dynamo table in which entries for a Sleeper table are written with a sort key which is a counter effectively has two ordered list of updates: the FIFO queue has an ordered list of transactions, and so does the DynamoDB table. The difference is that the FIFO queue only supports retrieving the next unprocessed message and cannot be used to retrieve updates from a certain point onwards. The DynamoDB table does support retrieving messages from a particular point onwards, but does not natively support insertion in order - this has to be performed by Sleeper with a conditional put. If we had a storage technology that supported storing data in order and reading data from given offsets then that would allow us to simply send transactions to the storage, without the need to go through a single point. We could still perform snapshotting and updates from the log as we do now. Kafka may meet these needs.
  • Instead of using a transaction log approach, use a store that supports transactions, snapshot isolation and a high update rate, e.g. PostgreSQL or FoundationDB.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      Throughput limitations for state store committer · Issue #4525 · gchq/sleeper