JAMES LONG

How one word in PostgreSQL unlocked a 9x performance improvement

May 26, 2020

At the very heart of Actual is a custom syncing engine. Recently I implemented full end-to-end encryption (not released yet) and it inspired me to audit the performance of the whole process. In the future I'll blog more about using CRDTs for syncing, but for now I'd like to talk about a PostgreSQL feature that enabled a 9-10x performance improvement.

Actual is completely a local app and syncing happens in the background (using CRDTs). This means the server is very simple and all it has to do is store and fetch "messages" for clients. The entire code for handling syncing is only ~200 lines of JavaScript.

We need to handle a lot of messages to keep syncing fast. In fact, while working on this something strange happened: a new user generated 169,000 messages on one day. This is an outlier by a huge margin. For example, importing 1000 transactions into the system would generate about 6000 messages and, while reasonable, is still more than the average number of message per day per user. I believe they did this by using the API trying to bulk import a lot of data and we have different APIs for that. Still, I thought, what if I made 169,000 my benchmark?

I tried pumping 169,000 messages through the system and broke the server. The request timed out and the server was still crunching through messages making everything else slow. I knew what the problem was instantly.

Messages are stored in PostgreSQL and the table looks like this:

CREATE TABLE messages_binary
(timestamp TEXT,
group_id TEXT,
is_encrypted BOOLEAN,
content bytea,
PRIMARY KEY(timestamp, group_id));

It stores small binary blobs marked with a timestamp and a "sync group" they belong to.

The server was choking trying to insert so many rows. Unfortunately, we can't simply execute one query with a bunch of INSERT statements when adding messages. Our CRDTs have a few constraints:

  1. Message can't ever be duplicated (identified by timestamp)
  2. We need to update a merkle trie depending on whether or not the message was added

Solving #1 is easy. Because we made timestamp the primary key, we can do INSERT INTO messages_binary (...) VALUES (...) ON CONFLICT DO NOTHING. The ON CONFLICT clause tells it to do nothing when there's a conflict, and duplicates conflict on primary key.

A much bigger problem is #2. We need the result of the insert to know if a row was inserted or not. If it was inserted, we need to also update our merkle trie like this:

if(inserted) {
trie = merkle.insert(trie, Timestamp.parse(msg.timestamp));
}

It's extremely important that each timestamp in the system only ever get inserted to the merkle trie once. The trie is responsible for guaranteeing consistency in the system and maintains hashes for the content. If you haven't added each timestamp once and only once, the hashes (and thus verification) are wrong.

The whole code for updating the database looks like this (using some abstractions over node-postgres):

await runQuery('BEGIN');
let trie = await getMerkle(runQuery, groupId);
for (let message of messages) {
let timestamp = message.getTimestamp();
let isEncrypted = message.getIsencrypted();
let content = message.getContent();
let { changes } = await runQuery(
`INSERT INTO messages_binary (timestamp, group_id, is_encrypted, content)
VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING`,
[timestamp, groupId, isEncrypted, content]
);
if (changes === 1) {
// Update the merkle trie
trie = merkle.insert(trie, Timestamp.parse(timestamp));
}
}
await runQuery(
`INSERT INTO messages_merkles (group_id, merkle)
VALUES ($1, $2)
ON CONFLICT (group_id) DO UPDATE SET merkle = $2`,
[groupId, JSON.stringify(trie)]
);
await runQuery('COMMIT');

This is mostly the real code, the only difference is we also rollback the transaction on failure. It's extremely important that this happens in a transaction and both the messages and merkle trie are updated atomically. Again, the merkle trie verifies the messages content and they must always be in sync. The user will see sync errors if they are not.

The problem is immediately clear: we are executing an INSERT query for each message individually. In our extreme case we are trying to execute 169,000 statements. PostgreSQL lives on a different server (but close) and making that many network calls alone is going to kill performance, not to mention PG overhead.

I knew this was slow, but I didn't realize how slow. Let's test a more reasonable number of messages that actually finishes. 4000 messages takes 6.9s to complete. This is just profiling the above code, and not taking into account network transfer.

This is a huge UX issue. While this is processing the user is sitting there watching the "sync" icon spin and spin and spin…

Back to the drawing board. What we need:

  • To execute as few queries as possible
  • To know which messages were added
  • To commit the messages and merkle trie updates atomically

We could check which messages already exist and filter them out, but that would require an expensive SELECT query (that would probably need to be broken up because you wouldn't want to pass 169,000 parameters). Another idea I had was to insert messages with a unique number, and then afterwards I can query which messages have that unique number since only the new ones would have it.

The beauty of relational databases (compared to key-value) is they tend to have robust solutions for these kinds of problems. There had to be a way to do this because this pattern is not esoteric. The first thing I learned was how to insert multiple rows with a single INSERT statement:

-- At least in PostgreSQL, you can pass multiple items to a single insert
INSERT INTO messages_binary (timestamp, group_id, content) VALUES
("1", "group1", "binary-blob1"),
("3", "group1", "binary-blobb6"),
("2", "group1", "binary-blobbbb");

This is better than concatenating multiple INSERT statements into one query because it's probably faster, and most importantly we have hope of getting back information about what happened.

Scouring the docs I discovered the RETURNING clause of an INSERT statement. By default PostgreSQL doesn't return anything when doing INSERT except the number of rows that changed. But if you do INSERT INTO table (value) VALUES (1) RETURNING id it will return the id of the new row.

The big question was if this did what I wanted: when using an INSERT statement with multiple items and ON CONFLICT DO NOTHING, will it return an array of ids of only the items that were actually inserted? I was suspicious it might return the ids of all the items even if they conflicted (and weren't inserted).

I wrote a quick script to test the behavior and: bingo. RETURNING does exactly what I want. Here's a test:

INSERT INTO messages_binary (timestamp, group_id, content) VALUES
('1', 'group5', '...'),
('2', 'group6', '...'),
('3', 'group7', '...')
ON CONFLICT DO NOTHING RETURNING timestamp;

When executing this query, if a message with timestamp of 1 already exists, this will only insert 2 and 3 and return an array [{ id: '2' }, { id: '3' }]. Bingo bango bongo.

RETURNING allows me to reduce all of this work down into a single query. I can use the results to know exactly which messages were added and update the merkle trie appropriately.

The new code looks something like this. I'm still auditing the safety of the pg-promise helper:

// We use a helper from a library `pg-promise` to generate
// the multi-value INSERT statement. This will escape values.
// http://vitaly-t.github.io/pg-promise/helpers.html#.insert
let stmt = pgp.helpers.insert(
messages.map(msg => ({
timestamp: msg.getTimestamp(),
group_id: groupId,
is_encrypted: msg.getIsencrypted(),
content: msg.getContent()
})),
['timestamp', 'group_id', 'is_encrypted', 'content'],
'messages_binary'
);
let { changes, rows } = await runQuery(
stmt + ' ON CONFLICT DO NOTHING RETURNING timestamp'
);
rows.forEach(row => {
trie = merkle.insert(trie, Timestamp.parse(row.timestamp));
});
// Write back the merkle trie…

Let's check out the results!

4000 messages
Before: 6.9s
After: .75s
40000 messages
Before: 59s
After: 7.1s

You read that right: previously it took 59 seconds to process 40000 messages and now it only takes 7.2 seconds. We're able to process 10 times the amount of messages!

Update: There was an error in the SQL generation causing each piece of data to be larger than needed (the binary blob encoding was wrong) so the generated INSERT statement is about 25% smaller, and 40000 messages is now processed in ~5 seconds.

What about 169,000?

You might be wondering what happened to 169,000, our benchmark? Well, turns out there's still an upper limit. This time we're hitting a PostgreSQL limit and there isn't a quick fix.

When processing 169,000, the first problem is that, well, node crashes. The pgp.helpers.insert helper from pg-promise causes the crash when passed that number of items. Not exactly sure why, but it's not worth investigating because there are other problems.

First, 169,000 items requires an upload payload of 21MB. That's unacceptable because the chances of that failing is too large.

If we scale the benchmark down to 100,000, we get something that gets further. The multi-value INSERT statement that is generated is a 72MB string. Trying to execute this massive query string simply… hangs the whole server. I'm not sure where the problem is, or if PostgreSQL settings could be tuned to handle it, but again we simply can't handle something of this size.

The better solution is to page message syncing and have an upper limit per request. A good limit seems to be 40,000 messages. At the size, the upload payload is 5MB and it takes 7 seconds to process (it still generates a 30MB query string which PostgreSQL happily processes!). To process 169,000 messages, we'd send 5 requests each which 40,000 messages (or whatever is leftover). The total time to process all of them would be 169000 / 40000 * 7 or 29.6 seconds. As long as we display the progress to the user, not bad for such a gigantic changeset.

This is the worst case scenario. We're not normally dealing with timeframes in seconds. The most common syncing operations deal with 10-200 messages which syncs within 20ms. This is absolutely the worst case, like somebody is hitting the API with thousands of changes per second and trying to sync later, which almost never happens. But we should be able to handle it if a user abuses the API.

One last improvement

Unrelated to the above problem, there is one last improvement I'd like to make. Since the merkle trie is stored in the database, the server needs to fetch it, change it, then store it back. That means no other connections can concurrently change the trie while we're working on it.

The current solution uses a blunt hammer to solve it: a mutex. The mutex locks per user around the syncing logic, so users can concurrently sync, but if the same user syncs on multiple devices, they will be serialized. This is necessary to avoid race conditions while updating the merkle trie (remember, it's extremely important that it stays in tact).

It looks like the Serializable Isolation Level for transactions might solve this. You start the transaction with BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, and PostgreSQL will abort a transaction if it detects that race conditions might occur between them. I'm not 100% sure if it will work with my use case where I read something and write it back later in the same transaction, but if it does, if a transaction fails I'd just restart it. So each syncing process would get serialized. I'd love to hear from you if you know anything about this.

Up next…

I haven't thrown the 169,000 benchmark at the client part of syncing yet. The client does more work when syncing because a lot of other things like undo hooks into the system, so there is still a lot to optimize there. I doubt it would handle a case of 169,000 messages right now anyway, but I'm sure it could handle 40,000 messages. I think the idea of paging the syncing into 40,000 blocks would work well though, and would be easy to show feedback to the user of how many messages have been processed so far.

No matter what, it's great to optimize for the extreme cases. The 9-10x improvement here trickles down to the far smaller cases that make up 95% of the requests. Now a request that took 100ms before will take ~10ms. Snappy!